001package io.prometheus.metrics.exporter.pushgateway; 002 003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP; 004 005import io.prometheus.metrics.config.ExporterPushgatewayProperties; 006import io.prometheus.metrics.config.PrometheusProperties; 007import io.prometheus.metrics.config.PrometheusPropertiesException; 008import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter; 009import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; 010import io.prometheus.metrics.model.registry.Collector; 011import io.prometheus.metrics.model.registry.MultiCollector; 012import io.prometheus.metrics.model.registry.PrometheusRegistry; 013import java.io.ByteArrayOutputStream; 014import java.io.IOException; 015import java.io.InputStream; 016import java.io.OutputStream; 017import java.io.UnsupportedEncodingException; 018import java.net.HttpURLConnection; 019import java.net.InetAddress; 020import java.net.MalformedURLException; 021import java.net.URI; 022import java.net.URL; 023import java.net.URLEncoder; 024import java.net.UnknownHostException; 025import java.nio.charset.StandardCharsets; 026import java.util.Base64; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.TreeMap; 031 032/** 033 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus 034 * Pushgateway</a> 035 * 036 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to 037 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead 038 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link 039 * PrometheusRegistry} to a Pushgateway. 040 * 041 * <p>Example usage: 042 * 043 * <pre>{@code 044 * void executeBatchJob() throws Exception { 045 * PrometheusRegistry registry = new PrometheusRegistry(); 046 * Gauge duration = Gauge.builder() 047 * .name("my_batch_job_duration_seconds") 048 * .help("Duration of my batch job in seconds.") 049 * .register(registry); 050 * Timer durationTimer = duration.startTimer(); 051 * try { 052 * // Your code here. 053 * 054 * // This is only added to the registry after success, 055 * // so that a previous success in the Pushgateway isn't overwritten on failure. 056 * Gauge lastSuccess = Gauge.builder() 057 * .name("my_batch_job_last_success") 058 * .help("Last time my batch job succeeded, in unixtime.") 059 * .register(registry); 060 * lastSuccess.set(System.currentTimeMillis()); 061 * } finally { 062 * durationTimer.observeDuration(); 063 * PushGateway pg = PushGateway.builder() 064 * .address("127.0.0.1:9091") 065 * .job("my_batch_job") 066 * .registry(registry) 067 * .build(); 068 * pg.pushAdd(); 069 * } 070 * } 071 * }</pre> 072 * 073 * <p>See <a 074 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>. 075 */ 076public class PushGateway { 077 078 private static final int MILLISECONDS_PER_SECOND = 1000; 079 080 private final URL url; 081 private final Format format; 082 private final Map<String, String> requestHeaders; 083 private final PrometheusRegistry registry; 084 private final HttpConnectionFactory connectionFactory; 085 086 private PushGateway( 087 PrometheusRegistry registry, 088 Format format, 089 URL url, 090 HttpConnectionFactory connectionFactory, 091 Map<String, String> requestHeaders) { 092 this.registry = registry; 093 this.format = format; 094 this.url = url; 095 this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders)); 096 this.connectionFactory = connectionFactory; 097 } 098 099 /** 100 * Push all metrics. All metrics with the same job and grouping key are replaced. 101 * 102 * <p>This uses the PUT HTTP method. 103 */ 104 public void push() throws IOException { 105 doRequest(registry, "PUT"); 106 } 107 108 /** 109 * Push a single metric. All metrics with the same job and grouping key are replaced. 110 * 111 * <p>This is useful for pushing a single Gauge. 112 * 113 * <p>This uses the PUT HTTP method. 114 */ 115 public void push(Collector collector) throws IOException { 116 PrometheusRegistry registry = new PrometheusRegistry(); 117 registry.register(collector); 118 doRequest(registry, "PUT"); 119 } 120 121 /** 122 * Push a single collector. All metrics with the same job and grouping key are replaced. 123 * 124 * <p>This uses the PUT HTTP method. 125 */ 126 public void push(MultiCollector collector) throws IOException { 127 PrometheusRegistry registry = new PrometheusRegistry(); 128 registry.register(collector); 129 doRequest(registry, "PUT"); 130 } 131 132 /** 133 * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are 134 * replaced. 135 * 136 * <p>This uses the POST HTTP method. 137 */ 138 public void pushAdd() throws IOException { 139 doRequest(registry, "POST"); 140 } 141 142 /** 143 * Like {@link #push(Collector)}, but only the specified metric will be replaced. 144 * 145 * <p>This uses the POST HTTP method. 146 */ 147 public void pushAdd(Collector collector) throws IOException { 148 PrometheusRegistry registry = new PrometheusRegistry(); 149 registry.register(collector); 150 doRequest(registry, "POST"); 151 } 152 153 /** 154 * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced. 155 * 156 * <p>This uses the POST HTTP method. 157 */ 158 public void pushAdd(MultiCollector collector) throws IOException { 159 PrometheusRegistry registry = new PrometheusRegistry(); 160 registry.register(collector); 161 doRequest(registry, "POST"); 162 } 163 164 /** 165 * Deletes metrics from the Pushgateway. 166 * 167 * <p>This uses the DELETE HTTP method. 168 */ 169 public void delete() throws IOException { 170 doRequest(null, "DELETE"); 171 } 172 173 private void doRequest(PrometheusRegistry registry, String method) throws IOException { 174 try { 175 HttpURLConnection connection = connectionFactory.create(url); 176 requestHeaders.forEach(connection::setRequestProperty); 177 if (format == Format.PROMETHEUS_TEXT) { 178 connection.setRequestProperty("Content-Type", PrometheusTextFormatWriter.CONTENT_TYPE); 179 } else { 180 connection.setRequestProperty("Content-Type", PrometheusProtobufWriter.CONTENT_TYPE); 181 } 182 if (!method.equals("DELETE")) { 183 connection.setDoOutput(true); 184 } 185 connection.setRequestMethod(method); 186 187 connection.setConnectTimeout(10 * MILLISECONDS_PER_SECOND); 188 connection.setReadTimeout(10 * MILLISECONDS_PER_SECOND); 189 connection.connect(); 190 191 try { 192 if (!method.equals("DELETE")) { 193 OutputStream outputStream = connection.getOutputStream(); 194 if (format == Format.PROMETHEUS_TEXT) { 195 new PrometheusTextFormatWriter(false).write(outputStream, registry.scrape()); 196 } else { 197 new PrometheusProtobufWriter().write(outputStream, registry.scrape()); 198 } 199 outputStream.flush(); 200 outputStream.close(); 201 } 202 203 int response = connection.getResponseCode(); 204 if (response / 100 != 2) { 205 String errorMessage; 206 InputStream errorStream = connection.getErrorStream(); 207 if (errorStream != null) { 208 String errBody = readFromStream(errorStream); 209 errorMessage = 210 "Response code from " + url + " was " + response + ", response body: " + errBody; 211 } else { 212 errorMessage = "Response code from " + url + " was " + response; 213 } 214 throw new IOException(errorMessage); 215 } 216 217 } finally { 218 connection.disconnect(); 219 } 220 } catch (IOException e) { 221 String baseUrl = url.getProtocol() + "://" + url.getHost(); 222 if (url.getPort() != -1) { 223 baseUrl += ":" + url.getPort(); 224 } 225 throw new IOException( 226 "Failed to push metrics to the Prometheus Pushgateway on " 227 + baseUrl 228 + ": " 229 + e.getMessage(), 230 e); 231 } 232 } 233 234 private static String readFromStream(InputStream is) throws IOException { 235 ByteArrayOutputStream result = new ByteArrayOutputStream(); 236 byte[] buffer = new byte[1024]; 237 int length; 238 while ((length = is.read(buffer)) != -1) { 239 result.write(buffer, 0, length); 240 } 241 return result.toString("UTF-8"); 242 } 243 244 public static Builder builder() { 245 return builder(PrometheusProperties.get()); 246 } 247 248 /** 249 * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}. 250 */ 251 public static Builder builder(PrometheusProperties config) { 252 return new Builder(config); 253 } 254 255 public static class Builder { 256 257 private final PrometheusProperties config; 258 private Format format; 259 private String address; 260 private Scheme scheme; 261 private String job; 262 private final Map<String, String> requestHeaders = new HashMap<>(); 263 private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry; 264 private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory(); 265 private Map<String, String> groupingKey = new TreeMap<>(); 266 267 private Builder(PrometheusProperties config) { 268 this.config = config; 269 } 270 271 /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */ 272 public Builder format(Format format) { 273 if (format == null) { 274 throw new NullPointerException(); 275 } 276 this.format = format; 277 return this; 278 } 279 280 /** 281 * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}. 282 * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address} 283 * property. 284 */ 285 public Builder address(String address) { 286 if (address == null) { 287 throw new NullPointerException(); 288 } 289 this.address = address; 290 return this; 291 } 292 293 /** Username and password for HTTP basic auth when pushing to the Pushgateway. */ 294 public Builder basicAuth(String user, String password) { 295 if (user == null || password == null) { 296 throw new NullPointerException(); 297 } 298 byte[] credentialsBytes = (user + ":" + password).getBytes(StandardCharsets.UTF_8); 299 String encoded = Base64.getEncoder().encodeToString(credentialsBytes); 300 requestHeaders.put("Authorization", String.format("Basic %s", encoded)); 301 return this; 302 } 303 304 /** Bearer token authorization when pushing to the Pushgateway. */ 305 public Builder bearerToken(String token) { 306 if (token == null) { 307 throw new NullPointerException(); 308 } 309 requestHeaders.put("Authorization", String.format("Bearer %s", token)); 310 return this; 311 } 312 313 /** 314 * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten 315 * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property. 316 */ 317 public Builder scheme(Scheme scheme) { 318 if (scheme == null) { 319 throw new NullPointerException(); 320 } 321 this.scheme = scheme; 322 return this; 323 } 324 325 /** 326 * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}. 327 * 328 * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example 329 * of a custom connection factory that skips SSL certificate validation for HTTPS connections. 330 */ 331 public Builder connectionFactory(HttpConnectionFactory connectionFactory) { 332 if (connectionFactory == null) { 333 throw new NullPointerException(); 334 } 335 this.connectionFactory = connectionFactory; 336 return this; 337 } 338 339 /** 340 * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR 341 * file will be used by default. Can be overwritten at runtime with the {@code 342 * io.prometheus.exporter.pushgateway.job} property. 343 */ 344 public Builder job(String job) { 345 if (job == null) { 346 throw new NullPointerException(); 347 } 348 this.job = job; 349 return this; 350 } 351 352 /** 353 * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for 354 * adding multiple grouping keys. 355 */ 356 public Builder groupingKey(String name, String value) { 357 if (name == null || value == null) { 358 throw new NullPointerException(); 359 } 360 groupingKey.put(name, value); 361 return this; 362 } 363 364 /** Convenience method for adding the current IP address as an "instance" label. */ 365 public Builder instanceIpGroupingKey() throws UnknownHostException { 366 return groupingKey("instance", InetAddress.getLocalHost().getHostAddress()); 367 } 368 369 /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */ 370 public Builder registry(PrometheusRegistry registry) { 371 if (registry == null) { 372 throw new NullPointerException(); 373 } 374 this.registry = registry; 375 return this; 376 } 377 378 private Scheme getScheme(ExporterPushgatewayProperties properties) { 379 if (properties != null && properties.getScheme() != null) { 380 return Scheme.valueOf(properties.getScheme()); 381 } else if (this.scheme != null) { 382 return this.scheme; 383 } else { 384 return HTTP; 385 } 386 } 387 388 private String getAddress(ExporterPushgatewayProperties properties) { 389 if (properties != null && properties.getAddress() != null) { 390 return properties.getAddress(); 391 } else if (this.address != null) { 392 return this.address; 393 } else { 394 return "localhost:9091"; 395 } 396 } 397 398 private String getJob(ExporterPushgatewayProperties properties) { 399 if (properties != null && properties.getJob() != null) { 400 return properties.getJob(); 401 } else if (this.job != null) { 402 return this.job; 403 } else { 404 return DefaultJobLabelDetector.getDefaultJobLabel(); 405 } 406 } 407 408 private Format getFormat() { 409 // currently not configurable via properties 410 if (this.format != null) { 411 return this.format; 412 } 413 return Format.PROMETHEUS_PROTOBUF; 414 } 415 416 private URL makeUrl(ExporterPushgatewayProperties properties) 417 throws UnsupportedEncodingException, MalformedURLException { 418 String url = getScheme(properties) + "://" + getAddress(properties) + "/metrics/"; 419 String job = getJob(properties); 420 if (job.contains("/")) { 421 url += "job@base64/" + base64url(job); 422 } else { 423 url += "job/" + URLEncoder.encode(job, "UTF-8"); 424 } 425 if (groupingKey != null) { 426 for (Map.Entry<String, String> entry : groupingKey.entrySet()) { 427 if (entry.getValue().isEmpty()) { 428 url += "/" + entry.getKey() + "@base64/="; 429 } else if (entry.getValue().contains("/")) { 430 url += "/" + entry.getKey() + "@base64/" + base64url(entry.getValue()); 431 } else { 432 url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8"); 433 } 434 } 435 } 436 return URI.create(url).normalize().toURL(); 437 } 438 439 private String base64url(String v) { 440 return Base64.getEncoder() 441 .encodeToString(v.getBytes(StandardCharsets.UTF_8)) 442 .replace("+", "-") 443 .replace("/", "_"); 444 } 445 446 public PushGateway build() { 447 ExporterPushgatewayProperties properties = 448 config == null ? null : config.getExporterPushgatewayProperties(); 449 try { 450 return new PushGateway( 451 registry, getFormat(), makeUrl(properties), connectionFactory, requestHeaders); 452 } catch (MalformedURLException e) { 453 throw new PrometheusPropertiesException( 454 address + ": Invalid address. Expecting <host>:<port>"); 455 } catch (UnsupportedEncodingException e) { 456 throw new RuntimeException(e); // cannot happen, UTF-8 is always supported 457 } 458 } 459 } 460}