1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
import unittest
import threading
try:
import SocketServer
except ImportError:
import socketserver as SocketServer
from prometheus_client import Counter, CollectorRegistry
from prometheus_client.bridge.graphite import GraphiteBridge
class FakeTime(object):
def time(self):
return 1434898897.5
class TestGraphiteBridge(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
self.data = ''
class TCPHandler(SocketServer.BaseRequestHandler):
def handle(s):
self.data = s.request.recv(1024)
server = SocketServer.TCPServer(('', 0), TCPHandler)
class ServingThread(threading.Thread):
def run(self):
server.handle_request()
server.socket.close()
self.t = ServingThread()
self.t.start()
# Explicitly use localhost as the target host, since connecting to 0.0.0.0 fails on Windows
address = ('localhost', server.server_address[1])
self.gb = GraphiteBridge(address, self.registry, _time=FakeTime())
def test_nolabels(self):
counter = Counter('c', 'help', registry=self.registry)
counter.inc()
self.gb.push()
self.t.join()
self.assertEqual(b'c 1.0 1434898897\n', self.data)
def test_labels(self):
labels = Counter('labels', 'help', ['a', 'b'], registry=self.registry)
labels.labels('c', 'd').inc()
self.gb.push()
self.t.join()
self.assertEqual(b'labels.a.c.b.d 1.0 1434898897\n', self.data)
def test_prefix(self):
labels = Counter('labels', 'help', ['a', 'b'], registry=self.registry)
labels.labels('c', 'd').inc()
self.gb.push(prefix = 'pre.fix')
self.t.join()
self.assertEqual(b'pre.fix.labels.a.c.b.d 1.0 1434898897\n', self.data)
def test_sanitizing(self):
labels = Counter('labels', 'help', ['a'], registry=self.registry)
labels.labels('c.:8').inc()
self.gb.push()
self.t.join()
self.assertEqual(b'labels.a.c__8 1.0 1434898897\n', self.data)
|