001package io.prometheus.metrics.exporter.pushgateway;
002
003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP;
004
005import io.prometheus.metrics.config.ExporterPushgatewayProperties;
006import io.prometheus.metrics.config.PrometheusProperties;
007import io.prometheus.metrics.config.PrometheusPropertiesException;
008import io.prometheus.metrics.expositionformats.ExpositionFormatWriter;
009import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter;
010import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter;
011import io.prometheus.metrics.model.registry.Collector;
012import io.prometheus.metrics.model.registry.MultiCollector;
013import io.prometheus.metrics.model.registry.PrometheusRegistry;
014import java.io.ByteArrayOutputStream;
015import java.io.IOException;
016import java.io.InputStream;
017import java.io.OutputStream;
018import java.io.UnsupportedEncodingException;
019import java.net.HttpURLConnection;
020import java.net.InetAddress;
021import java.net.MalformedURLException;
022import java.net.URI;
023import java.net.URL;
024import java.net.URLEncoder;
025import java.net.UnknownHostException;
026import java.nio.charset.StandardCharsets;
027import java.util.Base64;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.Map;
031import java.util.TreeMap;
032
033/**
034 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus
035 * Pushgateway</a>
036 *
037 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to
038 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead
039 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link
040 * PrometheusRegistry} to a Pushgateway.
041 *
042 * <p>Example usage:
043 *
044 * <pre>{@code
045 * void executeBatchJob() throws Exception {
046 *     PrometheusRegistry registry = new PrometheusRegistry();
047 *     Gauge duration = Gauge.builder()
048 *             .name("my_batch_job_duration_seconds")
049 *             .help("Duration of my batch job in seconds.")
050 *             .register(registry);
051 *     Timer durationTimer = duration.startTimer();
052 *     try {
053 *         // Your code here.
054 *
055 *         // This is only added to the registry after success,
056 *         // so that a previous success in the Pushgateway isn't overwritten on failure.
057 *         Gauge lastSuccess = Gauge.builder()
058 *                 .name("my_batch_job_last_success")
059 *                 .help("Last time my batch job succeeded, in unixtime.")
060 *                 .register(registry);
061 *         lastSuccess.set(System.currentTimeMillis());
062 *     } finally {
063 *         durationTimer.observeDuration();
064 *         PushGateway pg = PushGateway.builder()
065 *                 .address("127.0.0.1:9091")
066 *                 .job("my_batch_job")
067 *                 .registry(registry)
068 *                 .build();
069 *         pg.pushAdd();
070 *     }
071 * }
072 * }</pre>
073 *
074 * <p>See <a
075 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>.
076 */
077public class PushGateway {
078
079  private static final int MILLISECONDS_PER_SECOND = 1000;
080
081  private final URL url;
082  private final ExpositionFormatWriter writer;
083  private final boolean prometheusTimestampsInMs;
084  private final Map<String, String> requestHeaders;
085  private final PrometheusRegistry registry;
086  private final HttpConnectionFactory connectionFactory;
087
088  private PushGateway(
089      PrometheusRegistry registry,
090      Format format,
091      URL url,
092      HttpConnectionFactory connectionFactory,
093      Map<String, String> requestHeaders,
094      boolean prometheusTimestampsInMs) {
095    this.registry = registry;
096    this.url = url;
097    this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders));
098    this.connectionFactory = connectionFactory;
099    this.prometheusTimestampsInMs = prometheusTimestampsInMs;
100    writer = getWriter(format);
101    if (!writer.isAvailable()) {
102      throw new RuntimeException(writer.getClass() + " is not available");
103    }
104  }
105
106  @SuppressWarnings("deprecation")
107  private ExpositionFormatWriter getWriter(Format format) {
108    if (format == Format.PROMETHEUS_TEXT) {
109      return PrometheusTextFormatWriter.builder()
110          .setTimestampsInMs(this.prometheusTimestampsInMs)
111          .build();
112    } else {
113      // use reflection to avoid a compile-time dependency on the expositionformats module
114      return new PrometheusProtobufWriter();
115    }
116  }
117
118  /**
119   * Push all metrics. All metrics with the same job and grouping key are replaced.
120   *
121   * <p>This uses the PUT HTTP method.
122   */
123  public void push() throws IOException {
124    doRequest(registry, "PUT");
125  }
126
127  /**
128   * Push a single metric. All metrics with the same job and grouping key are replaced.
129   *
130   * <p>This is useful for pushing a single Gauge.
131   *
132   * <p>This uses the PUT HTTP method.
133   */
134  public void push(Collector collector) throws IOException {
135    PrometheusRegistry registry = new PrometheusRegistry();
136    registry.register(collector);
137    doRequest(registry, "PUT");
138  }
139
140  /**
141   * Push a single collector. All metrics with the same job and grouping key are replaced.
142   *
143   * <p>This uses the PUT HTTP method.
144   */
145  public void push(MultiCollector collector) throws IOException {
146    PrometheusRegistry registry = new PrometheusRegistry();
147    registry.register(collector);
148    doRequest(registry, "PUT");
149  }
150
151  /**
152   * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are
153   * replaced.
154   *
155   * <p>This uses the POST HTTP method.
156   */
157  public void pushAdd() throws IOException {
158    doRequest(registry, "POST");
159  }
160
161  /**
162   * Like {@link #push(Collector)}, but only the specified metric will be replaced.
163   *
164   * <p>This uses the POST HTTP method.
165   */
166  public void pushAdd(Collector collector) throws IOException {
167    PrometheusRegistry registry = new PrometheusRegistry();
168    registry.register(collector);
169    doRequest(registry, "POST");
170  }
171
172  /**
173   * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced.
174   *
175   * <p>This uses the POST HTTP method.
176   */
177  public void pushAdd(MultiCollector collector) throws IOException {
178    PrometheusRegistry registry = new PrometheusRegistry();
179    registry.register(collector);
180    doRequest(registry, "POST");
181  }
182
183  /**
184   * Deletes metrics from the Pushgateway.
185   *
186   * <p>This uses the DELETE HTTP method.
187   */
188  public void delete() throws IOException {
189    doRequest(null, "DELETE");
190  }
191
192  private void doRequest(PrometheusRegistry registry, String method) throws IOException {
193    try {
194      HttpURLConnection connection = connectionFactory.create(url);
195      requestHeaders.forEach(connection::setRequestProperty);
196      connection.setRequestProperty("Content-Type", writer.getContentType());
197      if (!method.equals("DELETE")) {
198        connection.setDoOutput(true);
199      }
200      connection.setRequestMethod(method);
201
202      connection.setConnectTimeout(10 * MILLISECONDS_PER_SECOND);
203      connection.setReadTimeout(10 * MILLISECONDS_PER_SECOND);
204      connection.connect();
205
206      try {
207        if (!method.equals("DELETE")) {
208          OutputStream outputStream = connection.getOutputStream();
209          writer.write(outputStream, registry.scrape());
210          outputStream.flush();
211          outputStream.close();
212        }
213
214        int response = connection.getResponseCode();
215        if (response / 100 != 2) {
216          String errorMessage;
217          InputStream errorStream = connection.getErrorStream();
218          if (errorStream != null) {
219            String errBody = readFromStream(errorStream);
220            errorMessage =
221                "Response code from " + url + " was " + response + ", response body: " + errBody;
222          } else {
223            errorMessage = "Response code from " + url + " was " + response;
224          }
225          throw new IOException(errorMessage);
226        }
227
228      } finally {
229        connection.disconnect();
230      }
231    } catch (IOException e) {
232      String baseUrl = url.getProtocol() + "://" + url.getHost();
233      if (url.getPort() != -1) {
234        baseUrl += ":" + url.getPort();
235      }
236      throw new IOException(
237          "Failed to push metrics to the Prometheus Pushgateway on "
238              + baseUrl
239              + ": "
240              + e.getMessage(),
241          e);
242    }
243  }
244
245  private static String readFromStream(InputStream is) throws IOException {
246    ByteArrayOutputStream result = new ByteArrayOutputStream();
247    byte[] buffer = new byte[1024];
248    int length;
249    while ((length = is.read(buffer)) != -1) {
250      result.write(buffer, 0, length);
251    }
252    return result.toString("UTF-8");
253  }
254
255  public static Builder builder() {
256    return builder(PrometheusProperties.get());
257  }
258
259  /**
260   * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}.
261   */
262  public static Builder builder(PrometheusProperties config) {
263    return new Builder(config);
264  }
265
266  public static class Builder {
267
268    private final PrometheusProperties config;
269    private Format format;
270    private String address;
271    private Scheme scheme;
272    private String job;
273    private boolean prometheusTimestampsInMs;
274    private final Map<String, String> requestHeaders = new HashMap<>();
275    private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry;
276    private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory();
277    private Map<String, String> groupingKey = new TreeMap<>();
278
279    private Builder(PrometheusProperties config) {
280      this.config = config;
281    }
282
283    /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */
284    public Builder format(Format format) {
285      if (format == null) {
286        throw new NullPointerException();
287      }
288      this.format = format;
289      return this;
290    }
291
292    /**
293     * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}.
294     * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address}
295     * property.
296     */
297    public Builder address(String address) {
298      if (address == null) {
299        throw new NullPointerException();
300      }
301      this.address = address;
302      return this;
303    }
304
305    /** Username and password for HTTP basic auth when pushing to the Pushgateway. */
306    public Builder basicAuth(String user, String password) {
307      if (user == null || password == null) {
308        throw new NullPointerException();
309      }
310      byte[] credentialsBytes = (user + ":" + password).getBytes(StandardCharsets.UTF_8);
311      String encoded = Base64.getEncoder().encodeToString(credentialsBytes);
312      requestHeaders.put("Authorization", String.format("Basic %s", encoded));
313      return this;
314    }
315
316    /** Bearer token authorization when pushing to the Pushgateway. */
317    public Builder bearerToken(String token) {
318      if (token == null) {
319        throw new NullPointerException();
320      }
321      requestHeaders.put("Authorization", String.format("Bearer %s", token));
322      return this;
323    }
324
325    /**
326     * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten
327     * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property.
328     */
329    public Builder scheme(Scheme scheme) {
330      if (scheme == null) {
331        throw new NullPointerException();
332      }
333      this.scheme = scheme;
334      return this;
335    }
336
337    /**
338     * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}.
339     *
340     * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example
341     * of a custom connection factory that skips SSL certificate validation for HTTPS connections.
342     */
343    public Builder connectionFactory(HttpConnectionFactory connectionFactory) {
344      if (connectionFactory == null) {
345        throw new NullPointerException();
346      }
347      this.connectionFactory = connectionFactory;
348      return this;
349    }
350
351    /**
352     * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR
353     * file will be used by default. Can be overwritten at runtime with the {@code
354     * io.prometheus.exporter.pushgateway.job} property.
355     */
356    public Builder job(String job) {
357      if (job == null) {
358        throw new NullPointerException();
359      }
360      this.job = job;
361      return this;
362    }
363
364    /**
365     * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for
366     * adding multiple grouping keys.
367     */
368    public Builder groupingKey(String name, String value) {
369      if (name == null || value == null) {
370        throw new NullPointerException();
371      }
372      groupingKey.put(name, value);
373      return this;
374    }
375
376    /** Convenience method for adding the current IP address as an "instance" label. */
377    public Builder instanceIpGroupingKey() throws UnknownHostException {
378      return groupingKey("instance", InetAddress.getLocalHost().getHostAddress());
379    }
380
381    /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */
382    public Builder registry(PrometheusRegistry registry) {
383      if (registry == null) {
384        throw new NullPointerException();
385      }
386      this.registry = registry;
387      return this;
388    }
389
390    /**
391     * Use milliseconds for timestamps in text format? Default is {@code false}. Can be overwritten
392     * at runtime with the {@code io.prometheus.exporter.timestampsInMs} property.
393     */
394    public Builder prometheusTimestampsInMs(boolean prometheusTimestampsInMs) {
395      this.prometheusTimestampsInMs = prometheusTimestampsInMs;
396      return this;
397    }
398
399    private boolean getPrometheusTimestampsInMs() {
400      // accept either to opt in to timestamps in milliseconds
401      return config.getExporterProperties().getPrometheusTimestampsInMs()
402          || this.prometheusTimestampsInMs;
403    }
404
405    private Scheme getScheme(ExporterPushgatewayProperties properties) {
406      if (properties != null && properties.getScheme() != null) {
407        return Scheme.valueOf(properties.getScheme());
408      } else if (this.scheme != null) {
409        return this.scheme;
410      } else {
411        return HTTP;
412      }
413    }
414
415    private String getAddress(ExporterPushgatewayProperties properties) {
416      if (properties != null && properties.getAddress() != null) {
417        return properties.getAddress();
418      } else if (this.address != null) {
419        return this.address;
420      } else {
421        return "localhost:9091";
422      }
423    }
424
425    private String getJob(ExporterPushgatewayProperties properties) {
426      if (properties != null && properties.getJob() != null) {
427        return properties.getJob();
428      } else if (this.job != null) {
429        return this.job;
430      } else {
431        return DefaultJobLabelDetector.getDefaultJobLabel();
432      }
433    }
434
435    private Format getFormat() {
436      // currently not configurable via properties
437      if (this.format != null) {
438        return this.format;
439      }
440      return Format.PROMETHEUS_PROTOBUF;
441    }
442
443    private URL makeUrl(ExporterPushgatewayProperties properties)
444        throws UnsupportedEncodingException, MalformedURLException {
445      String url = getScheme(properties) + "://" + getAddress(properties) + "/metrics/";
446      String job = getJob(properties);
447      if (job.contains("/")) {
448        url += "job@base64/" + base64url(job);
449      } else {
450        url += "job/" + URLEncoder.encode(job, "UTF-8");
451      }
452      if (groupingKey != null) {
453        for (Map.Entry<String, String> entry : groupingKey.entrySet()) {
454          if (entry.getValue().isEmpty()) {
455            url += "/" + entry.getKey() + "@base64/=";
456          } else if (entry.getValue().contains("/")) {
457            url += "/" + entry.getKey() + "@base64/" + base64url(entry.getValue());
458          } else {
459            url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8");
460          }
461        }
462      }
463      return URI.create(url).normalize().toURL();
464    }
465
466    private String base64url(String v) {
467      return Base64.getEncoder()
468          .encodeToString(v.getBytes(StandardCharsets.UTF_8))
469          .replace("+", "-")
470          .replace("/", "_");
471    }
472
473    public PushGateway build() {
474      ExporterPushgatewayProperties properties =
475          config == null ? null : config.getExporterPushgatewayProperties();
476      try {
477        return new PushGateway(
478            registry,
479            getFormat(),
480            makeUrl(properties),
481            connectionFactory,
482            requestHeaders,
483            getPrometheusTimestampsInMs());
484      } catch (MalformedURLException e) {
485        throw new PrometheusPropertiesException(
486            address + ": Invalid address. Expecting <host>:<port>");
487      } catch (UnsupportedEncodingException e) {
488        throw new RuntimeException(e); // cannot happen, UTF-8 is always supported
489      }
490    }
491  }
492}