001package io.prometheus.metrics.exporter.pushgateway; 002 003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP; 004 005import io.prometheus.metrics.config.ExporterPushgatewayProperties; 006import io.prometheus.metrics.config.PrometheusProperties; 007import io.prometheus.metrics.config.PrometheusPropertiesException; 008import io.prometheus.metrics.expositionformats.ExpositionFormatWriter; 009import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter; 010import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; 011import io.prometheus.metrics.model.registry.Collector; 012import io.prometheus.metrics.model.registry.MultiCollector; 013import io.prometheus.metrics.model.registry.PrometheusRegistry; 014import java.io.ByteArrayOutputStream; 015import java.io.IOException; 016import java.io.InputStream; 017import java.io.OutputStream; 018import java.io.UnsupportedEncodingException; 019import java.net.HttpURLConnection; 020import java.net.InetAddress; 021import java.net.MalformedURLException; 022import java.net.URI; 023import java.net.URL; 024import java.net.URLEncoder; 025import java.net.UnknownHostException; 026import java.nio.charset.StandardCharsets; 027import java.util.Base64; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.Map; 031import java.util.TreeMap; 032 033/** 034 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus 035 * Pushgateway</a> 036 * 037 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to 038 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead 039 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link 040 * PrometheusRegistry} to a Pushgateway. 041 * 042 * <p>Example usage: 043 * 044 * <pre>{@code 045 * void executeBatchJob() throws Exception { 046 * PrometheusRegistry registry = new PrometheusRegistry(); 047 * Gauge duration = Gauge.builder() 048 * .name("my_batch_job_duration_seconds") 049 * .help("Duration of my batch job in seconds.") 050 * .register(registry); 051 * Timer durationTimer = duration.startTimer(); 052 * try { 053 * // Your code here. 054 * 055 * // This is only added to the registry after success, 056 * // so that a previous success in the Pushgateway isn't overwritten on failure. 057 * Gauge lastSuccess = Gauge.builder() 058 * .name("my_batch_job_last_success") 059 * .help("Last time my batch job succeeded, in unixtime.") 060 * .register(registry); 061 * lastSuccess.set(System.currentTimeMillis()); 062 * } finally { 063 * durationTimer.observeDuration(); 064 * PushGateway pg = PushGateway.builder() 065 * .address("127.0.0.1:9091") 066 * .job("my_batch_job") 067 * .registry(registry) 068 * .build(); 069 * pg.pushAdd(); 070 * } 071 * } 072 * }</pre> 073 * 074 * <p>See <a 075 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>. 076 */ 077public class PushGateway { 078 079 private static final int MILLISECONDS_PER_SECOND = 1000; 080 081 private final URL url; 082 private final ExpositionFormatWriter writer; 083 private final Map<String, String> requestHeaders; 084 private final PrometheusRegistry registry; 085 private final HttpConnectionFactory connectionFactory; 086 087 private PushGateway( 088 PrometheusRegistry registry, 089 Format format, 090 URL url, 091 HttpConnectionFactory connectionFactory, 092 Map<String, String> requestHeaders) { 093 this.registry = registry; 094 this.url = url; 095 this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders)); 096 this.connectionFactory = connectionFactory; 097 writer = getWriter(format); 098 if (!writer.isAvailable()) { 099 throw new RuntimeException(writer.getClass() + " is not available"); 100 } 101 } 102 103 private ExpositionFormatWriter getWriter(Format format) { 104 if (format == Format.PROMETHEUS_TEXT) { 105 return new PrometheusTextFormatWriter(false); 106 } else { 107 // use reflection to avoid a compile-time dependency on the expositionformats module 108 return new PrometheusProtobufWriter(); 109 } 110 } 111 112 /** 113 * Push all metrics. All metrics with the same job and grouping key are replaced. 114 * 115 * <p>This uses the PUT HTTP method. 116 */ 117 public void push() throws IOException { 118 doRequest(registry, "PUT"); 119 } 120 121 /** 122 * Push a single metric. All metrics with the same job and grouping key are replaced. 123 * 124 * <p>This is useful for pushing a single Gauge. 125 * 126 * <p>This uses the PUT HTTP method. 127 */ 128 public void push(Collector collector) throws IOException { 129 PrometheusRegistry registry = new PrometheusRegistry(); 130 registry.register(collector); 131 doRequest(registry, "PUT"); 132 } 133 134 /** 135 * Push a single collector. All metrics with the same job and grouping key are replaced. 136 * 137 * <p>This uses the PUT HTTP method. 138 */ 139 public void push(MultiCollector collector) throws IOException { 140 PrometheusRegistry registry = new PrometheusRegistry(); 141 registry.register(collector); 142 doRequest(registry, "PUT"); 143 } 144 145 /** 146 * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are 147 * replaced. 148 * 149 * <p>This uses the POST HTTP method. 150 */ 151 public void pushAdd() throws IOException { 152 doRequest(registry, "POST"); 153 } 154 155 /** 156 * Like {@link #push(Collector)}, but only the specified metric will be replaced. 157 * 158 * <p>This uses the POST HTTP method. 159 */ 160 public void pushAdd(Collector collector) throws IOException { 161 PrometheusRegistry registry = new PrometheusRegistry(); 162 registry.register(collector); 163 doRequest(registry, "POST"); 164 } 165 166 /** 167 * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced. 168 * 169 * <p>This uses the POST HTTP method. 170 */ 171 public void pushAdd(MultiCollector collector) throws IOException { 172 PrometheusRegistry registry = new PrometheusRegistry(); 173 registry.register(collector); 174 doRequest(registry, "POST"); 175 } 176 177 /** 178 * Deletes metrics from the Pushgateway. 179 * 180 * <p>This uses the DELETE HTTP method. 181 */ 182 public void delete() throws IOException { 183 doRequest(null, "DELETE"); 184 } 185 186 private void doRequest(PrometheusRegistry registry, String method) throws IOException { 187 try { 188 HttpURLConnection connection = connectionFactory.create(url); 189 requestHeaders.forEach(connection::setRequestProperty); 190 connection.setRequestProperty("Content-Type", writer.getContentType()); 191 if (!method.equals("DELETE")) { 192 connection.setDoOutput(true); 193 } 194 connection.setRequestMethod(method); 195 196 connection.setConnectTimeout(10 * MILLISECONDS_PER_SECOND); 197 connection.setReadTimeout(10 * MILLISECONDS_PER_SECOND); 198 connection.connect(); 199 200 try { 201 if (!method.equals("DELETE")) { 202 OutputStream outputStream = connection.getOutputStream(); 203 writer.write(outputStream, registry.scrape()); 204 outputStream.flush(); 205 outputStream.close(); 206 } 207 208 int response = connection.getResponseCode(); 209 if (response / 100 != 2) { 210 String errorMessage; 211 InputStream errorStream = connection.getErrorStream(); 212 if (errorStream != null) { 213 String errBody = readFromStream(errorStream); 214 errorMessage = 215 "Response code from " + url + " was " + response + ", response body: " + errBody; 216 } else { 217 errorMessage = "Response code from " + url + " was " + response; 218 } 219 throw new IOException(errorMessage); 220 } 221 222 } finally { 223 connection.disconnect(); 224 } 225 } catch (IOException e) { 226 String baseUrl = url.getProtocol() + "://" + url.getHost(); 227 if (url.getPort() != -1) { 228 baseUrl += ":" + url.getPort(); 229 } 230 throw new IOException( 231 "Failed to push metrics to the Prometheus Pushgateway on " 232 + baseUrl 233 + ": " 234 + e.getMessage(), 235 e); 236 } 237 } 238 239 private static String readFromStream(InputStream is) throws IOException { 240 ByteArrayOutputStream result = new ByteArrayOutputStream(); 241 byte[] buffer = new byte[1024]; 242 int length; 243 while ((length = is.read(buffer)) != -1) { 244 result.write(buffer, 0, length); 245 } 246 return result.toString("UTF-8"); 247 } 248 249 public static Builder builder() { 250 return builder(PrometheusProperties.get()); 251 } 252 253 /** 254 * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}. 255 */ 256 public static Builder builder(PrometheusProperties config) { 257 return new Builder(config); 258 } 259 260 public static class Builder { 261 262 private final PrometheusProperties config; 263 private Format format; 264 private String address; 265 private Scheme scheme; 266 private String job; 267 private final Map<String, String> requestHeaders = new HashMap<>(); 268 private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry; 269 private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory(); 270 private Map<String, String> groupingKey = new TreeMap<>(); 271 272 private Builder(PrometheusProperties config) { 273 this.config = config; 274 } 275 276 /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */ 277 public Builder format(Format format) { 278 if (format == null) { 279 throw new NullPointerException(); 280 } 281 this.format = format; 282 return this; 283 } 284 285 /** 286 * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}. 287 * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address} 288 * property. 289 */ 290 public Builder address(String address) { 291 if (address == null) { 292 throw new NullPointerException(); 293 } 294 this.address = address; 295 return this; 296 } 297 298 /** Username and password for HTTP basic auth when pushing to the Pushgateway. */ 299 public Builder basicAuth(String user, String password) { 300 if (user == null || password == null) { 301 throw new NullPointerException(); 302 } 303 byte[] credentialsBytes = (user + ":" + password).getBytes(StandardCharsets.UTF_8); 304 String encoded = Base64.getEncoder().encodeToString(credentialsBytes); 305 requestHeaders.put("Authorization", String.format("Basic %s", encoded)); 306 return this; 307 } 308 309 /** Bearer token authorization when pushing to the Pushgateway. */ 310 public Builder bearerToken(String token) { 311 if (token == null) { 312 throw new NullPointerException(); 313 } 314 requestHeaders.put("Authorization", String.format("Bearer %s", token)); 315 return this; 316 } 317 318 /** 319 * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten 320 * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property. 321 */ 322 public Builder scheme(Scheme scheme) { 323 if (scheme == null) { 324 throw new NullPointerException(); 325 } 326 this.scheme = scheme; 327 return this; 328 } 329 330 /** 331 * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}. 332 * 333 * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example 334 * of a custom connection factory that skips SSL certificate validation for HTTPS connections. 335 */ 336 public Builder connectionFactory(HttpConnectionFactory connectionFactory) { 337 if (connectionFactory == null) { 338 throw new NullPointerException(); 339 } 340 this.connectionFactory = connectionFactory; 341 return this; 342 } 343 344 /** 345 * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR 346 * file will be used by default. Can be overwritten at runtime with the {@code 347 * io.prometheus.exporter.pushgateway.job} property. 348 */ 349 public Builder job(String job) { 350 if (job == null) { 351 throw new NullPointerException(); 352 } 353 this.job = job; 354 return this; 355 } 356 357 /** 358 * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for 359 * adding multiple grouping keys. 360 */ 361 public Builder groupingKey(String name, String value) { 362 if (name == null || value == null) { 363 throw new NullPointerException(); 364 } 365 groupingKey.put(name, value); 366 return this; 367 } 368 369 /** Convenience method for adding the current IP address as an "instance" label. */ 370 public Builder instanceIpGroupingKey() throws UnknownHostException { 371 return groupingKey("instance", InetAddress.getLocalHost().getHostAddress()); 372 } 373 374 /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */ 375 public Builder registry(PrometheusRegistry registry) { 376 if (registry == null) { 377 throw new NullPointerException(); 378 } 379 this.registry = registry; 380 return this; 381 } 382 383 private Scheme getScheme(ExporterPushgatewayProperties properties) { 384 if (properties != null && properties.getScheme() != null) { 385 return Scheme.valueOf(properties.getScheme()); 386 } else if (this.scheme != null) { 387 return this.scheme; 388 } else { 389 return HTTP; 390 } 391 } 392 393 private String getAddress(ExporterPushgatewayProperties properties) { 394 if (properties != null && properties.getAddress() != null) { 395 return properties.getAddress(); 396 } else if (this.address != null) { 397 return this.address; 398 } else { 399 return "localhost:9091"; 400 } 401 } 402 403 private String getJob(ExporterPushgatewayProperties properties) { 404 if (properties != null && properties.getJob() != null) { 405 return properties.getJob(); 406 } else if (this.job != null) { 407 return this.job; 408 } else { 409 return DefaultJobLabelDetector.getDefaultJobLabel(); 410 } 411 } 412 413 private Format getFormat() { 414 // currently not configurable via properties 415 if (this.format != null) { 416 return this.format; 417 } 418 return Format.PROMETHEUS_PROTOBUF; 419 } 420 421 private URL makeUrl(ExporterPushgatewayProperties properties) 422 throws UnsupportedEncodingException, MalformedURLException { 423 String url = getScheme(properties) + "://" + getAddress(properties) + "/metrics/"; 424 String job = getJob(properties); 425 if (job.contains("/")) { 426 url += "job@base64/" + base64url(job); 427 } else { 428 url += "job/" + URLEncoder.encode(job, "UTF-8"); 429 } 430 if (groupingKey != null) { 431 for (Map.Entry<String, String> entry : groupingKey.entrySet()) { 432 if (entry.getValue().isEmpty()) { 433 url += "/" + entry.getKey() + "@base64/="; 434 } else if (entry.getValue().contains("/")) { 435 url += "/" + entry.getKey() + "@base64/" + base64url(entry.getValue()); 436 } else { 437 url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8"); 438 } 439 } 440 } 441 return URI.create(url).normalize().toURL(); 442 } 443 444 private String base64url(String v) { 445 return Base64.getEncoder() 446 .encodeToString(v.getBytes(StandardCharsets.UTF_8)) 447 .replace("+", "-") 448 .replace("/", "_"); 449 } 450 451 public PushGateway build() { 452 ExporterPushgatewayProperties properties = 453 config == null ? null : config.getExporterPushgatewayProperties(); 454 try { 455 return new PushGateway( 456 registry, getFormat(), makeUrl(properties), connectionFactory, requestHeaders); 457 } catch (MalformedURLException e) { 458 throw new PrometheusPropertiesException( 459 address + ": Invalid address. Expecting <host>:<port>"); 460 } catch (UnsupportedEncodingException e) { 461 throw new RuntimeException(e); // cannot happen, UTF-8 is always supported 462 } 463 } 464 } 465}