001package io.prometheus.metrics.exporter.pushgateway;
002
003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP;
004
005import io.prometheus.metrics.config.ExporterPushgatewayProperties;
006import io.prometheus.metrics.config.PrometheusProperties;
007import io.prometheus.metrics.config.PrometheusPropertiesException;
008import io.prometheus.metrics.expositionformats.ExpositionFormatWriter;
009import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter;
010import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter;
011import io.prometheus.metrics.model.registry.Collector;
012import io.prometheus.metrics.model.registry.MultiCollector;
013import io.prometheus.metrics.model.registry.PrometheusRegistry;
014import java.io.ByteArrayOutputStream;
015import java.io.IOException;
016import java.io.InputStream;
017import java.io.OutputStream;
018import java.io.UnsupportedEncodingException;
019import java.net.HttpURLConnection;
020import java.net.InetAddress;
021import java.net.MalformedURLException;
022import java.net.URI;
023import java.net.URL;
024import java.net.URLEncoder;
025import java.net.UnknownHostException;
026import java.nio.charset.StandardCharsets;
027import java.util.Base64;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.Map;
031import java.util.TreeMap;
032
033/**
034 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus
035 * Pushgateway</a>
036 *
037 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to
038 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead
039 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link
040 * PrometheusRegistry} to a Pushgateway.
041 *
042 * <p>Example usage:
043 *
044 * <pre>{@code
045 * void executeBatchJob() throws Exception {
046 *     PrometheusRegistry registry = new PrometheusRegistry();
047 *     Gauge duration = Gauge.builder()
048 *             .name("my_batch_job_duration_seconds")
049 *             .help("Duration of my batch job in seconds.")
050 *             .register(registry);
051 *     Timer durationTimer = duration.startTimer();
052 *     try {
053 *         // Your code here.
054 *
055 *         // This is only added to the registry after success,
056 *         // so that a previous success in the Pushgateway isn't overwritten on failure.
057 *         Gauge lastSuccess = Gauge.builder()
058 *                 .name("my_batch_job_last_success")
059 *                 .help("Last time my batch job succeeded, in unixtime.")
060 *                 .register(registry);
061 *         lastSuccess.set(System.currentTimeMillis());
062 *     } finally {
063 *         durationTimer.observeDuration();
064 *         PushGateway pg = PushGateway.builder()
065 *                 .address("127.0.0.1:9091")
066 *                 .job("my_batch_job")
067 *                 .registry(registry)
068 *                 .build();
069 *         pg.pushAdd();
070 *     }
071 * }
072 * }</pre>
073 *
074 * <p>See <a
075 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>.
076 */
077public class PushGateway {
078
079  private static final int MILLISECONDS_PER_SECOND = 1000;
080
081  private final URL url;
082  private final ExpositionFormatWriter writer;
083  private final Map<String, String> requestHeaders;
084  private final PrometheusRegistry registry;
085  private final HttpConnectionFactory connectionFactory;
086
087  private PushGateway(
088      PrometheusRegistry registry,
089      Format format,
090      URL url,
091      HttpConnectionFactory connectionFactory,
092      Map<String, String> requestHeaders) {
093    this.registry = registry;
094    this.url = url;
095    this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders));
096    this.connectionFactory = connectionFactory;
097    writer = getWriter(format);
098    if (!writer.isAvailable()) {
099      throw new RuntimeException(writer.getClass() + " is not available");
100    }
101  }
102
103  private ExpositionFormatWriter getWriter(Format format) {
104    if (format == Format.PROMETHEUS_TEXT) {
105      return new PrometheusTextFormatWriter(false);
106    } else {
107      // use reflection to avoid a compile-time dependency on the expositionformats module
108      return new PrometheusProtobufWriter();
109    }
110  }
111
112  /**
113   * Push all metrics. All metrics with the same job and grouping key are replaced.
114   *
115   * <p>This uses the PUT HTTP method.
116   */
117  public void push() throws IOException {
118    doRequest(registry, "PUT");
119  }
120
121  /**
122   * Push a single metric. All metrics with the same job and grouping key are replaced.
123   *
124   * <p>This is useful for pushing a single Gauge.
125   *
126   * <p>This uses the PUT HTTP method.
127   */
128  public void push(Collector collector) throws IOException {
129    PrometheusRegistry registry = new PrometheusRegistry();
130    registry.register(collector);
131    doRequest(registry, "PUT");
132  }
133
134  /**
135   * Push a single collector. All metrics with the same job and grouping key are replaced.
136   *
137   * <p>This uses the PUT HTTP method.
138   */
139  public void push(MultiCollector collector) throws IOException {
140    PrometheusRegistry registry = new PrometheusRegistry();
141    registry.register(collector);
142    doRequest(registry, "PUT");
143  }
144
145  /**
146   * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are
147   * replaced.
148   *
149   * <p>This uses the POST HTTP method.
150   */
151  public void pushAdd() throws IOException {
152    doRequest(registry, "POST");
153  }
154
155  /**
156   * Like {@link #push(Collector)}, but only the specified metric will be replaced.
157   *
158   * <p>This uses the POST HTTP method.
159   */
160  public void pushAdd(Collector collector) throws IOException {
161    PrometheusRegistry registry = new PrometheusRegistry();
162    registry.register(collector);
163    doRequest(registry, "POST");
164  }
165
166  /**
167   * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced.
168   *
169   * <p>This uses the POST HTTP method.
170   */
171  public void pushAdd(MultiCollector collector) throws IOException {
172    PrometheusRegistry registry = new PrometheusRegistry();
173    registry.register(collector);
174    doRequest(registry, "POST");
175  }
176
177  /**
178   * Deletes metrics from the Pushgateway.
179   *
180   * <p>This uses the DELETE HTTP method.
181   */
182  public void delete() throws IOException {
183    doRequest(null, "DELETE");
184  }
185
186  private void doRequest(PrometheusRegistry registry, String method) throws IOException {
187    try {
188      HttpURLConnection connection = connectionFactory.create(url);
189      requestHeaders.forEach(connection::setRequestProperty);
190      connection.setRequestProperty("Content-Type", writer.getContentType());
191      if (!method.equals("DELETE")) {
192        connection.setDoOutput(true);
193      }
194      connection.setRequestMethod(method);
195
196      connection.setConnectTimeout(10 * MILLISECONDS_PER_SECOND);
197      connection.setReadTimeout(10 * MILLISECONDS_PER_SECOND);
198      connection.connect();
199
200      try {
201        if (!method.equals("DELETE")) {
202          OutputStream outputStream = connection.getOutputStream();
203          writer.write(outputStream, registry.scrape());
204          outputStream.flush();
205          outputStream.close();
206        }
207
208        int response = connection.getResponseCode();
209        if (response / 100 != 2) {
210          String errorMessage;
211          InputStream errorStream = connection.getErrorStream();
212          if (errorStream != null) {
213            String errBody = readFromStream(errorStream);
214            errorMessage =
215                "Response code from " + url + " was " + response + ", response body: " + errBody;
216          } else {
217            errorMessage = "Response code from " + url + " was " + response;
218          }
219          throw new IOException(errorMessage);
220        }
221
222      } finally {
223        connection.disconnect();
224      }
225    } catch (IOException e) {
226      String baseUrl = url.getProtocol() + "://" + url.getHost();
227      if (url.getPort() != -1) {
228        baseUrl += ":" + url.getPort();
229      }
230      throw new IOException(
231          "Failed to push metrics to the Prometheus Pushgateway on "
232              + baseUrl
233              + ": "
234              + e.getMessage(),
235          e);
236    }
237  }
238
239  private static String readFromStream(InputStream is) throws IOException {
240    ByteArrayOutputStream result = new ByteArrayOutputStream();
241    byte[] buffer = new byte[1024];
242    int length;
243    while ((length = is.read(buffer)) != -1) {
244      result.write(buffer, 0, length);
245    }
246    return result.toString("UTF-8");
247  }
248
249  public static Builder builder() {
250    return builder(PrometheusProperties.get());
251  }
252
253  /**
254   * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}.
255   */
256  public static Builder builder(PrometheusProperties config) {
257    return new Builder(config);
258  }
259
260  public static class Builder {
261
262    private final PrometheusProperties config;
263    private Format format;
264    private String address;
265    private Scheme scheme;
266    private String job;
267    private final Map<String, String> requestHeaders = new HashMap<>();
268    private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry;
269    private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory();
270    private Map<String, String> groupingKey = new TreeMap<>();
271
272    private Builder(PrometheusProperties config) {
273      this.config = config;
274    }
275
276    /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */
277    public Builder format(Format format) {
278      if (format == null) {
279        throw new NullPointerException();
280      }
281      this.format = format;
282      return this;
283    }
284
285    /**
286     * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}.
287     * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address}
288     * property.
289     */
290    public Builder address(String address) {
291      if (address == null) {
292        throw new NullPointerException();
293      }
294      this.address = address;
295      return this;
296    }
297
298    /** Username and password for HTTP basic auth when pushing to the Pushgateway. */
299    public Builder basicAuth(String user, String password) {
300      if (user == null || password == null) {
301        throw new NullPointerException();
302      }
303      byte[] credentialsBytes = (user + ":" + password).getBytes(StandardCharsets.UTF_8);
304      String encoded = Base64.getEncoder().encodeToString(credentialsBytes);
305      requestHeaders.put("Authorization", String.format("Basic %s", encoded));
306      return this;
307    }
308
309    /** Bearer token authorization when pushing to the Pushgateway. */
310    public Builder bearerToken(String token) {
311      if (token == null) {
312        throw new NullPointerException();
313      }
314      requestHeaders.put("Authorization", String.format("Bearer %s", token));
315      return this;
316    }
317
318    /**
319     * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten
320     * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property.
321     */
322    public Builder scheme(Scheme scheme) {
323      if (scheme == null) {
324        throw new NullPointerException();
325      }
326      this.scheme = scheme;
327      return this;
328    }
329
330    /**
331     * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}.
332     *
333     * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example
334     * of a custom connection factory that skips SSL certificate validation for HTTPS connections.
335     */
336    public Builder connectionFactory(HttpConnectionFactory connectionFactory) {
337      if (connectionFactory == null) {
338        throw new NullPointerException();
339      }
340      this.connectionFactory = connectionFactory;
341      return this;
342    }
343
344    /**
345     * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR
346     * file will be used by default. Can be overwritten at runtime with the {@code
347     * io.prometheus.exporter.pushgateway.job} property.
348     */
349    public Builder job(String job) {
350      if (job == null) {
351        throw new NullPointerException();
352      }
353      this.job = job;
354      return this;
355    }
356
357    /**
358     * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for
359     * adding multiple grouping keys.
360     */
361    public Builder groupingKey(String name, String value) {
362      if (name == null || value == null) {
363        throw new NullPointerException();
364      }
365      groupingKey.put(name, value);
366      return this;
367    }
368
369    /** Convenience method for adding the current IP address as an "instance" label. */
370    public Builder instanceIpGroupingKey() throws UnknownHostException {
371      return groupingKey("instance", InetAddress.getLocalHost().getHostAddress());
372    }
373
374    /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */
375    public Builder registry(PrometheusRegistry registry) {
376      if (registry == null) {
377        throw new NullPointerException();
378      }
379      this.registry = registry;
380      return this;
381    }
382
383    private Scheme getScheme(ExporterPushgatewayProperties properties) {
384      if (properties != null && properties.getScheme() != null) {
385        return Scheme.valueOf(properties.getScheme());
386      } else if (this.scheme != null) {
387        return this.scheme;
388      } else {
389        return HTTP;
390      }
391    }
392
393    private String getAddress(ExporterPushgatewayProperties properties) {
394      if (properties != null && properties.getAddress() != null) {
395        return properties.getAddress();
396      } else if (this.address != null) {
397        return this.address;
398      } else {
399        return "localhost:9091";
400      }
401    }
402
403    private String getJob(ExporterPushgatewayProperties properties) {
404      if (properties != null && properties.getJob() != null) {
405        return properties.getJob();
406      } else if (this.job != null) {
407        return this.job;
408      } else {
409        return DefaultJobLabelDetector.getDefaultJobLabel();
410      }
411    }
412
413    private Format getFormat() {
414      // currently not configurable via properties
415      if (this.format != null) {
416        return this.format;
417      }
418      return Format.PROMETHEUS_PROTOBUF;
419    }
420
421    private URL makeUrl(ExporterPushgatewayProperties properties)
422        throws UnsupportedEncodingException, MalformedURLException {
423      String url = getScheme(properties) + "://" + getAddress(properties) + "/metrics/";
424      String job = getJob(properties);
425      if (job.contains("/")) {
426        url += "job@base64/" + base64url(job);
427      } else {
428        url += "job/" + URLEncoder.encode(job, "UTF-8");
429      }
430      if (groupingKey != null) {
431        for (Map.Entry<String, String> entry : groupingKey.entrySet()) {
432          if (entry.getValue().isEmpty()) {
433            url += "/" + entry.getKey() + "@base64/=";
434          } else if (entry.getValue().contains("/")) {
435            url += "/" + entry.getKey() + "@base64/" + base64url(entry.getValue());
436          } else {
437            url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8");
438          }
439        }
440      }
441      return URI.create(url).normalize().toURL();
442    }
443
444    private String base64url(String v) {
445      return Base64.getEncoder()
446          .encodeToString(v.getBytes(StandardCharsets.UTF_8))
447          .replace("+", "-")
448          .replace("/", "_");
449    }
450
451    public PushGateway build() {
452      ExporterPushgatewayProperties properties =
453          config == null ? null : config.getExporterPushgatewayProperties();
454      try {
455        return new PushGateway(
456            registry, getFormat(), makeUrl(properties), connectionFactory, requestHeaders);
457      } catch (MalformedURLException e) {
458        throw new PrometheusPropertiesException(
459            address + ": Invalid address. Expecting <host>:<port>");
460      } catch (UnsupportedEncodingException e) {
461        throw new RuntimeException(e); // cannot happen, UTF-8 is always supported
462      }
463    }
464  }
465}