001package io.prometheus.metrics.exporter.pushgateway; 002 003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP; 004import static io.prometheus.metrics.model.snapshots.PrometheusNaming.escapeName; 005import static java.util.Objects.requireNonNull; 006 007import io.prometheus.metrics.config.EscapingScheme; 008import io.prometheus.metrics.config.ExporterPushgatewayProperties; 009import io.prometheus.metrics.config.PrometheusProperties; 010import io.prometheus.metrics.config.PrometheusPropertiesException; 011import io.prometheus.metrics.expositionformats.ExpositionFormatWriter; 012import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter; 013import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; 014import io.prometheus.metrics.model.registry.Collector; 015import io.prometheus.metrics.model.registry.MultiCollector; 016import io.prometheus.metrics.model.registry.PrometheusRegistry; 017import java.io.ByteArrayOutputStream; 018import java.io.IOException; 019import java.io.InputStream; 020import java.io.OutputStream; 021import java.io.UnsupportedEncodingException; 022import java.net.HttpURLConnection; 023import java.net.InetAddress; 024import java.net.MalformedURLException; 025import java.net.URI; 026import java.net.URL; 027import java.net.URLEncoder; 028import java.net.UnknownHostException; 029import java.nio.charset.StandardCharsets; 030import java.util.Base64; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.Map; 034import java.util.TreeMap; 035import javax.annotation.Nullable; 036 037/** 038 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus 039 * Pushgateway</a> 040 * 041 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to 042 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead 043 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link 044 * PrometheusRegistry} to a Pushgateway. 045 * 046 * <p>Example usage: 047 * 048 * <pre>{@code 049 * void executeBatchJob() throws Exception { 050 * PrometheusRegistry registry = new PrometheusRegistry(); 051 * Gauge duration = Gauge.builder() 052 * .name("my_batch_job_duration_seconds") 053 * .help("Duration of my batch job in seconds.") 054 * .register(registry); 055 * Timer durationTimer = duration.startTimer(); 056 * try { 057 * // Your code here. 058 * 059 * // This is only added to the registry after success, 060 * // so that a previous success in the Pushgateway isn't overwritten on failure. 061 * Gauge lastSuccess = Gauge.builder() 062 * .name("my_batch_job_last_success") 063 * .help("Last time my batch job succeeded, in unixtime.") 064 * .register(registry); 065 * lastSuccess.set(System.currentTimeMillis()); 066 * } finally { 067 * durationTimer.observeDuration(); 068 * PushGateway pg = PushGateway.builder() 069 * .address("127.0.0.1:9091") 070 * .job("my_batch_job") 071 * .registry(registry) 072 * .build(); 073 * pg.pushAdd(); 074 * } 075 * } 076 * }</pre> 077 * 078 * <p>See <a 079 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>. 080 */ 081public class PushGateway { 082 083 private static final int MILLISECONDS_PER_SECOND = 1000; 084 085 private final URL url; 086 private final ExpositionFormatWriter writer; 087 private final boolean prometheusTimestampsInMs; 088 private final Map<String, String> requestHeaders; 089 private final PrometheusRegistry registry; 090 private final HttpConnectionFactory connectionFactory; 091 private final EscapingScheme escapingScheme; 092 093 private PushGateway( 094 PrometheusRegistry registry, 095 Format format, 096 URL url, 097 HttpConnectionFactory connectionFactory, 098 Map<String, String> requestHeaders, 099 boolean prometheusTimestampsInMs, 100 EscapingScheme escapingScheme) { 101 this.registry = registry; 102 this.url = url; 103 this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders)); 104 this.connectionFactory = connectionFactory; 105 this.prometheusTimestampsInMs = prometheusTimestampsInMs; 106 this.escapingScheme = escapingScheme; 107 writer = getWriter(format); 108 if (!writer.isAvailable()) { 109 throw new RuntimeException(writer.getClass() + " is not available"); 110 } 111 } 112 113 @SuppressWarnings("deprecation") 114 private ExpositionFormatWriter getWriter(Format format) { 115 if (format == Format.PROMETHEUS_TEXT) { 116 return PrometheusTextFormatWriter.builder() 117 .setTimestampsInMs(this.prometheusTimestampsInMs) 118 .build(); 119 } else { 120 // use reflection to avoid a compile-time dependency on the expositionformats module 121 return new PrometheusProtobufWriter(); 122 } 123 } 124 125 /** 126 * Push all metrics. All metrics with the same job and grouping key are replaced. 127 * 128 * <p>This uses the PUT HTTP method. 129 */ 130 public void push() throws IOException { 131 doRequest(registry, "PUT"); 132 } 133 134 /** 135 * Push a single metric. All metrics with the same job and grouping key are replaced. 136 * 137 * <p>This is useful for pushing a single Gauge. 138 * 139 * <p>This uses the PUT HTTP method. 140 */ 141 public void push(Collector collector) throws IOException { 142 PrometheusRegistry registry = new PrometheusRegistry(); 143 registry.register(collector); 144 doRequest(registry, "PUT"); 145 } 146 147 /** 148 * Push a single collector. All metrics with the same job and grouping key are replaced. 149 * 150 * <p>This uses the PUT HTTP method. 151 */ 152 public void push(MultiCollector collector) throws IOException { 153 PrometheusRegistry registry = new PrometheusRegistry(); 154 registry.register(collector); 155 doRequest(registry, "PUT"); 156 } 157 158 /** 159 * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are 160 * replaced. 161 * 162 * <p>This uses the POST HTTP method. 163 */ 164 public void pushAdd() throws IOException { 165 doRequest(registry, "POST"); 166 } 167 168 /** 169 * Like {@link #push(Collector)}, but only the specified metric will be replaced. 170 * 171 * <p>This uses the POST HTTP method. 172 */ 173 public void pushAdd(Collector collector) throws IOException { 174 PrometheusRegistry registry = new PrometheusRegistry(); 175 registry.register(collector); 176 doRequest(registry, "POST"); 177 } 178 179 /** 180 * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced. 181 * 182 * <p>This uses the POST HTTP method. 183 */ 184 public void pushAdd(MultiCollector collector) throws IOException { 185 PrometheusRegistry registry = new PrometheusRegistry(); 186 registry.register(collector); 187 doRequest(registry, "POST"); 188 } 189 190 /** 191 * Deletes metrics from the Pushgateway. 192 * 193 * <p>This uses the DELETE HTTP method. 194 */ 195 public void delete() throws IOException { 196 doRequest(null, "DELETE"); 197 } 198 199 private void doRequest(@Nullable PrometheusRegistry registry, String method) throws IOException { 200 try { 201 HttpURLConnection connection = connectionFactory.create(url); 202 requestHeaders.forEach(connection::setRequestProperty); 203 connection.setRequestProperty("Content-Type", writer.getContentType()); 204 if (!method.equals("DELETE")) { 205 connection.setDoOutput(true); 206 } 207 connection.setRequestMethod(method); 208 209 connection.setConnectTimeout(10 * MILLISECONDS_PER_SECOND); 210 connection.setReadTimeout(10 * MILLISECONDS_PER_SECOND); 211 connection.connect(); 212 213 try { 214 if (!method.equals("DELETE")) { 215 OutputStream outputStream = connection.getOutputStream(); 216 writer.write(outputStream, requireNonNull(registry).scrape(), this.escapingScheme); 217 outputStream.flush(); 218 outputStream.close(); 219 } 220 221 int response = connection.getResponseCode(); 222 if (response / 100 != 2) { 223 String errorMessage; 224 InputStream errorStream = connection.getErrorStream(); 225 if (errorStream != null) { 226 String errBody = readFromStream(errorStream); 227 errorMessage = 228 "Response code from " + url + " was " + response + ", response body: " + errBody; 229 } else { 230 errorMessage = "Response code from " + url + " was " + response; 231 } 232 throw new IOException(errorMessage); 233 } 234 235 } finally { 236 connection.disconnect(); 237 } 238 } catch (IOException e) { 239 String baseUrl = url.getProtocol() + "://" + url.getHost(); 240 if (url.getPort() != -1) { 241 baseUrl += ":" + url.getPort(); 242 } 243 throw new IOException( 244 "Failed to push metrics to the Prometheus Pushgateway on " 245 + baseUrl 246 + ": " 247 + e.getMessage(), 248 e); 249 } 250 } 251 252 private static String readFromStream(InputStream is) throws IOException { 253 ByteArrayOutputStream result = new ByteArrayOutputStream(); 254 byte[] buffer = new byte[1024]; 255 int length; 256 while ((length = is.read(buffer)) != -1) { 257 result.write(buffer, 0, length); 258 } 259 return result.toString("UTF-8"); 260 } 261 262 public static Builder builder() { 263 return builder(PrometheusProperties.get()); 264 } 265 266 /** 267 * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}. 268 */ 269 public static Builder builder(PrometheusProperties config) { 270 return new Builder(config); 271 } 272 273 public static class Builder { 274 275 private final PrometheusProperties config; 276 @Nullable private Format format; 277 @Nullable private String address; 278 @Nullable private Scheme scheme; 279 @Nullable private String job; 280 private boolean prometheusTimestampsInMs; 281 private final Map<String, String> requestHeaders = new HashMap<>(); 282 private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry; 283 private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory(); 284 private final Map<String, String> groupingKey = new TreeMap<>(); 285 @Nullable private EscapingScheme escapingScheme; 286 287 private Builder(PrometheusProperties config) { 288 this.config = config; 289 } 290 291 /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */ 292 public Builder format(Format format) { 293 this.format = requireNonNull(format, "format must not be null"); 294 return this; 295 } 296 297 /** 298 * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}. 299 * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address} 300 * property. 301 */ 302 public Builder address(String address) { 303 this.address = requireNonNull(address, "address must not be null"); 304 return this; 305 } 306 307 /** Username and password for HTTP basic auth when pushing to the Pushgateway. */ 308 public Builder basicAuth(String user, String password) { 309 byte[] credentialsBytes = 310 (requireNonNull(user, "user must not be null") 311 + ":" 312 + requireNonNull(password, "password must not be null")) 313 .getBytes(StandardCharsets.UTF_8); 314 String encoded = Base64.getEncoder().encodeToString(credentialsBytes); 315 requestHeaders.put("Authorization", String.format("Basic %s", encoded)); 316 return this; 317 } 318 319 /** Bearer token authorization when pushing to the Pushgateway. */ 320 public Builder bearerToken(String token) { 321 requestHeaders.put( 322 "Authorization", 323 String.format("Bearer %s", requireNonNull(token, "token must not be null"))); 324 return this; 325 } 326 327 /** 328 * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten 329 * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property. 330 */ 331 public Builder scheme(Scheme scheme) { 332 this.scheme = requireNonNull(scheme, "scheme must not be null"); 333 return this; 334 } 335 336 /** 337 * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}. 338 * 339 * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example 340 * of a custom connection factory that skips SSL certificate validation for HTTPS connections. 341 */ 342 public Builder connectionFactory(HttpConnectionFactory connectionFactory) { 343 this.connectionFactory = 344 requireNonNull(connectionFactory, "connectionFactory must not be null"); 345 return this; 346 } 347 348 /** 349 * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR 350 * file will be used by default. Can be overwritten at runtime with the {@code 351 * io.prometheus.exporter.pushgateway.job} property. 352 */ 353 public Builder job(String job) { 354 this.job = requireNonNull(job, "job must not be null"); 355 return this; 356 } 357 358 /** 359 * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for 360 * adding multiple grouping keys. 361 */ 362 public Builder groupingKey(String name, String value) { 363 groupingKey.put( 364 requireNonNull(name, "name must not be null"), 365 requireNonNull(value, "value must not be null")); 366 return this; 367 } 368 369 /** Convenience method for adding the current IP address as an "instance" label. */ 370 public Builder instanceIpGroupingKey() throws UnknownHostException { 371 return groupingKey("instance", InetAddress.getLocalHost().getHostAddress()); 372 } 373 374 /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */ 375 public Builder registry(PrometheusRegistry registry) { 376 this.registry = requireNonNull(registry, "registry must not be null"); 377 return this; 378 } 379 380 /** 381 * Specify the escaping scheme to be used when pushing metrics. Default is {@link 382 * EscapingScheme#UNDERSCORE_ESCAPING}. 383 */ 384 public Builder escapingScheme(EscapingScheme escapingScheme) { 385 this.escapingScheme = requireNonNull(escapingScheme, "escapingScheme must not be null"); 386 return this; 387 } 388 389 /** 390 * Use milliseconds for timestamps in text format? Default is {@code false}. Can be overwritten 391 * at runtime with the {@code io.prometheus.exporter.timestampsInMs} property. 392 */ 393 public Builder prometheusTimestampsInMs(boolean prometheusTimestampsInMs) { 394 this.prometheusTimestampsInMs = prometheusTimestampsInMs; 395 return this; 396 } 397 398 private boolean getPrometheusTimestampsInMs() { 399 // accept either to opt in to timestamps in milliseconds 400 return config.getExporterProperties().getPrometheusTimestampsInMs() 401 || this.prometheusTimestampsInMs; 402 } 403 404 private Scheme getScheme(@Nullable ExporterPushgatewayProperties properties) { 405 if (properties != null && properties.getScheme() != null) { 406 return Scheme.valueOf(properties.getScheme()); 407 } else if (this.scheme != null) { 408 return this.scheme; 409 } else { 410 return HTTP; 411 } 412 } 413 414 private String getAddress(@Nullable ExporterPushgatewayProperties properties) { 415 if (properties != null && properties.getAddress() != null) { 416 return properties.getAddress(); 417 } else if (this.address != null) { 418 return this.address; 419 } else { 420 return "localhost:9091"; 421 } 422 } 423 424 private String getJob(@Nullable ExporterPushgatewayProperties properties) { 425 if (properties != null && properties.getJob() != null) { 426 return properties.getJob(); 427 } else if (this.job != null) { 428 return this.job; 429 } else { 430 return DefaultJobLabelDetector.getDefaultJobLabel(); 431 } 432 } 433 434 private EscapingScheme getEscapingScheme(@Nullable ExporterPushgatewayProperties properties) { 435 if (properties != null && properties.getEscapingScheme() != null) { 436 return properties.getEscapingScheme(); 437 } else if (this.escapingScheme != null) { 438 return this.escapingScheme; 439 } 440 return EscapingScheme.UNDERSCORE_ESCAPING; 441 } 442 443 private Format getFormat() { 444 // currently not configurable via properties 445 if (this.format != null) { 446 return this.format; 447 } 448 return Format.PROMETHEUS_PROTOBUF; 449 } 450 451 private URL makeUrl(@Nullable ExporterPushgatewayProperties properties) 452 throws UnsupportedEncodingException, MalformedURLException { 453 StringBuilder url = 454 new StringBuilder(getScheme(properties) + "://" + getAddress(properties) + "/metrics/"); 455 String job = getJob(properties); 456 if (job.contains("/")) { 457 url.append("job@base64/").append(base64url(job)); 458 } else { 459 url.append("job/").append(URLEncoder.encode(job, "UTF-8")); 460 } 461 for (Map.Entry<String, String> entry : groupingKey.entrySet()) { 462 if (entry.getValue().isEmpty()) { 463 url.append("/") 464 .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING)) 465 .append("@base64/="); 466 } else if (entry.getValue().contains("/")) { 467 url.append("/") 468 .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING)) 469 .append("@base64/") 470 .append(base64url(entry.getValue())); 471 } else { 472 url.append("/") 473 .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING)) 474 .append("/") 475 .append(URLEncoder.encode(entry.getValue(), "UTF-8")); 476 } 477 } 478 return URI.create(url.toString()).normalize().toURL(); 479 } 480 481 private String base64url(String v) { 482 return Base64.getEncoder() 483 .encodeToString(v.getBytes(StandardCharsets.UTF_8)) 484 .replace("+", "-") 485 .replace("/", "_"); 486 } 487 488 public PushGateway build() { 489 ExporterPushgatewayProperties properties = 490 config == null ? null : config.getExporterPushgatewayProperties(); 491 try { 492 return new PushGateway( 493 registry, 494 getFormat(), 495 makeUrl(properties), 496 connectionFactory, 497 requestHeaders, 498 getPrometheusTimestampsInMs(), 499 getEscapingScheme(properties)); 500 } catch (MalformedURLException e) { 501 throw new PrometheusPropertiesException( 502 address + ": Invalid address. Expecting <host>:<port>"); 503 } catch (UnsupportedEncodingException e) { 504 throw new RuntimeException(e); // cannot happen, UTF-8 is always supported 505 } 506 } 507 } 508}