From 5bcf65dcff75a01a90b688626d97735372b594e2 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 24 Dec 2015 10:59:02 +0000 Subject: Initial commit --- prometheus_haproxy_log_exporter/__init__.py | 3 + prometheus_haproxy_log_exporter/__main__.py | 3 + prometheus_haproxy_log_exporter/cli.py | 258 +++++++++++++++++++++ prometheus_haproxy_log_exporter/exposition.py | 99 ++++++++ prometheus_haproxy_log_exporter/file/__init__.py | 1 + .../file/log_file_processor.py | 22 ++ .../journal/__init__.py | 1 + .../journal/journal_processor.py | 35 +++ prometheus_haproxy_log_exporter/log_processing.py | 64 +++++ prometheus_haproxy_log_exporter/metrics.py | 242 +++++++++++++++++++ prometheus_haproxy_log_exporter/stdin/__init__.py | 1 + .../stdin/stdin_processor.py | 22 ++ 12 files changed, 751 insertions(+) create mode 100644 prometheus_haproxy_log_exporter/__init__.py create mode 100644 prometheus_haproxy_log_exporter/__main__.py create mode 100644 prometheus_haproxy_log_exporter/cli.py create mode 100644 prometheus_haproxy_log_exporter/exposition.py create mode 100644 prometheus_haproxy_log_exporter/file/__init__.py create mode 100644 prometheus_haproxy_log_exporter/file/log_file_processor.py create mode 100644 prometheus_haproxy_log_exporter/journal/__init__.py create mode 100644 prometheus_haproxy_log_exporter/journal/journal_processor.py create mode 100644 prometheus_haproxy_log_exporter/log_processing.py create mode 100644 prometheus_haproxy_log_exporter/metrics.py create mode 100644 prometheus_haproxy_log_exporter/stdin/__init__.py create mode 100644 prometheus_haproxy_log_exporter/stdin/stdin_processor.py (limited to 'prometheus_haproxy_log_exporter') diff --git a/prometheus_haproxy_log_exporter/__init__.py b/prometheus_haproxy_log_exporter/__init__.py new file mode 100644 index 0000000..069bf5d --- /dev/null +++ b/prometheus_haproxy_log_exporter/__init__.py @@ -0,0 +1,3 @@ +__ver_major__ = 0 +__ver_minor__ = 1 +__version__ = "%d.%d" % (__ver_major__, __ver_minor__) diff --git a/prometheus_haproxy_log_exporter/__main__.py b/prometheus_haproxy_log_exporter/__main__.py new file mode 100644 index 0000000..4e28416 --- /dev/null +++ b/prometheus_haproxy_log_exporter/__main__.py @@ -0,0 +1,3 @@ +from .cli import main + +main() diff --git a/prometheus_haproxy_log_exporter/cli.py b/prometheus_haproxy_log_exporter/cli.py new file mode 100644 index 0000000..079b7fc --- /dev/null +++ b/prometheus_haproxy_log_exporter/cli.py @@ -0,0 +1,258 @@ +#!/usr/bin/python3 + +# Copyright (C) 2016 Christopher Baines +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import logging +import configargparse + +from os.path import join, dirname, normpath +from http.server import HTTPServer + +from . import __version__ +from . import metrics +from .exposition import create_request_handler + + +def get_argument_parser(): + p = configargparse.ArgParser( + prog="prometheus-haproxy-log-exporter", + default_config_files=[ + '/etc/prometheus-haproxy-log-exporter/config', + ], + ) + + p.add( + '--version', + action='version', + version=__version__, + help="Show the version", + ) + + p.add( + '-c', + '--config', + is_config_file=True, + help="config file path", + ) + + p.add( + '--licence-location', + default=join(dirname(dirname(normpath(__file__))), 'LICENSE'), + help="The location of the licence, linked to through the web interface", + env_var='LICENCE_LOCATION', + ) + + # Processor arguments + processor = p.add_mutually_exclusive_group(required=True) + processor.add_argument( + '-f', + '--file', + help="read logs from a log file", + type=configargparse.FileType('r'), + action='store', + dest='file', + env_var='LOG_FILE', + ) + processor.add_argument( + '-j', + '--journal', + help="read logs from systemd journal", + dest='journal', + const="haproxy.service", + nargs='?', + action='store', + env_var='JOURNAL_UNIT', + ) + processor.add_argument( + '-s', + '--stdin', + help="read logs from stdin", + dest='stdin', + action='store_true', + env_var='STDIN', + ) + + p.add( + '--enabled-metrics', + nargs='+', + default=( + [ + 'requests_total', + 'bytes_read_total', + 'backend_queue_length', + 'server_queue_length', + ] + + list(metrics.TIMERS.keys()) + ), + choices=( + [ + 'requests_total', + 'bytes_read_total', + 'backend_queue_length', + 'server_queue_length', + ] + + list(metrics.TIMERS.keys()) + ), + help="Comma separated list of timers to export", + env_var='ENABLED_TIMERS', + ) + + for counter in ( + metrics.bytes_read_total, + metrics.requests_total, + ): + name_with_hyphens = counter.__name__.replace('_', '-') + + p.add( + '--%s-labels' % name_with_hyphens, + nargs='+', + default=['status_code', 'backend_name', 'server_name'], + choices=metrics.REQUEST_LABELS, + help="Labels to use for %s" % counter.__name__, + env_var='%s_LABELS' % counter.__name__.upper(), + ) + + for timer_name, (_, documentation) in metrics.TIMERS.items(): + p.add_argument( + '--%s-labels' % timer_name.replace('_', '-'), + nargs='+', + default=[], + choices=metrics.REQUEST_LABELS, + help="Labels for the %s timer" % timer_name, + env_var='%s_LABELS' % timer_name.upper(), + ) + + p.add_argument( + '--%s-buckets' % timer_name.replace('_', '-'), + nargs='+', + default=metrics.DEFAULT_TIMER_BUCKETS, + help="Labels for the %s metric" % timer_name, + env_var='%s_BUCKETS' % timer_name.upper(), + ) + + for queue_histogram in ( + metrics.backend_queue_length, + metrics.server_queue_length, + ): + name_with_hyphens = queue_histogram.__name__.replace('_', '-') + + p.add_argument( + '--%s-labels' % name_with_hyphens, + nargs='+', + default=[], + choices=metrics.REQUEST_LABELS, + help="Labels for the %s metric" % queue_histogram.__name__, + env_var='%s_LABELS' % queue_histogram.__name__.upper(), + ) + + p.add_argument( + '--%s-buckets' % name_with_hyphens, + nargs='+', + default=metrics.DEFAULT_QUEUE_LENGTH_BUCKETS, + help="Labels for the %s metric" % queue_histogram.__name__, + env_var='%s_BUCKETS' % queue_histogram.__name__.upper(), + ) + + return p + + +def create_log_processor(options, error): + from pprint import pprint + pprint(options) + + metric_updaters = [] + + for timer_name in metrics.TIMERS.keys(): + if timer_name not in options.enabled_metrics: + continue + + labelnames = getattr(options, '%s_labels' % timer_name) + buckets = getattr(options, '%s_buckets' % timer_name) + + metric_updaters.append( + metrics.timer(timer_name, labelnames, buckets), + ) + + for counter in ( + metrics.bytes_read_total, + metrics.requests_total, + ): + if counter.__name__ not in options.enabled_metrics: + continue + + labelnames = getattr(options, '%s_labels' % counter.__name__) + + metric_updaters.append(counter(labelnames)) + + for queue_histogram in ( + metrics.backend_queue_length, + metrics.server_queue_length, + ): + if queue_histogram.__name__ not in options.enabled_metrics: + continue + + labelnames = getattr(options, '%s_labels' % queue_histogram.__name__) + buckets = getattr(options, '%s_buckets' % queue_histogram.__name__) + + metric_updaters.append(queue_histogram(labelnames, buckets)) + + if options.stdin: + from .stdin import StdinProcessor + + log_processor = StdinProcessor(metric_updaters) + elif options.journal: + from .journal import JournalProcessor + + log_processor = JournalProcessor( + metric_updaters=metric_updaters, + unit=options.journal, + ) + elif options.file: + from .file import LogFileProcessor + + log_processor = LogFileProcessor( + metric_updaters=metric_updaters, + path=options.file, + ) + + return log_processor + + +def main(): + logging.basicConfig(level=logging.DEBUG) + + p = get_argument_parser() + options = p.parse_args() + + logging.info(p.format_values()) + + log_processor = create_log_processor(options, p.error) + log_processor.start() + + host = '0.0.0.0' + port = 9129 + + httpd = HTTPServer( + (host, port), + create_request_handler(options.licence_location), + ) + + logging.info("Listing on port %s:%d" % (host, port)) + + try: + httpd.serve_forever() + except KeyboardInterrupt: + pass diff --git a/prometheus_haproxy_log_exporter/exposition.py b/prometheus_haproxy_log_exporter/exposition.py new file mode 100644 index 0000000..aec3521 --- /dev/null +++ b/prometheus_haproxy_log_exporter/exposition.py @@ -0,0 +1,99 @@ +# Copyright (C) 2016 Christopher Baines +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from prometheus_client.exposition import MetricsHandler + +index_page = """ + + + + + + + Prometheus HAProxy Log Exporter + + + +

