001package io.prometheus.metrics.exporter.pushgateway; 002 003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP; 004 005import io.prometheus.metrics.config.ExporterPushgatewayProperties; 006import io.prometheus.metrics.config.PrometheusProperties; 007import io.prometheus.metrics.config.PrometheusPropertiesException; 008import io.prometheus.metrics.expositionformats.ExpositionFormatWriter; 009import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter; 010import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; 011import io.prometheus.metrics.model.registry.Collector; 012import io.prometheus.metrics.model.registry.MultiCollector; 013import io.prometheus.metrics.model.registry.PrometheusRegistry; 014import java.io.ByteArrayOutputStream; 015import java.io.IOException; 016import java.io.InputStream; 017import java.io.OutputStream; 018import java.io.UnsupportedEncodingException; 019import java.net.HttpURLConnection; 020import java.net.InetAddress; 021import java.net.MalformedURLException; 022import java.net.URI; 023import java.net.URL; 024import java.net.URLEncoder; 025import java.net.UnknownHostException; 026import java.nio.charset.StandardCharsets; 027import java.util.Base64; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.Map; 031import java.util.TreeMap; 032 033/** 034 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus 035 * Pushgateway</a> 036 * 037 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to 038 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead 039 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link 040 * PrometheusRegistry} to a Pushgateway. 041 * 042 * <p>Example usage: 043 * 044 * <pre>{@code 045 * void executeBatchJob() throws Exception { 046 * PrometheusRegistry registry = new PrometheusRegistry(); 047 * Gauge duration = Gauge.builder() 048 * .name("my_batch_job_duration_seconds") 049 * .help("Duration of my batch job in seconds.") 050 * .register(registry); 051 * Timer durationTimer = duration.startTimer(); 052 * try { 053 * // Your code here. 054 * 055 * // This is only added to the registry after success, 056 * // so that a previous success in the Pushgateway isn't overwritten on failure. 057 * Gauge lastSuccess = Gauge.builder() 058 * .name("my_batch_job_last_success") 059 * .help("Last time my batch job succeeded, in unixtime.") 060 * .register(registry); 061 * lastSuccess.set(System.currentTimeMillis()); 062 * } finally { 063 * durationTimer.observeDuration(); 064 * PushGateway pg = PushGateway.builder() 065 * .address("127.0.0.1:9091") 066 * .job("my_batch_job") 067 * .registry(registry) 068 * .build(); 069 * pg.pushAdd(); 070 * } 071 * } 072 * }</pre> 073 * 074 * <p>See <a 075 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>. 076 */ 077public class PushGateway { 078 079 private static final int MILLISECONDS_PER_SECOND = 1000; 080 081 private final URL url; 082 private final ExpositionFormatWriter writer; 083 private final boolean prometheusTimestampsInMs; 084 private final Map<String, String> requestHeaders; 085 private final PrometheusRegistry registry; 086 private final HttpConnectionFactory connectionFactory; 087 088 private PushGateway( 089 PrometheusRegistry registry, 090 Format format, 091 URL url, 092 HttpConnectionFactory connectionFactory, 093 Map<String, String> requestHeaders, 094 boolean prometheusTimestampsInMs) { 095 this.registry = registry; 096 this.url = url; 097 this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders)); 098 this.connectionFactory = connectionFactory; 099 this.prometheusTimestampsInMs = prometheusTimestampsInMs; 100 writer = getWriter(format); 101 if (!writer.isAvailable()) { 102 throw new RuntimeException(writer.getClass() + " is not available"); 103 } 104 } 105 106 @SuppressWarnings("deprecation") 107 private ExpositionFormatWriter getWriter(Format format) { 108 if (format == Format.PROMETHEUS_TEXT) { 109 return PrometheusTextFormatWriter.builder() 110 .setTimestampsInMs(this.prometheusTimestampsInMs) 111 .build(); 112 } else { 113 // use reflection to avoid a compile-time dependency on the expositionformats module 114 return new PrometheusProtobufWriter(); 115 } 116 } 117 118 /** 119 * Push all metrics. All metrics with the same job and grouping key are replaced. 120 * 121 * <p>This uses the PUT HTTP method. 122 */ 123 public void push() throws IOException { 124 doRequest(registry, "PUT"); 125 } 126 127 /** 128 * Push a single metric. All metrics with the same job and grouping key are replaced. 129 * 130 * <p>This is useful for pushing a single Gauge. 131 * 132 * <p>This uses the PUT HTTP method. 133 */ 134 public void push(Collector collector) throws IOException { 135 PrometheusRegistry registry = new PrometheusRegistry(); 136 registry.register(collector); 137 doRequest(registry, "PUT"); 138 } 139 140 /** 141 * Push a single collector. All metrics with the same job and grouping key are replaced. 142 * 143 * <p>This uses the PUT HTTP method. 144 */ 145 public void push(MultiCollector collector) throws IOException { 146 PrometheusRegistry registry = new PrometheusRegistry(); 147 registry.register(collector); 148 doRequest(registry, "PUT"); 149 } 150 151 /** 152 * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are 153 * replaced. 154 * 155 * <p>This uses the POST HTTP method. 156 */ 157 public void pushAdd() throws IOException { 158 doRequest(registry, "POST"); 159 } 160 161 /** 162 * Like {@link #push(Collector)}, but only the specified metric will be replaced. 163 * 164 * <p>This uses the POST HTTP method. 165 */ 166 public void pushAdd(Collector collector) throws IOException { 167 PrometheusRegistry registry = new PrometheusRegistry(); 168 registry.register(collector); 169 doRequest(registry, "POST"); 170 } 171 172 /** 173 * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced. 174 * 175 * <p>This uses the POST HTTP method. 176 */ 177 public void pushAdd(MultiCollector collector) throws IOException { 178 PrometheusRegistry registry = new PrometheusRegistry(); 179 registry.register(collector); 180 doRequest(registry, "POST"); 181 } 182 183 /** 184 * Deletes metrics from the Pushgateway. 185 * 186 * <p>This uses the DELETE HTTP method. 187 */ 188 public void delete() throws IOException { 189 doRequest(null, "DELETE"); 190 } 191 192 private void doRequest(PrometheusRegistry registry, String method) throws IOException { 193 try { 194 HttpURLConnection connection = connectionFactory.create(url); 195 requestHeaders.forEach(connection::setRequestProperty); 196 connection.setRequestProperty("Content-Type", writer.getContentType()); 197 if (!method.equals("DELETE")) { 198 connection.setDoOutput(true); 199 } 200 connection.setRequestMethod(method); 201 202 connection.setConnectTimeout(10 * MILLISECONDS_PER_SECOND); 203 connection.setReadTimeout(10 * MILLISECONDS_PER_SECOND); 204 connection.connect(); 205 206 try { 207 if (!method.equals("DELETE")) { 208 OutputStream outputStream = connection.getOutputStream(); 209 writer.write(outputStream, registry.scrape()); 210 outputStream.flush(); 211 outputStream.close(); 212 } 213 214 int response = connection.getResponseCode(); 215 if (response / 100 != 2) { 216 String errorMessage; 217 InputStream errorStream = connection.getErrorStream(); 218 if (errorStream != null) { 219 String errBody = readFromStream(errorStream); 220 errorMessage = 221 "Response code from " + url + " was " + response + ", response body: " + errBody; 222 } else { 223 errorMessage = "Response code from " + url + " was " + response; 224 } 225 throw new IOException(errorMessage); 226 } 227 228 } finally { 229 connection.disconnect(); 230 } 231 } catch (IOException e) { 232 String baseUrl = url.getProtocol() + "://" + url.getHost(); 233 if (url.getPort() != -1) { 234 baseUrl += ":" + url.getPort(); 235 } 236 throw new IOException( 237 "Failed to push metrics to the Prometheus Pushgateway on " 238 + baseUrl 239 + ": " 240 + e.getMessage(), 241 e); 242 } 243 } 244 245 private static String readFromStream(InputStream is) throws IOException { 246 ByteArrayOutputStream result = new ByteArrayOutputStream(); 247 byte[] buffer = new byte[1024]; 248 int length; 249 while ((length = is.read(buffer)) != -1) { 250 result.write(buffer, 0, length); 251 } 252 return result.toString("UTF-8"); 253 } 254 255 public static Builder builder() { 256 return builder(PrometheusProperties.get()); 257 } 258 259 /** 260 * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}. 261 */ 262 public static Builder builder(PrometheusProperties config) { 263 return new Builder(config); 264 } 265 266 public static class Builder { 267 268 private final PrometheusProperties config; 269 private Format format; 270 private String address; 271 private Scheme scheme; 272 private String job; 273 private boolean prometheusTimestampsInMs; 274 private final Map<String, String> requestHeaders = new HashMap<>(); 275 private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry; 276 private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory(); 277 private Map<String, String> groupingKey = new TreeMap<>(); 278 279 private Builder(PrometheusProperties config) { 280 this.config = config; 281 } 282 283 /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */ 284 public Builder format(Format format) { 285 if (format == null) { 286 throw new NullPointerException(); 287 } 288 this.format = format; 289 return this; 290 } 291 292 /** 293 * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}. 294 * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address} 295 * property. 296 */ 297 public Builder address(String address) { 298 if (address == null) { 299 throw new NullPointerException(); 300 } 301 this.address = address; 302 return this; 303 } 304 305 /** Username and password for HTTP basic auth when pushing to the Pushgateway. */ 306 public Builder basicAuth(String user, String password) { 307 if (user == null || password == null) { 308 throw new NullPointerException(); 309 } 310 byte[] credentialsBytes = (user + ":" + password).getBytes(StandardCharsets.UTF_8); 311 String encoded = Base64.getEncoder().encodeToString(credentialsBytes); 312 requestHeaders.put("Authorization", String.format("Basic %s", encoded)); 313 return this; 314 } 315 316 /** Bearer token authorization when pushing to the Pushgateway. */ 317 public Builder bearerToken(String token) { 318 if (token == null) { 319 throw new NullPointerException(); 320 } 321 requestHeaders.put("Authorization", String.format("Bearer %s", token)); 322 return this; 323 } 324 325 /** 326 * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten 327 * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property. 328 */ 329 public Builder scheme(Scheme scheme) { 330 if (scheme == null) { 331 throw new NullPointerException(); 332 } 333 this.scheme = scheme; 334 return this; 335 } 336 337 /** 338 * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}. 339 * 340 * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example 341 * of a custom connection factory that skips SSL certificate validation for HTTPS connections. 342 */ 343 public Builder connectionFactory(HttpConnectionFactory connectionFactory) { 344 if (connectionFactory == null) { 345 throw new NullPointerException(); 346 } 347 this.connectionFactory = connectionFactory; 348 return this; 349 } 350 351 /** 352 * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR 353 * file will be used by default. Can be overwritten at runtime with the {@code 354 * io.prometheus.exporter.pushgateway.job} property. 355 */ 356 public Builder job(String job) { 357 if (job == null) { 358 throw new NullPointerException(); 359 } 360 this.job = job; 361 return this; 362 } 363 364 /** 365 * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for 366 * adding multiple grouping keys. 367 */ 368 public Builder groupingKey(String name, String value) { 369 if (name == null || value == null) { 370 throw new NullPointerException(); 371 } 372 groupingKey.put(name, value); 373 return this; 374 } 375 376 /** Convenience method for adding the current IP address as an "instance" label. */ 377 public Builder instanceIpGroupingKey() throws UnknownHostException { 378 return groupingKey("instance", InetAddress.getLocalHost().getHostAddress()); 379 } 380 381 /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */ 382 public Builder registry(PrometheusRegistry registry) { 383 if (registry == null) { 384 throw new NullPointerException(); 385 } 386 this.registry = registry; 387 return this; 388 } 389 390 /** 391 * Use milliseconds for timestamps in text format? Default is {@code false}. Can be overwritten 392 * at runtime with the {@code io.prometheus.exporter.timestampsInMs} property. 393 */ 394 public Builder prometheusTimestampsInMs(boolean prometheusTimestampsInMs) { 395 this.prometheusTimestampsInMs = prometheusTimestampsInMs; 396 return this; 397 } 398 399 private boolean getPrometheusTimestampsInMs() { 400 // accept either to opt in to timestamps in milliseconds 401 return config.getExporterProperties().getPrometheusTimestampsInMs() 402 || this.prometheusTimestampsInMs; 403 } 404 405 private Scheme getScheme(ExporterPushgatewayProperties properties) { 406 if (properties != null && properties.getScheme() != null) { 407 return Scheme.valueOf(properties.getScheme()); 408 } else if (this.scheme != null) { 409 return this.scheme; 410 } else { 411 return HTTP; 412 } 413 } 414 415 private String getAddress(ExporterPushgatewayProperties properties) { 416 if (properties != null && properties.getAddress() != null) { 417 return properties.getAddress(); 418 } else if (this.address != null) { 419 return this.address; 420 } else { 421 return "localhost:9091"; 422 } 423 } 424 425 private String getJob(ExporterPushgatewayProperties properties) { 426 if (properties != null && properties.getJob() != null) { 427 return properties.getJob(); 428 } else if (this.job != null) { 429 return this.job; 430 } else { 431 return DefaultJobLabelDetector.getDefaultJobLabel(); 432 } 433 } 434 435 private Format getFormat() { 436 // currently not configurable via properties 437 if (this.format != null) { 438 return this.format; 439 } 440 return Format.PROMETHEUS_PROTOBUF; 441 } 442 443 private URL makeUrl(ExporterPushgatewayProperties properties) 444 throws UnsupportedEncodingException, MalformedURLException { 445 String url = getScheme(properties) + "://" + getAddress(properties) + "/metrics/"; 446 String job = getJob(properties); 447 if (job.contains("/")) { 448 url += "job@base64/" + base64url(job); 449 } else { 450 url += "job/" + URLEncoder.encode(job, "UTF-8"); 451 } 452 if (groupingKey != null) { 453 for (Map.Entry<String, String> entry : groupingKey.entrySet()) { 454 if (entry.getValue().isEmpty()) { 455 url += "/" + entry.getKey() + "@base64/="; 456 } else if (entry.getValue().contains("/")) { 457 url += "/" + entry.getKey() + "@base64/" + base64url(entry.getValue()); 458 } else { 459 url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8"); 460 } 461 } 462 } 463 return URI.create(url).normalize().toURL(); 464 } 465 466 private String base64url(String v) { 467 return Base64.getEncoder() 468 .encodeToString(v.getBytes(StandardCharsets.UTF_8)) 469 .replace("+", "-") 470 .replace("/", "_"); 471 } 472 473 public PushGateway build() { 474 ExporterPushgatewayProperties properties = 475 config == null ? null : config.getExporterPushgatewayProperties(); 476 try { 477 return new PushGateway( 478 registry, 479 getFormat(), 480 makeUrl(properties), 481 connectionFactory, 482 requestHeaders, 483 getPrometheusTimestampsInMs()); 484 } catch (MalformedURLException e) { 485 throw new PrometheusPropertiesException( 486 address + ": Invalid address. Expecting <host>:<port>"); 487 } catch (UnsupportedEncodingException e) { 488 throw new RuntimeException(e); // cannot happen, UTF-8 is always supported 489 } 490 } 491 } 492}