diff --git a/utils/django/middleware.py b/utils/django/middleware.py index c50778f..8aa3a24 100644 --- a/utils/django/middleware.py +++ b/utils/django/middleware.py @@ -3,6 +3,7 @@ import urllib.parse from django.core.exceptions import RequestDataTooBig +from django.utils.encoding import escape_uri_path log = logging.getLogger(__name__) @@ -22,7 +23,7 @@ def middleware(request): class LogRequestMiddleware: _sanitized_value = '***' - _sanitizeable_keys = {'password', 'token', } + _sanitizeable_keys = {'password', 'token', 'username', } def __init__(self, get_response): self.get_response = get_response @@ -32,42 +33,57 @@ def __init__(self, get_response): } def __call__(self, request): - body = None + body = self.get_sanitized_body(request=request) + full_path = self.get_sanitized_full_path(request=request) + if body: + log.debug('%s %s [%s] %s', request.method, full_path, request.content_type, body) + else: + log.debug('%s %s', request.method, full_path) + + return self.get_response(request) + + def is_ctype_supported(self, ctype): + return ctype in self._sanitize_funcs_by_ctype_map or ctype.startswith('text/') + def get_sanitized_body(self, request): # noinspection PyBroadException try: if self.is_ctype_supported(request.content_type) and request.body: body = request.body.decode(request.encoding or 'utf-8') - body = self.sanitize(body=body, content_type=request.content_type) + body = self._sanitize_by_ctype(data=body, content_type=request.content_type) if len(body) > 1000: body = body[:1000] + ' (truncated)' + else: + body = None except RequestDataTooBig: body = '(too big)' except Exception: - log.exception('Failed to parse request body') - - if body: - log.debug('%s %s [%s] %s', request.method, request.get_full_path(), request.content_type, body) - else: - log.debug('%s %s', request.method, request.get_full_path()) - - return self.get_response(request) - - def is_ctype_supported(self, ctype): - return ctype in self._sanitize_funcs_by_ctype_map or ctype.startswith('text/') + msg = 'failed to parse request body' + log.exception(msg) + body = f'({msg})' + return body - def sanitize(self, body, content_type): + def get_sanitized_full_path(self, request): # noinspection PyBroadException try: - sanitize_func = self._sanitize_funcs_by_ctype_map.get(content_type) - if not sanitize_func: - return body - for sanitizeable_key in self._sanitizeable_keys: - if sanitizeable_key in body: - return sanitize_func(body) + query_params = urllib.parse.urlencode(self._sanitize_dict_deep(request.GET.copy())) except Exception: - log.exception('Failed to sanitize request body') - return body + msg = 'failed to sanitize query params' + log.exception(msg) + query_params = f'({msg})' + path = escape_uri_path(request.path) + full_path = f'{path}?{query_params}' if query_params else path + return full_path + + def _sanitize_by_ctype(self, data, content_type): + # noinspection PyBroadException + sanitize_func = self._sanitize_funcs_by_ctype_map.get(content_type) + if not sanitize_func: + return data + for sanitizeable_key in self._sanitizeable_keys: + if sanitizeable_key in data: + return sanitize_func(data) + return data def _sanitize_json(self, body): data = json.loads(body)