001package io.prometheus.metrics.exporter.pushgateway;
002
003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP;
004import static io.prometheus.metrics.model.snapshots.PrometheusNaming.escapeName;
005import static java.util.Objects.requireNonNull;
006
007import io.prometheus.metrics.annotations.StableApi;
008import io.prometheus.metrics.config.EscapingScheme;
009import io.prometheus.metrics.config.ExporterPushgatewayProperties;
010import io.prometheus.metrics.config.PrometheusProperties;
011import io.prometheus.metrics.config.PrometheusPropertiesException;
012import io.prometheus.metrics.expositionformats.ExpositionFormatWriter;
013import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter;
014import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter;
015import io.prometheus.metrics.model.registry.Collector;
016import io.prometheus.metrics.model.registry.MultiCollector;
017import io.prometheus.metrics.model.registry.PrometheusRegistry;
018import java.io.ByteArrayOutputStream;
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.OutputStream;
022import java.io.UnsupportedEncodingException;
023import java.net.HttpURLConnection;
024import java.net.InetAddress;
025import java.net.MalformedURLException;
026import java.net.URI;
027import java.net.URL;
028import java.net.URLEncoder;
029import java.net.UnknownHostException;
030import java.nio.charset.StandardCharsets;
031import java.time.Duration;
032import java.util.Base64;
033import java.util.Collections;
034import java.util.HashMap;
035import java.util.Map;
036import java.util.TreeMap;
037import javax.annotation.Nullable;
038
039/**
040 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus
041 * Pushgateway</a>
042 *
043 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to
044 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead
045 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link
046 * PrometheusRegistry} to a Pushgateway.
047 *
048 * <p>Example usage:
049 *
050 * <pre>{@code
051 * void executeBatchJob() throws Exception {
052 *     PrometheusRegistry registry = new PrometheusRegistry();
053 *     Gauge duration = Gauge.builder()
054 *             .name("my_batch_job_duration_seconds")
055 *             .help("Duration of my batch job in seconds.")
056 *             .register(registry);
057 *     Timer durationTimer = duration.startTimer();
058 *     try {
059 *         // Your code here.
060 *
061 *         // This is only added to the registry after success,
062 *         // so that a previous success in the Pushgateway isn't overwritten on failure.
063 *         Gauge lastSuccess = Gauge.builder()
064 *                 .name("my_batch_job_last_success")
065 *                 .help("Last time my batch job succeeded, in unixtime.")
066 *                 .register(registry);
067 *         lastSuccess.set(System.currentTimeMillis());
068 *     } finally {
069 *         durationTimer.observeDuration();
070 *         PushGateway pg = PushGateway.builder()
071 *                 .address("127.0.0.1:9091")
072 *                 .job("my_batch_job")
073 *                 .registry(registry)
074 *                 .build();
075 *         pg.pushAdd();
076 *     }
077 * }
078 * }</pre>
079 *
080 * <p>See <a
081 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>.
082 */
083@StableApi
084public class PushGateway {
085  private final URL url;
086  private final ExpositionFormatWriter writer;
087  private final boolean prometheusTimestampsInMs;
088  private final Map<String, String> requestHeaders;
089  private final PrometheusRegistry registry;
090  private final HttpConnectionFactory connectionFactory;
091  private final EscapingScheme escapingScheme;
092  private final Duration connectionTimeout;
093  private final Duration readTimeout;
094
095  private PushGateway(
096      PrometheusRegistry registry,
097      Format format,
098      URL url,
099      HttpConnectionFactory connectionFactory,
100      Map<String, String> requestHeaders,
101      boolean prometheusTimestampsInMs,
102      EscapingScheme escapingScheme,
103      Duration connectionTimeout,
104      Duration readTimeout) {
105    this.registry = registry;
106    this.url = url;
107    this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders));
108    this.connectionFactory = connectionFactory;
109    this.prometheusTimestampsInMs = prometheusTimestampsInMs;
110    this.escapingScheme = escapingScheme;
111    this.connectionTimeout = connectionTimeout;
112    this.readTimeout = readTimeout;
113    writer = getWriter(format);
114    if (!writer.isAvailable()) {
115      throw new RuntimeException(writer.getClass() + " is not available");
116    }
117  }
118
119  @SuppressWarnings("deprecation")
120  private ExpositionFormatWriter getWriter(Format format) {
121    if (format == Format.PROMETHEUS_TEXT) {
122      return PrometheusTextFormatWriter.builder()
123          .setTimestampsInMs(this.prometheusTimestampsInMs)
124          .build();
125    } else {
126      // use reflection to avoid a compile-time dependency on the expositionformats module
127      return new PrometheusProtobufWriter();
128    }
129  }
130
131  /**
132   * Push all metrics. All metrics with the same job and grouping key are replaced.
133   *
134   * <p>This uses the PUT HTTP method.
135   */
136  public void push() throws IOException {
137    doRequest(registry, "PUT");
138  }
139
140  /**
141   * Push a single metric. All metrics with the same job and grouping key are replaced.
142   *
143   * <p>This is useful for pushing a single Gauge.
144   *
145   * <p>This uses the PUT HTTP method.
146   */
147  public void push(Collector collector) throws IOException {
148    PrometheusRegistry registry = new PrometheusRegistry();
149    registry.register(collector);
150    doRequest(registry, "PUT");
151  }
152
153  /**
154   * Push a single collector. All metrics with the same job and grouping key are replaced.
155   *
156   * <p>This uses the PUT HTTP method.
157   */
158  public void push(MultiCollector collector) throws IOException {
159    PrometheusRegistry registry = new PrometheusRegistry();
160    registry.register(collector);
161    doRequest(registry, "PUT");
162  }
163
164  /**
165   * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are
166   * replaced.
167   *
168   * <p>This uses the POST HTTP method.
169   */
170  public void pushAdd() throws IOException {
171    doRequest(registry, "POST");
172  }
173
174  /**
175   * Like {@link #push(Collector)}, but only the specified metric will be replaced.
176   *
177   * <p>This uses the POST HTTP method.
178   */
179  public void pushAdd(Collector collector) throws IOException {
180    PrometheusRegistry registry = new PrometheusRegistry();
181    registry.register(collector);
182    doRequest(registry, "POST");
183  }
184
185  /**
186   * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced.
187   *
188   * <p>This uses the POST HTTP method.
189   */
190  public void pushAdd(MultiCollector collector) throws IOException {
191    PrometheusRegistry registry = new PrometheusRegistry();
192    registry.register(collector);
193    doRequest(registry, "POST");
194  }
195
196  /**
197   * Deletes metrics from the Pushgateway.
198   *
199   * <p>This uses the DELETE HTTP method.
200   */
201  public void delete() throws IOException {
202    doRequest(null, "DELETE");
203  }
204
205  private void doRequest(@Nullable PrometheusRegistry registry, String method) throws IOException {
206    try {
207      HttpURLConnection connection = connectionFactory.create(url);
208      requestHeaders.forEach(connection::setRequestProperty);
209      connection.setRequestProperty("Content-Type", writer.getContentType());
210      if (!method.equals("DELETE")) {
211        connection.setDoOutput(true);
212      }
213      connection.setRequestMethod(method);
214
215      connection.setConnectTimeout((int) this.connectionTimeout.toMillis());
216      connection.setReadTimeout((int) this.readTimeout.toMillis());
217      connection.connect();
218
219      try {
220        if (!method.equals("DELETE")) {
221          OutputStream outputStream = connection.getOutputStream();
222          writer.write(outputStream, requireNonNull(registry).scrape(), this.escapingScheme);
223          outputStream.flush();
224          outputStream.close();
225        }
226
227        int response = connection.getResponseCode();
228        if (response / 100 != 2) {
229          String errorMessage;
230          InputStream errorStream = connection.getErrorStream();
231          if (errorStream != null) {
232            String errBody = readFromStream(errorStream);
233            errorMessage =
234                "Response code from " + url + " was " + response + ", response body: " + errBody;
235          } else {
236            errorMessage = "Response code from " + url + " was " + response;
237          }
238          throw new IOException(errorMessage);
239        }
240
241      } finally {
242        connection.disconnect();
243      }
244    } catch (IOException e) {
245      String baseUrl = url.getProtocol() + "://" + url.getHost();
246      if (url.getPort() != -1) {
247        baseUrl += ":" + url.getPort();
248      }
249      throw new IOException(
250          "Failed to push metrics to the Prometheus Pushgateway on "
251              + baseUrl
252              + ": "
253              + e.getMessage(),
254          e);
255    }
256  }
257
258  // toString with Charset is only available in Java 10+, but we want to support Java 8
259  @SuppressWarnings("JdkObsolete")
260  private static String readFromStream(InputStream is) throws IOException {
261    ByteArrayOutputStream result = new ByteArrayOutputStream();
262    byte[] buffer = new byte[1024];
263    int length;
264    while ((length = is.read(buffer)) != -1) {
265      result.write(buffer, 0, length);
266    }
267    return result.toString("UTF-8");
268  }
269
270  public static Builder builder() {
271    return builder(PrometheusProperties.get());
272  }
273
274  /**
275   * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}.
276   */
277  public static Builder builder(PrometheusProperties config) {
278    return new Builder(config);
279  }
280
281  public static class Builder {
282
283    private final PrometheusProperties config;
284    @Nullable private Format format;
285    @Nullable private String address;
286    @Nullable private Scheme scheme;
287    @Nullable private String job;
288    @Nullable private Duration connectionTimeout;
289    @Nullable private Duration readTimeout;
290    private boolean prometheusTimestampsInMs;
291    private final Map<String, String> requestHeaders = new HashMap<>();
292    private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry;
293    private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory();
294    private final Map<String, String> groupingKey = new TreeMap<>();
295    @Nullable private EscapingScheme escapingScheme;
296
297    private Builder(PrometheusProperties config) {
298      this.config = config;
299    }
300
301    /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */
302    public Builder format(Format format) {
303      this.format = requireNonNull(format, "format must not be null");
304      return this;
305    }
306
307    /**
308     * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}.
309     * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address}
310     * property.
311     */
312    public Builder address(String address) {
313      this.address = requireNonNull(address, "address must not be null");
314      return this;
315    }
316
317    /** Username and password for HTTP basic auth when pushing to the Pushgateway. */
318    public Builder basicAuth(String user, String password) {
319      byte[] credentialsBytes =
320          (requireNonNull(user, "user must not be null")
321                  + ":"
322                  + requireNonNull(password, "password must not be null"))
323              .getBytes(StandardCharsets.UTF_8);
324      String encoded = Base64.getEncoder().encodeToString(credentialsBytes);
325      requestHeaders.put("Authorization", String.format("Basic %s", encoded));
326      return this;
327    }
328
329    /** Bearer token authorization when pushing to the Pushgateway. */
330    public Builder bearerToken(String token) {
331      requestHeaders.put(
332          "Authorization",
333          String.format("Bearer %s", requireNonNull(token, "token must not be null")));
334      return this;
335    }
336
337    /**
338     * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten
339     * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property.
340     */
341    public Builder scheme(Scheme scheme) {
342      this.scheme = requireNonNull(scheme, "scheme must not be null");
343      return this;
344    }
345
346    /**
347     * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}.
348     *
349     * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example
350     * of a custom connection factory that skips SSL certificate validation for HTTPS connections.
351     */
352    public Builder connectionFactory(HttpConnectionFactory connectionFactory) {
353      this.connectionFactory =
354          requireNonNull(connectionFactory, "connectionFactory must not be null");
355      return this;
356    }
357
358    /**
359     * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR
360     * file will be used by default. Can be overwritten at runtime with the {@code
361     * io.prometheus.exporter.pushgateway.job} property.
362     */
363    public Builder job(String job) {
364      this.job = requireNonNull(job, "job must not be null");
365      return this;
366    }
367
368    /**
369     * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for
370     * adding multiple grouping keys.
371     */
372    public Builder groupingKey(String name, String value) {
373      groupingKey.put(
374          requireNonNull(name, "name must not be null"),
375          requireNonNull(value, "value must not be null"));
376      return this;
377    }
378
379    /** Convenience method for adding the current IP address as an "instance" label. */
380    public Builder instanceIpGroupingKey() throws UnknownHostException {
381      return groupingKey("instance", InetAddress.getLocalHost().getHostAddress());
382    }
383
384    /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */
385    public Builder registry(PrometheusRegistry registry) {
386      this.registry = requireNonNull(registry, "registry must not be null");
387      return this;
388    }
389
390    /**
391     * Specify the escaping scheme to be used when pushing metrics. Default is {@link
392     * EscapingScheme#UNDERSCORE_ESCAPING}.
393     */
394    public Builder escapingScheme(EscapingScheme escapingScheme) {
395      this.escapingScheme = requireNonNull(escapingScheme, "escapingScheme must not be null");
396      return this;
397    }
398
399    /**
400     * Use milliseconds for timestamps in text format? Default is {@code false}. Can be overwritten
401     * at runtime with the {@code io.prometheus.exporter.timestamps_in_ms} property.
402     */
403    public Builder prometheusTimestampsInMs(boolean prometheusTimestampsInMs) {
404      this.prometheusTimestampsInMs = prometheusTimestampsInMs;
405      return this;
406    }
407
408    /**
409     * Specify the connection timeout for HTTP connections to the PushGateway. Default is 10
410     * seconds.
411     *
412     * @param connectionTimeout timeout value
413     * @return this {@link Builder} instance
414     */
415    public Builder connectionTimeout(Duration connectionTimeout) {
416      this.connectionTimeout = connectionTimeout;
417      return this;
418    }
419
420    private Duration getConnectionTimeout(@Nullable ExporterPushgatewayProperties properties) {
421      if (properties != null && properties.getConnectTimeout() != null) {
422        return properties.getConnectTimeout();
423      } else if (this.connectionTimeout != null) {
424        return this.connectionTimeout;
425      } else {
426        return Duration.ofSeconds(10);
427      }
428    }
429
430    /**
431     * Specify the read timeout for HTTP connections to the PushGateway. Default is 10 seconds.
432     *
433     * @param readTimeout timeout value
434     * @return this {@link Builder} instance
435     */
436    public Builder readTimeout(Duration readTimeout) {
437      this.readTimeout = readTimeout;
438      return this;
439    }
440
441    private Duration getReadTimeout(@Nullable ExporterPushgatewayProperties properties) {
442      if (properties != null && properties.getReadTimeout() != null) {
443        return properties.getReadTimeout();
444      } else if (this.readTimeout != null) {
445        return this.readTimeout;
446      } else {
447        return Duration.ofSeconds(10);
448      }
449    }
450
451    private boolean getPrometheusTimestampsInMs() {
452      // accept either to opt in to timestamps in milliseconds
453      return config.getExporterProperties().getPrometheusTimestampsInMs()
454          || this.prometheusTimestampsInMs;
455    }
456
457    private Scheme getScheme(@Nullable ExporterPushgatewayProperties properties) {
458      if (properties != null && properties.getScheme() != null) {
459        return Scheme.valueOf(properties.getScheme());
460      } else if (this.scheme != null) {
461        return this.scheme;
462      } else {
463        return HTTP;
464      }
465    }
466
467    private String getAddress(@Nullable ExporterPushgatewayProperties properties) {
468      if (properties != null && properties.getAddress() != null) {
469        return properties.getAddress();
470      } else if (this.address != null) {
471        return this.address;
472      } else {
473        return "localhost:9091";
474      }
475    }
476
477    private String getJob(@Nullable ExporterPushgatewayProperties properties) {
478      if (properties != null && properties.getJob() != null) {
479        return properties.getJob();
480      } else if (this.job != null) {
481        return this.job;
482      } else {
483        return DefaultJobLabelDetector.getDefaultJobLabel();
484      }
485    }
486
487    private EscapingScheme getEscapingScheme(@Nullable ExporterPushgatewayProperties properties) {
488      if (properties != null && properties.getEscapingScheme() != null) {
489        return properties.getEscapingScheme();
490      } else if (this.escapingScheme != null) {
491        return this.escapingScheme;
492      }
493      return EscapingScheme.UNDERSCORE_ESCAPING;
494    }
495
496    private Format getFormat() {
497      // currently not configurable via properties
498      if (this.format != null) {
499        return this.format;
500      }
501      return Format.PROMETHEUS_PROTOBUF;
502    }
503
504    // encode with Charset is only available in Java 10+, but we want to support Java 8
505    @SuppressWarnings("JdkObsolete")
506    private URL makeUrl(@Nullable ExporterPushgatewayProperties properties)
507        throws UnsupportedEncodingException, MalformedURLException {
508      StringBuilder url =
509          new StringBuilder(getScheme(properties) + "://" + getAddress(properties) + "/metrics/");
510      String job = getJob(properties);
511      if (job.contains("/")) {
512        url.append("job@base64/").append(base64url(job));
513      } else {
514        url.append("job/").append(URLEncoder.encode(job, "UTF-8"));
515      }
516      for (Map.Entry<String, String> entry : groupingKey.entrySet()) {
517        if (entry.getValue().isEmpty()) {
518          url.append("/")
519              .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING))
520              .append("@base64/=");
521        } else if (entry.getValue().contains("/")) {
522          url.append("/")
523              .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING))
524              .append("@base64/")
525              .append(base64url(entry.getValue()));
526        } else {
527          url.append("/")
528              .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING))
529              .append("/")
530              .append(URLEncoder.encode(entry.getValue(), "UTF-8"));
531        }
532      }
533      return URI.create(url.toString()).normalize().toURL();
534    }
535
536    private String base64url(String v) {
537      return Base64.getEncoder()
538          .encodeToString(v.getBytes(StandardCharsets.UTF_8))
539          .replace("+", "-")
540          .replace("/", "_");
541    }
542
543    public PushGateway build() {
544      ExporterPushgatewayProperties properties =
545          config == null ? null : config.getExporterPushgatewayProperties();
546      try {
547        return new PushGateway(
548            registry,
549            getFormat(),
550            makeUrl(properties),
551            connectionFactory,
552            requestHeaders,
553            getPrometheusTimestampsInMs(),
554            getEscapingScheme(properties),
555            getConnectionTimeout(properties),
556            getReadTimeout(properties));
557      } catch (MalformedURLException e) {
558        throw new PrometheusPropertiesException(
559            address + ": Invalid address. Expecting <host>:<port>");
560      } catch (UnsupportedEncodingException e) {
561        throw new RuntimeException(e); // cannot happen, UTF-8 is always supported
562      }
563    }
564  }
565}