001package io.prometheus.metrics.exporter.pushgateway;
002
003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP;
004import static io.prometheus.metrics.model.snapshots.PrometheusNaming.escapeName;
005import static java.util.Objects.requireNonNull;
006
007import io.prometheus.metrics.config.EscapingScheme;
008import io.prometheus.metrics.config.ExporterPushgatewayProperties;
009import io.prometheus.metrics.config.PrometheusProperties;
010import io.prometheus.metrics.config.PrometheusPropertiesException;
011import io.prometheus.metrics.expositionformats.ExpositionFormatWriter;
012import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter;
013import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter;
014import io.prometheus.metrics.model.registry.Collector;
015import io.prometheus.metrics.model.registry.MultiCollector;
016import io.prometheus.metrics.model.registry.PrometheusRegistry;
017import java.io.ByteArrayOutputStream;
018import java.io.IOException;
019import java.io.InputStream;
020import java.io.OutputStream;
021import java.io.UnsupportedEncodingException;
022import java.net.HttpURLConnection;
023import java.net.InetAddress;
024import java.net.MalformedURLException;
025import java.net.URI;
026import java.net.URL;
027import java.net.URLEncoder;
028import java.net.UnknownHostException;
029import java.nio.charset.StandardCharsets;
030import java.time.Duration;
031import java.util.Base64;
032import java.util.Collections;
033import java.util.HashMap;
034import java.util.Map;
035import java.util.TreeMap;
036import javax.annotation.Nullable;
037
038/**
039 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus
040 * Pushgateway</a>
041 *
042 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to
043 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead
044 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link
045 * PrometheusRegistry} to a Pushgateway.
046 *
047 * <p>Example usage:
048 *
049 * <pre>{@code
050 * void executeBatchJob() throws Exception {
051 *     PrometheusRegistry registry = new PrometheusRegistry();
052 *     Gauge duration = Gauge.builder()
053 *             .name("my_batch_job_duration_seconds")
054 *             .help("Duration of my batch job in seconds.")
055 *             .register(registry);
056 *     Timer durationTimer = duration.startTimer();
057 *     try {
058 *         // Your code here.
059 *
060 *         // This is only added to the registry after success,
061 *         // so that a previous success in the Pushgateway isn't overwritten on failure.
062 *         Gauge lastSuccess = Gauge.builder()
063 *                 .name("my_batch_job_last_success")
064 *                 .help("Last time my batch job succeeded, in unixtime.")
065 *                 .register(registry);
066 *         lastSuccess.set(System.currentTimeMillis());
067 *     } finally {
068 *         durationTimer.observeDuration();
069 *         PushGateway pg = PushGateway.builder()
070 *                 .address("127.0.0.1:9091")
071 *                 .job("my_batch_job")
072 *                 .registry(registry)
073 *                 .build();
074 *         pg.pushAdd();
075 *     }
076 * }
077 * }</pre>
078 *
079 * <p>See <a
080 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>.
081 */
082public class PushGateway {
083  private final URL url;
084  private final ExpositionFormatWriter writer;
085  private final boolean prometheusTimestampsInMs;
086  private final Map<String, String> requestHeaders;
087  private final PrometheusRegistry registry;
088  private final HttpConnectionFactory connectionFactory;
089  private final EscapingScheme escapingScheme;
090  private final Duration connectionTimeout;
091  private final Duration readTimeout;
092
093  private PushGateway(
094      PrometheusRegistry registry,
095      Format format,
096      URL url,
097      HttpConnectionFactory connectionFactory,
098      Map<String, String> requestHeaders,
099      boolean prometheusTimestampsInMs,
100      EscapingScheme escapingScheme,
101      Duration connectionTimeout,
102      Duration readTimeout) {
103    this.registry = registry;
104    this.url = url;
105    this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders));
106    this.connectionFactory = connectionFactory;
107    this.prometheusTimestampsInMs = prometheusTimestampsInMs;
108    this.escapingScheme = escapingScheme;
109    this.connectionTimeout = connectionTimeout;
110    this.readTimeout = readTimeout;
111    writer = getWriter(format);
112    if (!writer.isAvailable()) {
113      throw new RuntimeException(writer.getClass() + " is not available");
114    }
115  }
116
117  @SuppressWarnings("deprecation")
118  private ExpositionFormatWriter getWriter(Format format) {
119    if (format == Format.PROMETHEUS_TEXT) {
120      return PrometheusTextFormatWriter.builder()
121          .setTimestampsInMs(this.prometheusTimestampsInMs)
122          .build();
123    } else {
124      // use reflection to avoid a compile-time dependency on the expositionformats module
125      return new PrometheusProtobufWriter();
126    }
127  }
128
129  /**
130   * Push all metrics. All metrics with the same job and grouping key are replaced.
131   *
132   * <p>This uses the PUT HTTP method.
133   */
134  public void push() throws IOException {
135    doRequest(registry, "PUT");
136  }
137
138  /**
139   * Push a single metric. All metrics with the same job and grouping key are replaced.
140   *
141   * <p>This is useful for pushing a single Gauge.
142   *
143   * <p>This uses the PUT HTTP method.
144   */
145  public void push(Collector collector) throws IOException {
146    PrometheusRegistry registry = new PrometheusRegistry();
147    registry.register(collector);
148    doRequest(registry, "PUT");
149  }
150
151  /**
152   * Push a single collector. All metrics with the same job and grouping key are replaced.
153   *
154   * <p>This uses the PUT HTTP method.
155   */
156  public void push(MultiCollector collector) throws IOException {
157    PrometheusRegistry registry = new PrometheusRegistry();
158    registry.register(collector);
159    doRequest(registry, "PUT");
160  }
161
162  /**
163   * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are
164   * replaced.
165   *
166   * <p>This uses the POST HTTP method.
167   */
168  public void pushAdd() throws IOException {
169    doRequest(registry, "POST");
170  }
171
172  /**
173   * Like {@link #push(Collector)}, but only the specified metric will be replaced.
174   *
175   * <p>This uses the POST HTTP method.
176   */
177  public void pushAdd(Collector collector) throws IOException {
178    PrometheusRegistry registry = new PrometheusRegistry();
179    registry.register(collector);
180    doRequest(registry, "POST");
181  }
182
183  /**
184   * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced.
185   *
186   * <p>This uses the POST HTTP method.
187   */
188  public void pushAdd(MultiCollector collector) throws IOException {
189    PrometheusRegistry registry = new PrometheusRegistry();
190    registry.register(collector);
191    doRequest(registry, "POST");
192  }
193
194  /**
195   * Deletes metrics from the Pushgateway.
196   *
197   * <p>This uses the DELETE HTTP method.
198   */
199  public void delete() throws IOException {
200    doRequest(null, "DELETE");
201  }
202
203  private void doRequest(@Nullable PrometheusRegistry registry, String method) throws IOException {
204    try {
205      HttpURLConnection connection = connectionFactory.create(url);
206      requestHeaders.forEach(connection::setRequestProperty);
207      connection.setRequestProperty("Content-Type", writer.getContentType());
208      if (!method.equals("DELETE")) {
209        connection.setDoOutput(true);
210      }
211      connection.setRequestMethod(method);
212
213      connection.setConnectTimeout((int) this.connectionTimeout.toMillis());
214      connection.setReadTimeout((int) this.readTimeout.toMillis());
215      connection.connect();
216
217      try {
218        if (!method.equals("DELETE")) {
219          OutputStream outputStream = connection.getOutputStream();
220          writer.write(outputStream, requireNonNull(registry).scrape(), this.escapingScheme);
221          outputStream.flush();
222          outputStream.close();
223        }
224
225        int response = connection.getResponseCode();
226        if (response / 100 != 2) {
227          String errorMessage;
228          InputStream errorStream = connection.getErrorStream();
229          if (errorStream != null) {
230            String errBody = readFromStream(errorStream);
231            errorMessage =
232                "Response code from " + url + " was " + response + ", response body: " + errBody;
233          } else {
234            errorMessage = "Response code from " + url + " was " + response;
235          }
236          throw new IOException(errorMessage);
237        }
238
239      } finally {
240        connection.disconnect();
241      }
242    } catch (IOException e) {
243      String baseUrl = url.getProtocol() + "://" + url.getHost();
244      if (url.getPort() != -1) {
245        baseUrl += ":" + url.getPort();
246      }
247      throw new IOException(
248          "Failed to push metrics to the Prometheus Pushgateway on "
249              + baseUrl
250              + ": "
251              + e.getMessage(),
252          e);
253    }
254  }
255
256  private static String readFromStream(InputStream is) throws IOException {
257    ByteArrayOutputStream result = new ByteArrayOutputStream();
258    byte[] buffer = new byte[1024];
259    int length;
260    while ((length = is.read(buffer)) != -1) {
261      result.write(buffer, 0, length);
262    }
263    return result.toString("UTF-8");
264  }
265
266  public static Builder builder() {
267    return builder(PrometheusProperties.get());
268  }
269
270  /**
271   * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}.
272   */
273  public static Builder builder(PrometheusProperties config) {
274    return new Builder(config);
275  }
276
277  public static class Builder {
278
279    private final PrometheusProperties config;
280    @Nullable private Format format;
281    @Nullable private String address;
282    @Nullable private Scheme scheme;
283    @Nullable private String job;
284    @Nullable private Duration connectionTimeout;
285    @Nullable private Duration readTimeout;
286    private boolean prometheusTimestampsInMs;
287    private final Map<String, String> requestHeaders = new HashMap<>();
288    private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry;
289    private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory();
290    private final Map<String, String> groupingKey = new TreeMap<>();
291    @Nullable private EscapingScheme escapingScheme;
292
293    private Builder(PrometheusProperties config) {
294      this.config = config;
295    }
296
297    /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */
298    public Builder format(Format format) {
299      this.format = requireNonNull(format, "format must not be null");
300      return this;
301    }
302
303    /**
304     * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}.
305     * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address}
306     * property.
307     */
308    public Builder address(String address) {
309      this.address = requireNonNull(address, "address must not be null");
310      return this;
311    }
312
313    /** Username and password for HTTP basic auth when pushing to the Pushgateway. */
314    public Builder basicAuth(String user, String password) {
315      byte[] credentialsBytes =
316          (requireNonNull(user, "user must not be null")
317                  + ":"
318                  + requireNonNull(password, "password must not be null"))
319              .getBytes(StandardCharsets.UTF_8);
320      String encoded = Base64.getEncoder().encodeToString(credentialsBytes);
321      requestHeaders.put("Authorization", String.format("Basic %s", encoded));
322      return this;
323    }
324
325    /** Bearer token authorization when pushing to the Pushgateway. */
326    public Builder bearerToken(String token) {
327      requestHeaders.put(
328          "Authorization",
329          String.format("Bearer %s", requireNonNull(token, "token must not be null")));
330      return this;
331    }
332
333    /**
334     * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten
335     * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property.
336     */
337    public Builder scheme(Scheme scheme) {
338      this.scheme = requireNonNull(scheme, "scheme must not be null");
339      return this;
340    }
341
342    /**
343     * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}.
344     *
345     * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example
346     * of a custom connection factory that skips SSL certificate validation for HTTPS connections.
347     */
348    public Builder connectionFactory(HttpConnectionFactory connectionFactory) {
349      this.connectionFactory =
350          requireNonNull(connectionFactory, "connectionFactory must not be null");
351      return this;
352    }
353
354    /**
355     * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR
356     * file will be used by default. Can be overwritten at runtime with the {@code
357     * io.prometheus.exporter.pushgateway.job} property.
358     */
359    public Builder job(String job) {
360      this.job = requireNonNull(job, "job must not be null");
361      return this;
362    }
363
364    /**
365     * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for
366     * adding multiple grouping keys.
367     */
368    public Builder groupingKey(String name, String value) {
369      groupingKey.put(
370          requireNonNull(name, "name must not be null"),
371          requireNonNull(value, "value must not be null"));
372      return this;
373    }
374
375    /** Convenience method for adding the current IP address as an "instance" label. */
376    public Builder instanceIpGroupingKey() throws UnknownHostException {
377      return groupingKey("instance", InetAddress.getLocalHost().getHostAddress());
378    }
379
380    /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */
381    public Builder registry(PrometheusRegistry registry) {
382      this.registry = requireNonNull(registry, "registry must not be null");
383      return this;
384    }
385
386    /**
387     * Specify the escaping scheme to be used when pushing metrics. Default is {@link
388     * EscapingScheme#UNDERSCORE_ESCAPING}.
389     */
390    public Builder escapingScheme(EscapingScheme escapingScheme) {
391      this.escapingScheme = requireNonNull(escapingScheme, "escapingScheme must not be null");
392      return this;
393    }
394
395    /**
396     * Use milliseconds for timestamps in text format? Default is {@code false}. Can be overwritten
397     * at runtime with the {@code io.prometheus.exporter.timestampsInMs} property.
398     */
399    public Builder prometheusTimestampsInMs(boolean prometheusTimestampsInMs) {
400      this.prometheusTimestampsInMs = prometheusTimestampsInMs;
401      return this;
402    }
403
404    /**
405     * Specify the connection timeout for HTTP connections to the PushGateway. Default is 10
406     * seconds.
407     *
408     * @param connectionTimeout timeout value
409     * @return this {@link Builder} instance
410     */
411    public Builder connectionTimeout(Duration connectionTimeout) {
412      this.connectionTimeout = connectionTimeout;
413      return this;
414    }
415
416    private Duration getConnectionTimeout(@Nullable ExporterPushgatewayProperties properties) {
417      if (properties != null && properties.getConnectTimeout() != null) {
418        return properties.getConnectTimeout();
419      } else if (this.connectionTimeout != null) {
420        return this.connectionTimeout;
421      } else {
422        return Duration.ofSeconds(10);
423      }
424    }
425
426    /**
427     * Specify the read timeout for HTTP connections to the PushGateway. Default is 10 seconds.
428     *
429     * @param readTimeout timeout value
430     * @return this {@link Builder} instance
431     */
432    public Builder readTimeout(Duration readTimeout) {
433      this.readTimeout = readTimeout;
434      return this;
435    }
436
437    private Duration getReadTimeout(@Nullable ExporterPushgatewayProperties properties) {
438      if (properties != null && properties.getReadTimeout() != null) {
439        return properties.getReadTimeout();
440      } else if (this.readTimeout != null) {
441        return this.readTimeout;
442      } else {
443        return Duration.ofSeconds(10);
444      }
445    }
446
447    private boolean getPrometheusTimestampsInMs() {
448      // accept either to opt in to timestamps in milliseconds
449      return config.getExporterProperties().getPrometheusTimestampsInMs()
450          || this.prometheusTimestampsInMs;
451    }
452
453    private Scheme getScheme(@Nullable ExporterPushgatewayProperties properties) {
454      if (properties != null && properties.getScheme() != null) {
455        return Scheme.valueOf(properties.getScheme());
456      } else if (this.scheme != null) {
457        return this.scheme;
458      } else {
459        return HTTP;
460      }
461    }
462
463    private String getAddress(@Nullable ExporterPushgatewayProperties properties) {
464      if (properties != null && properties.getAddress() != null) {
465        return properties.getAddress();
466      } else if (this.address != null) {
467        return this.address;
468      } else {
469        return "localhost:9091";
470      }
471    }
472
473    private String getJob(@Nullable ExporterPushgatewayProperties properties) {
474      if (properties != null && properties.getJob() != null) {
475        return properties.getJob();
476      } else if (this.job != null) {
477        return this.job;
478      } else {
479        return DefaultJobLabelDetector.getDefaultJobLabel();
480      }
481    }
482
483    private EscapingScheme getEscapingScheme(@Nullable ExporterPushgatewayProperties properties) {
484      if (properties != null && properties.getEscapingScheme() != null) {
485        return properties.getEscapingScheme();
486      } else if (this.escapingScheme != null) {
487        return this.escapingScheme;
488      }
489      return EscapingScheme.UNDERSCORE_ESCAPING;
490    }
491
492    private Format getFormat() {
493      // currently not configurable via properties
494      if (this.format != null) {
495        return this.format;
496      }
497      return Format.PROMETHEUS_PROTOBUF;
498    }
499
500    private URL makeUrl(@Nullable ExporterPushgatewayProperties properties)
501        throws UnsupportedEncodingException, MalformedURLException {
502      StringBuilder url =
503          new StringBuilder(getScheme(properties) + "://" + getAddress(properties) + "/metrics/");
504      String job = getJob(properties);
505      if (job.contains("/")) {
506        url.append("job@base64/").append(base64url(job));
507      } else {
508        url.append("job/").append(URLEncoder.encode(job, "UTF-8"));
509      }
510      for (Map.Entry<String, String> entry : groupingKey.entrySet()) {
511        if (entry.getValue().isEmpty()) {
512          url.append("/")
513              .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING))
514              .append("@base64/=");
515        } else if (entry.getValue().contains("/")) {
516          url.append("/")
517              .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING))
518              .append("@base64/")
519              .append(base64url(entry.getValue()));
520        } else {
521          url.append("/")
522              .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING))
523              .append("/")
524              .append(URLEncoder.encode(entry.getValue(), "UTF-8"));
525        }
526      }
527      return URI.create(url.toString()).normalize().toURL();
528    }
529
530    private String base64url(String v) {
531      return Base64.getEncoder()
532          .encodeToString(v.getBytes(StandardCharsets.UTF_8))
533          .replace("+", "-")
534          .replace("/", "_");
535    }
536
537    public PushGateway build() {
538      ExporterPushgatewayProperties properties =
539          config == null ? null : config.getExporterPushgatewayProperties();
540      try {
541        return new PushGateway(
542            registry,
543            getFormat(),
544            makeUrl(properties),
545            connectionFactory,
546            requestHeaders,
547            getPrometheusTimestampsInMs(),
548            getEscapingScheme(properties),
549            getConnectionTimeout(properties),
550            getReadTimeout(properties));
551      } catch (MalformedURLException e) {
552        throw new PrometheusPropertiesException(
553            address + ": Invalid address. Expecting <host>:<port>");
554      } catch (UnsupportedEncodingException e) {
555        throw new RuntimeException(e); // cannot happen, UTF-8 is always supported
556      }
557    }
558  }
559}