From bcc61110d5b76580a2da0d72d07de8efd7525292 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 22 Dec 2015 17:59:32 +0000 Subject: Import python-prometheus-client_0.0.13.orig.tar.gz --- tests/test_client.py | 373 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 373 insertions(+) create mode 100644 tests/test_client.py (limited to 'tests/test_client.py') diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..83b15e8 --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,373 @@ +from __future__ import unicode_literals +import os +import threading +import time +import unittest + +from prometheus_client.core import * + +class TestCounter(unittest.TestCase): + def setUp(self): + self.registry = CollectorRegistry() + self.counter = Counter('c', 'help', registry=self.registry) + + def test_increment(self): + self.assertEqual(0, self.registry.get_sample_value('c')) + self.counter.inc() + self.assertEqual(1, self.registry.get_sample_value('c')) + self.counter.inc(7) + self.assertEqual(8, self.registry.get_sample_value('c')) + + def test_negative_increment_raises(self): + self.assertRaises(ValueError, self.counter.inc, -1) + + def test_function_decorator(self): + @self.counter.count_exceptions(ValueError) + def f(r): + if r: + raise ValueError + else: + raise TypeError + + try: + f(False) + except TypeError: + pass + self.assertEqual(0, self.registry.get_sample_value('c')) + + try: + f(True) + except ValueError: + raised = True + self.assertEqual(1, self.registry.get_sample_value('c')) + + def test_block_decorator(self): + with self.counter.count_exceptions(): + pass + self.assertEqual(0, self.registry.get_sample_value('c')) + + raised = False + try: + with self.counter.count_exceptions(): + raise ValueError + except: + raised = True + self.assertTrue(raised) + self.assertEqual(1, self.registry.get_sample_value('c')) + + +class TestGauge(unittest.TestCase): + def setUp(self): + self.registry = CollectorRegistry() + self.gauge = Gauge('g', 'help', registry=self.registry) + + def test_gauge(self): + self.assertEqual(0, self.registry.get_sample_value('g')) + self.gauge.inc() + self.assertEqual(1, self.registry.get_sample_value('g')) + self.gauge.dec(3) + self.assertEqual(-2, self.registry.get_sample_value('g')) + self.gauge.set(9) + self.assertEqual(9, self.registry.get_sample_value('g')) + + def test_function_decorator(self): + self.assertEqual(0, self.registry.get_sample_value('g')) + + @self.gauge.track_inprogress() + def f(): + self.assertEqual(1, self.registry.get_sample_value('g')) + + f() + self.assertEqual(0, self.registry.get_sample_value('g')) + + def test_block_decorator(self): + self.assertEqual(0, self.registry.get_sample_value('g')) + with self.gauge.track_inprogress(): + self.assertEqual(1, self.registry.get_sample_value('g')) + self.assertEqual(0, self.registry.get_sample_value('g')) + + def test_gauge_function(self): + x = {} + self.gauge.set_function(lambda: len(x)) + self.assertEqual(0, self.registry.get_sample_value('g')) + self.gauge.inc() + self.assertEqual(0, self.registry.get_sample_value('g')) + x['a'] = None + self.assertEqual(1, self.registry.get_sample_value('g')) + + def test_function_decorator(self): + self.assertEqual(0, self.registry.get_sample_value('g')) + + @self.gauge.time() + def f(): + time.sleep(.001) + + f() + self.assertNotEqual(0, self.registry.get_sample_value('g')) + + def test_block_decorator(self): + self.assertEqual(0, self.registry.get_sample_value('g')) + with self.gauge.time(): + time.sleep(.001) + self.assertNotEqual(0, self.registry.get_sample_value('g')) + + +class TestSummary(unittest.TestCase): + def setUp(self): + self.registry = CollectorRegistry() + self.summary = Summary('s', 'help', registry=self.registry) + + def test_summary(self): + self.assertEqual(0, self.registry.get_sample_value('s_count')) + self.assertEqual(0, self.registry.get_sample_value('s_sum')) + self.summary.observe(10) + self.assertEqual(1, self.registry.get_sample_value('s_count')) + self.assertEqual(10, self.registry.get_sample_value('s_sum')) + + def test_function_decorator(self): + self.assertEqual(0, self.registry.get_sample_value('s_count')) + + @self.summary.time() + def f(): + pass + + f() + self.assertEqual(1, self.registry.get_sample_value('s_count')) + + def test_block_decorator(self): + self.assertEqual(0, self.registry.get_sample_value('s_count')) + with self.summary.time(): + pass + self.assertEqual(1, self.registry.get_sample_value('s_count')) + + +class TestHistogram(unittest.TestCase): + def setUp(self): + self.registry = CollectorRegistry() + self.histogram = Histogram('h', 'help', registry=self.registry) + self.labels = Histogram('hl', 'help', ['l'], registry=self.registry) + + def test_histogram(self): + self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '1.0'})) + self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '2.5'})) + self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '5.0'})) + self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) + self.assertEqual(0, self.registry.get_sample_value('h_count')) + self.assertEqual(0, self.registry.get_sample_value('h_sum')) + + self.histogram.observe(2) + self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '1.0'})) + self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '2.5'})) + self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '5.0'})) + self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) + self.assertEqual(1, self.registry.get_sample_value('h_count')) + self.assertEqual(2, self.registry.get_sample_value('h_sum')) + + self.histogram.observe(2.5) + self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '1.0'})) + self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '2.5'})) + self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '5.0'})) + self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) + self.assertEqual(2, self.registry.get_sample_value('h_count')) + self.assertEqual(4.5, self.registry.get_sample_value('h_sum')) + + self.histogram.observe(float("inf")) + self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '1.0'})) + self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '2.5'})) + self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '5.0'})) + self.assertEqual(3, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) + self.assertEqual(3, self.registry.get_sample_value('h_count')) + self.assertEqual(float("inf"), self.registry.get_sample_value('h_sum')) + + def test_setting_buckets(self): + h = Histogram('h', 'help', registry=None, buckets=[0, 1, 2]) + self.assertEqual([0.0, 1.0, 2.0, float("inf")], h._upper_bounds) + + h = Histogram('h', 'help', registry=None, buckets=[0, 1, 2, float("inf")]) + self.assertEqual([0.0, 1.0, 2.0, float("inf")], h._upper_bounds) + + self.assertRaises(ValueError, Histogram, 'h', 'help', registry=None, buckets=[]) + self.assertRaises(ValueError, Histogram, 'h', 'help', registry=None, buckets=[float("inf")]) + self.assertRaises(ValueError, Histogram, 'h', 'help', registry=None, buckets=[3, 1]) + + def test_labels(self): + self.labels.labels('a').observe(2) + self.assertEqual(0, self.registry.get_sample_value('hl_bucket', {'le': '1.0', 'l': 'a'})) + self.assertEqual(1, self.registry.get_sample_value('hl_bucket', {'le': '2.5', 'l': 'a'})) + self.assertEqual(1, self.registry.get_sample_value('hl_bucket', {'le': '5.0', 'l': 'a'})) + self.assertEqual(1, self.registry.get_sample_value('hl_bucket', {'le': '+Inf', 'l': 'a'})) + self.assertEqual(1, self.registry.get_sample_value('hl_count', {'l': 'a'})) + self.assertEqual(2, self.registry.get_sample_value('hl_sum', {'l': 'a'})) + + def test_function_decorator(self): + self.assertEqual(0, self.registry.get_sample_value('h_count')) + self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) + + @self.histogram.time() + def f(): + pass + + f() + self.assertEqual(1, self.registry.get_sample_value('h_count')) + self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) + + def test_block_decorator(self): + self.assertEqual(0, self.registry.get_sample_value('h_count')) + self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) + with self.histogram.time(): + pass + self.assertEqual(1, self.registry.get_sample_value('h_count')) + self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) + + +class TestMetricWrapper(unittest.TestCase): + def setUp(self): + self.registry = CollectorRegistry() + self.counter = Counter('c', 'help', labelnames=['l'], registry=self.registry) + self.two_labels = Counter('two', 'help', labelnames=['a', 'b'], registry=self.registry) + + def test_child(self): + self.counter.labels('x').inc() + self.assertEqual(1, self.registry.get_sample_value('c', {'l': 'x'})) + self.two_labels.labels('x', 'y').inc(2) + self.assertEqual(2, self.registry.get_sample_value('two', {'a': 'x', 'b': 'y'})) + + def test_remove(self): + self.counter.labels('x').inc() + self.counter.labels('y').inc(2) + self.assertEqual(1, self.registry.get_sample_value('c', {'l': 'x'})) + self.assertEqual(2, self.registry.get_sample_value('c', {'l': 'y'})) + self.counter.remove('x') + self.assertEqual(None, self.registry.get_sample_value('c', {'l': 'x'})) + self.assertEqual(2, self.registry.get_sample_value('c', {'l': 'y'})) + + def test_incorrect_label_count_raises(self): + self.assertRaises(ValueError, self.counter.labels) + self.assertRaises(ValueError, self.counter.labels, 'a', 'b') + self.assertRaises(ValueError, self.counter.remove) + self.assertRaises(ValueError, self.counter.remove, 'a', 'b') + + def test_labels_coerced_to_string(self): + self.counter.labels(None).inc() + self.counter.labels({'l': None}).inc() + self.assertEqual(2, self.registry.get_sample_value('c', {'l': 'None'})) + + self.counter.remove(None) + self.assertEqual(None, self.registry.get_sample_value('c', {'l': 'None'})) + + def test_non_string_labels_raises(self): + class Test(object): + __str__ = None + self.assertRaises(TypeError, self.counter.labels, Test()) + self.assertRaises(TypeError, self.counter.labels, {'l': Test()}) + + def test_namespace_subsystem_concatenated(self): + c = Counter('c', 'help', namespace='a', subsystem='b', registry=self.registry) + c.inc() + self.assertEqual(1, self.registry.get_sample_value('a_b_c')) + + def test_labels_by_dict(self): + self.counter.labels({'l': 'x'}).inc() + self.assertEqual(1, self.registry.get_sample_value('c', {'l': 'x'})) + self.assertRaises(ValueError, self.counter.labels, {'l': 'x', 'm': 'y'}) + self.assertRaises(ValueError, self.counter.labels, {'m': 'y'}) + self.assertRaises(ValueError, self.counter.labels, {}) + self.two_labels.labels({'a': 'x', 'b': 'y'}).inc() + self.assertEqual(1, self.registry.get_sample_value('two', {'a': 'x', 'b': 'y'})) + self.assertRaises(ValueError, self.two_labels.labels, {'a': 'x', 'b': 'y', 'c': 'z'}) + self.assertRaises(ValueError, self.two_labels.labels, {'a': 'x', 'c': 'z'}) + self.assertRaises(ValueError, self.two_labels.labels, {'b': 'y', 'c': 'z'}) + self.assertRaises(ValueError, self.two_labels.labels, {'c': 'z'}) + self.assertRaises(ValueError, self.two_labels.labels, {}) + + def test_invalid_names_raise(self): + self.assertRaises(ValueError, Counter, '', 'help') + self.assertRaises(ValueError, Counter, '^', 'help') + self.assertRaises(ValueError, Counter, '', 'help', namespace='&') + self.assertRaises(ValueError, Counter, '', 'help', subsystem='(') + self.assertRaises(ValueError, Counter, 'c', '', labelnames=['^']) + self.assertRaises(ValueError, Counter, 'c', '', labelnames=['__reserved']) + self.assertRaises(ValueError, Summary, 'c', '', labelnames=['quantile']) + + +class TestMetricFamilies(unittest.TestCase): + def setUp(self): + self.registry = CollectorRegistry() + + def custom_collector(self, metric_family): + class CustomCollector(object): + def collect(self): + return [metric_family] + self.registry.register(CustomCollector()) + + def test_counter(self): + self.custom_collector(CounterMetricFamily('c', 'help', value=1)) + self.assertEqual(1, self.registry.get_sample_value('c', {})) + + def test_counter_labels(self): + cmf = CounterMetricFamily('c', 'help', labels=['a', 'c']) + cmf.add_metric(['b', 'd'], 2) + self.custom_collector(cmf) + self.assertEqual(2, self.registry.get_sample_value('c', {'a': 'b', 'c': 'd'})) + + def test_gauge(self): + self.custom_collector(GaugeMetricFamily('g', 'help', value=1)) + self.assertEqual(1, self.registry.get_sample_value('g', {})) + + def test_gauge_labels(self): + cmf = GaugeMetricFamily('g', 'help', labels=['a']) + cmf.add_metric(['b'], 2) + self.custom_collector(cmf) + self.assertEqual(2, self.registry.get_sample_value('g', {'a':'b'})) + + def test_summary(self): + self.custom_collector(SummaryMetricFamily('s', 'help', count_value=1, sum_value=2)) + self.assertEqual(1, self.registry.get_sample_value('s_count', {})) + self.assertEqual(2, self.registry.get_sample_value('s_sum', {})) + + def test_summary_labels(self): + cmf = SummaryMetricFamily('s', 'help', labels=['a']) + cmf.add_metric(['b'], count_value=1, sum_value=2) + self.custom_collector(cmf) + self.assertEqual(1, self.registry.get_sample_value('s_count', {'a': 'b'})) + self.assertEqual(2, self.registry.get_sample_value('s_sum', {'a': 'b'})) + + def test_histogram(self): + self.custom_collector(HistogramMetricFamily('h', 'help', buckets=[('0', 1), ('+Inf', 2)], sum_value=3)) + self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '0'})) + self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) + self.assertEqual(2, self.registry.get_sample_value('h_count', {})) + self.assertEqual(3, self.registry.get_sample_value('h_sum', {})) + + def test_histogram_labels(self): + cmf = HistogramMetricFamily('h', 'help', labels=['a']) + cmf.add_metric(['b'], buckets=[('0', 1), ('+Inf', 2)], sum_value=3) + self.custom_collector(cmf) + self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'a': 'b', 'le': '0'})) + self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'a': 'b', 'le': '+Inf'})) + self.assertEqual(2, self.registry.get_sample_value('h_count', {'a': 'b'})) + self.assertEqual(3, self.registry.get_sample_value('h_sum', {'a': 'b'})) + + def test_bad_constructors(self): + self.assertRaises(ValueError, CounterMetricFamily, 'c', 'help', value=1, labels=[]) + self.assertRaises(ValueError, CounterMetricFamily, 'c', 'help', value=1, labels=['a']) + + self.assertRaises(ValueError, GaugeMetricFamily, 'g', 'help', value=1, labels=[]) + self.assertRaises(ValueError, GaugeMetricFamily, 'g', 'help', value=1, labels=['a']) + + self.assertRaises(ValueError, SummaryMetricFamily, 's', 'help', sum_value=1) + self.assertRaises(ValueError, SummaryMetricFamily, 's', 'help', count_value=1) + self.assertRaises(ValueError, SummaryMetricFamily, 's', 'help', count_value=1, labels=['a']) + self.assertRaises(ValueError, SummaryMetricFamily, 's', 'help', sum_value=1, labels=['a']) + self.assertRaises(ValueError, SummaryMetricFamily, 's', 'help', count_value=1, sum_value=1, labels=['a']) + + self.assertRaises(ValueError, HistogramMetricFamily, 'h', 'help', sum_value=1) + self.assertRaises(ValueError, HistogramMetricFamily, 'h', 'help', buckets={}) + self.assertRaises(ValueError, HistogramMetricFamily, 'h', 'help', sum_value=1, labels=['a']) + self.assertRaises(ValueError, HistogramMetricFamily, 'h', 'help', buckets={}, labels=['a']) + self.assertRaises(ValueError, HistogramMetricFamily, 'h', 'help', buckets={}, sum_value=1, labels=['a']) + self.assertRaises(KeyError, HistogramMetricFamily, 'h', 'help', buckets={}, sum_value=1) + + +if __name__ == '__main__': + unittest.main() -- cgit v1.2.3