aboutsummaryrefslogtreecommitdiff
path: root/prometheus_haproxy_log_exporter
diff options
context:
space:
mode:
authorChristopher Baines <chris@lucida.cbaines.net>2015-12-24 10:59:02 +0000
committerChristopher Baines <mail@cbaines.net>2016-02-23 23:00:55 +0000
commit5bcf65dcff75a01a90b688626d97735372b594e2 (patch)
tree617016a92505090cc830ae8844169933a06ff56e /prometheus_haproxy_log_exporter
downloadprometheus-haproxy-log-exporter-5bcf65dcff75a01a90b688626d97735372b594e2.tar
prometheus-haproxy-log-exporter-5bcf65dcff75a01a90b688626d97735372b594e2.tar.gz
Initial commit
Diffstat (limited to 'prometheus_haproxy_log_exporter')
-rw-r--r--prometheus_haproxy_log_exporter/__init__.py3
-rw-r--r--prometheus_haproxy_log_exporter/__main__.py3
-rw-r--r--prometheus_haproxy_log_exporter/cli.py258
-rw-r--r--prometheus_haproxy_log_exporter/exposition.py99
-rw-r--r--prometheus_haproxy_log_exporter/file/__init__.py1
-rw-r--r--prometheus_haproxy_log_exporter/file/log_file_processor.py22
-rw-r--r--prometheus_haproxy_log_exporter/journal/__init__.py1
-rw-r--r--prometheus_haproxy_log_exporter/journal/journal_processor.py35
-rw-r--r--prometheus_haproxy_log_exporter/log_processing.py64
-rw-r--r--prometheus_haproxy_log_exporter/metrics.py242
-rw-r--r--prometheus_haproxy_log_exporter/stdin/__init__.py1
-rw-r--r--prometheus_haproxy_log_exporter/stdin/stdin_processor.py22
12 files changed, 751 insertions, 0 deletions
diff --git a/prometheus_haproxy_log_exporter/__init__.py b/prometheus_haproxy_log_exporter/__init__.py
new file mode 100644
index 0000000..069bf5d
--- /dev/null
+++ b/prometheus_haproxy_log_exporter/__init__.py
@@ -0,0 +1,3 @@
+__ver_major__ = 0
+__ver_minor__ = 1
+__version__ = "%d.%d" % (__ver_major__, __ver_minor__)
diff --git a/prometheus_haproxy_log_exporter/__main__.py b/prometheus_haproxy_log_exporter/__main__.py
new file mode 100644
index 0000000..4e28416
--- /dev/null
+++ b/prometheus_haproxy_log_exporter/__main__.py
@@ -0,0 +1,3 @@
+from .cli import main
+
+main()
diff --git a/prometheus_haproxy_log_exporter/cli.py b/prometheus_haproxy_log_exporter/cli.py
new file mode 100644
index 0000000..079b7fc
--- /dev/null
+++ b/prometheus_haproxy_log_exporter/cli.py
@@ -0,0 +1,258 @@
+#!/usr/bin/python3
+
+# Copyright (C) 2016 Christopher Baines <mail@cbaines.net>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+import configargparse
+
+from os.path import join, dirname, normpath
+from http.server import HTTPServer
+
+from . import __version__
+from . import metrics
+from .exposition import create_request_handler
+
+
+def get_argument_parser():
+ p = configargparse.ArgParser(
+ prog="prometheus-haproxy-log-exporter",
+ default_config_files=[
+ '/etc/prometheus-haproxy-log-exporter/config',
+ ],
+ )
+
+ p.add(
+ '--version',
+ action='version',
+ version=__version__,
+ help="Show the version",
+ )
+
+ p.add(
+ '-c',
+ '--config',
+ is_config_file=True,
+ help="config file path",
+ )
+
+ p.add(
+ '--licence-location',
+ default=join(dirname(dirname(normpath(__file__))), 'LICENSE'),
+ help="The location of the licence, linked to through the web interface",
+ env_var='LICENCE_LOCATION',
+ )
+
+ # Processor arguments
+ processor = p.add_mutually_exclusive_group(required=True)
+ processor.add_argument(
+ '-f',
+ '--file',
+ help="read logs from a log file",
+ type=configargparse.FileType('r'),
+ action='store',
+ dest='file',
+ env_var='LOG_FILE',
+ )
+ processor.add_argument(
+ '-j',
+ '--journal',
+ help="read logs from systemd journal",
+ dest='journal',
+ const="haproxy.service",
+ nargs='?',
+ action='store',
+ env_var='JOURNAL_UNIT',
+ )
+ processor.add_argument(
+ '-s',
+ '--stdin',
+ help="read logs from stdin",
+ dest='stdin',
+ action='store_true',
+ env_var='STDIN',
+ )
+
+ p.add(
+ '--enabled-metrics',
+ nargs='+',
+ default=(
+ [
+ 'requests_total',
+ 'bytes_read_total',
+ 'backend_queue_length',
+ 'server_queue_length',
+ ] +
+ list(metrics.TIMERS.keys())
+ ),
+ choices=(
+ [
+ 'requests_total',
+ 'bytes_read_total',
+ 'backend_queue_length',
+ 'server_queue_length',
+ ] +
+ list(metrics.TIMERS.keys())
+ ),
+ help="Comma separated list of timers to export",
+ env_var='ENABLED_TIMERS',
+ )
+
+ for counter in (
+ metrics.bytes_read_total,
+ metrics.requests_total,
+ ):
+ name_with_hyphens = counter.__name__.replace('_', '-')
+
+ p.add(
+ '--%s-labels' % name_with_hyphens,
+ nargs='+',
+ default=['status_code', 'backend_name', 'server_name'],
+ choices=metrics.REQUEST_LABELS,
+ help="Labels to use for %s" % counter.__name__,
+ env_var='%s_LABELS' % counter.__name__.upper(),
+ )
+
+ for timer_name, (_, documentation) in metrics.TIMERS.items():
+ p.add_argument(
+ '--%s-labels' % timer_name.replace('_', '-'),
+ nargs='+',
+ default=[],
+ choices=metrics.REQUEST_LABELS,
+ help="Labels for the %s timer" % timer_name,
+ env_var='%s_LABELS' % timer_name.upper(),
+ )
+
+ p.add_argument(
+ '--%s-buckets' % timer_name.replace('_', '-'),
+ nargs='+',
+ default=metrics.DEFAULT_TIMER_BUCKETS,
+ help="Labels for the %s metric" % timer_name,
+ env_var='%s_BUCKETS' % timer_name.upper(),
+ )
+
+ for queue_histogram in (
+ metrics.backend_queue_length,
+ metrics.server_queue_length,
+ ):
+ name_with_hyphens = queue_histogram.__name__.replace('_', '-')
+
+ p.add_argument(
+ '--%s-labels' % name_with_hyphens,
+ nargs='+',
+ default=[],
+ choices=metrics.REQUEST_LABELS,
+ help="Labels for the %s metric" % queue_histogram.__name__,
+ env_var='%s_LABELS' % queue_histogram.__name__.upper(),
+ )
+
+ p.add_argument(
+ '--%s-buckets' % name_with_hyphens,
+ nargs='+',
+ default=metrics.DEFAULT_QUEUE_LENGTH_BUCKETS,
+ help="Labels for the %s metric" % queue_histogram.__name__,
+ env_var='%s_BUCKETS' % queue_histogram.__name__.upper(),
+ )
+
+ return p
+
+
+def create_log_processor(options, error):
+ from pprint import pprint
+ pprint(options)
+
+ metric_updaters = []
+
+ for timer_name in metrics.TIMERS.keys():
+ if timer_name not in options.enabled_metrics:
+ continue
+
+ labelnames = getattr(options, '%s_labels' % timer_name)
+ buckets = getattr(options, '%s_buckets' % timer_name)
+
+ metric_updaters.append(
+ metrics.timer(timer_name, labelnames, buckets),
+ )
+
+ for counter in (
+ metrics.bytes_read_total,
+ metrics.requests_total,
+ ):
+ if counter.__name__ not in options.enabled_metrics:
+ continue
+
+ labelnames = getattr(options, '%s_labels' % counter.__name__)
+
+ metric_updaters.append(counter(labelnames))
+
+ for queue_histogram in (
+ metrics.backend_queue_length,
+ metrics.server_queue_length,
+ ):
+ if queue_histogram.__name__ not in options.enabled_metrics:
+ continue
+
+ labelnames = getattr(options, '%s_labels' % queue_histogram.__name__)
+ buckets = getattr(options, '%s_buckets' % queue_histogram.__name__)
+
+ metric_updaters.append(queue_histogram(labelnames, buckets))
+
+ if options.stdin:
+ from .stdin import StdinProcessor
+
+ log_processor = StdinProcessor(metric_updaters)
+ elif options.journal:
+ from .journal import JournalProcessor
+
+ log_processor = JournalProcessor(
+ metric_updaters=metric_updaters,
+ unit=options.journal,
+ )
+ elif options.file:
+ from .file import LogFileProcessor
+
+ log_processor = LogFileProcessor(
+ metric_updaters=metric_updaters,
+ path=options.file,
+ )
+
+ return log_processor
+
+
+def main():
+ logging.basicConfig(level=logging.DEBUG)
+
+ p = get_argument_parser()
+ options = p.parse_args()
+
+ logging.info(p.format_values())
+
+ log_processor = create_log_processor(options, p.error)
+ log_processor.start()
+
+ host = '0.0.0.0'
+ port = 9129
+
+ httpd = HTTPServer(
+ (host, port),
+ create_request_handler(options.licence_location),
+ )
+
+ logging.info("Listing on port %s:%d" % (host, port))
+
+ try:
+ httpd.serve_forever()
+ except KeyboardInterrupt:
+ pass
diff --git a/prometheus_haproxy_log_exporter/exposition.py b/prometheus_haproxy_log_exporter/exposition.py
new file mode 100644
index 0000000..aec3521
--- /dev/null
+++ b/prometheus_haproxy_log_exporter/exposition.py
@@ -0,0 +1,99 @@
+# Copyright (C) 2016 Christopher Baines <mail@cbaines.net>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from prometheus_client.exposition import MetricsHandler
+
+index_page = """
+<!doctype html>
+
+<html lang="en">
+<head>
+ <meta charset="utf-8">
+
+ <title>Prometheus HAProxy Log Exporter</title>
+</head>
+
+<body>
+ <h1>HAProxy Log exporter for Prometheus</h1>
+
+ <p>
+ This is a highly configurable exporter for HAProxy.
+ </p>
+
+ <a href="/metrics">View metrics</a>
+
+ <h2>Example Configuration</h2>
+ <p>
+ You must configure Prometheus to scrape the metrics exported here. The port
+ is 9129, and the configuration should look something like the example
+ below.
+ </p>
+ <pre><code>
+ scrape_configs:
+ - job_name: haproxy_log
+ target_groups:
+ - targets:
+ - MACHINE_ADDRESS:9129
+ </code></pre>
+
+ <h2>Information</h2>
+ <p>
+ Copyright (C) 2016 Christopher Baines <mail@cbaines.net><br>
+ <a href="/licence">View Licence</a>
+ </p>
+
+ <p>
+ The source may be obtained from
+ <a href="http://git.cbaines.net/prometheus-haproxy-log-exporter/">
+ this Git repository</a>
+ </p>
+
+ <p>
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+ </p>
+
+ <p>
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+ </p>
+</body>
+</html>
+"""
+
+
+def create_request_handler(licence_location):
+ class RequestHandler(MetricsHandler):
+ def do_GET(self):
+ if self.path == "/metrics":
+ return super().do_GET()
+
+ self.send_response(200)
+ self.end_headers()
+
+ if self.path == "/licence":
+ with open(
+ licence_location,
+ 'rb',
+ ) as licence:
+ self.wfile.write(licence.read())
+ else:
+ self.wfile.write(index_page.encode('UTF-8'))
+
+ return RequestHandler
diff --git a/prometheus_haproxy_log_exporter/file/__init__.py b/prometheus_haproxy_log_exporter/file/__init__.py
new file mode 100644
index 0000000..42c08b1
--- /dev/null
+++ b/prometheus_haproxy_log_exporter/file/__init__.py
@@ -0,0 +1 @@
+from .log_file_processor import LogFileProcessor
diff --git a/prometheus_haproxy_log_exporter/file/log_file_processor.py b/prometheus_haproxy_log_exporter/file/log_file_processor.py
new file mode 100644
index 0000000..22b6d42
--- /dev/null
+++ b/prometheus_haproxy_log_exporter/file/log_file_processor.py
@@ -0,0 +1,22 @@
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from pygtail import Pygtail
+
+from ..log_processing import AbstractLogProcessor
+
+
+class LogFileProcessor(AbstractLogProcessor):
+ def run(self, path):
+ for line in Pygtail(path):
+ self.update_metrics(line)
diff --git a/prometheus_haproxy_log_exporter/journal/__init__.py b/prometheus_haproxy_log_exporter/journal/__init__.py
new file mode 100644
index 0000000..3ac2398
--- /dev/null
+++ b/prometheus_haproxy_log_exporter/journal/__init__.py
@@ -0,0 +1 @@
+from .journal_processor import JournalProcessor
diff --git a/prometheus_haproxy_log_exporter/journal/journal_processor.py b/prometheus_haproxy_log_exporter/journal/journal_processor.py
new file mode 100644
index 0000000..1d022d4
--- /dev/null
+++ b/prometheus_haproxy_log_exporter/journal/journal_processor.py
@@ -0,0 +1,35 @@
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from systemd import journal
+
+from ..log_processing import AbstractLogProcessor
+
+
+class JournalProcessor(AbstractLogProcessor):
+ def __init__(self, unit, *args, **kwargs):
+ super(JournalProcessor, self).__init__(*args, **kwargs)
+
+ self.unit = unit
+
+ def run(self):
+ with journal.Reader() as j:
+ j.add_match(_SYSTEMD_UNIT=self.unit)
+
+ j.seek_tail()
+ j.get_previous()
+
+ while True:
+ for entry in j:
+ self.update_metrics(entry['MESSAGE'])
+ j.wait()
diff --git a/prometheus_haproxy_log_exporter/log_processing.py b/prometheus_haproxy_log_exporter/log_processing.py
new file mode 100644
index 0000000..ec87a0a
--- /dev/null
+++ b/prometheus_haproxy_log_exporter/log_processing.py
@@ -0,0 +1,64 @@
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import re
+import logging
+import threading
+
+from haproxy.haproxy_logline import HaproxyLogLine
+from prometheus_client import Counter
+
+from .metrics import NAMESPACE
+
+JOURNAL_REGEX = re.compile(
+ # Dec 9
+ r'\A\w+\s+\d+\s+'
+ # 13:01:26
+ r'\d+:\d+:\d+\s+'
+ # localhost.localdomain haproxy[28029]:
+ r'([\.a-zA-Z0-9_-]+)\s+\w+\[\d+\]:\s+',
+)
+
+
+class AbstractLogProcessor(threading.Thread):
+ def __init__(self, metric_updaters, *args, **kwargs):
+ super(AbstractLogProcessor, self).__init__(*args, **kwargs)
+
+ self.metric_updaters = metric_updaters
+
+ self.processing_errors = Counter(
+ 'processing_errors_total',
+ "Total log lines which could not be processed",
+ namespace=NAMESPACE,
+ )
+
+ def update_metrics(self, raw_line):
+ try:
+ raw_line = JOURNAL_REGEX.sub('', raw_line.strip())
+ line = HaproxyLogLine(raw_line.strip())
+ except Exception as e:
+ self.processing_errors.inc()
+ logging.exception("%s (line parsing error): %s" % (e, raw_line))
+ return
+
+ if not line.valid:
+ self.processing_errors.inc()
+ logging.warning("Failed to parse line: %s" % raw_line)
+ return
+
+ try:
+ for metric_updater in self.metric_updaters:
+ metric_updater(line)
+ except Exception as e:
+ self.processing_errors.inc()
+ logging.exception("%s (error updating metrics): %s" % (e, raw_line))
diff --git a/prometheus_haproxy_log_exporter/metrics.py b/prometheus_haproxy_log_exporter/metrics.py
new file mode 100644
index 0000000..1ee8c11
--- /dev/null
+++ b/prometheus_haproxy_log_exporter/metrics.py
@@ -0,0 +1,242 @@
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import itertools
+
+from prometheus_client import Counter, Histogram
+
+NAMESPACE = 'haproxy_log'
+
+TIMERS = {
+ 'request_wait_milliseconds': (
+ 'time_wait_request',
+ "Time spent waiting for the client to send the full HTTP request (Tq in HAProxy)",
+ ),
+ 'server_tcp_connection_establish_milliseconds': (
+ 'time_connect_server',
+ "Time in milliseconds to connect to the final server (Tc in HAProxy)",
+ ),
+ 'request_queued_milliseconds': (
+ 'time_wait_queues',
+ "Time that the request spend on HAProxy queues (Tw in HAProxy)",
+ ),
+ 'response_processing_milliseconds': (
+ 'time_wait_response',
+ "Time waiting the downstream server to send the full HTTP response (Tr in HAProxy)",
+ ),
+ 'session_duration_milliseconds': (
+ 'total_time',
+ "Time between accepting the HTTP request and sending back the HTTP response (Tt in HAProxy)",
+ ),
+}
+
+TIMER_ABORT_COUNTERS = {
+ 'request_wait_milliseconds': ( # Tq
+ 'request_abort_total',
+ "Count of connections aborted before a complete request was received",
+ ),
+ 'server_tcp_connection_establish_milliseconds': ( # Tc
+ 'request_pre_server_connection_abort',
+ "Count of connections aborted before a connection to a server was established",
+ ),
+ 'request_queued_milliseconds': ( # Tw
+ 'request_pre_queue_abort_total',
+ "Count of connections aborted before reaching the queue",
+ ),
+ 'response_processing_milliseconds': ( # Tr
+ 'request_response_abort_total',
+ "Count of connections for which the last response header from the server was never received",
+ ),
+}
+
+TIMER_NAMES = TIMERS.keys()
+
+# These are attributes associated with each line processed, which can be used
+# as labels on metrics
+REQUEST_LABELS = (
+ 'status_code',
+ 'frontend_name',
+ 'backend_name',
+ 'server_name',
+ 'http_request_path',
+ 'http_request_method',
+ 'client_ip',
+ 'client_port',
+)
+
+# These are the default buckets for the Prometheus python client, adjusted to
+# be in milliseconds
+DEFAULT_TIMER_BUCKETS = (
+ 5, 10, 25,
+ 50, 75, 100, 250,
+ 500, 750, 1000, 2500,
+ 5000, 7500, 10000, float('inf'),
+)
+
+
+DEFAULT_QUEUE_LENGTH_BUCKETS = tuple(itertools.chain(
+ range(1, 10),
+ (20, 30, 40, 60, 100, float('inf')),
+))
+
+
+def requests_total(labelnames):
+ requests_total = Counter(
+ 'requests_total',
+ "Total processed requests",
+ namespace=NAMESPACE,
+ labelnames=labelnames,
+ )
+
+ if len(labelnames) == 0:
+ def observe(line):
+ requests_total.inc()
+ else:
+ def observe(line):
+ requests_total.labels({
+ label: getattr(line, label)
+ for label in labelnames
+ }).inc()
+
+ return observe
+
+
+def timer(timer_name, labelnames, buckets):
+ attribute, documentation = TIMERS[timer_name]
+
+ all_labelnames = labelnames
+
+ if timer_name == 'session_duration_milliseconds':
+ all_labelnames = labelnames + ['logasap']
+
+ histogram = Histogram(
+ timer_name,
+ documentation=documentation,
+ namespace=NAMESPACE,
+ labelnames=tuple(all_labelnames),
+ buckets=buckets,
+ )
+
+ if timer_name == 'session_duration_milliseconds':
+ def observe(line):
+ raw_value = getattr(line, attribute)
+
+ label_values = {
+ label: getattr(line, label)
+ for label in labelnames
+ }
+
+ if raw_value.startswith('+'):
+ label_values['logasap'] = True
+ value = float(raw_value[1:])
+ else:
+ label_values['logasap'] = False
+ value = float(raw_value)
+
+ histogram.labels(label_values).observe(value)
+ else:
+ abort_counter_name, abort_counter_documentation = TIMER_ABORT_COUNTERS[timer_name]
+
+ abort_counter = Counter(
+ abort_counter_name,
+ abort_counter_documentation,
+ namespace=NAMESPACE,
+ labelnames=labelnames,
+ )
+
+ if len(labelnames) == 0:
+ def observe(line):
+ value = float(getattr(line, attribute))
+
+ if value == -1:
+ abort_counter.inc()
+ else:
+ histogram.observe(value)
+ else:
+ def observe(line):
+ value = float(getattr(line, attribute))
+
+ label_values = {
+ label: getattr(line, label)
+ for label in labelnames
+ }
+
+ if value == -1:
+ abort_counter.labels(label_values).inc()
+ else:
+ histogram.labels(label_values).observe(value)
+
+ return observe
+
+
+def bytes_read_total(labelnames):
+ counter = Counter(
+ 'bytes_read_total',
+ "Bytes read total",
+ namespace=NAMESPACE,
+ labelnames=labelnames,
+ )
+
+ if len(labelnames) == 0:
+ def observe(line):
+ counter.inc()
+ else:
+ def observe(line):
+ counter.labels({
+ label: getattr(line, label)
+ for label in labelnames
+ }).inc()
+
+ return observe
+
+
+def backend_queue_length(labelnames, buckets):
+ histogram = Histogram(
+ 'backend_queue_length',
+ "Requests processed before this one in the backend queue",
+ namespace=NAMESPACE,
+ labelnames=tuple(labelnames),
+ )
+
+ if len(labelnames) == 0:
+ def observe(line):
+ histogram.observe(line.queue_backend)
+ else:
+ def observe(line):
+ histogram.labels({
+ label: getattr(line, label)
+ for label in labelnames
+ }).observe(line.queue_backend)
+
+ return observe
+
+
+def server_queue_length(labelnames, buckets):
+ histogram = Histogram(
+ 'server_queue_length',
+ "Length of the server queue when the request was received",
+ namespace=NAMESPACE,
+ labelnames=tuple(labelnames),
+ )
+
+ if len(labelnames) == 0:
+ def observe(line):
+ histogram.observe(line.queue_server)
+ else:
+ def observe(line):
+ histogram.labels({
+ label: getattr(line, label)
+ for label in labelnames
+ }).observe(line.queue_server)
+
+ return observe
diff --git a/prometheus_haproxy_log_exporter/stdin/__init__.py b/prometheus_haproxy_log_exporter/stdin/__init__.py
new file mode 100644
index 0000000..741986e
--- /dev/null
+++ b/prometheus_haproxy_log_exporter/stdin/__init__.py
@@ -0,0 +1 @@
+from .stdin_processor import StdinProcessor
diff --git a/prometheus_haproxy_log_exporter/stdin/stdin_processor.py b/prometheus_haproxy_log_exporter/stdin/stdin_processor.py
new file mode 100644
index 0000000..ab9cd7a
--- /dev/null
+++ b/prometheus_haproxy_log_exporter/stdin/stdin_processor.py
@@ -0,0 +1,22 @@
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import sys
+
+from ..log_processing import AbstractLogProcessor
+
+
+class StdinProcessor(AbstractLogProcessor):
+ def run(self):
+ for line in sys.stdin:
+ self.update_metrics(line)