001package io.prometheus.metrics.exporter.pushgateway;
002
003import io.prometheus.metrics.config.ExporterPushgatewayProperties;
004import io.prometheus.metrics.config.PrometheusProperties;
005import io.prometheus.metrics.config.PrometheusPropertiesException;
006import io.prometheus.metrics.expositionformats.PrometheusProtobufWriter;
007import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter;
008import io.prometheus.metrics.model.registry.Collector;
009import io.prometheus.metrics.model.registry.MultiCollector;
010import io.prometheus.metrics.model.registry.PrometheusRegistry;
011
012import java.io.*;
013import java.net.*;
014import java.nio.charset.StandardCharsets;
015import java.util.*;
016
017import static io.prometheus.metrics.exporter.pushgateway.Scheme.HTTP;
018
019/**
020 * Export metrics via the <a href="https://github.com/prometheus/pushgateway">Prometheus Pushgateway</a>
021 * <p>
022 * The Prometheus Pushgateway exists to allow ephemeral and batch jobs to expose their metrics to Prometheus.
023 * Since these kinds of jobs may not exist long enough to be scraped, they can instead push their metrics
024 * to a Pushgateway. This Java class allows pushing the contents of a {@link PrometheusRegistry} to a Pushgateway.
025 * <p>
026 * Example usage:
027 * <pre>
028 * {@code
029 * void executeBatchJob() throws Exception {
030 *     PrometheusRegistry registry = new PrometheusRegistry();
031 *     Gauge duration = Gauge.builder()
032 *             .name("my_batch_job_duration_seconds")
033 *             .help("Duration of my batch job in seconds.")
034 *             .register(registry);
035 *     Timer durationTimer = duration.startTimer();
036 *     try {
037 *         // Your code here.
038 *
039 *         // This is only added to the registry after success,
040 *         // so that a previous success in the Pushgateway isn't overwritten on failure.
041 *         Gauge lastSuccess = Gauge.builder()
042 *                 .name("my_batch_job_last_success")
043 *                 .help("Last time my batch job succeeded, in unixtime.")
044 *                 .register(registry);
045 *         lastSuccess.set(System.currentTimeMillis());
046 *     } finally {
047 *         durationTimer.observeDuration();
048 *         PushGateway pg = PushGateway.builder()
049 *                 .address("127.0.0.1:9091")
050 *                 .job("my_batch_job")
051 *                 .registry(registry)
052 *                 .build();
053 *         pg.pushAdd();
054 *     }
055 * }
056 * }
057 * </pre>
058 * <p>
059 * See <a href="https://github.com/prometheus/pushgateway">https://github.com/prometheus/pushgateway</a>.
060 */
061public class PushGateway {
062
063    private static final int MILLISECONDS_PER_SECOND = 1000;
064
065    private final URL url;
066    private final Format format;
067    private final Map<String, String> requestHeaders;
068    private final PrometheusRegistry registry;
069    private final HttpConnectionFactory connectionFactory;
070
071    private PushGateway(PrometheusRegistry registry, Format format, URL url, HttpConnectionFactory connectionFactory, Map<String, String> requestHeaders) {
072        this.registry = registry;
073        this.format = format;
074        this.url = url;
075        this.requestHeaders = Collections.unmodifiableMap(new HashMap<>(requestHeaders));
076        this.connectionFactory = connectionFactory;
077    }
078
079    /**
080     * Push all metrics. All metrics with the same job and grouping key are replaced.
081     * <p>
082     * This uses the PUT HTTP method.
083     */
084    public void push() throws IOException {
085        doRequest(registry, "PUT");
086    }
087
088    /**
089     * Push a single metric. All metrics with the same job and grouping key are replaced.
090     * <p>
091     * This is useful for pushing a single Gauge.
092     * <p>
093     * This uses the PUT HTTP method.
094     */
095    public void push(Collector collector) throws IOException {
096        PrometheusRegistry registry = new PrometheusRegistry();
097        registry.register(collector);
098        doRequest(registry, "PUT");
099    }
100
101    /**
102     * Push a single collector. All metrics with the same job and grouping key are replaced.
103     * <p>
104     * This uses the PUT HTTP method.
105     */
106    public void push(MultiCollector collector) throws IOException {
107        PrometheusRegistry registry = new PrometheusRegistry();
108        registry.register(collector);
109        doRequest(registry, "PUT");
110    }
111
112    /**
113     * Like {@link #push()}, but only metrics with the same name as the newly pushed metrics are replaced.
114     * <p>
115     * This uses the POST HTTP method.
116     */
117    public void pushAdd() throws IOException {
118        doRequest(registry, "POST");
119    }
120
121    /**
122     * Like {@link #push(Collector)}, but only the specified metric will be replaced.
123     * <p>
124     * This uses the POST HTTP method.
125     */
126    public void pushAdd(Collector collector) throws IOException {
127        PrometheusRegistry registry = new PrometheusRegistry();
128        registry.register(collector);
129        doRequest(registry, "POST");
130    }
131
132    /**
133     * Like {@link #push(MultiCollector)}, but only the metrics from the collector will be replaced.
134     * <p>
135     * This uses the POST HTTP method.
136     */
137    public void pushAdd(MultiCollector collector) throws IOException {
138        PrometheusRegistry registry = new PrometheusRegistry();
139        registry.register(collector);
140        doRequest(registry, "POST");
141    }
142
143    /**
144     * Deletes metrics from the Pushgateway.
145     * <p>
146     * This uses the DELETE HTTP method.
147     */
148    public void delete() throws IOException {
149        doRequest(null, "DELETE");
150    }
151
152    private void doRequest(PrometheusRegistry registry, String method) throws IOException {
153        try {
154            HttpURLConnection connection = connectionFactory.create(url);
155            requestHeaders.forEach(connection::setRequestProperty);
156            if (format == Format.PROMETHEUS_TEXT) {
157                connection.setRequestProperty("Content-Type", PrometheusTextFormatWriter.CONTENT_TYPE);
158            } else {
159                connection.setRequestProperty("Content-Type", PrometheusProtobufWriter.CONTENT_TYPE);
160            }
161            if (!method.equals("DELETE")) {
162                connection.setDoOutput(true);
163            }
164            connection.setRequestMethod(method);
165
166            connection.setConnectTimeout(10 * MILLISECONDS_PER_SECOND);
167            connection.setReadTimeout(10 * MILLISECONDS_PER_SECOND);
168            connection.connect();
169
170            try {
171                if (!method.equals("DELETE")) {
172                    OutputStream outputStream = connection.getOutputStream();
173                    if (format == Format.PROMETHEUS_TEXT) {
174                        new PrometheusTextFormatWriter(false).write(outputStream, registry.scrape());
175                    } else {
176                        new PrometheusProtobufWriter().write(outputStream, registry.scrape());
177                    }
178                    outputStream.flush();
179                    outputStream.close();
180                }
181
182                int response = connection.getResponseCode();
183                if (response / 100 != 2) {
184                    String errorMessage;
185                    InputStream errorStream = connection.getErrorStream();
186                    if (errorStream != null) {
187                        String errBody = readFromStream(errorStream);
188                        errorMessage = "Response code from " + url + " was " + response + ", response body: " + errBody;
189                    } else {
190                        errorMessage = "Response code from " + url + " was " + response;
191                    }
192                    throw new IOException(errorMessage);
193                }
194
195            } finally {
196                connection.disconnect();
197            }
198        } catch (IOException e) {
199            String baseUrl = url.getProtocol() + "://" + url.getHost();
200            if (url.getPort() != -1) {
201                baseUrl += ":" + url.getPort();
202            }
203            throw new IOException("Failed to push metrics to the Prometheus Pushgateway on " + baseUrl + ": " + e.getMessage(), e);
204        }
205    }
206
207    private static String readFromStream(InputStream is) throws IOException {
208        ByteArrayOutputStream result = new ByteArrayOutputStream();
209        byte[] buffer = new byte[1024];
210        int length;
211        while ((length = is.read(buffer)) != -1) {
212            result.write(buffer, 0, length);
213        }
214        return result.toString("UTF-8");
215    }
216
217    public static Builder builder() {
218        return builder(PrometheusProperties.get());
219    }
220
221    /**
222     * The {@link PrometheusProperties} will be used to override what is set in the {@link Builder}.
223     */
224    public static Builder builder(PrometheusProperties config) {
225        return new Builder(config);
226    }
227
228    public static class Builder {
229
230        private final PrometheusProperties config;
231        private Format format;
232        private String address;
233        private Scheme scheme;
234        private String job;
235        private final Map<String, String> requestHeaders = new HashMap<>();
236        private PrometheusRegistry registry = PrometheusRegistry.defaultRegistry;
237        private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory();
238        private Map<String, String> groupingKey = new TreeMap<>();
239
240        private Builder(PrometheusProperties config) {
241            this.config = config;
242        }
243
244        /**
245         * Default is {@link Format#PROMETHEUS_PROTOBUF}.
246         */
247        public Builder format(Format format) {
248            if (format == null) {
249                throw new NullPointerException();
250            }
251            this.format = format;
252            return this;
253        }
254
255        /**
256         * Address of the Pushgateway in format {@code host:port}.
257         * Default is {@code localhost:9091}.
258         * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.address} property.
259         */
260        public Builder address(String address) {
261            if (address == null) {
262                throw new NullPointerException();
263            }
264            this.address = address;
265            return this;
266        }
267
268        /**
269         * Username and password for HTTP basic auth when pushing to the Pushgateway.
270         */
271        public Builder basicAuth(String user, String password) {
272            if (user == null || password == null) {
273                throw new NullPointerException();
274            }
275            byte[] credentialsBytes = (user + ":" + password).getBytes(StandardCharsets.UTF_8);
276            String encoded = Base64.getEncoder().encodeToString(credentialsBytes);
277            requestHeaders.put("Authorization", String.format("Basic %s", encoded));
278            return this;
279        }
280
281        /**
282         * Bearer token authorization when pushing to the Pushgateway.
283         */
284        public Builder bearerToken(String token) {
285            if (token == null)  {
286                throw new NullPointerException();
287            }
288            requestHeaders.put("Authorization", String.format("Bearer %s", token));
289            return this;
290        }
291
292        /**
293         * Specify if metrics should be pushed using HTTP or HTTPS. Default is HTTP.
294         * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.scheme} property.
295         */
296        public Builder scheme(Scheme scheme) {
297            if (scheme == null) {
298                throw new NullPointerException();
299            }
300            this.scheme = scheme;
301            return this;
302        }
303
304        /**
305         * Custom connection factory. Default is {@link DefaultHttpConnectionFactory}.
306         * <p>
307         * The {@code PushGatewayTestApp} in {@code integration-tests/it-pushgateway/} has an example of a custom
308         * connection factory that skips SSL certificate validation for HTTPS connections.
309         */
310        public Builder connectionFactory(HttpConnectionFactory connectionFactory) {
311            if (connectionFactory == null) {
312                throw new NullPointerException();
313            }
314            this.connectionFactory = connectionFactory;
315            return this;
316        }
317
318        /**
319         * The {@code job} label to be used when pushing metrics.
320         * If not provided, the name of the JAR file will be used by default.
321         * Can be overwritten at runtime with the {@code io.prometheus.exporter.pushgateway.job} property.
322         */
323        public Builder job(String job) {
324            if (job == null) {
325                throw new NullPointerException();
326            }
327            this.job = job;
328            return this;
329        }
330
331        /**
332         * Grouping keys to be used when pushing/deleting metrics.
333         * Call this method multiple times for adding multiple grouping keys.
334         */
335        public Builder groupingKey(String name, String value) {
336            if (name == null || value == null) {
337                throw new NullPointerException();
338            }
339            groupingKey.put(name, value);
340            return this;
341        }
342
343        /**
344         * Convenience method for adding the current IP address as an "instance" label.
345         */
346        public Builder instanceIpGroupingKey() throws UnknownHostException {
347            return groupingKey("instance", InetAddress.getLocalHost().getHostAddress());
348        }
349
350        /**
351         * Push metrics from this registry instead of {@link PrometheusRegistry#defaultRegistry}.
352         */
353        public Builder registry(PrometheusRegistry registry) {
354            if (registry == null) {
355                throw new NullPointerException();
356            }
357            this.registry = registry;
358            return this;
359        }
360
361        private Scheme getScheme(ExporterPushgatewayProperties properties) {
362            if (properties != null && properties.getScheme() != null) {
363                return Scheme.valueOf(properties.getScheme());
364            } else if (this.scheme != null) {
365                return this.scheme;
366            } else {
367                return HTTP;
368            }
369        }
370
371        private String getAddress(ExporterPushgatewayProperties properties) {
372            if (properties != null && properties.getAddress() != null) {
373                return properties.getAddress();
374            } else if (this.address != null) {
375                return this.address;
376            } else {
377                return "localhost:9091";
378            }
379        }
380
381        private String getJob(ExporterPushgatewayProperties properties) {
382            if (properties != null && properties.getJob() != null) {
383                return properties.getJob();
384            } else if (this.job != null) {
385                return this.job;
386            } else {
387                return DefaultJobLabelDetector.getDefaultJobLabel();
388            }
389        }
390
391        private Format getFormat(ExporterPushgatewayProperties properties) {
392            // currently not configurable via properties
393            if (this.format != null) {
394                return this.format;
395            }
396            return Format.PROMETHEUS_PROTOBUF;
397        }
398
399        private URL makeUrl(ExporterPushgatewayProperties properties) throws UnsupportedEncodingException, MalformedURLException {
400            String url = getScheme(properties) + "://" + getAddress(properties) + "/metrics/";
401            String job = getJob(properties);
402            if (job.contains("/")) {
403                url += "job@base64/" + base64url(job);
404            } else {
405                url += "job/" + URLEncoder.encode(job, "UTF-8");
406            }
407            if (groupingKey != null) {
408                for (Map.Entry<String, String> entry : groupingKey.entrySet()) {
409                    if (entry.getValue().isEmpty()) {
410                        url += "/" + entry.getKey() + "@base64/=";
411                    } else if (entry.getValue().contains("/")) {
412                        url += "/" + entry.getKey() + "@base64/" + base64url(entry.getValue());
413                    } else {
414                        url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8");
415                    }
416                }
417            }
418            return URI.create(url).normalize().toURL();
419        }
420
421        private String base64url(String v) {
422            return Base64.getEncoder().encodeToString(v.getBytes(StandardCharsets.UTF_8)).replace("+", "-").replace("/", "_");
423        }
424
425        public PushGateway build() {
426            ExporterPushgatewayProperties properties = config == null ? null : config.getExporterPushgatewayProperties();
427            try {
428                return new PushGateway(registry, getFormat(properties), makeUrl(properties), connectionFactory, requestHeaders);
429            } catch (MalformedURLException e) {
430                throw new PrometheusPropertiesException(address + ": Invalid address. Expecting <host>:<port>");
431            } catch (UnsupportedEncodingException e) {
432                throw new RuntimeException(e); // cannot happen, UTF-8 is always supported
433            }
434        }
435    }
436}