001package io.prometheus.metrics.exporter.pushgateway; 002 003import io.prometheus.metrics.config.ExporterPushgatewayProperties; 004import io.prometheus.metrics.config.PrometheusProperties; 005import io.prometheus.metrics.config.PrometheusPropertiesException; 006import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter; 007import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; 008import io.prometheus.metrics.model.registry.Collector; 009import io.prometheus.metrics.model.registry.MultiCollector; 010import io.prometheus.metrics.model.registry.PrometheusRegistry; 011 012import java.io.*; 013import java.net.*; 014import java.nio.charset.StandardCharsets; 015import java.util.*; 016 017import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP; 018 019/** 020 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus Pushgateway</a> 021 * <p> 022 * The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to Prometheus. 023 * Since these kinds of jobs may not exist long enough to be scraped, they can instead push their metrics 024 * to a Pushgateway. This Java class allows pushing the contents of a {@link PrometheusRegistry} to a Pushgateway. 025 * <p> 026 * Example usage: 027 * <pre> 028 * {@code 029 * void executeBatchJob() throws Exception { 030 * PrometheusRegistry registry = new PrometheusRegistry(); 031 * Gauge duration = Gauge.builder() 032 * .name("my_batch_job_duration_seconds") 033 * .help("Duration of my batch job in seconds.") 034 * .register(registry); 035 * Timer durationTimer = duration.startTimer(); 036 * try { 037 * // Your code here. 038 * 039 * // This is only added to the registry after success, 040 * // so that a previous success in the Pushgateway isn't overwritten on failure. 041 * Gauge lastSuccess = Gauge.builder() 042 * .name("my_batch_job_last_success") 043 * .help("Last time my batch job succeeded, in unixtime.") 044 * .register(registry); 045 * lastSuccess.set(System.currentTimeMillis()); 046 * } finally { 047 * durationTimer.observeDuration(); 048 * PushGateway pg = PushGateway.builder() 049 * .address("127.0.0.1:9091") 050 * .job("my_batch_job") 051 * .registry(registry) 052 * .build(); 053 * pg.pushAdd(); 054 * } 055 * } 056 * } 057 * </pre> 058 * <p> 059 * See <a href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>. 060 */ 061public class PushGateway { 062 063 private static final int MILLISECONDS_PER_SECOND = 1000; 064 065 private final URL url; 066 private final Format format; 067 private final Map<String, String> requestHeaders; 068 private final PrometheusRegistry registry; 069 private final HttpConnectionFactory connectionFactory; 070 071 private PushGateway(PrometheusRegistry registry, Format format, URL url, HttpConnectionFactory connectionFactory, Map<String, String> requestHeaders) { 072 this.registry = registry; 073 this.format = format; 074 this.url = url; 075 this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders)); 076 this.connectionFactory = connectionFactory; 077 } 078 079 /** 080 * Push all metrics. All metrics with the same job and grouping key are replaced. 081 * <p> 082 * This uses the PUT HTTP method. 083 */ 084 public void push() throws IOException { 085 doRequest(registry, "PUT"); 086 } 087 088 /** 089 * Push a single metric. All metrics with the same job and grouping key are replaced. 090 * <p> 091 * This is useful for pushing a single Gauge. 092 * <p> 093 * This uses the PUT HTTP method. 094 */ 095 public void push(Collector collector) throws IOException { 096 PrometheusRegistry registry = new PrometheusRegistry(); 097 registry.register(collector); 098 doRequest(registry, "PUT"); 099 } 100 101 /** 102 * Push a single collector. All metrics with the same job and grouping key are replaced. 103 * <p> 104 * This uses the PUT HTTP method. 105 */ 106 public void push(MultiCollector collector) throws IOException { 107 PrometheusRegistry registry = new PrometheusRegistry(); 108 registry.register(collector); 109 doRequest(registry, "PUT"); 110 } 111 112 /** 113 * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are replaced. 114 * <p> 115 * This uses the POST HTTP method. 116 */ 117 public void pushAdd() throws IOException { 118 doRequest(registry, "POST"); 119 } 120 121 /** 122 * Like {@link #push(Collector)}, but only the specified metric will be replaced. 123 * <p> 124 * This uses the POST HTTP method. 125 */ 126 public void pushAdd(Collector collector) throws IOException { 127 PrometheusRegistry registry = new PrometheusRegistry(); 128 registry.register(collector); 129 doRequest(registry, "POST"); 130 } 131 132 /** 133 * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced. 134 * <p> 135 * This uses the POST HTTP method. 136 */ 137 public void pushAdd(MultiCollector collector) throws IOException { 138 PrometheusRegistry registry = new PrometheusRegistry(); 139 registry.register(collector); 140 doRequest(registry, "POST"); 141 } 142 143 /** 144 * Deletes metrics from the Pushgateway. 145 * <p> 146 * This uses the DELETE HTTP method. 147 */ 148 public void delete() throws IOException { 149 doRequest(null, "DELETE"); 150 } 151 152 private void doRequest(PrometheusRegistry registry, String method) throws IOException { 153 try { 154 HttpURLConnection connection = connectionFactory.create(url); 155 requestHeaders.forEach(connection::setRequestProperty); 156 if (format == Format.PROMETHEUS_TEXT) { 157 connection.setRequestProperty("Content-Type", PrometheusTextFormatWriter.CONTENT_TYPE); 158 } else { 159 connection.setRequestProperty("Content-Type", PrometheusProtobufWriter.CONTENT_TYPE); 160 } 161 if (!method.equals("DELETE")) { 162 connection.setDoOutput(true); 163 } 164 connection.setRequestMethod(method); 165 166 connection.setConnectTimeout(10 * MILLISECONDS_PER_SECOND); 167 connection.setReadTimeout(10 * MILLISECONDS_PER_SECOND); 168 connection.connect(); 169 170 try { 171 if (!method.equals("DELETE")) { 172 OutputStream outputStream = connection.getOutputStream(); 173 if (format == Format.PROMETHEUS_TEXT) { 174 new PrometheusTextFormatWriter(false).write(outputStream, registry.scrape()); 175 } else { 176 new PrometheusProtobufWriter().write(outputStream, registry.scrape()); 177 } 178 outputStream.flush(); 179 outputStream.close(); 180 } 181 182 int response = connection.getResponseCode(); 183 if (response / 100 != 2) { 184 String errorMessage; 185 InputStream errorStream = connection.getErrorStream(); 186 if (errorStream != null) { 187 String errBody = readFromStream(errorStream); 188 errorMessage = "Response code from " + url + " was " + response + ", response body: " + errBody; 189 } else { 190 errorMessage = "Response code from " + url + " was " + response; 191 } 192 throw new IOException(errorMessage); 193 } 194 195 } finally { 196 connection.disconnect(); 197 } 198 } catch (IOException e) { 199 String baseUrl = url.getProtocol() + "://" + url.getHost(); 200 if (url.getPort() != -1) { 201 baseUrl += ":" + url.getPort(); 202 } 203 throw new IOException("Failed to push metrics to the Prometheus Pushgateway on " + baseUrl + ": " + e.getMessage(), e); 204 } 205 } 206 207 private static String readFromStream(InputStream is) throws IOException { 208 ByteArrayOutputStream result = new ByteArrayOutputStream(); 209 byte[] buffer = new byte[1024]; 210 int length; 211 while ((length = is.read(buffer)) != -1) { 212 result.write(buffer, 0, length); 213 } 214 return result.toString("UTF-8"); 215 } 216 217 public static Builder builder() { 218 return builder(PrometheusProperties.get()); 219 } 220 221 /** 222 * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}. 223 */ 224 public static Builder builder(PrometheusProperties config) { 225 return new Builder(config); 226 } 227 228 public static class Builder { 229 230 private final PrometheusProperties config; 231 private Format format; 232 private String address; 233 private Scheme scheme; 234 private String job; 235 private final Map<String, String> requestHeaders = new HashMap<>(); 236 private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry; 237 private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory(); 238 private Map<String, String> groupingKey = new TreeMap<>(); 239 240 private Builder(PrometheusProperties config) { 241 this.config = config; 242 } 243 244 /** 245 * Default is {@link Format#PROMETHEUS_PROTOBUF}. 246 */ 247 public Builder format(Format format) { 248 if (format == null) { 249 throw new NullPointerException(); 250 } 251 this.format = format; 252 return this; 253 } 254 255 /** 256 * Address of the Pushgateway in format {@code host:port}. 257 * Default is {@code localhost:9091}. 258 * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address} property. 259 */ 260 public Builder address(String address) { 261 if (address == null) { 262 throw new NullPointerException(); 263 } 264 this.address = address; 265 return this; 266 } 267 268 /** 269 * Username and password for HTTP basic auth when pushing to the Pushgateway. 270 */ 271 public Builder basicAuth(String user, String password) { 272 if (user == null || password == null) { 273 throw new NullPointerException(); 274 } 275 byte[] credentialsBytes = (user + ":" + password).getBytes(StandardCharsets.UTF_8); 276 String encoded = Base64.getEncoder().encodeToString(credentialsBytes); 277 requestHeaders.put("Authorization", String.format("Basic %s", encoded)); 278 return this; 279 } 280 281 /** 282 * Bearer token authorization when pushing to the Pushgateway. 283 */ 284 public Builder bearerToken(String token) { 285 if (token == null) { 286 throw new NullPointerException(); 287 } 288 requestHeaders.put("Authorization", String.format("Bearer %s", token)); 289 return this; 290 } 291 292 /** 293 * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. 294 * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property. 295 */ 296 public Builder scheme(Scheme scheme) { 297 if (scheme == null) { 298 throw new NullPointerException(); 299 } 300 this.scheme = scheme; 301 return this; 302 } 303 304 /** 305 * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}. 306 * <p> 307 * The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example of a custom 308 * connection factory that skips SSL certificate validation for HTTPS connections. 309 */ 310 public Builder connectionFactory(HttpConnectionFactory connectionFactory) { 311 if (connectionFactory == null) { 312 throw new NullPointerException(); 313 } 314 this.connectionFactory = connectionFactory; 315 return this; 316 } 317 318 /** 319 * The {@code job} label to be used when pushing metrics. 320 * If not provided, the name of the JAR file will be used by default. 321 * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.job} property. 322 */ 323 public Builder job(String job) { 324 if (job == null) { 325 throw new NullPointerException(); 326 } 327 this.job = job; 328 return this; 329 } 330 331 /** 332 * Grouping keys to be used when pushing/deleting metrics. 333 * Call this method multiple times for adding multiple grouping keys. 334 */ 335 public Builder groupingKey(String name, String value) { 336 if (name == null || value == null) { 337 throw new NullPointerException(); 338 } 339 groupingKey.put(name, value); 340 return this; 341 } 342 343 /** 344 * Convenience method for adding the current IP address as an "instance" label. 345 */ 346 public Builder instanceIpGroupingKey() throws UnknownHostException { 347 return groupingKey("instance", InetAddress.getLocalHost().getHostAddress()); 348 } 349 350 /** 351 * Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. 352 */ 353 public Builder registry(PrometheusRegistry registry) { 354 if (registry == null) { 355 throw new NullPointerException(); 356 } 357 this.registry = registry; 358 return this; 359 } 360 361 private Scheme getScheme(ExporterPushgatewayProperties properties) { 362 if (properties != null && properties.getScheme() != null) { 363 return Scheme.valueOf(properties.getScheme()); 364 } else if (this.scheme != null) { 365 return this.scheme; 366 } else { 367 return HTTP; 368 } 369 } 370 371 private String getAddress(ExporterPushgatewayProperties properties) { 372 if (properties != null && properties.getAddress() != null) { 373 return properties.getAddress(); 374 } else if (this.address != null) { 375 return this.address; 376 } else { 377 return "localhost:9091"; 378 } 379 } 380 381 private String getJob(ExporterPushgatewayProperties properties) { 382 if (properties != null && properties.getJob() != null) { 383 return properties.getJob(); 384 } else if (this.job != null) { 385 return this.job; 386 } else { 387 return DefaultJobLabelDetector.getDefaultJobLabel(); 388 } 389 } 390 391 private Format getFormat(ExporterPushgatewayProperties properties) { 392 // currently not configurable via properties 393 if (this.format != null) { 394 return this.format; 395 } 396 return Format.PROMETHEUS_PROTOBUF; 397 } 398 399 private URL makeUrl(ExporterPushgatewayProperties properties) throws UnsupportedEncodingException, MalformedURLException { 400 String url = getScheme(properties) + "://" + getAddress(properties) + "/metrics/"; 401 String job = getJob(properties); 402 if (job.contains("/")) { 403 url += "job@base64/" + base64url(job); 404 } else { 405 url += "job/" + URLEncoder.encode(job, "UTF-8"); 406 } 407 if (groupingKey != null) { 408 for (Map.Entry<String, String> entry : groupingKey.entrySet()) { 409 if (entry.getValue().isEmpty()) { 410 url += "/" + entry.getKey() + "@base64/="; 411 } else if (entry.getValue().contains("/")) { 412 url += "/" + entry.getKey() + "@base64/" + base64url(entry.getValue()); 413 } else { 414 url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8"); 415 } 416 } 417 } 418 return URI.create(url).normalize().toURL(); 419 } 420 421 private String base64url(String v) { 422 return Base64.getEncoder().encodeToString(v.getBytes(StandardCharsets.UTF_8)).replace("+", "-").replace("/", "_"); 423 } 424 425 public PushGateway build() { 426 ExporterPushgatewayProperties properties = config == null ? null : config.getExporterPushgatewayProperties(); 427 try { 428 return new PushGateway(registry, getFormat(properties), makeUrl(properties), connectionFactory, requestHeaders); 429 } catch (MalformedURLException e) { 430 throw new PrometheusPropertiesException(address + ": Invalid address. Expecting <host>:<port>"); 431 } catch (UnsupportedEncodingException e) { 432 throw new RuntimeException(e); // cannot happen, UTF-8 is always supported 433 } 434 } 435 } 436}