001package io.prometheus.metrics.exporter.pushgateway;
002
003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP;
004import static io.prometheus.metrics.model.snapshots.PrometheusNaming.escapeName;
005import static java.util.Objects.requireNonNull;
006
007import io.prometheus.metrics.config.EscapingScheme;
008import io.prometheus.metrics.config.ExporterPushgatewayProperties;
009import io.prometheus.metrics.config.PrometheusProperties;
010import io.prometheus.metrics.config.PrometheusPropertiesException;
011import io.prometheus.metrics.expositionformats.ExpositionFormatWriter;
012import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter;
013import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter;
014import io.prometheus.metrics.model.registry.Collector;
015import io.prometheus.metrics.model.registry.MultiCollector;
016import io.prometheus.metrics.model.registry.PrometheusRegistry;
017import java.io.ByteArrayOutputStream;
018import java.io.IOException;
019import java.io.InputStream;
020import java.io.OutputStream;
021import java.io.UnsupportedEncodingException;
022import java.net.HttpURLConnection;
023import java.net.InetAddress;
024import java.net.MalformedURLException;
025import java.net.URI;
026import java.net.URL;
027import java.net.URLEncoder;
028import java.net.UnknownHostException;
029import java.nio.charset.StandardCharsets;
030import java.time.Duration;
031import java.util.Base64;
032import java.util.Collections;
033import java.util.HashMap;
034import java.util.Map;
035import java.util.TreeMap;
036import javax.annotation.Nullable;
037
038/**
039 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus
040 * Pushgateway</a>
041 *
042 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to
043 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead
044 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link
045 * PrometheusRegistry} to a Pushgateway.
046 *
047 * <p>Example usage:
048 *
049 * <pre>{@code
050 * void executeBatchJob() throws Exception {
051 *     PrometheusRegistry registry = new PrometheusRegistry();
052 *     Gauge duration = Gauge.builder()
053 *             .name("my_batch_job_duration_seconds")
054 *             .help("Duration of my batch job in seconds.")
055 *             .register(registry);
056 *     Timer durationTimer = duration.startTimer();
057 *     try {
058 *         // Your code here.
059 *
060 *         // This is only added to the registry after success,
061 *         // so that a previous success in the Pushgateway isn't overwritten on failure.
062 *         Gauge lastSuccess = Gauge.builder()
063 *                 .name("my_batch_job_last_success")
064 *                 .help("Last time my batch job succeeded, in unixtime.")
065 *                 .register(registry);
066 *         lastSuccess.set(System.currentTimeMillis());
067 *     } finally {
068 *         durationTimer.observeDuration();
069 *         PushGateway pg = PushGateway.builder()
070 *                 .address("127.0.0.1:9091")
071 *                 .job("my_batch_job")
072 *                 .registry(registry)
073 *                 .build();
074 *         pg.pushAdd();
075 *     }
076 * }
077 * }</pre>
078 *
079 * <p>See <a
080 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>.
081 */
082public class PushGateway {
083  private final URL url;
084  private final ExpositionFormatWriter writer;
085  private final boolean prometheusTimestampsInMs;
086  private final Map<String, String> requestHeaders;
087  private final PrometheusRegistry registry;
088  private final HttpConnectionFactory connectionFactory;
089  private final EscapingScheme escapingScheme;
090  private final Duration connectionTimeout;
091  private final Duration readTimeout;
092
093  private PushGateway(
094      PrometheusRegistry registry,
095      Format format,
096      URL url,
097      HttpConnectionFactory connectionFactory,
098      Map<String, String> requestHeaders,
099      boolean prometheusTimestampsInMs,
100      EscapingScheme escapingScheme,
101      Duration connectionTimeout,
102      Duration readTimeout) {
103    this.registry = registry;
104    this.url = url;
105    this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders));
106    this.connectionFactory = connectionFactory;
107    this.prometheusTimestampsInMs = prometheusTimestampsInMs;
108    this.escapingScheme = escapingScheme;
109    this.connectionTimeout = connectionTimeout;
110    this.readTimeout = readTimeout;
111    writer = getWriter(format);
112    if (!writer.isAvailable()) {
113      throw new RuntimeException(writer.getClass() + " is not available");
114    }
115  }
116
117  @SuppressWarnings("deprecation")
118  private ExpositionFormatWriter getWriter(Format format) {
119    if (format == Format.PROMETHEUS_TEXT) {
120      return PrometheusTextFormatWriter.builder()
121          .setTimestampsInMs(this.prometheusTimestampsInMs)
122          .build();
123    } else {
124      // use reflection to avoid a compile-time dependency on the expositionformats module
125      return new PrometheusProtobufWriter();
126    }
127  }
128
129  /**
130   * Push all metrics. All metrics with the same job and grouping key are replaced.
131   *
132   * <p>This uses the PUT HTTP method.
133   */
134  public void push() throws IOException {
135    doRequest(registry, "PUT");
136  }
137
138  /**
139   * Push a single metric. All metrics with the same job and grouping key are replaced.
140   *
141   * <p>This is useful for pushing a single Gauge.
142   *
143   * <p>This uses the PUT HTTP method.
144   */
145  public void push(Collector collector) throws IOException {
146    PrometheusRegistry registry = new PrometheusRegistry();
147    registry.register(collector);
148    doRequest(registry, "PUT");
149  }
150
151  /**
152   * Push a single collector. All metrics with the same job and grouping key are replaced.
153   *
154   * <p>This uses the PUT HTTP method.
155   */
156  public void push(MultiCollector collector) throws IOException {
157    PrometheusRegistry registry = new PrometheusRegistry();
158    registry.register(collector);
159    doRequest(registry, "PUT");
160  }
161
162  /**
163   * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are
164   * replaced.
165   *
166   * <p>This uses the POST HTTP method.
167   */
168  public void pushAdd() throws IOException {
169    doRequest(registry, "POST");
170  }
171
172  /**
173   * Like {@link #push(Collector)}, but only the specified metric will be replaced.
174   *
175   * <p>This uses the POST HTTP method.
176   */
177  public void pushAdd(Collector collector) throws IOException {
178    PrometheusRegistry registry = new PrometheusRegistry();
179    registry.register(collector);
180    doRequest(registry, "POST");
181  }
182
183  /**
184   * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced.
185   *
186   * <p>This uses the POST HTTP method.
187   */
188  public void pushAdd(MultiCollector collector) throws IOException {
189    PrometheusRegistry registry = new PrometheusRegistry();
190    registry.register(collector);
191    doRequest(registry, "POST");
192  }
193
194  /**
195   * Deletes metrics from the Pushgateway.
196   *
197   * <p>This uses the DELETE HTTP method.
198   */
199  public void delete() throws IOException {
200    doRequest(null, "DELETE");
201  }
202
203  private void doRequest(@Nullable PrometheusRegistry registry, String method) throws IOException {
204    try {
205      HttpURLConnection connection = connectionFactory.create(url);
206      requestHeaders.forEach(connection::setRequestProperty);
207      connection.setRequestProperty("Content-Type", writer.getContentType());
208      if (!method.equals("DELETE")) {
209        connection.setDoOutput(true);
210      }
211      connection.setRequestMethod(method);
212
213      connection.setConnectTimeout((int) this.connectionTimeout.toMillis());
214      connection.setReadTimeout((int) this.readTimeout.toMillis());
215      connection.connect();
216
217      try {
218        if (!method.equals("DELETE")) {
219          OutputStream outputStream = connection.getOutputStream();
220          writer.write(outputStream, requireNonNull(registry).scrape(), this.escapingScheme);
221          outputStream.flush();
222          outputStream.close();
223        }
224
225        int response = connection.getResponseCode();
226        if (response / 100 != 2) {
227          String errorMessage;
228          InputStream errorStream = connection.getErrorStream();
229          if (errorStream != null) {
230            String errBody = readFromStream(errorStream);
231            errorMessage =
232                "Response code from " + url + " was " + response + ", response body: " + errBody;
233          } else {
234            errorMessage = "Response code from " + url + " was " + response;
235          }
236          throw new IOException(errorMessage);
237        }
238
239      } finally {
240        connection.disconnect();
241      }
242    } catch (IOException e) {
243      String baseUrl = url.getProtocol() + "://" + url.getHost();
244      if (url.getPort() != -1) {
245        baseUrl += ":" + url.getPort();
246      }
247      throw new IOException(
248          "Failed to push metrics to the Prometheus Pushgateway on "
249              + baseUrl
250              + ": "
251              + e.getMessage(),
252          e);
253    }
254  }
255
256  // toString with Charset is only available in Java 10+, but we want to support Java 8
257  @SuppressWarnings("JdkObsolete")
258  private static String readFromStream(InputStream is) throws IOException {
259    ByteArrayOutputStream result = new ByteArrayOutputStream();
260    byte[] buffer = new byte[1024];
261    int length;
262    while ((length = is.read(buffer)) != -1) {
263      result.write(buffer, 0, length);
264    }
265    return result.toString("UTF-8");
266  }
267
268  public static Builder builder() {
269    return builder(PrometheusProperties.get());
270  }
271
272  /**
273   * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}.
274   */
275  public static Builder builder(PrometheusProperties config) {
276    return new Builder(config);
277  }
278
279  public static class Builder {
280
281    private final PrometheusProperties config;
282    @Nullable private Format format;
283    @Nullable private String address;
284    @Nullable private Scheme scheme;
285    @Nullable private String job;
286    @Nullable private Duration connectionTimeout;
287    @Nullable private Duration readTimeout;
288    private boolean prometheusTimestampsInMs;
289    private final Map<String, String> requestHeaders = new HashMap<>();
290    private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry;
291    private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory();
292    private final Map<String, String> groupingKey = new TreeMap<>();
293    @Nullable private EscapingScheme escapingScheme;
294
295    private Builder(PrometheusProperties config) {
296      this.config = config;
297    }
298
299    /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */
300    public Builder format(Format format) {
301      this.format = requireNonNull(format, "format must not be null");
302      return this;
303    }
304
305    /**
306     * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}.
307     * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address}
308     * property.
309     */
310    public Builder address(String address) {
311      this.address = requireNonNull(address, "address must not be null");
312      return this;
313    }
314
315    /** Username and password for HTTP basic auth when pushing to the Pushgateway. */
316    public Builder basicAuth(String user, String password) {
317      byte[] credentialsBytes =
318          (requireNonNull(user, "user must not be null")
319                  + ":"
320                  + requireNonNull(password, "password must not be null"))
321              .getBytes(StandardCharsets.UTF_8);
322      String encoded = Base64.getEncoder().encodeToString(credentialsBytes);
323      requestHeaders.put("Authorization", String.format("Basic %s", encoded));
324      return this;
325    }
326
327    /** Bearer token authorization when pushing to the Pushgateway. */
328    public Builder bearerToken(String token) {
329      requestHeaders.put(
330          "Authorization",
331          String.format("Bearer %s", requireNonNull(token, "token must not be null")));
332      return this;
333    }
334
335    /**
336     * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten
337     * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property.
338     */
339    public Builder scheme(Scheme scheme) {
340      this.scheme = requireNonNull(scheme, "scheme must not be null");
341      return this;
342    }
343
344    /**
345     * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}.
346     *
347     * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example
348     * of a custom connection factory that skips SSL certificate validation for HTTPS connections.
349     */
350    public Builder connectionFactory(HttpConnectionFactory connectionFactory) {
351      this.connectionFactory =
352          requireNonNull(connectionFactory, "connectionFactory must not be null");
353      return this;
354    }
355
356    /**
357     * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR
358     * file will be used by default. Can be overwritten at runtime with the {@code
359     * io.prometheus.exporter.pushgateway.job} property.
360     */
361    public Builder job(String job) {
362      this.job = requireNonNull(job, "job must not be null");
363      return this;
364    }
365
366    /**
367     * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for
368     * adding multiple grouping keys.
369     */
370    public Builder groupingKey(String name, String value) {
371      groupingKey.put(
372          requireNonNull(name, "name must not be null"),
373          requireNonNull(value, "value must not be null"));
374      return this;
375    }
376
377    /** Convenience method for adding the current IP address as an "instance" label. */
378    public Builder instanceIpGroupingKey() throws UnknownHostException {
379      return groupingKey("instance", InetAddress.getLocalHost().getHostAddress());
380    }
381
382    /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */
383    public Builder registry(PrometheusRegistry registry) {
384      this.registry = requireNonNull(registry, "registry must not be null");
385      return this;
386    }
387
388    /**
389     * Specify the escaping scheme to be used when pushing metrics. Default is {@link
390     * EscapingScheme#UNDERSCORE_ESCAPING}.
391     */
392    public Builder escapingScheme(EscapingScheme escapingScheme) {
393      this.escapingScheme = requireNonNull(escapingScheme, "escapingScheme must not be null");
394      return this;
395    }
396
397    /**
398     * Use milliseconds for timestamps in text format? Default is {@code false}. Can be overwritten
399     * at runtime with the {@code io.prometheus.exporter.timestamps_in_ms} property.
400     */
401    public Builder prometheusTimestampsInMs(boolean prometheusTimestampsInMs) {
402      this.prometheusTimestampsInMs = prometheusTimestampsInMs;
403      return this;
404    }
405
406    /**
407     * Specify the connection timeout for HTTP connections to the PushGateway. Default is 10
408     * seconds.
409     *
410     * @param connectionTimeout timeout value
411     * @return this {@link Builder} instance
412     */
413    public Builder connectionTimeout(Duration connectionTimeout) {
414      this.connectionTimeout = connectionTimeout;
415      return this;
416    }
417
418    private Duration getConnectionTimeout(@Nullable ExporterPushgatewayProperties properties) {
419      if (properties != null && properties.getConnectTimeout() != null) {
420        return properties.getConnectTimeout();
421      } else if (this.connectionTimeout != null) {
422        return this.connectionTimeout;
423      } else {
424        return Duration.ofSeconds(10);
425      }
426    }
427
428    /**
429     * Specify the read timeout for HTTP connections to the PushGateway. Default is 10 seconds.
430     *
431     * @param readTimeout timeout value
432     * @return this {@link Builder} instance
433     */
434    public Builder readTimeout(Duration readTimeout) {
435      this.readTimeout = readTimeout;
436      return this;
437    }
438
439    private Duration getReadTimeout(@Nullable ExporterPushgatewayProperties properties) {
440      if (properties != null && properties.getReadTimeout() != null) {
441        return properties.getReadTimeout();
442      } else if (this.readTimeout != null) {
443        return this.readTimeout;
444      } else {
445        return Duration.ofSeconds(10);
446      }
447    }
448
449    private boolean getPrometheusTimestampsInMs() {
450      // accept either to opt in to timestamps in milliseconds
451      return config.getExporterProperties().getPrometheusTimestampsInMs()
452          || this.prometheusTimestampsInMs;
453    }
454
455    private Scheme getScheme(@Nullable ExporterPushgatewayProperties properties) {
456      if (properties != null && properties.getScheme() != null) {
457        return Scheme.valueOf(properties.getScheme());
458      } else if (this.scheme != null) {
459        return this.scheme;
460      } else {
461        return HTTP;
462      }
463    }
464
465    private String getAddress(@Nullable ExporterPushgatewayProperties properties) {
466      if (properties != null && properties.getAddress() != null) {
467        return properties.getAddress();
468      } else if (this.address != null) {
469        return this.address;
470      } else {
471        return "localhost:9091";
472      }
473    }
474
475    private String getJob(@Nullable ExporterPushgatewayProperties properties) {
476      if (properties != null && properties.getJob() != null) {
477        return properties.getJob();
478      } else if (this.job != null) {
479        return this.job;
480      } else {
481        return DefaultJobLabelDetector.getDefaultJobLabel();
482      }
483    }
484
485    private EscapingScheme getEscapingScheme(@Nullable ExporterPushgatewayProperties properties) {
486      if (properties != null && properties.getEscapingScheme() != null) {
487        return properties.getEscapingScheme();
488      } else if (this.escapingScheme != null) {
489        return this.escapingScheme;
490      }
491      return EscapingScheme.UNDERSCORE_ESCAPING;
492    }
493
494    private Format getFormat() {
495      // currently not configurable via properties
496      if (this.format != null) {
497        return this.format;
498      }
499      return Format.PROMETHEUS_PROTOBUF;
500    }
501
502    // encode with Charset is only available in Java 10+, but we want to support Java 8
503    @SuppressWarnings("JdkObsolete")
504    private URL makeUrl(@Nullable ExporterPushgatewayProperties properties)
505        throws UnsupportedEncodingException, MalformedURLException {
506      StringBuilder url =
507          new StringBuilder(getScheme(properties) + "://" + getAddress(properties) + "/metrics/");
508      String job = getJob(properties);
509      if (job.contains("/")) {
510        url.append("job@base64/").append(base64url(job));
511      } else {
512        url.append("job/").append(URLEncoder.encode(job, "UTF-8"));
513      }
514      for (Map.Entry<String, String> entry : groupingKey.entrySet()) {
515        if (entry.getValue().isEmpty()) {
516          url.append("/")
517              .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING))
518              .append("@base64/=");
519        } else if (entry.getValue().contains("/")) {
520          url.append("/")
521              .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING))
522              .append("@base64/")
523              .append(base64url(entry.getValue()));
524        } else {
525          url.append("/")
526              .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING))
527              .append("/")
528              .append(URLEncoder.encode(entry.getValue(), "UTF-8"));
529        }
530      }
531      return URI.create(url.toString()).normalize().toURL();
532    }
533
534    private String base64url(String v) {
535      return Base64.getEncoder()
536          .encodeToString(v.getBytes(StandardCharsets.UTF_8))
537          .replace("+", "-")
538          .replace("/", "_");
539    }
540
541    public PushGateway build() {
542      ExporterPushgatewayProperties properties =
543          config == null ? null : config.getExporterPushgatewayProperties();
544      try {
545        return new PushGateway(
546            registry,
547            getFormat(),
548            makeUrl(properties),
549            connectionFactory,
550            requestHeaders,
551            getPrometheusTimestampsInMs(),
552            getEscapingScheme(properties),
553            getConnectionTimeout(properties),
554            getReadTimeout(properties));
555      } catch (MalformedURLException e) {
556        throw new PrometheusPropertiesException(
557            address + ": Invalid address. Expecting <host>:<port>");
558      } catch (UnsupportedEncodingException e) {
559        throw new RuntimeException(e); // cannot happen, UTF-8 is always supported
560      }
561    }
562  }
563}