001package io.prometheus.metrics.exporter.pushgateway; 002 003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP; 004import static io.prometheus.metrics.model.snapshots.PrometheusNaming.escapeName; 005import static java.util.Objects.requireNonNull; 006 007import io.prometheus.metrics.config.EscapingScheme; 008import io.prometheus.metrics.config.ExporterPushgatewayProperties; 009import io.prometheus.metrics.config.PrometheusProperties; 010import io.prometheus.metrics.config.PrometheusPropertiesException; 011import io.prometheus.metrics.expositionformats.ExpositionFormatWriter; 012import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter; 013import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; 014import io.prometheus.metrics.model.registry.Collector; 015import io.prometheus.metrics.model.registry.MultiCollector; 016import io.prometheus.metrics.model.registry.PrometheusRegistry; 017import java.io.ByteArrayOutputStream; 018import java.io.IOException; 019import java.io.InputStream; 020import java.io.OutputStream; 021import java.io.UnsupportedEncodingException; 022import java.net.HttpURLConnection; 023import java.net.InetAddress; 024import java.net.MalformedURLException; 025import java.net.URI; 026import java.net.URL; 027import java.net.URLEncoder; 028import java.net.UnknownHostException; 029import java.nio.charset.StandardCharsets; 030import java.time.Duration; 031import java.util.Base64; 032import java.util.Collections; 033import java.util.HashMap; 034import java.util.Map; 035import java.util.TreeMap; 036import javax.annotation.Nullable; 037 038/** 039 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus 040 * Pushgateway</a> 041 * 042 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to 043 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead 044 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link 045 * PrometheusRegistry} to a Pushgateway. 046 * 047 * <p>Example usage: 048 * 049 * <pre>{@code 050 * void executeBatchJob() throws Exception { 051 * PrometheusRegistry registry = new PrometheusRegistry(); 052 * Gauge duration = Gauge.builder() 053 * .name("my_batch_job_duration_seconds") 054 * .help("Duration of my batch job in seconds.") 055 * .register(registry); 056 * Timer durationTimer = duration.startTimer(); 057 * try { 058 * // Your code here. 059 * 060 * // This is only added to the registry after success, 061 * // so that a previous success in the Pushgateway isn't overwritten on failure. 062 * Gauge lastSuccess = Gauge.builder() 063 * .name("my_batch_job_last_success") 064 * .help("Last time my batch job succeeded, in unixtime.") 065 * .register(registry); 066 * lastSuccess.set(System.currentTimeMillis()); 067 * } finally { 068 * durationTimer.observeDuration(); 069 * PushGateway pg = PushGateway.builder() 070 * .address("127.0.0.1:9091") 071 * .job("my_batch_job") 072 * .registry(registry) 073 * .build(); 074 * pg.pushAdd(); 075 * } 076 * } 077 * }</pre> 078 * 079 * <p>See <a 080 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>. 081 */ 082public class PushGateway { 083 private final URL url; 084 private final ExpositionFormatWriter writer; 085 private final boolean prometheusTimestampsInMs; 086 private final Map<String, String> requestHeaders; 087 private final PrometheusRegistry registry; 088 private final HttpConnectionFactory connectionFactory; 089 private final EscapingScheme escapingScheme; 090 private final Duration connectionTimeout; 091 private final Duration readTimeout; 092 093 private PushGateway( 094 PrometheusRegistry registry, 095 Format format, 096 URL url, 097 HttpConnectionFactory connectionFactory, 098 Map<String, String> requestHeaders, 099 boolean prometheusTimestampsInMs, 100 EscapingScheme escapingScheme, 101 Duration connectionTimeout, 102 Duration readTimeout) { 103 this.registry = registry; 104 this.url = url; 105 this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders)); 106 this.connectionFactory = connectionFactory; 107 this.prometheusTimestampsInMs = prometheusTimestampsInMs; 108 this.escapingScheme = escapingScheme; 109 this.connectionTimeout = connectionTimeout; 110 this.readTimeout = readTimeout; 111 writer = getWriter(format); 112 if (!writer.isAvailable()) { 113 throw new RuntimeException(writer.getClass() + " is not available"); 114 } 115 } 116 117 @SuppressWarnings("deprecation") 118 private ExpositionFormatWriter getWriter(Format format) { 119 if (format == Format.PROMETHEUS_TEXT) { 120 return PrometheusTextFormatWriter.builder() 121 .setTimestampsInMs(this.prometheusTimestampsInMs) 122 .build(); 123 } else { 124 // use reflection to avoid a compile-time dependency on the expositionformats module 125 return new PrometheusProtobufWriter(); 126 } 127 } 128 129 /** 130 * Push all metrics. All metrics with the same job and grouping key are replaced. 131 * 132 * <p>This uses the PUT HTTP method. 133 */ 134 public void push() throws IOException { 135 doRequest(registry, "PUT"); 136 } 137 138 /** 139 * Push a single metric. All metrics with the same job and grouping key are replaced. 140 * 141 * <p>This is useful for pushing a single Gauge. 142 * 143 * <p>This uses the PUT HTTP method. 144 */ 145 public void push(Collector collector) throws IOException { 146 PrometheusRegistry registry = new PrometheusRegistry(); 147 registry.register(collector); 148 doRequest(registry, "PUT"); 149 } 150 151 /** 152 * Push a single collector. All metrics with the same job and grouping key are replaced. 153 * 154 * <p>This uses the PUT HTTP method. 155 */ 156 public void push(MultiCollector collector) throws IOException { 157 PrometheusRegistry registry = new PrometheusRegistry(); 158 registry.register(collector); 159 doRequest(registry, "PUT"); 160 } 161 162 /** 163 * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are 164 * replaced. 165 * 166 * <p>This uses the POST HTTP method. 167 */ 168 public void pushAdd() throws IOException { 169 doRequest(registry, "POST"); 170 } 171 172 /** 173 * Like {@link #push(Collector)}, but only the specified metric will be replaced. 174 * 175 * <p>This uses the POST HTTP method. 176 */ 177 public void pushAdd(Collector collector) throws IOException { 178 PrometheusRegistry registry = new PrometheusRegistry(); 179 registry.register(collector); 180 doRequest(registry, "POST"); 181 } 182 183 /** 184 * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced. 185 * 186 * <p>This uses the POST HTTP method. 187 */ 188 public void pushAdd(MultiCollector collector) throws IOException { 189 PrometheusRegistry registry = new PrometheusRegistry(); 190 registry.register(collector); 191 doRequest(registry, "POST"); 192 } 193 194 /** 195 * Deletes metrics from the Pushgateway. 196 * 197 * <p>This uses the DELETE HTTP method. 198 */ 199 public void delete() throws IOException { 200 doRequest(null, "DELETE"); 201 } 202 203 private void doRequest(@Nullable PrometheusRegistry registry, String method) throws IOException { 204 try { 205 HttpURLConnection connection = connectionFactory.create(url); 206 requestHeaders.forEach(connection::setRequestProperty); 207 connection.setRequestProperty("Content-Type", writer.getContentType()); 208 if (!method.equals("DELETE")) { 209 connection.setDoOutput(true); 210 } 211 connection.setRequestMethod(method); 212 213 connection.setConnectTimeout((int) this.connectionTimeout.toMillis()); 214 connection.setReadTimeout((int) this.readTimeout.toMillis()); 215 connection.connect(); 216 217 try { 218 if (!method.equals("DELETE")) { 219 OutputStream outputStream = connection.getOutputStream(); 220 writer.write(outputStream, requireNonNull(registry).scrape(), this.escapingScheme); 221 outputStream.flush(); 222 outputStream.close(); 223 } 224 225 int response = connection.getResponseCode(); 226 if (response / 100 != 2) { 227 String errorMessage; 228 InputStream errorStream = connection.getErrorStream(); 229 if (errorStream != null) { 230 String errBody = readFromStream(errorStream); 231 errorMessage = 232 "Response code from " + url + " was " + response + ", response body: " + errBody; 233 } else { 234 errorMessage = "Response code from " + url + " was " + response; 235 } 236 throw new IOException(errorMessage); 237 } 238 239 } finally { 240 connection.disconnect(); 241 } 242 } catch (IOException e) { 243 String baseUrl = url.getProtocol() + "://" + url.getHost(); 244 if (url.getPort() != -1) { 245 baseUrl += ":" + url.getPort(); 246 } 247 throw new IOException( 248 "Failed to push metrics to the Prometheus Pushgateway on " 249 + baseUrl 250 + ": " 251 + e.getMessage(), 252 e); 253 } 254 } 255 256 // toString with Charset is only available in Java 10+, but we want to support Java 8 257 @SuppressWarnings("JdkObsolete") 258 private static String readFromStream(InputStream is) throws IOException { 259 ByteArrayOutputStream result = new ByteArrayOutputStream(); 260 byte[] buffer = new byte[1024]; 261 int length; 262 while ((length = is.read(buffer)) != -1) { 263 result.write(buffer, 0, length); 264 } 265 return result.toString("UTF-8"); 266 } 267 268 public static Builder builder() { 269 return builder(PrometheusProperties.get()); 270 } 271 272 /** 273 * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}. 274 */ 275 public static Builder builder(PrometheusProperties config) { 276 return new Builder(config); 277 } 278 279 public static class Builder { 280 281 private final PrometheusProperties config; 282 @Nullable private Format format; 283 @Nullable private String address; 284 @Nullable private Scheme scheme; 285 @Nullable private String job; 286 @Nullable private Duration connectionTimeout; 287 @Nullable private Duration readTimeout; 288 private boolean prometheusTimestampsInMs; 289 private final Map<String, String> requestHeaders = new HashMap<>(); 290 private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry; 291 private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory(); 292 private final Map<String, String> groupingKey = new TreeMap<>(); 293 @Nullable private EscapingScheme escapingScheme; 294 295 private Builder(PrometheusProperties config) { 296 this.config = config; 297 } 298 299 /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */ 300 public Builder format(Format format) { 301 this.format = requireNonNull(format, "format must not be null"); 302 return this; 303 } 304 305 /** 306 * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}. 307 * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address} 308 * property. 309 */ 310 public Builder address(String address) { 311 this.address = requireNonNull(address, "address must not be null"); 312 return this; 313 } 314 315 /** Username and password for HTTP basic auth when pushing to the Pushgateway. */ 316 public Builder basicAuth(String user, String password) { 317 byte[] credentialsBytes = 318 (requireNonNull(user, "user must not be null") 319 + ":" 320 + requireNonNull(password, "password must not be null")) 321 .getBytes(StandardCharsets.UTF_8); 322 String encoded = Base64.getEncoder().encodeToString(credentialsBytes); 323 requestHeaders.put("Authorization", String.format("Basic %s", encoded)); 324 return this; 325 } 326 327 /** Bearer token authorization when pushing to the Pushgateway. */ 328 public Builder bearerToken(String token) { 329 requestHeaders.put( 330 "Authorization", 331 String.format("Bearer %s", requireNonNull(token, "token must not be null"))); 332 return this; 333 } 334 335 /** 336 * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten 337 * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property. 338 */ 339 public Builder scheme(Scheme scheme) { 340 this.scheme = requireNonNull(scheme, "scheme must not be null"); 341 return this; 342 } 343 344 /** 345 * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}. 346 * 347 * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example 348 * of a custom connection factory that skips SSL certificate validation for HTTPS connections. 349 */ 350 public Builder connectionFactory(HttpConnectionFactory connectionFactory) { 351 this.connectionFactory = 352 requireNonNull(connectionFactory, "connectionFactory must not be null"); 353 return this; 354 } 355 356 /** 357 * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR 358 * file will be used by default. Can be overwritten at runtime with the {@code 359 * io.prometheus.exporter.pushgateway.job} property. 360 */ 361 public Builder job(String job) { 362 this.job = requireNonNull(job, "job must not be null"); 363 return this; 364 } 365 366 /** 367 * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for 368 * adding multiple grouping keys. 369 */ 370 public Builder groupingKey(String name, String value) { 371 groupingKey.put( 372 requireNonNull(name, "name must not be null"), 373 requireNonNull(value, "value must not be null")); 374 return this; 375 } 376 377 /** Convenience method for adding the current IP address as an "instance" label. */ 378 public Builder instanceIpGroupingKey() throws UnknownHostException { 379 return groupingKey("instance", InetAddress.getLocalHost().getHostAddress()); 380 } 381 382 /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */ 383 public Builder registry(PrometheusRegistry registry) { 384 this.registry = requireNonNull(registry, "registry must not be null"); 385 return this; 386 } 387 388 /** 389 * Specify the escaping scheme to be used when pushing metrics. Default is {@link 390 * EscapingScheme#UNDERSCORE_ESCAPING}. 391 */ 392 public Builder escapingScheme(EscapingScheme escapingScheme) { 393 this.escapingScheme = requireNonNull(escapingScheme, "escapingScheme must not be null"); 394 return this; 395 } 396 397 /** 398 * Use milliseconds for timestamps in text format? Default is {@code false}. Can be overwritten 399 * at runtime with the {@code io.prometheus.exporter.timestamps_in_ms} property. 400 */ 401 public Builder prometheusTimestampsInMs(boolean prometheusTimestampsInMs) { 402 this.prometheusTimestampsInMs = prometheusTimestampsInMs; 403 return this; 404 } 405 406 /** 407 * Specify the connection timeout for HTTP connections to the PushGateway. Default is 10 408 * seconds. 409 * 410 * @param connectionTimeout timeout value 411 * @return this {@link Builder} instance 412 */ 413 public Builder connectionTimeout(Duration connectionTimeout) { 414 this.connectionTimeout = connectionTimeout; 415 return this; 416 } 417 418 private Duration getConnectionTimeout(@Nullable ExporterPushgatewayProperties properties) { 419 if (properties != null && properties.getConnectTimeout() != null) { 420 return properties.getConnectTimeout(); 421 } else if (this.connectionTimeout != null) { 422 return this.connectionTimeout; 423 } else { 424 return Duration.ofSeconds(10); 425 } 426 } 427 428 /** 429 * Specify the read timeout for HTTP connections to the PushGateway. Default is 10 seconds. 430 * 431 * @param readTimeout timeout value 432 * @return this {@link Builder} instance 433 */ 434 public Builder readTimeout(Duration readTimeout) { 435 this.readTimeout = readTimeout; 436 return this; 437 } 438 439 private Duration getReadTimeout(@Nullable ExporterPushgatewayProperties properties) { 440 if (properties != null && properties.getReadTimeout() != null) { 441 return properties.getReadTimeout(); 442 } else if (this.readTimeout != null) { 443 return this.readTimeout; 444 } else { 445 return Duration.ofSeconds(10); 446 } 447 } 448 449 private boolean getPrometheusTimestampsInMs() { 450 // accept either to opt in to timestamps in milliseconds 451 return config.getExporterProperties().getPrometheusTimestampsInMs() 452 || this.prometheusTimestampsInMs; 453 } 454 455 private Scheme getScheme(@Nullable ExporterPushgatewayProperties properties) { 456 if (properties != null && properties.getScheme() != null) { 457 return Scheme.valueOf(properties.getScheme()); 458 } else if (this.scheme != null) { 459 return this.scheme; 460 } else { 461 return HTTP; 462 } 463 } 464 465 private String getAddress(@Nullable ExporterPushgatewayProperties properties) { 466 if (properties != null && properties.getAddress() != null) { 467 return properties.getAddress(); 468 } else if (this.address != null) { 469 return this.address; 470 } else { 471 return "localhost:9091"; 472 } 473 } 474 475 private String getJob(@Nullable ExporterPushgatewayProperties properties) { 476 if (properties != null && properties.getJob() != null) { 477 return properties.getJob(); 478 } else if (this.job != null) { 479 return this.job; 480 } else { 481 return DefaultJobLabelDetector.getDefaultJobLabel(); 482 } 483 } 484 485 private EscapingScheme getEscapingScheme(@Nullable ExporterPushgatewayProperties properties) { 486 if (properties != null && properties.getEscapingScheme() != null) { 487 return properties.getEscapingScheme(); 488 } else if (this.escapingScheme != null) { 489 return this.escapingScheme; 490 } 491 return EscapingScheme.UNDERSCORE_ESCAPING; 492 } 493 494 private Format getFormat() { 495 // currently not configurable via properties 496 if (this.format != null) { 497 return this.format; 498 } 499 return Format.PROMETHEUS_PROTOBUF; 500 } 501 502 // encode with Charset is only available in Java 10+, but we want to support Java 8 503 @SuppressWarnings("JdkObsolete") 504 private URL makeUrl(@Nullable ExporterPushgatewayProperties properties) 505 throws UnsupportedEncodingException, MalformedURLException { 506 StringBuilder url = 507 new StringBuilder(getScheme(properties) + "://" + getAddress(properties) + "/metrics/"); 508 String job = getJob(properties); 509 if (job.contains("/")) { 510 url.append("job@base64/").append(base64url(job)); 511 } else { 512 url.append("job/").append(URLEncoder.encode(job, "UTF-8")); 513 } 514 for (Map.Entry<String, String> entry : groupingKey.entrySet()) { 515 if (entry.getValue().isEmpty()) { 516 url.append("/") 517 .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING)) 518 .append("@base64/="); 519 } else if (entry.getValue().contains("/")) { 520 url.append("/") 521 .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING)) 522 .append("@base64/") 523 .append(base64url(entry.getValue())); 524 } else { 525 url.append("/") 526 .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING)) 527 .append("/") 528 .append(URLEncoder.encode(entry.getValue(), "UTF-8")); 529 } 530 } 531 return URI.create(url.toString()).normalize().toURL(); 532 } 533 534 private String base64url(String v) { 535 return Base64.getEncoder() 536 .encodeToString(v.getBytes(StandardCharsets.UTF_8)) 537 .replace("+", "-") 538 .replace("/", "_"); 539 } 540 541 public PushGateway build() { 542 ExporterPushgatewayProperties properties = 543 config == null ? null : config.getExporterPushgatewayProperties(); 544 try { 545 return new PushGateway( 546 registry, 547 getFormat(), 548 makeUrl(properties), 549 connectionFactory, 550 requestHeaders, 551 getPrometheusTimestampsInMs(), 552 getEscapingScheme(properties), 553 getConnectionTimeout(properties), 554 getReadTimeout(properties)); 555 } catch (MalformedURLException e) { 556 throw new PrometheusPropertiesException( 557 address + ": Invalid address. Expecting <host>:<port>"); 558 } catch (UnsupportedEncodingException e) { 559 throw new RuntimeException(e); // cannot happen, UTF-8 is always supported 560 } 561 } 562 } 563}