diff options
Diffstat (limited to 'prometheus_client')
-rw-r--r-- | prometheus_client/__init__.py | 49 | ||||
-rw-r--r-- | prometheus_client/bridge/__init__.py | 0 | ||||
-rw-r--r-- | prometheus_client/bridge/graphite.py | 80 | ||||
-rw-r--r-- | prometheus_client/core.py | 679 | ||||
-rw-r--r-- | prometheus_client/exposition.py | 131 | ||||
-rw-r--r-- | prometheus_client/parser.py | 224 | ||||
-rw-r--r-- | prometheus_client/process_collector.py | 95 |
7 files changed, 1258 insertions, 0 deletions
diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py new file mode 100644 index 0000000..80424db --- /dev/null +++ b/prometheus_client/__init__.py @@ -0,0 +1,49 @@ +#!/usr/bin/python + +from . import core +from . import exposition +from . import process_collector + +__all__ = ['Counter', 'Gauge', 'Summary', 'Histogram'] +# http://stackoverflow.com/questions/19913653/no-unicode-in-all-for-a-packages-init +__all__ = [n.encode('ascii') for n in __all__] + +CollectorRegistry = core.CollectorRegistry +REGISTRY = core.REGISTRY +Metric = core.Metric +Counter = core.Counter +Gauge = core.Gauge +Summary = core.Summary +Histogram = core.Histogram + +CONTENT_TYPE_LATEST = exposition.CONTENT_TYPE_LATEST +generate_latest = exposition.generate_latest +MetricsHandler = exposition.MetricsHandler +start_http_server = exposition.start_http_server +write_to_textfile = exposition.write_to_textfile +push_to_gateway = exposition.push_to_gateway +pushadd_to_gateway = exposition.pushadd_to_gateway +delete_from_gateway = exposition.delete_from_gateway +instance_ip_grouping_key = exposition.instance_ip_grouping_key + +ProcessCollector = process_collector.ProcessCollector +PROCESS_COLLECTOR = process_collector.PROCESS_COLLECTOR + + +if __name__ == '__main__': + c = Counter('cc', 'A counter') + c.inc() + + g = Gauge('gg', 'A gauge') + g.set(17) + + s = Summary('ss', 'A summary', ['a', 'b']) + s.labels('c', 'd').observe(17) + + h = Histogram('hh', 'A histogram') + h.observe(.6) + + start_http_server(8000) + import time + while True: + time.sleep(1) diff --git a/prometheus_client/bridge/__init__.py b/prometheus_client/bridge/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/prometheus_client/bridge/__init__.py diff --git a/prometheus_client/bridge/graphite.py b/prometheus_client/bridge/graphite.py new file mode 100644 index 0000000..a01c312 --- /dev/null +++ b/prometheus_client/bridge/graphite.py @@ -0,0 +1,80 @@ +#!/usr/bin/python +from __future__ import unicode_literals + +import logging +import re +import socket +import time +import threading + +from .. import core + +# Roughly, have to keep to what works as a file name. +# We also remove periods, so labels can be distinguished. +_INVALID_GRAPHITE_CHARS = re.compile(r"[^a-zA-Z0-9_-]") + + +def _sanitize(s): + return _INVALID_GRAPHITE_CHARS.sub('_', s) + + +class _RegularPush(threading.Thread): + def __init__(self, pusher, interval, prefix): + super(_RegularPush, self).__init__() + self._pusher = pusher + self._interval = interval + self._prefix = prefix + + def run(self): + wait_until = time.time() + while True: + while True: + now = time.time() + if now >= wait_until: + # May need to skip some pushes. + while wait_until < now: + wait_until += self._interval + break + # time.sleep can return early. + time.sleep(wait_until - now) + try: + self._pusher.push(prefix=self._prefix) + except IOError: + logging.exception("Push failed") + + +class GraphiteBridge(object): + def __init__(self, address, registry=core.REGISTRY, timeout_seconds=30, _time=time): + self._address = address + self._registry = registry + self._timeout = timeout_seconds + self._time = _time + + def push(self, prefix=''): + now = int(self._time.time()) + output = [] + + prefixstr = '' + if prefix: + prefixstr = prefix + '.' + + for metric in self._registry.collect(): + for name, labels, value in metric.samples: + if labels: + labelstr = '.' + '.'.join( + ['{0}.{1}'.format( + _sanitize(k), _sanitize(v)) + for k, v in sorted(labels.items())]) + else: + labelstr = '' + output.append('{0}{1}{2} {3} {4}\n'.format( + prefixstr, _sanitize(name), labelstr, float(value), now)) + + conn = socket.create_connection(self._address, self._timeout) + conn.sendall(''.join(output).encode('ascii')) + conn.close() + + def start(self, interval=60.0, prefix=''): + t = _RegularPush(self, interval, prefix) + t.daemon = True + t.start() diff --git a/prometheus_client/core.py b/prometheus_client/core.py new file mode 100644 index 0000000..14b5394 --- /dev/null +++ b/prometheus_client/core.py @@ -0,0 +1,679 @@ +#!/usr/bin/python + +from __future__ import unicode_literals + +import copy +import math +import re +import time +import types + +try: + from BaseHTTPServer import BaseHTTPRequestHandler +except ImportError: + # Python 3 + unicode = str + +from functools import wraps +from threading import Lock + +_METRIC_NAME_RE = re.compile(r'^[a-zA-Z_:][a-zA-Z0-9_:]*$') +_METRIC_LABEL_NAME_RE = re.compile(r'^[a-zA-Z_:][a-zA-Z0-9_:]*$') +_RESERVED_METRIC_LABEL_NAME_RE = re.compile(r'^__.*$') +_INF = float("inf") +_MINUS_INF = float("-inf") + + +class CollectorRegistry(object): + '''Metric collector registry. + + Collectors must have a no-argument method 'collect' that returns a list of + Metric objects. The returned metrics should be consistent with the Prometheus + exposition formats. + ''' + def __init__(self): + self._collectors = set() + self._lock = Lock() + + def register(self, collector): + '''Add a collector to the registry.''' + with self._lock: + self._collectors.add(collector) + + def unregister(self, collector): + '''Remove a collector from the registry.''' + with self._lock: + self._collectors.remove(collector) + + def collect(self): + '''Yields metrics from the collectors in the registry.''' + collectors = None + with self._lock: + collectors = copy.copy(self._collectors) + for collector in collectors: + for metric in collector.collect(): + yield metric + + def get_sample_value(self, name, labels=None): + '''Returns the sample value, or None if not found. + + This is inefficient, and intended only for use in unittests. + ''' + if labels is None: + labels = {} + for metric in self.collect(): + for n, l, value in metric.samples: + if n == name and l == labels: + return value + return None + + +REGISTRY = CollectorRegistry() +'''The default registry.''' + +_METRIC_TYPES = ('counter', 'gauge', 'summary', 'histogram', 'untyped') + + +class Metric(object): + '''A single metric family and its samples. + + This is intended only for internal use by the instrumentation client. + + Custom collectors should use GaugeMetricFamily, CounterMetricFamily + and SummaryMetricFamily instead. + ''' + def __init__(self, name, documentation, typ): + self.name = name + self.documentation = documentation + if typ not in _METRIC_TYPES: + raise ValueError('Invalid metric type: ' + typ) + self.type = typ + self.samples = [] + + def add_sample(self, name, labels, value): + '''Add a sample to the metric. + + Internal-only, do not use.''' + self.samples.append((name, labels, value)) + + def __eq__(self, other): + return (isinstance(other, Metric) + and self.name == other.name + and self.documentation == other.documentation + and self.type == other.type + and self.samples == other.samples) + + +class CounterMetricFamily(Metric): + '''A single counter and its samples. + + For use by custom collectors. + ''' + def __init__(self, name, documentation, value=None, labels=None): + Metric.__init__(self, name, documentation, 'counter') + if labels is not None and value is not None: + raise ValueError('Can only specify at most one of value and labels.') + if labels is None: + labels = [] + self._labelnames = labels + if value is not None: + self.add_metric([], value) + + def add_metric(self, labels, value): + '''Add a metric to the metric family. + + Args: + labels: A list of label values + value: The value of the metric. + ''' + self.samples.append((self.name, dict(zip(self._labelnames, labels)), value)) + + +class GaugeMetricFamily(Metric): + '''A single gauge and its samples. + + For use by custom collectors. + ''' + def __init__(self, name, documentation, value=None, labels=None): + Metric.__init__(self, name, documentation, 'gauge') + if labels is not None and value is not None: + raise ValueError('Can only specify at most one of value and labels.') + if labels is None: + labels = [] + self._labelnames = labels + if value is not None: + self.add_metric([], value) + + def add_metric(self, labels, value): + '''Add a metric to the metric family. + + Args: + labels: A list of label values + value: A float + ''' + self.samples.append((self.name, dict(zip(self._labelnames, labels)), value)) + + +class SummaryMetricFamily(Metric): + '''A single summary and its samples. + + For use by custom collectors. + ''' + def __init__(self, name, documentation, count_value=None, sum_value=None, labels=None): + Metric.__init__(self, name, documentation, 'summary') + if (sum_value is None) != (count_value is None): + raise ValueError('count_value and sum_value must be provided together.') + if labels is not None and count_value is not None: + raise ValueError('Can only specify at most one of value and labels.') + if labels is None: + labels = [] + self._labelnames = labels + if count_value is not None: + self.add_metric([], count_value, sum_value) + + def add_metric(self, labels, count_value, sum_value): + '''Add a metric to the metric family. + + Args: + labels: A list of label values + count_value: The count value of the metric. + sum_value: The sum value of the metric. + ''' + self.samples.append((self.name + '_count', dict(zip(self._labelnames, labels)), count_value)) + self.samples.append((self.name + '_sum', dict(zip(self._labelnames, labels)), sum_value)) + + +class HistogramMetricFamily(Metric): + '''A single histogram and its samples. + + For use by custom collectors. + ''' + def __init__(self, name, documentation, buckets=None, sum_value=None, labels=None): + Metric.__init__(self, name, documentation, 'histogram') + if (sum_value is None) != (buckets is None): + raise ValueError('buckets and sum_value must be provided together.') + if labels is not None and buckets is not None: + raise ValueError('Can only specify at most one of buckets and labels.') + if labels is None: + labels = [] + self._labelnames = labels + if buckets is not None: + self.add_metric([], buckets, sum_value) + + def add_metric(self, labels, buckets, sum_value): + '''Add a metric to the metric family. + + Args: + labels: A list of label values + buckets: A list of pairs of bucket names and values. + The buckets must be sorted, and +Inf present. + sum_value: The sum value of the metric. + ''' + for bucket, value in buckets: + self.samples.append((self.name + '_bucket', dict(list(zip(self._labelnames, labels)) + [('le', bucket)]), value)) + # +Inf is last and provides the count value. + self.samples.append((self.name + '_count', dict(zip(self._labelnames, labels)), buckets[-1][1])) + self.samples.append((self.name + '_sum', dict(zip(self._labelnames, labels)), sum_value)) + + +class _MutexValue(object): + '''A float protected by a mutex.''' + + def __init__(self, name, labelnames, labelvalues): + self._value = 0.0 + self._lock = Lock() + + def inc(self, amount): + with self._lock: + self._value += amount + + def set(self, value): + with self._lock: + self._value = value + + def get(self): + with self._lock: + return self._value + +_ValueClass = _MutexValue + + +class _LabelWrapper(object): + '''Handles labels for the wrapped metric.''' + def __init__(self, wrappedClass, name, labelnames, **kwargs): + self._wrappedClass = wrappedClass + self._type = wrappedClass._type + self._name = name + self._labelnames = labelnames + self._kwargs = kwargs + self._lock = Lock() + self._metrics = {} + + for l in labelnames: + if l.startswith('__'): + raise ValueError('Invalid label metric name: ' + l) + + def labels(self, *labelvalues): + '''Return the child for the given labelset. + + Labels can be provided as a tuple or as a dict: + c = Counter('c', 'counter', ['l', 'm']) + # Set labels by position + c.labels('0', '1').inc() + # Set labels by name + c.labels({'l': '0', 'm': '1'}).inc() + ''' + if len(labelvalues) == 1 and type(labelvalues[0]) == dict: + if sorted(labelvalues[0].keys()) != sorted(self._labelnames): + raise ValueError('Incorrect label names') + labelvalues = tuple([unicode(labelvalues[0][l]) for l in self._labelnames]) + else: + if len(labelvalues) != len(self._labelnames): + raise ValueError('Incorrect label count') + labelvalues = tuple([unicode(l) for l in labelvalues]) + with self._lock: + if labelvalues not in self._metrics: + self._metrics[labelvalues] = self._wrappedClass(self._name, self._labelnames, labelvalues, **self._kwargs) + return self._metrics[labelvalues] + + def remove(self, *labelvalues): + '''Remove the given labelset from the metric.''' + if len(labelvalues) != len(self._labelnames): + raise ValueError('Incorrect label count') + labelvalues = tuple([unicode(l) for l in labelvalues]) + with self._lock: + del self._metrics[labelvalues] + + def _samples(self): + with self._lock: + metrics = self._metrics.copy() + for labels, metric in metrics.items(): + series_labels = list(dict(zip(self._labelnames, labels)).items()) + for suffix, sample_labels, value in metric._samples(): + yield (suffix, dict(series_labels + list(sample_labels.items())), value) + + +def _MetricWrapper(cls): + '''Provides common functionality for metrics.''' + def init(name, documentation, labelnames=(), namespace='', subsystem='', registry=REGISTRY, **kwargs): + full_name = '' + if namespace: + full_name += namespace + '_' + if subsystem: + full_name += subsystem + '_' + full_name += name + + if labelnames: + labelnames = tuple(labelnames) + for l in labelnames: + if not _METRIC_LABEL_NAME_RE.match(l): + raise ValueError('Invalid label metric name: ' + l) + if _RESERVED_METRIC_LABEL_NAME_RE.match(l): + raise ValueError('Reserved label metric name: ' + l) + if l in cls._reserved_labelnames: + raise ValueError('Reserved label metric name: ' + l) + collector = _LabelWrapper(cls, name, labelnames, **kwargs) + else: + collector = cls(name, labelnames, (), **kwargs) + + if not _METRIC_NAME_RE.match(full_name): + raise ValueError('Invalid metric name: ' + full_name) + + def collect(): + metric = Metric(full_name, documentation, cls._type) + for suffix, labels, value in collector._samples(): + metric.add_sample(full_name + suffix, labels, value) + return [metric] + collector.collect = collect + + if registry: + registry.register(collector) + return collector + + return init + + +@_MetricWrapper +class Counter(object): + '''A Counter tracks counts of events or running totals. + + Example use cases for Counters: + - Number of requests processed + - Number of items that were inserted into a queue + - Total amount of data that a system has processed + + Counters can only go up (and be reset when the process restarts). If your use case can go down, + you should use a Gauge instead. + + An example for a Counter: + + from prometheus_client import Counter + c = Counter('my_failures_total', 'Description of counter') + c.inc() # Increment by 1 + c.inc(1.6) # Increment by given value + ''' + _type = 'counter' + _reserved_labelnames = [] + + def __init__(self, name, labelnames, labelvalues): + self._value = _ValueClass(name, labelnames, labelvalues) + + def inc(self, amount=1): + '''Increment counter by the given amount.''' + if amount < 0: + raise ValueError('Counters can only be incremented by non-negative amounts.') + self._value.inc(amount) + + def count_exceptions(self, exception=Exception): + '''Count exceptions in a block of code or function. + + Can be used as a function decorator or context manager. + Increments the counter when an exception of the given + type is raised up out of the code. + ''' + + class ExceptionCounter(object): + def __init__(self, counter): + self._counter = counter + + def __enter__(self): + pass + + def __exit__(self, typ, value, traceback): + if isinstance(value, exception): + self._counter.inc() + + def __call__(self, f): + @wraps(f) + def wrapped(*args, **kwargs): + with self: + return f(*args, **kwargs) + return wrapped + + return ExceptionCounter(self) + + def _samples(self): + return (('', {}, self._value.get()), ) + + +@_MetricWrapper +class Gauge(object): + '''Gauge metric, to report instantaneous values. + + Examples of Gauges include: + Inprogress requests + Number of items in a queue + Free memory + Total memory + Temperature + + Gauges can go both up and down. + + from prometheus_client import Gauge + g = Gauge('my_inprogress_requests', 'Description of gauge') + g.inc() # Increment by 1 + g.dec(10) # Decrement by given value + g.set(4.2) # Set to a given value + ''' + _type = 'gauge' + _reserved_labelnames = [] + + def __init__(self, name, labelnames, labelvalues): + self._value = _ValueClass(name, labelnames, labelvalues) + + def inc(self, amount=1): + '''Increment gauge by the given amount.''' + self._value.inc(amount) + + def dec(self, amount=1): + '''Decrement gauge by the given amount.''' + self._value.inc(-amount) + + def set(self, value): + '''Set gauge to the given value.''' + self._value.set(float(value)) + + def set_to_current_time(self): + '''Set gauge to the current unixtime.''' + self.set(time.time()) + + def track_inprogress(self): + '''Track inprogress blocks of code or functions. + + Can be used as a function decorator or context manager. + Increments the gauge when the code is entered, + and decrements when it is exited. + ''' + + class InprogressTracker(object): + def __init__(self, gauge): + self._gauge = gauge + + def __enter__(self): + self._gauge.inc() + + def __exit__(self, typ, value, traceback): + self._gauge.dec() + + def __call__(self, f): + @wraps(f) + def wrapped(*args, **kwargs): + with self: + return f(*args, **kwargs) + return wrapped + + return InprogressTracker(self) + + def time(self): + '''Time a block of code or function, and set the duration in seconds. + + Can be used as a function decorator or context manager. + ''' + + class Timer(object): + def __init__(self, gauge): + self._gauge = gauge + + def __enter__(self): + self._start = time.time() + + def __exit__(self, typ, value, traceback): + # Time can go backwards. + self._gauge.set(max(time.time() - self._start, 0)) + + def __call__(self, f): + @wraps(f) + def wrapped(*args, **kwargs): + with self: + return f(*args, **kwargs) + return wrapped + + return Timer(self) + + def set_function(self, f): + '''Call the provided function to return the Gauge value. + + The function must return a float, and may be called from + multiple threads. + All other methods of the Gauge become NOOPs. + ''' + def samples(self): + return (('', {}, float(f())), ) + self._samples = types.MethodType(samples, self) + + def _samples(self): + return (('', {}, self._value.get()), ) + + +@_MetricWrapper +class Summary(object): + '''A Summary tracks the size and number of events. + + Example use cases for Summaries: + - Response latency + - Request size + + Example for a Summary: + + from prometheus_client import Summary + s = Summary('request_size_bytes', 'Request size (bytes)') + s.observe(512) # Observe 512 (bytes) + + Example for a Summary using time: + from prometheus_client import Summary + REQUEST_TIME = Summary('response_latency_seconds', 'Response latency (seconds)') + + @REQUEST_TIME.time() + def create_response(request): + """A dummy function""" + time.sleep(1) + + ''' + _type = 'summary' + _reserved_labelnames = ['quantile'] + + def __init__(self, name, labelnames, labelvalues): + self._count = _ValueClass(name + '_count', labelnames, labelvalues) + self._sum = _ValueClass(name + '_sum', labelnames, labelvalues) + + def observe(self, amount): + '''Observe the given amount.''' + self._count.inc(1) + self._sum.inc(amount) + + def time(self): + '''Time a block of code or function, and observe the duration in seconds. + + Can be used as a function decorator or context manager. + ''' + + class Timer(object): + def __init__(self, summary): + self._summary = summary + + def __enter__(self): + self._start = time.time() + + def __exit__(self, typ, value, traceback): + # Time can go backwards. + self._summary.observe(max(time.time() - self._start, 0)) + + def __call__(self, f): + @wraps(f) + def wrapped(*args, **kwargs): + with self: + return f(*args, **kwargs) + return wrapped + + return Timer(self) + + def _samples(self): + return ( + ('_count', {}, self._count.get()), + ('_sum', {}, self._sum.get())) + + +def _floatToGoString(d): + if d == _INF: + return '+Inf' + elif d == _MINUS_INF: + return '-Inf' + elif math.isnan(d): + return 'NaN' + else: + return repr(float(d)) + + +@_MetricWrapper +class Histogram(object): + '''A Histogram tracks the size and number of events in buckets. + + You can use Histograms for aggregatable calculation of quantiles. + + Example use cases: + - Response latency + - Request size + + Example for a Histogram: + + from prometheus_client import Histogram + h = Histogram('request_size_bytes', 'Request size (bytes)') + h.observe(512) # Observe 512 (bytes) + + + Example for a Histogram using time: + from prometheus_client import Histogram + REQUEST_TIME = Histogram('response_latency_seconds', 'Response latency (seconds)') + + @REQUEST_TIME.time() + def create_response(request): + """A dummy function""" + time.sleep(1) + + The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds. + They can be overridden by passing `buckets` keyword argument to `Histogram`. + ''' + _type = 'histogram' + _reserved_labelnames = ['histogram'] + + def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, _INF)): + self._sum = _ValueClass(name + '_sum', labelnames, labelvalues) + buckets = [float(b) for b in buckets] + if buckets != sorted(buckets): + # This is probably an error on the part of the user, + # so raise rather than sorting for them. + raise ValueError('Buckets not in sorted order') + if buckets and buckets[-1] != _INF: + buckets.append(_INF) + if len(buckets) < 2: + raise ValueError('Must have at least two buckets') + self._upper_bounds = buckets + self._buckets = [] + bucket_labelnames = labelnames + ('le',) + for b in buckets: + self._buckets.append(_ValueClass(name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),))) + + def observe(self, amount): + '''Observe the given amount.''' + self._sum.inc(amount) + for i, bound in enumerate(self._upper_bounds): + if amount <= bound: + self._buckets[i].inc(1) + break + + def time(self): + '''Time a block of code or function, and observe the duration in seconds. + + Can be used as a function decorator or context manager. + ''' + + class Timer(object): + def __init__(self, histogram): + self._histogram = histogram + + def __enter__(self): + self._start = time.time() + + def __exit__(self, typ, value, traceback): + # Time can go backwards. + self._histogram.observe(max(time.time() - self._start, 0)) + + def __call__(self, f): + @wraps(f) + def wrapped(*args, **kwargs): + with self: + return f(*args, **kwargs) + return wrapped + + return Timer(self) + + def _samples(self): + samples = [] + acc = 0 + for i, bound in enumerate(self._upper_bounds): + acc += self._buckets[i].get() + samples.append(('_bucket', {'le': _floatToGoString(bound)}, acc)) + samples.append(('_count', {}, acc)) + samples.append(('_sum', {}, self._sum.get())) + return tuple(samples) + diff --git a/prometheus_client/exposition.py b/prometheus_client/exposition.py new file mode 100644 index 0000000..3c4795d --- /dev/null +++ b/prometheus_client/exposition.py @@ -0,0 +1,131 @@ +#!/usr/bin/python + +from __future__ import unicode_literals + +import os +import socket +import time +import threading +from contextlib import closing + +from . import core +try: + from BaseHTTPServer import BaseHTTPRequestHandler + from BaseHTTPServer import HTTPServer + from urllib2 import build_opener, Request, HTTPHandler + from urllib import quote_plus +except ImportError: + # Python 3 + unicode = str + from http.server import BaseHTTPRequestHandler + from http.server import HTTPServer + from urllib.request import build_opener, Request, HTTPHandler + from urllib.parse import quote_plus + + +CONTENT_TYPE_LATEST = 'text/plain; version=0.0.4; charset=utf-8' +'''Content type of the latest text format''' + + +def generate_latest(registry=core.REGISTRY): + '''Returns the metrics from the registry in latest text format as a string.''' + output = [] + for metric in registry.collect(): + output.append('# HELP {0} {1}'.format( + metric.name, metric.documentation.replace('\\', r'\\').replace('\n', r'\n'))) + output.append('\n# TYPE {0} {1}\n'.format(metric.name, metric.type)) + for name, labels, value in metric.samples: + if labels: + labelstr = '{{{0}}}'.format(','.join( + ['{0}="{1}"'.format( + k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"')) + for k, v in sorted(labels.items())])) + else: + labelstr = '' + output.append('{0}{1} {2}\n'.format(name, labelstr, core._floatToGoString(value))) + return ''.join(output).encode('utf-8') + + +class MetricsHandler(BaseHTTPRequestHandler): + def do_GET(self): + self.send_response(200) + self.send_header('Content-Type', CONTENT_TYPE_LATEST) + self.end_headers() + self.wfile.write(generate_latest(core.REGISTRY)) + + def log_message(self, format, *args): + return + + +def start_http_server(port, addr=''): + """Starts a HTTP server for prometheus metrics as a daemon thread.""" + class PrometheusMetricsServer(threading.Thread): + def run(self): + httpd = HTTPServer((addr, port), MetricsHandler) + httpd.serve_forever() + t = PrometheusMetricsServer() + t.daemon = True + t.start() + + +def write_to_textfile(path, registry): + '''Write metrics to the given path. + + This is intended for use with the Node exporter textfile collector. + The path must end in .prom for the textfile collector to process it.''' + tmppath = '%s.%s.%s' % (path, os.getpid(), threading.current_thread().ident) + with open(tmppath, 'wb') as f: + f.write(generate_latest(registry)) + # rename(2) is atomic. + os.rename(tmppath, path) + + +def push_to_gateway(gateway, job, registry, grouping_key=None, timeout=None): + '''Push metrics to the given pushgateway. + + This overwrites all metrics with the same job and grouping_key. + This uses the PUT HTTP method.''' + _use_gateway('PUT', gateway, job, registry, grouping_key, timeout) + + +def pushadd_to_gateway(gateway, job, registry, grouping_key=None, timeout=None): + '''PushAdd metrics to the given pushgateway. + + This replaces metrics with the same name, job and grouping_key. + This uses the POST HTTP method.''' + _use_gateway('POST', gateway, job, registry, grouping_key, timeout) + + +def delete_from_gateway(gateway, job, grouping_key=None, timeout=None): + '''Delete metrics from the given pushgateway. + + This deletes metrics with the given job and grouping_key. + This uses the DELETE HTTP method.''' + _use_gateway('DELETE', gateway, job, None, grouping_key, timeout) + + +def _use_gateway(method, gateway, job, registry, grouping_key, timeout): + url = 'http://{0}/metrics/job/{1}'.format(gateway, quote_plus(job)) + + data = b'' + if method != 'DELETE': + data = generate_latest(registry) + + if grouping_key is None: + grouping_key = {} + url = url + ''.join(['/{0}/{1}'.format(quote_plus(str(k)), quote_plus(str(v))) + for k, v in sorted(grouping_key.items())]) + + request = Request(url, data=data) + request.add_header('Content-Type', CONTENT_TYPE_LATEST) + request.get_method = lambda: method + resp = build_opener(HTTPHandler).open(request, timeout=timeout) + if resp.code >= 400: + raise IOError("error talking to pushgateway: {0} {1}".format( + resp.code, resp.msg)) + +def instance_ip_grouping_key(): + '''Grouping key with instance set to the IP Address of this host.''' + with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s: + s.connect(('localhost', 0)) + return {'instance': s.getsockname()[0]} diff --git a/prometheus_client/parser.py b/prometheus_client/parser.py new file mode 100644 index 0000000..4ca3d7c --- /dev/null +++ b/prometheus_client/parser.py @@ -0,0 +1,224 @@ +#!/usr/bin/python + +from __future__ import unicode_literals + +try: + import StringIO +except ImportError: + # Python 3 + import io as StringIO + +from . import core + + +def text_string_to_metric_families(text): + """Parse Prometheus text format from a string. + + See text_fd_to_metric_families. + """ + for metric_family in text_fd_to_metric_families(StringIO.StringIO(text)): + yield metric_family + + +def _unescape_help(text): + result = [] + slash = False + + for char in text: + if slash: + if char == '\\': + result.append('\\') + elif char == 'n': + result.append('\n') + else: + result.append('\\' + char) + slash = False + else: + if char == '\\': + slash = True + else: + result.append(char) + + if slash: + result.append('\\') + + return ''.join(result) + + +def _parse_sample(text): + name = [] + labelname = [] + labelvalue = [] + value = [] + labels = {} + + state = 'name' + + for char in text: + if state == 'name': + if char == '{': + state = 'startoflabelname' + elif char == ' ' or char == '\t': + state = 'endofname' + else: + name.append(char) + elif state == 'endofname': + if char == ' ' or char == '\t': + pass + elif char == '{': + state = 'startoflabelname' + else: + value.append(char) + state = 'value' + elif state == 'startoflabelname': + if char == ' ' or char == '\t': + pass + elif char == '}': + state = 'endoflabels' + else: + state = 'labelname' + labelname.append(char) + elif state == 'labelname': + if char == '=': + state = 'labelvaluequote' + elif char == ' ' or char == '\t': + state = 'labelvalueequals' + else: + labelname.append(char) + elif state == 'labelvalueequals': + if char == '=': + state = 'labelvaluequote' + elif char == ' ' or char == '\t': + pass + else: + raise ValueError("Invalid line: " + text) + elif state == 'labelvaluequote': + if char == '"': + state = 'labelvalue' + elif char == ' ' or char == '\t': + pass + else: + raise ValueError("Invalid line: " + text) + elif state == 'labelvalue': + if char == '\\': + state = 'labelvalueslash' + elif char == '"': + labels[''.join(labelname)] = ''.join(labelvalue) + labelname = [] + labelvalue = [] + state = 'nextlabel' + else: + labelvalue.append(char) + elif state == 'labelvalueslash': + state = 'labelvalue' + if char == '\\': + labelvalue.append('\\') + elif char == 'n': + labelvalue.append('\n') + elif char == '"': + labelvalue.append('"') + else: + labelvalue.append('\\' + char) + elif state == 'nextlabel': + if char == ',': + state = 'labelname' + elif char == '}': + state = 'endoflabels' + elif char == ' ' or char == '\t': + pass + else: + raise ValueError("Invalid line: " + text) + elif state == 'endoflabels': + if char == ' ' or char == '\t': + pass + else: + value.append(char) + state = 'value' + elif state == 'value': + if char == ' ' or char == '\t': + # Timestamps are not supported, halt + break + else: + value.append(char) + return (''.join(name), labels, float(''.join(value))) + + +def text_fd_to_metric_families(fd): + """Parse Prometheus text format from a file descriptor. + + This is a laxer parser than the main Go parser, + so successful parsing does not imply that the parsed + text meets the specification. + + Yields core.Metric's. + """ + name = '' + documentation = '' + typ = 'untyped' + samples = [] + allowed_names = [] + + def build_metric(name, documentation, typ, samples): + metric = core.Metric(name, documentation, typ) + metric.samples = samples + return metric + + for line in fd: + line = line.strip() + + if line.startswith('#'): + parts = line.split(None, 3) + if len(parts) < 2: + continue + if parts[1] == 'HELP': + if parts[2] != name: + if name != '': + yield build_metric(name, documentation, typ, samples) + # New metric + name = parts[2] + typ = 'untyped' + samples = [] + allowed_names = [parts[2]] + if len(parts) == 4: + documentation = _unescape_help(parts[3]) + else: + documentation = '' + elif parts[1] == 'TYPE': + if parts[2] != name: + if name != '': + yield build_metric(name, documentation, typ, samples) + # New metric + name = parts[2] + documentation = '' + samples = [] + typ = parts[3] + allowed_names = { + 'counter': [''], + 'gauge': [''], + 'summary': ['_count', '_sum', ''], + 'histogram': ['_count', '_sum', '_bucket'], + }.get(typ, [parts[2]]) + allowed_names = [name + n for n in allowed_names] + else: + # Ignore other comment tokens + pass + elif line == '': + # Ignore blank lines + pass + else: + sample = _parse_sample(line) + if sample[0] not in allowed_names: + if name != '': + yield build_metric(name, documentation, typ, samples) + # New metric, yield immediately as untyped singleton + name = '' + documentation = '' + typ = 'untyped' + samples = [] + allowed_names = [] + yield build_metric(sample[0], documentation, typ, [sample]) + else: + samples.append(sample) + + if name != '': + yield build_metric(name, documentation, typ, samples) diff --git a/prometheus_client/process_collector.py b/prometheus_client/process_collector.py new file mode 100644 index 0000000..5c906c9 --- /dev/null +++ b/prometheus_client/process_collector.py @@ -0,0 +1,95 @@ +#!/usr/bin/python + +from __future__ import unicode_literals + +import os +import time +import threading + +from . import core +try: + import resource + _PAGESIZE = resource.getpagesize() +except ImportError: + # Not Unix + _PAGESIZE = 4096 + + +class ProcessCollector(object): + """Collector for Standard Exports such as cpu and memory.""" + def __init__(self, namespace='', pid=lambda: 'self', proc='/proc', registry=core.REGISTRY): + self._namespace = namespace + self._pid = pid + self._proc = proc + if namespace: + self._prefix = namespace + '_process_' + else: + self._prefix = 'process_' + self._ticks = 100.0 + try: + self._ticks = os.sysconf('SC_CLK_TCK') + except (ValueError, TypeError, AttributeError): + pass + + # This is used to test if we can access /proc. + self._btime = 0 + try: + self._btime = self._boot_time() + except IOError: + pass + if registry: + registry.register(self) + + def _boot_time(self): + with open(os.path.join(self._proc, 'stat')) as stat: + for line in stat: + if line.startswith('btime '): + return float(line.split()[1]) + + def collect(self): + if not self._btime: + return [] + + try: + pid = os.path.join(self._proc, str(self._pid()).strip()) + except: + # File likely didn't exist, fail silently. + raise + return [] + + result = [] + try: + with open(os.path.join(pid, 'stat')) as stat: + parts = (stat.read().split(')')[-1].split()) + vmem = core.GaugeMetricFamily(self._prefix + 'virtual_memory_bytes', + 'Virtual memory size in bytes', value=float(parts[20])) + rss = core.GaugeMetricFamily(self._prefix + 'resident_memory_bytes', 'Resident memory size in bytes', value=float(parts[21]) * _PAGESIZE) + start_time_secs = float(parts[19]) / self._ticks + start_time = core.GaugeMetricFamily(self._prefix + 'start_time_seconds', + 'Start time of the process since unix epoch in seconds.', value=start_time_secs + self._btime) + utime = float(parts[11]) / self._ticks + stime = float(parts[12]) / self._ticks + cpu = core.CounterMetricFamily(self._prefix + 'cpu_seconds_total', + 'Total user and system CPU time spent in seconds.', value=utime + stime) + result.extend([vmem, rss, start_time, cpu]) + except IOError: + pass + + try: + with open(os.path.join(pid, 'limits')) as limits: + for line in limits: + if line.startswith('Max open file'): + max_fds = core.GaugeMetricFamily(self._prefix + 'max_fds', + 'Maximum number of open file descriptors.', value=float(line.split()[3])) + break + open_fds = core.GaugeMetricFamily(self._prefix + 'open_fds', + 'Number of open file descriptors.', len(os.listdir(os.path.join(pid, 'fd')))) + result.extend([open_fds, max_fds]) + except IOError: + pass + + return result + + +PROCESS_COLLECTOR = ProcessCollector() +"""Default ProcessCollector in default Registry REGISTRY.""" |