001package io.prometheus.metrics.exporter.pushgateway;
002
003import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP;
004
005import io.prometheus.metrics.config.ExporterPushgatewayProperties;
006import io.prometheus.metrics.config.PrometheusProperties;
007import io.prometheus.metrics.config.PrometheusPropertiesException;
008import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter;
009import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter;
010import io.prometheus.metrics.model.registry.Collector;
011import io.prometheus.metrics.model.registry.MultiCollector;
012import io.prometheus.metrics.model.registry.PrometheusRegistry;
013import java.io.ByteArrayOutputStream;
014import java.io.IOException;
015import java.io.InputStream;
016import java.io.OutputStream;
017import java.io.UnsupportedEncodingException;
018import java.net.HttpURLConnection;
019import java.net.InetAddress;
020import java.net.MalformedURLException;
021import java.net.URI;
022import java.net.URL;
023import java.net.URLEncoder;
024import java.net.UnknownHostException;
025import java.nio.charset.StandardCharsets;
026import java.util.Base64;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.TreeMap;
031
032/**
033 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus
034 * Pushgateway</a>
035 *
036 * <p>The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to
037 * Prometheus. Since these kinds of jobs may not exist long enough to be scraped, they can instead
038 * push their metrics to a Pushgateway. This Java class allows pushing the contents of a {@link
039 * PrometheusRegistry} to a Pushgateway.
040 *
041 * <p>Example usage:
042 *
043 * <pre>{@code
044 * void executeBatchJob() throws Exception {
045 *     PrometheusRegistry registry = new PrometheusRegistry();
046 *     Gauge duration = Gauge.builder()
047 *             .name("my_batch_job_duration_seconds")
048 *             .help("Duration of my batch job in seconds.")
049 *             .register(registry);
050 *     Timer durationTimer = duration.startTimer();
051 *     try {
052 *         // Your code here.
053 *
054 *         // This is only added to the registry after success,
055 *         // so that a previous success in the Pushgateway isn't overwritten on failure.
056 *         Gauge lastSuccess = Gauge.builder()
057 *                 .name("my_batch_job_last_success")
058 *                 .help("Last time my batch job succeeded, in unixtime.")
059 *                 .register(registry);
060 *         lastSuccess.set(System.currentTimeMillis());
061 *     } finally {
062 *         durationTimer.observeDuration();
063 *         PushGateway pg = PushGateway.builder()
064 *                 .address("127.0.0.1:9091")
065 *                 .job("my_batch_job")
066 *                 .registry(registry)
067 *                 .build();
068 *         pg.pushAdd();
069 *     }
070 * }
071 * }</pre>
072 *
073 * <p>See <a
074 * href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>.
075 */
076public class PushGateway {
077
078  private static final int MILLISECONDS_PER_SECOND = 1000;
079
080  private final URL url;
081  private final Format format;
082  private final Map<String, String> requestHeaders;
083  private final PrometheusRegistry registry;
084  private final HttpConnectionFactory connectionFactory;
085
086  private PushGateway(
087      PrometheusRegistry registry,
088      Format format,
089      URL url,
090      HttpConnectionFactory connectionFactory,
091      Map<String, String> requestHeaders) {
092    this.registry = registry;
093    this.format = format;
094    this.url = url;
095    this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders));
096    this.connectionFactory = connectionFactory;
097  }
098
099  /**
100   * Push all metrics. All metrics with the same job and grouping key are replaced.
101   *
102   * <p>This uses the PUT HTTP method.
103   */
104  public void push() throws IOException {
105    doRequest(registry, "PUT");
106  }
107
108  /**
109   * Push a single metric. All metrics with the same job and grouping key are replaced.
110   *
111   * <p>This is useful for pushing a single Gauge.
112   *
113   * <p>This uses the PUT HTTP method.
114   */
115  public void push(Collector collector) throws IOException {
116    PrometheusRegistry registry = new PrometheusRegistry();
117    registry.register(collector);
118    doRequest(registry, "PUT");
119  }
120
121  /**
122   * Push a single collector. All metrics with the same job and grouping key are replaced.
123   *
124   * <p>This uses the PUT HTTP method.
125   */
126  public void push(MultiCollector collector) throws IOException {
127    PrometheusRegistry registry = new PrometheusRegistry();
128    registry.register(collector);
129    doRequest(registry, "PUT");
130  }
131
132  /**
133   * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are
134   * replaced.
135   *
136   * <p>This uses the POST HTTP method.
137   */
138  public void pushAdd() throws IOException {
139    doRequest(registry, "POST");
140  }
141
142  /**
143   * Like {@link #push(Collector)}, but only the specified metric will be replaced.
144   *
145   * <p>This uses the POST HTTP method.
146   */
147  public void pushAdd(Collector collector) throws IOException {
148    PrometheusRegistry registry = new PrometheusRegistry();
149    registry.register(collector);
150    doRequest(registry, "POST");
151  }
152
153  /**
154   * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced.
155   *
156   * <p>This uses the POST HTTP method.
157   */
158  public void pushAdd(MultiCollector collector) throws IOException {
159    PrometheusRegistry registry = new PrometheusRegistry();
160    registry.register(collector);
161    doRequest(registry, "POST");
162  }
163
164  /**
165   * Deletes metrics from the Pushgateway.
166   *
167   * <p>This uses the DELETE HTTP method.
168   */
169  public void delete() throws IOException {
170    doRequest(null, "DELETE");
171  }
172
173  private void doRequest(PrometheusRegistry registry, String method) throws IOException {
174    try {
175      HttpURLConnection connection = connectionFactory.create(url);
176      requestHeaders.forEach(connection::setRequestProperty);
177      if (format == Format.PROMETHEUS_TEXT) {
178        connection.setRequestProperty("Content-Type", PrometheusTextFormatWriter.CONTENT_TYPE);
179      } else {
180        connection.setRequestProperty("Content-Type", PrometheusProtobufWriter.CONTENT_TYPE);
181      }
182      if (!method.equals("DELETE")) {
183        connection.setDoOutput(true);
184      }
185      connection.setRequestMethod(method);
186
187      connection.setConnectTimeout(10 * MILLISECONDS_PER_SECOND);
188      connection.setReadTimeout(10 * MILLISECONDS_PER_SECOND);
189      connection.connect();
190
191      try {
192        if (!method.equals("DELETE")) {
193          OutputStream outputStream = connection.getOutputStream();
194          if (format == Format.PROMETHEUS_TEXT) {
195            new PrometheusTextFormatWriter(false).write(outputStream, registry.scrape());
196          } else {
197            new PrometheusProtobufWriter().write(outputStream, registry.scrape());
198          }
199          outputStream.flush();
200          outputStream.close();
201        }
202
203        int response = connection.getResponseCode();
204        if (response / 100 != 2) {
205          String errorMessage;
206          InputStream errorStream = connection.getErrorStream();
207          if (errorStream != null) {
208            String errBody = readFromStream(errorStream);
209            errorMessage =
210                "Response code from " + url + " was " + response + ", response body: " + errBody;
211          } else {
212            errorMessage = "Response code from " + url + " was " + response;
213          }
214          throw new IOException(errorMessage);
215        }
216
217      } finally {
218        connection.disconnect();
219      }
220    } catch (IOException e) {
221      String baseUrl = url.getProtocol() + "://" + url.getHost();
222      if (url.getPort() != -1) {
223        baseUrl += ":" + url.getPort();
224      }
225      throw new IOException(
226          "Failed to push metrics to the Prometheus Pushgateway on "
227              + baseUrl
228              + ": "
229              + e.getMessage(),
230          e);
231    }
232  }
233
234  private static String readFromStream(InputStream is) throws IOException {
235    ByteArrayOutputStream result = new ByteArrayOutputStream();
236    byte[] buffer = new byte[1024];
237    int length;
238    while ((length = is.read(buffer)) != -1) {
239      result.write(buffer, 0, length);
240    }
241    return result.toString("UTF-8");
242  }
243
244  public static Builder builder() {
245    return builder(PrometheusProperties.get());
246  }
247
248  /**
249   * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}.
250   */
251  public static Builder builder(PrometheusProperties config) {
252    return new Builder(config);
253  }
254
255  public static class Builder {
256
257    private final PrometheusProperties config;
258    private Format format;
259    private String address;
260    private Scheme scheme;
261    private String job;
262    private final Map<String, String> requestHeaders = new HashMap<>();
263    private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry;
264    private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory();
265    private Map<String, String> groupingKey = new TreeMap<>();
266
267    private Builder(PrometheusProperties config) {
268      this.config = config;
269    }
270
271    /** Default is {@link Format#PROMETHEUS_PROTOBUF}. */
272    public Builder format(Format format) {
273      if (format == null) {
274        throw new NullPointerException();
275      }
276      this.format = format;
277      return this;
278    }
279
280    /**
281     * Address of the Pushgateway in format {@code host:port}. Default is {@code localhost:9091}.
282     * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address}
283     * property.
284     */
285    public Builder address(String address) {
286      if (address == null) {
287        throw new NullPointerException();
288      }
289      this.address = address;
290      return this;
291    }
292
293    /** Username and password for HTTP basic auth when pushing to the Pushgateway. */
294    public Builder basicAuth(String user, String password) {
295      if (user == null || password == null) {
296        throw new NullPointerException();
297      }
298      byte[] credentialsBytes = (user + ":" + password).getBytes(StandardCharsets.UTF_8);
299      String encoded = Base64.getEncoder().encodeToString(credentialsBytes);
300      requestHeaders.put("Authorization", String.format("Basic %s", encoded));
301      return this;
302    }
303
304    /** Bearer token authorization when pushing to the Pushgateway. */
305    public Builder bearerToken(String token) {
306      if (token == null) {
307        throw new NullPointerException();
308      }
309      requestHeaders.put("Authorization", String.format("Bearer %s", token));
310      return this;
311    }
312
313    /**
314     * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP. Can be overwritten
315     * at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property.
316     */
317    public Builder scheme(Scheme scheme) {
318      if (scheme == null) {
319        throw new NullPointerException();
320      }
321      this.scheme = scheme;
322      return this;
323    }
324
325    /**
326     * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}.
327     *
328     * <p>The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example
329     * of a custom connection factory that skips SSL certificate validation for HTTPS connections.
330     */
331    public Builder connectionFactory(HttpConnectionFactory connectionFactory) {
332      if (connectionFactory == null) {
333        throw new NullPointerException();
334      }
335      this.connectionFactory = connectionFactory;
336      return this;
337    }
338
339    /**
340     * The {@code job} label to be used when pushing metrics. If not provided, the name of the JAR
341     * file will be used by default. Can be overwritten at runtime with the {@code
342     * io.prometheus.exporter.pushgateway.job} property.
343     */
344    public Builder job(String job) {
345      if (job == null) {
346        throw new NullPointerException();
347      }
348      this.job = job;
349      return this;
350    }
351
352    /**
353     * Grouping keys to be used when pushing/deleting metrics. Call this method multiple times for
354     * adding multiple grouping keys.
355     */
356    public Builder groupingKey(String name, String value) {
357      if (name == null || value == null) {
358        throw new NullPointerException();
359      }
360      groupingKey.put(name, value);
361      return this;
362    }
363
364    /** Convenience method for adding the current IP address as an "instance" label. */
365    public Builder instanceIpGroupingKey() throws UnknownHostException {
366      return groupingKey("instance", InetAddress.getLocalHost().getHostAddress());
367    }
368
369    /** Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}. */
370    public Builder registry(PrometheusRegistry registry) {
371      if (registry == null) {
372        throw new NullPointerException();
373      }
374      this.registry = registry;
375      return this;
376    }
377
378    private Scheme getScheme(ExporterPushgatewayProperties properties) {
379      if (properties != null && properties.getScheme() != null) {
380        return Scheme.valueOf(properties.getScheme());
381      } else if (this.scheme != null) {
382        return this.scheme;
383      } else {
384        return HTTP;
385      }
386    }
387
388    private String getAddress(ExporterPushgatewayProperties properties) {
389      if (properties != null && properties.getAddress() != null) {
390        return properties.getAddress();
391      } else if (this.address != null) {
392        return this.address;
393      } else {
394        return "localhost:9091";
395      }
396    }
397
398    private String getJob(ExporterPushgatewayProperties properties) {
399      if (properties != null && properties.getJob() != null) {
400        return properties.getJob();
401      } else if (this.job != null) {
402        return this.job;
403      } else {
404        return DefaultJobLabelDetector.getDefaultJobLabel();
405      }
406    }
407
408    private Format getFormat() {
409      // currently not configurable via properties
410      if (this.format != null) {
411        return this.format;
412      }
413      return Format.PROMETHEUS_PROTOBUF;
414    }
415
416    private URL makeUrl(ExporterPushgatewayProperties properties)
417        throws UnsupportedEncodingException, MalformedURLException {
418      String url = getScheme(properties) + "://" + getAddress(properties) + "/metrics/";
419      String job = getJob(properties);
420      if (job.contains("/")) {
421        url += "job@base64/" + base64url(job);
422      } else {
423        url += "job/" + URLEncoder.encode(job, "UTF-8");
424      }
425      if (groupingKey != null) {
426        for (Map.Entry<String, String> entry : groupingKey.entrySet()) {
427          if (entry.getValue().isEmpty()) {
428            url += "/" + entry.getKey() + "@base64/=";
429          } else if (entry.getValue().contains("/")) {
430            url += "/" + entry.getKey() + "@base64/" + base64url(entry.getValue());
431          } else {
432            url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8");
433          }
434        }
435      }
436      return URI.create(url).normalize().toURL();
437    }
438
439    private String base64url(String v) {
440      return Base64.getEncoder()
441          .encodeToString(v.getBytes(StandardCharsets.UTF_8))
442          .replace("+", "-")
443          .replace("/", "_");
444    }
445
446    public PushGateway build() {
447      ExporterPushgatewayProperties properties =
448          config == null ? null : config.getExporterPushgatewayProperties();
449      try {
450        return new PushGateway(
451            registry, getFormat(), makeUrl(properties), connectionFactory, requestHeaders);
452      } catch (MalformedURLException e) {
453        throw new PrometheusPropertiesException(
454            address + ": Invalid address. Expecting <host>:<port>");
455      } catch (UnsupportedEncodingException e) {
456        throw new RuntimeException(e); // cannot happen, UTF-8 is always supported
457      }
458    }
459  }
460}