001package io.prometheus.metrics.exporter.pushgateway; 002 003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP; 004import static io.prometheus.metrics.model.snapshots.PrometheusNaming.escapeName; 005import static java.util.Objects.requireNonNull; 006 007import io.prometheus.metrics.annotations.StableApi; 008import io.prometheus.metrics.config.EscapingScheme; 009import io.prometheus.metrics.config.ExporterPushgatewayProperties; 010import io.prometheus.metrics.config.PrometheusProperties; 011import io.prometheus.metrics.config.PrometheusPropertiesException; 012import io.prometheus.metrics.expositionformats.ExpositionFormatWriter; 013import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter; 014import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; 015import io.prometheus.metrics.model.registry.Collector; 016import io.prometheus.metrics.model.registry.MultiCollector; 017import io.prometheus.metrics.model.registry.PrometheusRegistry; 018import java.io.ByteArrayOutputStream; 019import java.io.IOException; 020import java.io.InputStream; 021import java.io.OutputStream; 022import java.io.UnsupportedEncodingException; 023import java.net.HttpURLConnection; 024import java.net.InetAddress; 025import java.net.MalformedURLException; 026import java.net.URI; 027import java.net.URL; 028import java.net.URLEncoder; 029import java.net.UnknownHostException; 030import java.nio.charset.StandardCharsets; 031import java.time.Duration; 032import java.util.Base64; 033import java.util.Collections; 034import java.util.HashMap; 035import java.util.Map; 036import java.util.TreeMap; 037import javax.annotation.Nullable; 038 039/** 040 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus 041 * Pushgateway</a> 042 * 043 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to 044 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead 045 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link 046 * PrometheusRegistry} to a Pushgateway. 047 * 048 * <p>Example usage: 049 * 050 * <pre>{@code 051 * void executeBatchJob() throws Exception { 052 * PrometheusRegistry registry = new PrometheusRegistry(); 053 * Gauge duration = Gauge.builder() 054 * .name("my_batch_job_duration_seconds") 055 * .help("Duration of my batch job in seconds.") 056 * .register(registry); 057 * Timer durationTimer = duration.startTimer(); 058 * try { 059 * // Your code here. 060 * 061 * // This is only added to the registry after success, 062 * // so that a previous success in the Pushgateway isn't overwritten on failure. 063 * Gauge lastSuccess = Gauge.builder() 064 * .name("my_batch_job_last_success") 065 * .help("Last time my batch job succeeded, in unixtime.") 066 * .register(registry); 067 * lastSuccess.set(System.currentTimeMillis()); 068 * } finally { 069 * durationTimer.observeDuration(); 070 * PushGateway pg = PushGateway.builder() 071 * .address("127.0.0.1:9091") 072 * .job("my_batch_job") 073 * .registry(registry) 074 * .build(); 075 * pg.pushAdd(); 076 * } 077 * } 078 * }</pre> 079 * 080 * <p>See <a 081 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>. 082 */ 083@StableApi 084public class PushGateway { 085 private final URL url; 086 private final ExpositionFormatWriter writer; 087 private final boolean prometheusTimestampsInMs; 088 private final Map<String, String> requestHeaders; 089 private final PrometheusRegistry registry; 090 private final HttpConnectionFactory connectionFactory; 091 private final EscapingScheme escapingScheme; 092 private final Duration connectionTimeout; 093 private final Duration readTimeout; 094 095 private PushGateway( 096 PrometheusRegistry registry, 097 Format format, 098 URL url, 099 HttpConnectionFactory connectionFactory, 100 Map<String, String> requestHeaders, 101 boolean prometheusTimestampsInMs, 102 EscapingScheme escapingScheme, 103 Duration connectionTimeout, 104 Duration readTimeout) { 105 this.registry = registry; 106 this.url = url; 107 this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders)); 108 this.connectionFactory = connectionFactory; 109 this.prometheusTimestampsInMs = prometheusTimestampsInMs; 110 this.escapingScheme = escapingScheme; 111 this.connectionTimeout = connectionTimeout; 112 this.readTimeout = readTimeout; 113 writer = getWriter(format); 114 if (!writer.isAvailable()) { 115 throw new RuntimeException(writer.getClass() + " is not available"); 116 } 117 } 118 119 @SuppressWarnings("deprecation") 120 private ExpositionFormatWriter getWriter(Format format) { 121 if (format == Format.PROMETHEUS_TEXT) { 122 return PrometheusTextFormatWriter.builder() 123 .setTimestampsInMs(this.prometheusTimestampsInMs) 124 .build(); 125 } else { 126 // use reflection to avoid a compile-time dependency on the expositionformats module 127 return new PrometheusProtobufWriter(); 128 } 129 } 130 131 /** 132 * Push all metrics. All metrics with the same job and grouping key are replaced. 133 * 134 * <p>This uses the PUT HTTP method. 135 */ 136 public void push() throws IOException { 137 doRequest(registry, "PUT"); 138 } 139 140 /** 141 * Push a single metric. All metrics with the same job and grouping key are replaced. 142 * 143 * <p>This is useful for pushing a single Gauge. 144 * 145 * <p>This uses the PUT HTTP method. 146 */ 147 public void push(Collector collector) throws IOException { 148 PrometheusRegistry registry = new PrometheusRegistry(); 149 registry.register(collector); 150 doRequest(registry, "PUT"); 151 } 152 153 /** 154 * Push a single collector. All metrics with the same job and grouping key are replaced. 155 * 156 * <p>This uses the PUT HTTP method. 157 */ 158 public void push(MultiCollector collector) throws IOException { 159 PrometheusRegistry registry = new PrometheusRegistry(); 160 registry.register(collector); 161 doRequest(registry, "PUT"); 162 } 163 164 /** 165 * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are 166 * replaced. 167 * 168 * <p>This uses the POST HTTP method. 169 */ 170 public void pushAdd() throws IOException { 171 doRequest(registry, "POST"); 172 } 173 174 /** 175 * Like {@link #push(Collector)}, but only the specified metric will be replaced. 176 * 177 * <p>This uses the POST HTTP method. 178 */ 179 public void pushAdd(Collector collector) throws IOException { 180 PrometheusRegistry registry = new PrometheusRegistry(); 181 registry.register(collector); 182 doRequest(registry, "POST"); 183 } 184 185 /** 186 * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced. 187 * 188 * <p>This uses the POST HTTP method. 189 */ 190 public void pushAdd(MultiCollector collector) throws IOException { 191 PrometheusRegistry registry = new PrometheusRegistry(); 192 registry.register(collector); 193 doRequest(registry, "POST"); 194 } 195 196 /** 197 * Deletes metrics from the Pushgateway. 198 * 199 * <p>This uses the DELETE HTTP method. 200 */ 201 public void delete() throws IOException { 202 doRequest(null, "DELETE"); 203 } 204 205 private void doRequest(@Nullable PrometheusRegistry registry, String method) throws IOException { 206 try { 207 HttpURLConnection connection = connectionFactory.create(url); 208 requestHeaders.forEach(connection::setRequestProperty); 209 connection.setRequestProperty("Content-Type", writer.getContentType()); 210 if (!method.equals("DELETE")) { 211 connection.setDoOutput(true); 212 } 213 connection.setRequestMethod(method); 214 215 connection.setConnectTimeout((int) this.connectionTimeout.toMillis()); 216 connection.setReadTimeout((int) this.readTimeout.toMillis()); 217 connection.connect(); 218 219 try { 220 if (!method.equals("DELETE")) { 221 OutputStream outputStream = connection.getOutputStream(); 222 writer.write(outputStream, requireNonNull(registry).scrape(), this.escapingScheme); 223 outputStream.flush(); 224 outputStream.close(); 225 } 226 227 int response = connection.getResponseCode(); 228 if (response / 100 != 2) { 229 String errorMessage; 230 InputStream errorStream = connection.getErrorStream(); 231 if (errorStream != null) { 232 String errBody = readFromStream(errorStream); 233 errorMessage = 234 "Response code from " + url + " was " + response + ", response body: " + errBody; 235 } else { 236 errorMessage = "Response code from " + url + " was " + response; 237 } 238 throw new IOException(errorMessage); 239 } 240 241 } finally { 242 connection.disconnect(); 243 } 244 } catch (IOException e) { 245 String baseUrl = url.getProtocol() + "://" + url.getHost(); 246 if (url.getPort() != -1) { 247 baseUrl += ":" + url.getPort(); 248 } 249 throw new IOException( 250 "Failed to push metrics to the Prometheus Pushgateway on " 251 + baseUrl 252 + ": " 253 + e.getMessage(), 254 e); 255 } 256 } 257 258 // toString with Charset is only available in Java 10+, but we want to support Java 8 259 @SuppressWarnings("JdkObsolete") 260 private static String readFromStream(InputStream is) throws IOException { 261 ByteArrayOutputStream result = new ByteArrayOutputStream(); 262 byte[] buffer = new byte[1024]; 263 int length; 264 while ((length = is.read(buffer)) != -1) { 265 result.write(buffer, 0, length); 266 } 267 return result.toString("UTF-8"); 268 } 269 270 public static Builder builder() { 271 return builder(PrometheusProperties.get()); 272 } 273 274 /** 275 * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}. 276 */ 277 public static Builder builder(PrometheusProperties config) { 278 return new Builder(config); 279 } 280 281 public static class Builder { 282 283 private final PrometheusProperties config; 284 @Nullable private Format format; 285 @Nullable private String address; 286 @Nullable private Scheme scheme; 287 @Nullable private String job; 288 @Nullable private Duration connectionTimeout; 289 @Nullable private Duration readTimeout; 290 private boolean prometheusTimestampsInMs; 291 private final Map<String, String> requestHeaders = new HashMap<>(); 292 private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry; 293 private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory(); 294 private final Map<String, String> groupingKey = new TreeMap<>(); 295 @Nullable private EscapingScheme escapingScheme; 296 297 private Builder(PrometheusProperties config) { 298 this.config = config; 299 } 300 301 /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */ 302 public Builder format(Format format) { 303 this.format = requireNonNull(format, "format must not be null"); 304 return this; 305 } 306 307 /** 308 * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}. 309 * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address} 310 * property. 311 */ 312 public Builder address(String address) { 313 this.address = requireNonNull(address, "address must not be null"); 314 return this; 315 } 316 317 /** Username and password for HTTP basic auth when pushing to the Pushgateway. */ 318 public Builder basicAuth(String user, String password) { 319 byte[] credentialsBytes = 320 (requireNonNull(user, "user must not be null") 321 + ":" 322 + requireNonNull(password, "password must not be null")) 323 .getBytes(StandardCharsets.UTF_8); 324 String encoded = Base64.getEncoder().encodeToString(credentialsBytes); 325 requestHeaders.put("Authorization", String.format("Basic %s", encoded)); 326 return this; 327 } 328 329 /** Bearer token authorization when pushing to the Pushgateway. */ 330 public Builder bearerToken(String token) { 331 requestHeaders.put( 332 "Authorization", 333 String.format("Bearer %s", requireNonNull(token, "token must not be null"))); 334 return this; 335 } 336 337 /** 338 * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten 339 * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property. 340 */ 341 public Builder scheme(Scheme scheme) { 342 this.scheme = requireNonNull(scheme, "scheme must not be null"); 343 return this; 344 } 345 346 /** 347 * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}. 348 * 349 * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example 350 * of a custom connection factory that skips SSL certificate validation for HTTPS connections. 351 */ 352 public Builder connectionFactory(HttpConnectionFactory connectionFactory) { 353 this.connectionFactory = 354 requireNonNull(connectionFactory, "connectionFactory must not be null"); 355 return this; 356 } 357 358 /** 359 * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR 360 * file will be used by default. Can be overwritten at runtime with the {@code 361 * io.prometheus.exporter.pushgateway.job} property. 362 */ 363 public Builder job(String job) { 364 this.job = requireNonNull(job, "job must not be null"); 365 return this; 366 } 367 368 /** 369 * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for 370 * adding multiple grouping keys. 371 */ 372 public Builder groupingKey(String name, String value) { 373 groupingKey.put( 374 requireNonNull(name, "name must not be null"), 375 requireNonNull(value, "value must not be null")); 376 return this; 377 } 378 379 /** Convenience method for adding the current IP address as an "instance" label. */ 380 public Builder instanceIpGroupingKey() throws UnknownHostException { 381 return groupingKey("instance", InetAddress.getLocalHost().getHostAddress()); 382 } 383 384 /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */ 385 public Builder registry(PrometheusRegistry registry) { 386 this.registry = requireNonNull(registry, "registry must not be null"); 387 return this; 388 } 389 390 /** 391 * Specify the escaping scheme to be used when pushing metrics. Default is {@link 392 * EscapingScheme#UNDERSCORE_ESCAPING}. 393 */ 394 public Builder escapingScheme(EscapingScheme escapingScheme) { 395 this.escapingScheme = requireNonNull(escapingScheme, "escapingScheme must not be null"); 396 return this; 397 } 398 399 /** 400 * Use milliseconds for timestamps in text format? Default is {@code false}. Can be overwritten 401 * at runtime with the {@code io.prometheus.exporter.timestamps_in_ms} property. 402 */ 403 public Builder prometheusTimestampsInMs(boolean prometheusTimestampsInMs) { 404 this.prometheusTimestampsInMs = prometheusTimestampsInMs; 405 return this; 406 } 407 408 /** 409 * Specify the connection timeout for HTTP connections to the PushGateway. Default is 10 410 * seconds. 411 * 412 * @param connectionTimeout timeout value 413 * @return this {@link Builder} instance 414 */ 415 public Builder connectionTimeout(Duration connectionTimeout) { 416 this.connectionTimeout = connectionTimeout; 417 return this; 418 } 419 420 private Duration getConnectionTimeout(@Nullable ExporterPushgatewayProperties properties) { 421 if (properties != null && properties.getConnectTimeout() != null) { 422 return properties.getConnectTimeout(); 423 } else if (this.connectionTimeout != null) { 424 return this.connectionTimeout; 425 } else { 426 return Duration.ofSeconds(10); 427 } 428 } 429 430 /** 431 * Specify the read timeout for HTTP connections to the PushGateway. Default is 10 seconds. 432 * 433 * @param readTimeout timeout value 434 * @return this {@link Builder} instance 435 */ 436 public Builder readTimeout(Duration readTimeout) { 437 this.readTimeout = readTimeout; 438 return this; 439 } 440 441 private Duration getReadTimeout(@Nullable ExporterPushgatewayProperties properties) { 442 if (properties != null && properties.getReadTimeout() != null) { 443 return properties.getReadTimeout(); 444 } else if (this.readTimeout != null) { 445 return this.readTimeout; 446 } else { 447 return Duration.ofSeconds(10); 448 } 449 } 450 451 private boolean getPrometheusTimestampsInMs() { 452 // accept either to opt in to timestamps in milliseconds 453 return config.getExporterProperties().getPrometheusTimestampsInMs() 454 || this.prometheusTimestampsInMs; 455 } 456 457 private Scheme getScheme(@Nullable ExporterPushgatewayProperties properties) { 458 if (properties != null && properties.getScheme() != null) { 459 return Scheme.valueOf(properties.getScheme()); 460 } else if (this.scheme != null) { 461 return this.scheme; 462 } else { 463 return HTTP; 464 } 465 } 466 467 private String getAddress(@Nullable ExporterPushgatewayProperties properties) { 468 if (properties != null && properties.getAddress() != null) { 469 return properties.getAddress(); 470 } else if (this.address != null) { 471 return this.address; 472 } else { 473 return "localhost:9091"; 474 } 475 } 476 477 private String getJob(@Nullable ExporterPushgatewayProperties properties) { 478 if (properties != null && properties.getJob() != null) { 479 return properties.getJob(); 480 } else if (this.job != null) { 481 return this.job; 482 } else { 483 return DefaultJobLabelDetector.getDefaultJobLabel(); 484 } 485 } 486 487 private EscapingScheme getEscapingScheme(@Nullable ExporterPushgatewayProperties properties) { 488 if (properties != null && properties.getEscapingScheme() != null) { 489 return properties.getEscapingScheme(); 490 } else if (this.escapingScheme != null) { 491 return this.escapingScheme; 492 } 493 return EscapingScheme.UNDERSCORE_ESCAPING; 494 } 495 496 private Format getFormat() { 497 // currently not configurable via properties 498 if (this.format != null) { 499 return this.format; 500 } 501 return Format.PROMETHEUS_PROTOBUF; 502 } 503 504 // encode with Charset is only available in Java 10+, but we want to support Java 8 505 @SuppressWarnings("JdkObsolete") 506 private URL makeUrl(@Nullable ExporterPushgatewayProperties properties) 507 throws UnsupportedEncodingException, MalformedURLException { 508 StringBuilder url = 509 new StringBuilder(getScheme(properties) + "://" + getAddress(properties) + "/metrics/"); 510 String job = getJob(properties); 511 if (job.contains("/")) { 512 url.append("job@base64/").append(base64url(job)); 513 } else { 514 url.append("job/").append(URLEncoder.encode(job, "UTF-8")); 515 } 516 for (Map.Entry<String, String> entry : groupingKey.entrySet()) { 517 if (entry.getValue().isEmpty()) { 518 url.append("/") 519 .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING)) 520 .append("@base64/="); 521 } else if (entry.getValue().contains("/")) { 522 url.append("/") 523 .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING)) 524 .append("@base64/") 525 .append(base64url(entry.getValue())); 526 } else { 527 url.append("/") 528 .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING)) 529 .append("/") 530 .append(URLEncoder.encode(entry.getValue(), "UTF-8")); 531 } 532 } 533 return URI.create(url.toString()).normalize().toURL(); 534 } 535 536 private String base64url(String v) { 537 return Base64.getEncoder() 538 .encodeToString(v.getBytes(StandardCharsets.UTF_8)) 539 .replace("+", "-") 540 .replace("/", "_"); 541 } 542 543 public PushGateway build() { 544 ExporterPushgatewayProperties properties = 545 config == null ? null : config.getExporterPushgatewayProperties(); 546 try { 547 return new PushGateway( 548 registry, 549 getFormat(), 550 makeUrl(properties), 551 connectionFactory, 552 requestHeaders, 553 getPrometheusTimestampsInMs(), 554 getEscapingScheme(properties), 555 getConnectionTimeout(properties), 556 getReadTimeout(properties)); 557 } catch (MalformedURLException e) { 558 throw new PrometheusPropertiesException( 559 address + ": Invalid address. Expecting <host>:<port>"); 560 } catch (UnsupportedEncodingException e) { 561 throw new RuntimeException(e); // cannot happen, UTF-8 is always supported 562 } 563 } 564 } 565}