001package io.prometheus.metrics.exporter.pushgateway;
002
003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP;
004import static io.prometheus.metrics.model.snapshots.PrometheusNaming.escapeName;
005import static java.util.Objects.requireNonNull;
006
007import io.prometheus.metrics.config.EscapingScheme;
008import io.prometheus.metrics.config.ExporterPushgatewayProperties;
009import io.prometheus.metrics.config.PrometheusProperties;
010import io.prometheus.metrics.config.PrometheusPropertiesException;
011import io.prometheus.metrics.expositionformats.ExpositionFormatWriter;
012import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter;
013import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter;
014import io.prometheus.metrics.model.registry.Collector;
015import io.prometheus.metrics.model.registry.MultiCollector;
016import io.prometheus.metrics.model.registry.PrometheusRegistry;
017import java.io.ByteArrayOutputStream;
018import java.io.IOException;
019import java.io.InputStream;
020import java.io.OutputStream;
021import java.io.UnsupportedEncodingException;
022import java.net.HttpURLConnection;
023import java.net.InetAddress;
024import java.net.MalformedURLException;
025import java.net.URI;
026import java.net.URL;
027import java.net.URLEncoder;
028import java.net.UnknownHostException;
029import java.nio.charset.StandardCharsets;
030import java.util.Base64;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.Map;
034import java.util.TreeMap;
035import javax.annotation.Nullable;
036
037/**
038 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus
039 * Pushgateway</a>
040 *
041 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to
042 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead
043 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link
044 * PrometheusRegistry} to a Pushgateway.
045 *
046 * <p>Example usage:
047 *
048 * <pre>{@code
049 * void executeBatchJob() throws Exception {
050 *     PrometheusRegistry registry = new PrometheusRegistry();
051 *     Gauge duration = Gauge.builder()
052 *             .name("my_batch_job_duration_seconds")
053 *             .help("Duration of my batch job in seconds.")
054 *             .register(registry);
055 *     Timer durationTimer = duration.startTimer();
056 *     try {
057 *         // Your code here.
058 *
059 *         // This is only added to the registry after success,
060 *         // so that a previous success in the Pushgateway isn't overwritten on failure.
061 *         Gauge lastSuccess = Gauge.builder()
062 *                 .name("my_batch_job_last_success")
063 *                 .help("Last time my batch job succeeded, in unixtime.")
064 *                 .register(registry);
065 *         lastSuccess.set(System.currentTimeMillis());
066 *     } finally {
067 *         durationTimer.observeDuration();
068 *         PushGateway pg = PushGateway.builder()
069 *                 .address("127.0.0.1:9091")
070 *                 .job("my_batch_job")
071 *                 .registry(registry)
072 *                 .build();
073 *         pg.pushAdd();
074 *     }
075 * }
076 * }</pre>
077 *
078 * <p>See <a
079 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>.
080 */
081public class PushGateway {
082
083  private static final int MILLISECONDS_PER_SECOND = 1000;
084
085  private final URL url;
086  private final ExpositionFormatWriter writer;
087  private final boolean prometheusTimestampsInMs;
088  private final Map<String, String> requestHeaders;
089  private final PrometheusRegistry registry;
090  private final HttpConnectionFactory connectionFactory;
091  private final EscapingScheme escapingScheme;
092
093  private PushGateway(
094      PrometheusRegistry registry,
095      Format format,
096      URL url,
097      HttpConnectionFactory connectionFactory,
098      Map<String, String> requestHeaders,
099      boolean prometheusTimestampsInMs,
100      EscapingScheme escapingScheme) {
101    this.registry = registry;
102    this.url = url;
103    this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders));
104    this.connectionFactory = connectionFactory;
105    this.prometheusTimestampsInMs = prometheusTimestampsInMs;
106    this.escapingScheme = escapingScheme;
107    writer = getWriter(format);
108    if (!writer.isAvailable()) {
109      throw new RuntimeException(writer.getClass() + " is not available");
110    }
111  }
112
113  @SuppressWarnings("deprecation")
114  private ExpositionFormatWriter getWriter(Format format) {
115    if (format == Format.PROMETHEUS_TEXT) {
116      return PrometheusTextFormatWriter.builder()
117          .setTimestampsInMs(this.prometheusTimestampsInMs)
118          .build();
119    } else {
120      // use reflection to avoid a compile-time dependency on the expositionformats module
121      return new PrometheusProtobufWriter();
122    }
123  }
124
125  /**
126   * Push all metrics. All metrics with the same job and grouping key are replaced.
127   *
128   * <p>This uses the PUT HTTP method.
129   */
130  public void push() throws IOException {
131    doRequest(registry, "PUT");
132  }
133
134  /**
135   * Push a single metric. All metrics with the same job and grouping key are replaced.
136   *
137   * <p>This is useful for pushing a single Gauge.
138   *
139   * <p>This uses the PUT HTTP method.
140   */
141  public void push(Collector collector) throws IOException {
142    PrometheusRegistry registry = new PrometheusRegistry();
143    registry.register(collector);
144    doRequest(registry, "PUT");
145  }
146
147  /**
148   * Push a single collector. All metrics with the same job and grouping key are replaced.
149   *
150   * <p>This uses the PUT HTTP method.
151   */
152  public void push(MultiCollector collector) throws IOException {
153    PrometheusRegistry registry = new PrometheusRegistry();
154    registry.register(collector);
155    doRequest(registry, "PUT");
156  }
157
158  /**
159   * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are
160   * replaced.
161   *
162   * <p>This uses the POST HTTP method.
163   */
164  public void pushAdd() throws IOException {
165    doRequest(registry, "POST");
166  }
167
168  /**
169   * Like {@link #push(Collector)}, but only the specified metric will be replaced.
170   *
171   * <p>This uses the POST HTTP method.
172   */
173  public void pushAdd(Collector collector) throws IOException {
174    PrometheusRegistry registry = new PrometheusRegistry();
175    registry.register(collector);
176    doRequest(registry, "POST");
177  }
178
179  /**
180   * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced.
181   *
182   * <p>This uses the POST HTTP method.
183   */
184  public void pushAdd(MultiCollector collector) throws IOException {
185    PrometheusRegistry registry = new PrometheusRegistry();
186    registry.register(collector);
187    doRequest(registry, "POST");
188  }
189
190  /**
191   * Deletes metrics from the Pushgateway.
192   *
193   * <p>This uses the DELETE HTTP method.
194   */
195  public void delete() throws IOException {
196    doRequest(null, "DELETE");
197  }
198
199  private void doRequest(@Nullable PrometheusRegistry registry, String method) throws IOException {
200    try {
201      HttpURLConnection connection = connectionFactory.create(url);
202      requestHeaders.forEach(connection::setRequestProperty);
203      connection.setRequestProperty("Content-Type", writer.getContentType());
204      if (!method.equals("DELETE")) {
205        connection.setDoOutput(true);
206      }
207      connection.setRequestMethod(method);
208
209      connection.setConnectTimeout(10 * MILLISECONDS_PER_SECOND);
210      connection.setReadTimeout(10 * MILLISECONDS_PER_SECOND);
211      connection.connect();
212
213      try {
214        if (!method.equals("DELETE")) {
215          OutputStream outputStream = connection.getOutputStream();
216          writer.write(outputStream, requireNonNull(registry).scrape(), this.escapingScheme);
217          outputStream.flush();
218          outputStream.close();
219        }
220
221        int response = connection.getResponseCode();
222        if (response / 100 != 2) {
223          String errorMessage;
224          InputStream errorStream = connection.getErrorStream();
225          if (errorStream != null) {
226            String errBody = readFromStream(errorStream);
227            errorMessage =
228                "Response code from " + url + " was " + response + ", response body: " + errBody;
229          } else {
230            errorMessage = "Response code from " + url + " was " + response;
231          }
232          throw new IOException(errorMessage);
233        }
234
235      } finally {
236        connection.disconnect();
237      }
238    } catch (IOException e) {
239      String baseUrl = url.getProtocol() + "://" + url.getHost();
240      if (url.getPort() != -1) {
241        baseUrl += ":" + url.getPort();
242      }
243      throw new IOException(
244          "Failed to push metrics to the Prometheus Pushgateway on "
245              + baseUrl
246              + ": "
247              + e.getMessage(),
248          e);
249    }
250  }
251
252  private static String readFromStream(InputStream is) throws IOException {
253    ByteArrayOutputStream result = new ByteArrayOutputStream();
254    byte[] buffer = new byte[1024];
255    int length;
256    while ((length = is.read(buffer)) != -1) {
257      result.write(buffer, 0, length);
258    }
259    return result.toString("UTF-8");
260  }
261
262  public static Builder builder() {
263    return builder(PrometheusProperties.get());
264  }
265
266  /**
267   * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}.
268   */
269  public static Builder builder(PrometheusProperties config) {
270    return new Builder(config);
271  }
272
273  public static class Builder {
274
275    private final PrometheusProperties config;
276    @Nullable private Format format;
277    @Nullable private String address;
278    @Nullable private Scheme scheme;
279    @Nullable private String job;
280    private boolean prometheusTimestampsInMs;
281    private final Map<String, String> requestHeaders = new HashMap<>();
282    private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry;
283    private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory();
284    private final Map<String, String> groupingKey = new TreeMap<>();
285    @Nullable private EscapingScheme escapingScheme;
286
287    private Builder(PrometheusProperties config) {
288      this.config = config;
289    }
290
291    /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */
292    public Builder format(Format format) {
293      this.format = requireNonNull(format, "format must not be null");
294      return this;
295    }
296
297    /**
298     * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}.
299     * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address}
300     * property.
301     */
302    public Builder address(String address) {
303      this.address = requireNonNull(address, "address must not be null");
304      return this;
305    }
306
307    /** Username and password for HTTP basic auth when pushing to the Pushgateway. */
308    public Builder basicAuth(String user, String password) {
309      byte[] credentialsBytes =
310          (requireNonNull(user, "user must not be null")
311                  + ":"
312                  + requireNonNull(password, "password must not be null"))
313              .getBytes(StandardCharsets.UTF_8);
314      String encoded = Base64.getEncoder().encodeToString(credentialsBytes);
315      requestHeaders.put("Authorization", String.format("Basic %s", encoded));
316      return this;
317    }
318
319    /** Bearer token authorization when pushing to the Pushgateway. */
320    public Builder bearerToken(String token) {
321      requestHeaders.put(
322          "Authorization",
323          String.format("Bearer %s", requireNonNull(token, "token must not be null")));
324      return this;
325    }
326
327    /**
328     * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten
329     * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property.
330     */
331    public Builder scheme(Scheme scheme) {
332      this.scheme = requireNonNull(scheme, "scheme must not be null");
333      return this;
334    }
335
336    /**
337     * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}.
338     *
339     * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example
340     * of a custom connection factory that skips SSL certificate validation for HTTPS connections.
341     */
342    public Builder connectionFactory(HttpConnectionFactory connectionFactory) {
343      this.connectionFactory =
344          requireNonNull(connectionFactory, "connectionFactory must not be null");
345      return this;
346    }
347
348    /**
349     * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR
350     * file will be used by default. Can be overwritten at runtime with the {@code
351     * io.prometheus.exporter.pushgateway.job} property.
352     */
353    public Builder job(String job) {
354      this.job = requireNonNull(job, "job must not be null");
355      return this;
356    }
357
358    /**
359     * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for
360     * adding multiple grouping keys.
361     */
362    public Builder groupingKey(String name, String value) {
363      groupingKey.put(
364          requireNonNull(name, "name must not be null"),
365          requireNonNull(value, "value must not be null"));
366      return this;
367    }
368
369    /** Convenience method for adding the current IP address as an "instance" label. */
370    public Builder instanceIpGroupingKey() throws UnknownHostException {
371      return groupingKey("instance", InetAddress.getLocalHost().getHostAddress());
372    }
373
374    /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */
375    public Builder registry(PrometheusRegistry registry) {
376      this.registry = requireNonNull(registry, "registry must not be null");
377      return this;
378    }
379
380    /**
381     * Specify the escaping scheme to be used when pushing metrics. Default is {@link
382     * EscapingScheme#UNDERSCORE_ESCAPING}.
383     */
384    public Builder escapingScheme(EscapingScheme escapingScheme) {
385      this.escapingScheme = requireNonNull(escapingScheme, "escapingScheme must not be null");
386      return this;
387    }
388
389    /**
390     * Use milliseconds for timestamps in text format? Default is {@code false}. Can be overwritten
391     * at runtime with the {@code io.prometheus.exporter.timestampsInMs} property.
392     */
393    public Builder prometheusTimestampsInMs(boolean prometheusTimestampsInMs) {
394      this.prometheusTimestampsInMs = prometheusTimestampsInMs;
395      return this;
396    }
397
398    private boolean getPrometheusTimestampsInMs() {
399      // accept either to opt in to timestamps in milliseconds
400      return config.getExporterProperties().getPrometheusTimestampsInMs()
401          || this.prometheusTimestampsInMs;
402    }
403
404    private Scheme getScheme(@Nullable ExporterPushgatewayProperties properties) {
405      if (properties != null && properties.getScheme() != null) {
406        return Scheme.valueOf(properties.getScheme());
407      } else if (this.scheme != null) {
408        return this.scheme;
409      } else {
410        return HTTP;
411      }
412    }
413
414    private String getAddress(@Nullable ExporterPushgatewayProperties properties) {
415      if (properties != null && properties.getAddress() != null) {
416        return properties.getAddress();
417      } else if (this.address != null) {
418        return this.address;
419      } else {
420        return "localhost:9091";
421      }
422    }
423
424    private String getJob(@Nullable ExporterPushgatewayProperties properties) {
425      if (properties != null && properties.getJob() != null) {
426        return properties.getJob();
427      } else if (this.job != null) {
428        return this.job;
429      } else {
430        return DefaultJobLabelDetector.getDefaultJobLabel();
431      }
432    }
433
434    private EscapingScheme getEscapingScheme(@Nullable ExporterPushgatewayProperties properties) {
435      if (properties != null && properties.getEscapingScheme() != null) {
436        return properties.getEscapingScheme();
437      } else if (this.escapingScheme != null) {
438        return this.escapingScheme;
439      }
440      return EscapingScheme.UNDERSCORE_ESCAPING;
441    }
442
443    private Format getFormat() {
444      // currently not configurable via properties
445      if (this.format != null) {
446        return this.format;
447      }
448      return Format.PROMETHEUS_PROTOBUF;
449    }
450
451    private URL makeUrl(@Nullable ExporterPushgatewayProperties properties)
452        throws UnsupportedEncodingException, MalformedURLException {
453      StringBuilder url =
454          new StringBuilder(getScheme(properties) + "://" + getAddress(properties) + "/metrics/");
455      String job = getJob(properties);
456      if (job.contains("/")) {
457        url.append("job@base64/").append(base64url(job));
458      } else {
459        url.append("job/").append(URLEncoder.encode(job, "UTF-8"));
460      }
461      for (Map.Entry<String, String> entry : groupingKey.entrySet()) {
462        if (entry.getValue().isEmpty()) {
463          url.append("/")
464              .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING))
465              .append("@base64/=");
466        } else if (entry.getValue().contains("/")) {
467          url.append("/")
468              .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING))
469              .append("@base64/")
470              .append(base64url(entry.getValue()));
471        } else {
472          url.append("/")
473              .append(escapeName(entry.getKey(), EscapingScheme.VALUE_ENCODING_ESCAPING))
474              .append("/")
475              .append(URLEncoder.encode(entry.getValue(), "UTF-8"));
476        }
477      }
478      return URI.create(url.toString()).normalize().toURL();
479    }
480
481    private String base64url(String v) {
482      return Base64.getEncoder()
483          .encodeToString(v.getBytes(StandardCharsets.UTF_8))
484          .replace("+", "-")
485          .replace("/", "_");
486    }
487
488    public PushGateway build() {
489      ExporterPushgatewayProperties properties =
490          config == null ? null : config.getExporterPushgatewayProperties();
491      try {
492        return new PushGateway(
493            registry,
494            getFormat(),
495            makeUrl(properties),
496            connectionFactory,
497            requestHeaders,
498            getPrometheusTimestampsInMs(),
499            getEscapingScheme(properties));
500      } catch (MalformedURLException e) {
501        throw new PrometheusPropertiesException(
502            address + ": Invalid address. Expecting <host>:<port>");
503      } catch (UnsupportedEncodingException e) {
504        throw new RuntimeException(e); // cannot happen, UTF-8 is always supported
505      }
506    }
507  }
508}