HAProxy Log exporter for Prometheus

+ +

+ This is a highly configurable exporter for HAProxy. +

+ + View metrics + +

Example Configuration

+

+ You must configure Prometheus to scrape the metrics exported here. The port + is 9129, and the configuration should look something like the example + below. +

+

+    scrape_configs:
+      - job_name: haproxy_log
+        target_groups:
+          - targets:
+            - MACHINE_ADDRESS:9129
+  
+ +

Information

+

+ Copyright (C) 2016 Christopher Baines
+ View Licence +

+ +

+ The source may be obtained from + + this Git repository +

+ +

+ This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. +

+ +

+ This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. +

+ + +""" + + +def create_request_handler(licence_location): + class RequestHandler(MetricsHandler): + def do_GET(self): + if self.path == "/metrics": + return super().do_GET() + + self.send_response(200) + self.end_headers() + + if self.path == "/licence": + with open( + licence_location, + 'rb', + ) as licence: + self.wfile.write(licence.read()) + else: + self.wfile.write(index_page.encode('UTF-8')) + + return RequestHandler diff --git a/prometheus_haproxy_log_exporter/file/__init__.py b/prometheus_haproxy_log_exporter/file/__init__.py new file mode 100644 index 0000000..42c08b1 --- /dev/null +++ b/prometheus_haproxy_log_exporter/file/__init__.py @@ -0,0 +1 @@ +from .log_file_processor import LogFileProcessor diff --git a/prometheus_haproxy_log_exporter/file/log_file_processor.py b/prometheus_haproxy_log_exporter/file/log_file_processor.py new file mode 100644 index 0000000..22b6d42 --- /dev/null +++ b/prometheus_haproxy_log_exporter/file/log_file_processor.py @@ -0,0 +1,22 @@ +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from pygtail import Pygtail + +from ..log_processing import AbstractLogProcessor + + +class LogFileProcessor(AbstractLogProcessor): + def run(self, path): + for line in Pygtail(path): + self.update_metrics(line) diff --git a/prometheus_haproxy_log_exporter/journal/__init__.py b/prometheus_haproxy_log_exporter/journal/__init__.py new file mode 100644 index 0000000..3ac2398 --- /dev/null +++ b/prometheus_haproxy_log_exporter/journal/__init__.py @@ -0,0 +1 @@ +from .journal_processor import JournalProcessor diff --git a/prometheus_haproxy_log_exporter/journal/journal_processor.py b/prometheus_haproxy_log_exporter/journal/journal_processor.py new file mode 100644 index 0000000..1d022d4 --- /dev/null +++ b/prometheus_haproxy_log_exporter/journal/journal_processor.py @@ -0,0 +1,35 @@ +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from systemd import journal + +from ..log_processing import AbstractLogProcessor + + +class JournalProcessor(AbstractLogProcessor): + def __init__(self, unit, *args, **kwargs): + super(JournalProcessor, self).__init__(*args, **kwargs) + + self.unit = unit + + def run(self): + with journal.Reader() as j: + j.add_match(_SYSTEMD_UNIT=self.unit) + + j.seek_tail() + j.get_previous() + + while True: + for entry in j: + self.update_metrics(entry['MESSAGE']) + j.wait() diff --git a/prometheus_haproxy_log_exporter/log_processing.py b/prometheus_haproxy_log_exporter/log_processing.py new file mode 100644 index 0000000..ec87a0a --- /dev/null +++ b/prometheus_haproxy_log_exporter/log_processing.py @@ -0,0 +1,64 @@ +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import re +import logging +import threading + +from haproxy.haproxy_logline import HaproxyLogLine +from prometheus_client import Counter + +from .metrics import NAMESPACE + +JOURNAL_REGEX = re.compile( + # Dec 9 + r'\A\w+\s+\d+\s+' + # 13:01:26 + r'\d+:\d+:\d+\s+' + # localhost.localdomain haproxy[28029]: + r'([\.a-zA-Z0-9_-]+)\s+\w+\[\d+\]:\s+', +) + + +class AbstractLogProcessor(threading.Thread): + def __init__(self, metric_updaters, *args, **kwargs): + super(AbstractLogProcessor, self).__init__(*args, **kwargs) + + self.metric_updaters = metric_updaters + + self.processing_errors = Counter( + 'processing_errors_total', + "Total log lines which could not be processed", + namespace=NAMESPACE, + ) + + def update_metrics(self, raw_line): + try: + raw_line = JOURNAL_REGEX.sub('', raw_line.strip()) + line = HaproxyLogLine(raw_line.strip()) + except Exception as e: + self.processing_errors.inc() + logging.exception("%s (line parsing error): %s" % (e, raw_line)) + return + + if not line.valid: + self.processing_errors.inc() + logging.warning("Failed to parse line: %s" % raw_line) + return + + try: + for metric_updater in self.metric_updaters: + metric_updater(line) + except Exception as e: + self.processing_errors.inc() + logging.exception("%s (error updating metrics): %s" % (e, raw_line)) diff --git a/prometheus_haproxy_log_exporter/metrics.py b/prometheus_haproxy_log_exporter/metrics.py new file mode 100644 index 0000000..1ee8c11 --- /dev/null +++ b/prometheus_haproxy_log_exporter/metrics.py @@ -0,0 +1,242 @@ +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import itertools + +from prometheus_client import Counter, Histogram + +NAMESPACE = 'haproxy_log' + +TIMERS = { + 'request_wait_milliseconds': ( + 'time_wait_request', + "Time spent waiting for the client to send the full HTTP request (Tq in HAProxy)", + ), + 'server_tcp_connection_establish_milliseconds': ( + 'time_connect_server', + "Time in milliseconds to connect to the final server (Tc in HAProxy)", + ), + 'request_queued_milliseconds': ( + 'time_wait_queues', + "Time that the request spend on HAProxy queues (Tw in HAProxy)", + ), + 'response_processing_milliseconds': ( + 'time_wait_response', + "Time waiting the downstream server to send the full HTTP response (Tr in HAProxy)", + ), + 'session_duration_milliseconds': ( + 'total_time', + "Time between accepting the HTTP request and sending back the HTTP response (Tt in HAProxy)", + ), +} + +TIMER_ABORT_COUNTERS = { + 'request_wait_milliseconds': ( # Tq + 'request_abort_total', + "Count of connections aborted before a complete request was received", + ), + 'server_tcp_connection_establish_milliseconds': ( # Tc + 'request_pre_server_connection_abort', + "Count of connections aborted before a connection to a server was established", + ), + 'request_queued_milliseconds': ( # Tw + 'request_pre_queue_abort_total', + "Count of connections aborted before reaching the queue", + ), + 'response_processing_milliseconds': ( # Tr + 'request_response_abort_total', + "Count of connections for which the last response header from the server was never received", + ), +} + +TIMER_NAMES = TIMERS.keys() + +# These are attributes associated with each line processed, which can be used +# as labels on metrics +REQUEST_LABELS = ( + 'status_code', + 'frontend_name', + 'backend_name', + 'server_name', + 'http_request_path', + 'http_request_method', + 'client_ip', + 'client_port', +) + +# These are the default buckets for the Prometheus python client, adjusted to +# be in milliseconds +DEFAULT_TIMER_BUCKETS = ( + 5, 10, 25, + 50, 75, 100, 250, + 500, 750, 1000, 2500, + 5000, 7500, 10000, float('inf'), +) + + +DEFAULT_QUEUE_LENGTH_BUCKETS = tuple(itertools.chain( + range(1, 10), + (20, 30, 40, 60, 100, float('inf')), +)) + + +def requests_total(labelnames): + requests_total = Counter( + 'requests_total', + "Total processed requests", + namespace=NAMESPACE, + labelnames=labelnames, + ) + + if len(labelnames) == 0: + def observe(line): + requests_total.inc() + else: + def observe(line): + requests_total.labels({ + label: getattr(line, label) + for label in labelnames + }).inc() + + return observe + + +def timer(timer_name, labelnames, buckets): + attribute, documentation = TIMERS[timer_name] + + all_labelnames = labelnames + + if timer_name == 'session_duration_milliseconds': + all_labelnames = labelnames + ['logasap'] + + histogram = Histogram( + timer_name, + documentation=documentation, + namespace=NAMESPACE, + labelnames=tuple(all_labelnames), + buckets=buckets, + ) + + if timer_name == 'session_duration_milliseconds': + def observe(line): + raw_value = getattr(line, attribute) + + label_values = { + label: getattr(line, label) + for label in labelnames + } + + if raw_value.startswith('+'): + label_values['logasap'] = True + value = float(raw_value[1:]) + else: + label_values['logasap'] = False + value = float(raw_value) + + histogram.labels(label_values).observe(value) + else: + abort_counter_name, abort_counter_documentation = TIMER_ABORT_COUNTERS[timer_name] + + abort_counter = Counter( + abort_counter_name, + abort_counter_documentation, + namespace=NAMESPACE, + labelnames=labelnames, + ) + + if len(labelnames) == 0: + def observe(line): + value = float(getattr(line, attribute)) + + if value == -1: + abort_counter.inc() + else: + histogram.observe(value) + else: + def observe(line): + value = float(getattr(line, attribute)) + + label_values = { + label: getattr(line, label) + for label in labelnames + } + + if value == -1: + abort_counter.labels(label_values).inc() + else: + histogram.labels(label_values).observe(value) + + return observe + + +def bytes_read_total(labelnames): + counter = Counter( + 'bytes_read_total', + "Bytes read total", + namespace=NAMESPACE, + labelnames=labelnames, + ) + + if len(labelnames) == 0: + def observe(line): + counter.inc() + else: + def observe(line): + counter.labels({ + label: getattr(line, label) + for label in labelnames + }).inc() + + return observe + + +def backend_queue_length(labelnames, buckets): + histogram = Histogram( + 'backend_queue_length', + "Requests processed before this one in the backend queue", + namespace=NAMESPACE, + labelnames=tuple(labelnames), + ) + + if len(labelnames) == 0: + def observe(line): + histogram.observe(line.queue_backend) + else: + def observe(line): + histogram.labels({ + label: getattr(line, label) + for label in labelnames + }).observe(line.queue_backend) + + return observe + + +def server_queue_length(labelnames, buckets): + histogram = Histogram( + 'server_queue_length', + "Length of the server queue when the request was received", + namespace=NAMESPACE, + labelnames=tuple(labelnames), + ) + + if len(labelnames) == 0: + def observe(line): + histogram.observe(line.queue_server) + else: + def observe(line): + histogram.labels({ + label: getattr(line, label) + for label in labelnames + }).observe(line.queue_server) + + return observe diff --git a/prometheus_haproxy_log_exporter/stdin/__init__.py b/prometheus_haproxy_log_exporter/stdin/__init__.py new file mode 100644 index 0000000..741986e --- /dev/null +++ b/prometheus_haproxy_log_exporter/stdin/__init__.py @@ -0,0 +1 @@ +from .stdin_processor import StdinProcessor diff --git a/prometheus_haproxy_log_exporter/stdin/stdin_processor.py b/prometheus_haproxy_log_exporter/stdin/stdin_processor.py new file mode 100644 index 0000000..ab9cd7a --- /dev/null +++ b/prometheus_haproxy_log_exporter/stdin/stdin_processor.py @@ -0,0 +1,22 @@ +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import sys + +from ..log_processing import AbstractLogProcessor + + +class StdinProcessor(AbstractLogProcessor): + def run(self): + for line in sys.stdin: + self.update_metrics(line) -- cgit v1.2.3