001package io.prometheus.metrics.exporter.pushgateway; 002 003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP; 004import static io.prometheus.metrics.model.snapshots.PrometheusNaming.escapeName; 005import static java.util.Objects.requireNonNull; 006 007import io.prometheus.metrics.config.EscapingScheme; 008import io.prometheus.metrics.config.ExporterPushgatewayProperties; 009import io.prometheus.metrics.config.PrometheusProperties; 010import io.prometheus.metrics.config.PrometheusPropertiesException; 011import io.prometheus.metrics.expositionformats.ExpositionFormatWriter; 012import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter; 013import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; 014import io.prometheus.metrics.model.registry.Collector; 015import io.prometheus.metrics.model.registry.MultiCollector; 016import io.prometheus.metrics.model.registry.PrometheusRegistry; 017import java.io.ByteArrayOutputStream; 018import java.io.IOException; 019import java.io.InputStream; 020import java.io.OutputStream; 021import java.io.UnsupportedEncodingException; 022import java.net.HttpURLConnection; 023import java.net.InetAddress; 024import java.net.MalformedURLException; 025import java.net.URI; 026import java.net.URL; 027import java.net.URLEncoder; 028import java.net.UnknownHostException; 029import java.nio.charset.StandardCharsets; 030import java.time.Duration; 031import java.util.Base64; 032import java.util.Collections; 033import java.util.HashMap; 034import java.util.Map; 035import java.util.TreeMap; 036import javax.annotation.Nullable; 037 038/** 039 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus 040 * Pushgateway</a> 041 * 042 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to 043 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead 044 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link 045 * PrometheusRegistry} to a Pushgateway. 046 * 047 * <p>Example usage: 048 * 049 * <pre>{@code 050 * void executeBatchJob() throws Exception { 051 * PrometheusRegistry registry = new PrometheusRegistry(); 052 * Gauge duration = Gauge.builder() 053 * .name("my_batch_job_duration_seconds") 054 * .help("Duration of my batch job in seconds.") 055 * .register(registry); 056 * Timer durationTimer = duration.startTimer(); 057 * try { 058 * // Your code here. 059 * 060 * // This is only added to the registry after success, 061 * // so that a previous success in the Pushgateway isn't overwritten on failure. 062 * Gauge lastSuccess = Gauge.builder() 063 * .name("my_batch_job_last_success") 064 * .help("Last time my batch job succeeded, in unixtime.") 065 * .register(registry); 066 * lastSuccess.set(System.currentTimeMillis()); 067 * } finally { 068 * durationTimer.observeDuration(); 069 * PushGateway pg = PushGateway.builder() 070 * .address("127.0.0.1:9091") 071 * .job("my_batch_job") 072 * .registry(registry) 073 * .build(); 074 * pg.pushAdd(); 075 * } 076 * } 077 * }</pre> 078 * 079 * <p>See <a 080 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>. 081 */ 082public class PushGateway { 083 private final URL url; 084 private final ExpositionFormatWriter writer; 085 private final boolean prometheusTimestampsInMs; 086 private final Map<String, String> requestHeaders; 087 private final PrometheusRegistry registry; 088 private final HttpConnectionFactory connectionFactory; 089 private final EscapingScheme escapingScheme; 090 private final Duration connectionTimeout; 091 private final Duration readTimeout; 092 093 private PushGateway( 094 PrometheusRegistry registry, 095 Format format, 096 URL url, 097 HttpConnectionFactory connectionFactory, 098 Map<String, String> requestHeaders, 099 boolean prometheusTimestampsInMs, 100 EscapingScheme escapingScheme, 101 Duration connectionTimeout, 102 Duration readTimeout) { 103 this.registry = registry; 104 this.url = url; 105 this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders)); 106 this.connectionFactory = connectionFactory; 107 this.prometheusTimestampsInMs = prometheusTimestampsInMs; 108 this.escapingScheme = escapingScheme; 109 this.connectionTimeout = connectionTimeout; 110 this.readTimeout = readTimeout; 111 writer = getWriter(format); 112 if (!writer.isAvailable()) { 113 throw new RuntimeException(writer.getClass() + " is not available"); 114 } 115 } 116 117 @SuppressWarnings("deprecation") 118 private ExpositionFormatWriter getWriter(Format format) { 119 if (format == Format.PROMETHEUS_TEXT) { 120 return PrometheusTextFormatWriter.builder() 121 .setTimestampsInMs(this.prometheusTimestampsInMs) 122 .build(); 123 } else { 124 // use reflection to avoid a compile-time dependency on the expositionformats module 125 return new PrometheusProtobufWriter(); 126 } 127 } 128 129 /** 130 * Push all metrics. All metrics with the same job and grouping key are replaced. 131 * 132 * <p>This uses the PUT HTTP method. 133 */ 134 public void push() throws IOException { 135 doRequest(registry, "PUT"); 136 } 137 138 /** 139 * Push a single metric. All metrics with the same job and grouping key are replaced. 140 * 141 * <p>This is useful for pushing a single Gauge. 142 * 143 * <p>This uses the PUT HTTP method. 144 */ 145 public void push(Collector collector) throws IOException { 146 PrometheusRegistry registry = new PrometheusRegistry(); 147 registry.register(collector); 148 doRequest(registry, "PUT"); 149 } 150 151 /** 152 * Push a single collector. All metrics with the same job and grouping key are replaced. 153 * 154 * <p>This uses the PUT HTTP method. 155 */ 156 public void push(MultiCollector collector) throws IOException { 157 PrometheusRegistry registry = new PrometheusRegistry(); 158 registry.register(collector); 159 doRequest(registry, "PUT"); 160 } 161 162 /** 163 * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are 164 * replaced. 165 * 166 * <p>This uses the POST HTTP method. 167 */ 168 public void pushAdd() throws IOException { 169 doRequest(registry, "POST"); 170 } 171 172 /** 173 * Like {@link #push(Collector)}, but only the specified metric will be replaced. 174 * 175 * <p>This uses the POST HTTP method. 176 */ 177 public void pushAdd(Collector collector) throws IOException { 178 PrometheusRegistry registry = new PrometheusRegistry(); 179 registry.register(collector); 180 doRequest(registry, "POST"); 181 } 182 183 /** 184 * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced. 185 * 186 * <p>This uses the POST HTTP method. 187 */ 188 public void pushAdd(MultiCollector collector) throws IOException { 189 PrometheusRegistry registry = new PrometheusRegistry(); 190 registry.register(collector); 191 doRequest(registry, "POST"); 192 } 193 194 /** 195 * Deletes metrics from the Pushgateway. 196 * 197 * <p>This uses the DELETE HTTP method. 198 */ 199 public void delete() throws IOException { 200 doRequest(null, "DELETE"); 201 } 202 203 private void doRequest(@Nullable PrometheusRegistry registry, String method) throws IOException { 204 try { 205 HttpURLConnection connection = connectionFactory.create(url); 206 requestHeaders.forEach(connection::setRequestProperty); 207 connection.setRequestProperty("Content-Type", writer.getContentType()); 208 if (!method.equals("DELETE")) { 209 connection.setDoOutput(true); 210 } 211 connection.setRequestMethod(method); 212 213 connection.setConnectTimeout((int) this.connectionTimeout.toMillis()); 214 connection.setReadTimeout((int) this.readTimeout.toMillis()); 215 connection.connect(); 216 217 try { 218 if (!method.equals("DELETE")) { 219 OutputStream outputStream = connection.getOutputStream(); 220 writer.write(outputStream, requireNonNull(registry).scrape(), this.escapingScheme); 221 outputStream.flush(); 222 outputStream.close(); 223 } 224 225 int response = connection.getResponseCode(); 226 if (response / 100 != 2) { 227 String errorMessage; 228 InputStream errorStream = connection.getErrorStream(); 229 if (errorStream != null) { 230 String errBody = readFromStream(errorStream); 231 errorMessage = 232 "Response code from " + url + " was " + response + ", response body: " + errBody; 233 } else { 234 errorMessage = "Response code from " + url + " was " + response; 235 } 236 throw new IOException(errorMessage); 237 } 238 239 } finally { 240 connection.disconnect(); 241 } 242 } catch (IOException e) { 243 String baseUrl = url.getProtocol() + "://" + url.getHost(); 244 if (url.getPort() != -1) { 245 baseUrl += ":" + url.getPort(); 246 } 247 throw new IOException( 248 "Failed to push metrics to the Prometheus Pushgateway on " 249 + baseUrl 250 + ": " 251 + e.getMessage(), 252 e); 253 } 254 } 255 256 private static String readFromStream(InputStream is) throws IOException { 257 ByteArrayOutputStream result = new ByteArrayOutputStream(); 258 byte[] buffer = new byte[1024]; 259 int length; 260 while ((length = is.read(buffer)) != -1) { 261 result.write(buffer, 0, length); 262 } 263 return result.toString("UTF-8"); 264 } 265 266 public static Builder builder() { 267 return builder(PrometheusProperties.get()); 268 } 269 270 /** 271 * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}. 272 */ 273 public static Builder builder(PrometheusProperties config) { 274 return new Builder(config); 275 } 276 277 public static class Builder { 278 279 private final PrometheusProperties config; 280 @Nullable private Format format; 281 @Nullable private String address; 282 @Nullable private Scheme scheme; 283 @Nullable private String job; 284 @Nullable private Duration connectionTimeout; 285 @Nullable private Duration readTimeout; 286 private boolean prometheusTimestampsInMs; 287 private final Map<String, String> requestHeaders = new HashMap<>(); 288 private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry; 289 private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory(); 290 private final Map<String, String> groupingKey = new TreeMap<>(); 291 @Nullable private EscapingScheme escapingScheme; 292 293 private Builder(PrometheusProperties config) { 294 this.config = config; 295 } 296 297 /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */ 298 public Builder format(Format format) { 299 this.format = requireNonNull(format, "format must not be null"); 300 return this; 301 } 302 303 /** 304 * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}. 305 * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address} 306 * property. 307 */ 308 public Builder address(String address) { 309 this.address = requireNonNull(address, "address must not be null"); 310 return this; 311 } 312 313 /** Username and password for HTTP basic auth when pushing to the Pushgateway. */ 314 public Builder basicAuth(String user, String password) { 315 byte[] credentialsBytes = 316 (requireNonNull(user, "user must not be null") 317 + ":" 318 + requireNonNull(password, "password must not be null")) 319 .getBytes(StandardCharsets.UTF_8); 320 String encoded = Base64.getEncoder().encodeToString(credentialsBytes); 321 requestHeaders.put("Authorization", String.format("Basic %s", encoded)); 322 return this; 323 } 324 325 /** Bearer token authorization when pushing to the Pushgateway. */ 326 public Builder bearerToken(String token) { 327 requestHeaders.put( 328 "Authorization", 329 String.format("Bearer %s", requireNonNull(token, "token must not be null"))); 330 return this; 331 } 332 333 /** 334 * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten 335 * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property. 336 */ 337 public Builder scheme(Scheme scheme) { 338 this.scheme = requireNonNull(scheme, "scheme must not be null"); 339 return this; 340 } 341 342 /** 343 * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}. 344 * 345 * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example 346 * of a custom connection factory that skips SSL certificate validation for HTTPS connections. 347 */ 348 public Builder connectionFactory(HttpConnectionFactory connectionFactory) { 349 this.connectionFactory = 350 requireNonNull(connectionFactory, "connectionFactory must not be null"); 351 return this; 352 } 353 354 /** 355 * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR 356 * file will be used by default. Can be overwritten at runtime with the {@code 357 * io.prometheus.exporter.pushgateway.job} property. 358 */ 359 public Builder job(String job) { 360 this.job = requireNonNull(job, "job must not be null"); 361 return this; 362 } 363 364 /** 365 * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for 366 * adding multiple grouping keys. 367 */ 368 public Builder groupingKey(String name, String value) { 369 groupingKey.put( 370 requireNonNull(name, "name must not be null"), 371 requireNonNull(value, "value must not be null")); 372 return this; 373 } 374 375 /** Convenience method for adding the current IP address as an "instance" label. */ 376 public Builder instanceIpGroupingKey() throws UnknownHostException { 377 return groupingKey("instance", InetAddress.getLocalHost().getHostAddress()); 378 } 379 380 /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */ 381 public Builder registry(PrometheusRegistry registry) { 382 this.registry = requireNonNull(registry, "registry must not be null"); 383 return this; 384 } 385 386 /** 387 * Specify the escaping scheme to be used when pushing metrics. Default is {@link 388 * EscapingScheme#UNDERSCORE_ESCAPING}. 389 */ 390 public Builder escapingScheme(EscapingScheme escapingScheme) { 391 this.escapingScheme = requireNonNull(escapingScheme, "escapingScheme must not be null"); 392 return this; 393 } 394 395 /** 396 * Use milliseconds for timestamps in text format? Default is {@code false}. Can be overwritten 397 * at runtime with the {@code io.prometheus.exporter.timestampsInMs} property. 398 */ 399 public Builder prometheusTimestampsInMs(boolean prometheusTimestampsInMs) { 400 this.prometheusTimestampsInMs = prometheusTimestampsInMs; 401 return this; 402 } 403 404 /** 405 * Specify the connection timeout for HTTP connections to the PushGateway. Default is 10 406 * seconds. 407 * 408 * @param connectionTimeout timeout value 409 * @return this {@link Builder} instance 410 */ 411 public Builder connectionTimeout(Duration connectionTimeout) { 412 this.connectionTimeout = connectionTimeout; 413 return this; 414 } 415 416 private Duration getConnectionTimeout(@Nullable ExporterPushgatewayProperties properties) { 417 if (properties != null && properties.getConnectTimeout() != null) { 418 return properties.getConnectTimeout(); 419 } else if (this.connectionTimeout != null) { 420 return this.connectionTimeout; 421 } else { 422 return Duration.ofSeconds(10); 423 } 424 } 425 426 /** 427 * Specify the read timeout for HTTP connections to the PushGateway. Default is 10 seconds. 428 * 429 * @param readTimeout timeout value 430 * @return this {@link Builder} instance 431 */ 432 public Builder readTimeout(Duration readTimeout) { 433 this.readTimeout = readTimeout; 434 return this; 435 } 436 437 private Duration getReadTimeout(@Nullable ExporterPushgatewayProperties properties) { 438 if (properties != null && properties.getReadTimeout() != null) { 439 return properties.getReadTimeout(); 440 } else if (this.readTimeout != null) { 441 return this.readTimeout; 442 } else { 443 return Duration.ofSeconds(10); 444 } 445 } 446 447 private boolean getPrometheusTimestampsInMs() { 448 // accept either to opt in to timestamps in milliseconds 449 return config.getExporterProperties().getPrometheusTimestampsInMs() 450 || this.prometheusTimestampsInMs; 451 } 452 453 private Scheme getScheme(@Nullable ExporterPushgatewayProperties properties) { 454 if (properties != null && properties.getScheme() != null) { 455 return Scheme.valueOf(properties.getScheme()); 456 } else if (this.scheme != null) { 457 return this.scheme; 458 } else { 459 return HTTP; 460 } 461 } 462 463 private String getAddress(@Nullable ExporterPushgatewayProperties properties) { 464 if (properties != null && properties.getAddress() != null) { 465 return properties.getAddress(); 466 } else if (this.address != null) { 467 return this.address; 468 } else { 469 return "localhost:9091"; 470 } 471 } 472 473 private String getJob(@Nullable ExporterPushgatewayProperties properties) { 474 if (properties != null && properties.getJob() != null) { 475 return properties.getJob(); 476 } else if (this.job != null) { 477 return this.job; 478 } else { 479 return DefaultJobLabelDetector.getDefaultJobLabel(); 480 } 481 } 482 483 private EscapingScheme getEscapingScheme(@Nullable ExporterPushgatewayProperties properties) { 484 if (properties != null && properties.getEscapingScheme() != null) { 485 return properties.getEscapingScheme(); 486 } else if (this.escapingScheme != null) { 487 return this.escapingScheme; 488 } 489 return EscapingScheme.UNDERSCORE_ESCAPING; 490 } 491 492 private Format getFormat() { 493 // currently not configurable via properties 494 if (this.format != null) { 495 return this.format; 496 } 497 return Format.PROMETHEUS_PROTOBUF; 498 } 499 500 private URL makeUrl(@Nullable ExporterPushgatewayProperties properties) 501 throws UnsupportedEncodingException, MalformedURLException { 502 StringBuilder url = 503 new StringBuilder(getScheme(properties) + "://" + getAddress(properties) + "/metrics/"); 504 String job = getJob(properties); 505 if (job.contains("/")) { 506 url.append("job@base64/").append(base64url(job)); 507 } else { 508 url.append("job/").append(URLEncoder.encode(job, "UTF-8")); 509 } 510 for (Map.Entry<String, String> entry : groupingKey.entrySet()) { 511 if (entry.getValue().isEmpty()) { 512 url.append("/") 513 .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING)) 514 .append("@base64/="); 515 } else if (entry.getValue().contains("/")) { 516 url.append("/") 517 .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING)) 518 .append("@base64/") 519 .append(base64url(entry.getValue())); 520 } else { 521 url.append("/") 522 .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING)) 523 .append("/") 524 .append(URLEncoder.encode(entry.getValue(), "UTF-8")); 525 } 526 } 527 return URI.create(url.toString()).normalize().toURL(); 528 } 529 530 private String base64url(String v) { 531 return Base64.getEncoder() 532 .encodeToString(v.getBytes(StandardCharsets.UTF_8)) 533 .replace("+", "-") 534 .replace("/", "_"); 535 } 536 537 public PushGateway build() { 538 ExporterPushgatewayProperties properties = 539 config == null ? null : config.getExporterPushgatewayProperties(); 540 try { 541 return new PushGateway( 542 registry, 543 getFormat(), 544 makeUrl(properties), 545 connectionFactory, 546 requestHeaders, 547 getPrometheusTimestampsInMs(), 548 getEscapingScheme(properties), 549 getConnectionTimeout(properties), 550 getReadTimeout(properties)); 551 } catch (MalformedURLException e) { 552 throw new PrometheusPropertiesException( 553 address + ": Invalid address. Expecting <host>:<port>"); 554 } catch (UnsupportedEncodingException e) { 555 throw new RuntimeException(e); // cannot happen, UTF-8 is always supported 556 } 557 } 558 } 559}