Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 39 additions & 23 deletions utils/django/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import urllib.parse

from django.core.exceptions import RequestDataTooBig
from django.utils.encoding import escape_uri_path

log = logging.getLogger(__name__)

Expand All @@ -22,7 +23,7 @@ def middleware(request):

class LogRequestMiddleware:
_sanitized_value = '***'
_sanitizeable_keys = {'password', 'token', }
_sanitizeable_keys = {'password', 'token', 'username', }

def __init__(self, get_response):
self.get_response = get_response
Expand All @@ -32,42 +33,57 @@ def __init__(self, get_response):
}

def __call__(self, request):
body = None
body = self.get_sanitized_body(request=request)
full_path = self.get_sanitized_full_path(request=request)
if body:
log.debug('%s %s [%s] %s', request.method, full_path, request.content_type, body)
else:
log.debug('%s %s', request.method, full_path)

return self.get_response(request)

def is_ctype_supported(self, ctype):
return ctype in self._sanitize_funcs_by_ctype_map or ctype.startswith('text/')

def get_sanitized_body(self, request):
# noinspection PyBroadException
try:
if self.is_ctype_supported(request.content_type) and request.body:
body = request.body.decode(request.encoding or 'utf-8')
body = self.sanitize(body=body, content_type=request.content_type)
body = self._sanitize_by_ctype(data=body, content_type=request.content_type)
if len(body) > 1000:
body = body[:1000] + ' (truncated)'
else:
body = None
except RequestDataTooBig:
body = '(too big)'
except Exception:
log.exception('Failed to parse request body')

if body:
log.debug('%s %s [%s] %s', request.method, request.get_full_path(), request.content_type, body)
else:
log.debug('%s %s', request.method, request.get_full_path())

return self.get_response(request)

def is_ctype_supported(self, ctype):
return ctype in self._sanitize_funcs_by_ctype_map or ctype.startswith('text/')
msg = 'failed to parse request body'
log.exception(msg)
body = f'({msg})'
return body

def sanitize(self, body, content_type):
def get_sanitized_full_path(self, request):
# noinspection PyBroadException
try:
sanitize_func = self._sanitize_funcs_by_ctype_map.get(content_type)
if not sanitize_func:
return body
for sanitizeable_key in self._sanitizeable_keys:
if sanitizeable_key in body:
return sanitize_func(body)
query_params = urllib.parse.urlencode(self._sanitize_dict_deep(request.GET.copy()))
except Exception:
log.exception('Failed to sanitize request body')
return body
msg = 'failed to sanitize query params'
log.exception(msg)
query_params = f'({msg})'
path = escape_uri_path(request.path)
full_path = f'{path}?{query_params}' if query_params else path
return full_path

def _sanitize_by_ctype(self, data, content_type):
# noinspection PyBroadException
sanitize_func = self._sanitize_funcs_by_ctype_map.get(content_type)
if not sanitize_func:
return data
for sanitizeable_key in self._sanitizeable_keys:
if sanitizeable_key in data:
return sanitize_func(data)
return data

def _sanitize_json(self, body):
data = json.loads(body)
Expand Down