1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
|
# Patchwork - automated patch tracking system
# Copyright (C) 2018 Stephen Finucane <stephen@that.guru>
#
# SPDX-License-Identifier: GPL-2.0-or-later
import os
import re
from django.urls import resolve
from django.urls.resolvers import get_resolver
import openapi_core
from openapi_core.schema.schemas.models import Format
from openapi_core.wrappers.base import BaseOpenAPIResponse
from openapi_core.wrappers.base import BaseOpenAPIRequest
from openapi_core.validation.request.validators import RequestValidator
from openapi_core.validation.response.validators import ResponseValidator
from openapi_core.schema.parameters.exceptions import OpenAPIParameterError
from openapi_core.schema.media_types.exceptions import OpenAPIMediaTypeError
from rest_framework import status
import yaml
# docs/api/schemas
SCHEMAS_DIR = os.path.join(
os.path.dirname(os.path.abspath(__file__)), os.pardir, os.pardir,
os.pardir, 'docs', 'api', 'schemas')
HEADER_REGEXES = (
re.compile(r'^HTTP_.+$'), re.compile(r'^CONTENT_TYPE$'),
re.compile(r'^CONTENT_LENGTH$'))
_LOADED_SPECS = {}
class RegexValidator(object):
def __init__(self, regex):
self.regex = re.compile(regex, re.IGNORECASE)
def __call__(self, value):
if not isinstance(value, str):
return False
if not value:
return True
return self.regex.match(value)
CUSTOM_FORMATTERS = {
'uri': Format(str, RegexValidator(
r'^(?:http|ftp)s?://'
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # noqa
r'localhost|'
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
r'(?::\d+)?'
r'(?:/?|[/?]\S+)$')),
'iso8601': Format(str, RegexValidator(
r'^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d{6}$')),
'email': Format(str, RegexValidator(
r'[^@]+@[^@]+\.[^@]+')),
}
def _extract_headers(request):
request_headers = {}
for header in request.META:
for regex in HEADER_REGEXES:
if regex.match(header):
request_headers[header] = request.META[header]
return request_headers
def _resolve(path, resolver=None):
"""Resolve a given path to its matching regex (Django 2.x).
This is essentially a re-implementation of ``URLResolver.resolve`` that
builds and returns the matched regex instead of the view itself.
>>> _resolve('/api/1.0/patches/1/checks/')
"^api/(?:(?P<version>(1.0|1.1))/)patches/(?P<patch_id>[^/]+)/checks/$"
"""
from django.urls.resolvers import URLResolver # noqa
from django.urls.resolvers import RegexPattern # noqa
resolver = resolver or get_resolver()
match = resolver.pattern.match(path)
# we dont handle any other type of pattern at the moment
assert isinstance(resolver.pattern, RegexPattern)
if not match:
return
if isinstance(resolver, URLResolver):
sub_path, args, kwargs = match
for sub_resolver in resolver.url_patterns:
sub_match = _resolve(sub_path, sub_resolver)
if not sub_match:
continue
kwargs.update(sub_match[2])
args += sub_match[1]
regex = resolver.pattern._regex + sub_match[0].lstrip('^')
return regex, args, kwargs
else:
_, args, kwargs = match
return resolver.pattern._regex, args, kwargs
def _resolve_path_to_kwargs(path):
"""Convert a path to the kwargs used to resolve it.
>>> resolve_path_to_kwargs('/api/1.0/patches/1/checks/')
{"patch_id": 1}
"""
# TODO(stephenfin): Handle definition by args
_, _, kwargs = _resolve(path)
results = {}
for key, value in kwargs.items():
if key == 'version':
continue
if key == 'pk':
key = 'id'
results[key] = value
return results
def _resolve_path_to_template(path):
"""Convert a path to a template string.
>>> resolve_path_to_template('/api/1.0/patches/1/checks/')
"/api/{version}/patches/{patch_id}/checks/"
"""
regex, _, _ = _resolve(path)
regex = re.match(regex, path)
result = ''
prev_index = 0
for index, group in enumerate(regex.groups(), 1):
if not group: # group didn't match anything
continue
result += path[prev_index:regex.start(index)]
prev_index = regex.end(index)
# groupindex keys by name, not index. Switch that.
for name, index_ in regex.re.groupindex.items():
if index_ == (index):
# special-case version group
if name == 'version':
result += group
break
if name == 'pk':
name = 'id'
result += '{%s}' % name
break
result += path[prev_index:]
return result
def _load_spec(version):
global _LOADED_SPECS
if _LOADED_SPECS.get(version):
return _LOADED_SPECS[version]
spec_path = os.path.join(SCHEMAS_DIR,
'v{}'.format(version) if version else 'latest',
'patchwork.yaml')
with open(spec_path, 'r') as fh:
data = yaml.load(fh, Loader=yaml.SafeLoader)
_LOADED_SPECS[version] = openapi_core.create_spec(data)
return _LOADED_SPECS[version]
class DRFOpenAPIRequest(BaseOpenAPIRequest):
def __init__(self, request):
self.request = request
@property
def host_url(self):
return self.request.get_host()
@property
def path(self):
return self.request.path
@property
def method(self):
return self.request.method.lower()
@property
def path_pattern(self):
return _resolve_path_to_template(self.request.path_info)
@property
def parameters(self):
return {
'path': _resolve_path_to_kwargs(self.request.path_info),
'query': self.request.GET,
'header': _extract_headers(self.request),
'cookie': self.request.COOKIES,
}
@property
def body(self):
return self.request.body.decode('utf-8')
@property
def mimetype(self):
return self.request.content_type
class DRFOpenAPIResponse(BaseOpenAPIResponse):
def __init__(self, response):
self.response = response
@property
def data(self):
return self.response.content.decode('utf-8')
@property
def status_code(self):
return self.response.status_code
@property
def mimetype(self):
# TODO(stephenfin): Why isn't this populated?
return 'application/json'
def validate_data(path, request, response, validate_request,
validate_response):
if response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED:
return
spec = _load_spec(resolve(path).kwargs.get('version'))
request = DRFOpenAPIRequest(request)
response = DRFOpenAPIResponse(response)
# request
if validate_request:
validator = RequestValidator(
spec, custom_formatters=CUSTOM_FORMATTERS)
result = validator.validate(request)
try:
result.raise_for_errors()
except OpenAPIMediaTypeError:
if response.status_code != status.HTTP_400_BAD_REQUEST:
raise
except OpenAPIParameterError:
# TODO(stephenfin): In API v2.0, this should be an error. As things
# stand, we silently ignore these issues.
assert response.status_code == status.HTTP_200_OK
# response
if validate_response:
validator = ResponseValidator(
spec, custom_formatters=CUSTOM_FORMATTERS)
result = validator.validate(request, response)
result.raise_for_errors()
|