diff --git a/.gitignore b/.gitignore index 205f2f1..83551d7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ *.py[cod] +*~ # C extensions *.so diff --git a/test/test_billion_laughs.py b/test/test_billion_laughs.py new file mode 100644 index 0000000..730743e --- /dev/null +++ b/test/test_billion_laughs.py @@ -0,0 +1,96 @@ +from __future__ import unicode_literals +import unittest +import re +import xml_parser +import tutil +import webvulnscan.attacks.billion_laughs +import socket +import time + + +try: + from urllib.request import Request, URLError, HTTPError, urlopen +except ImportError: + from urllib2 import Request, URLError, HTTPError, urlopen + + +def get_XML(url): + if "http://" not in url: + url = "http://" + url + headers = {'Accept': 'application/xrds+xml'} + req = Request(url, None, headers) + + try: + response = urlopen(req) + except (HTTPError, URLError): + print( + 'There was a problem with your request!\n') + return None + + html = response.read() + xrds_loc = re.search( + r'[^"]*)"\s*', html.decode(), re.IGNORECASE) + + if not xrds_loc: + return None + + xrds_url = xrds_loc.group('value') + + try: + xrds_doc = urlopen(xrds_url) + except (HTTPError, URLError): + print( + 'There was a problem with your request!\n') + return None + + return xrds_doc.read() + + +def form_client(vulnerable=False): + form = u'''
+ + +
''' + + def result(request): + if 'openid' not in request.parameters: + return u'There is no input!' + + xml = get_XML(request.parameters['openid']) + + if xml is None: + return u'XML document could not be found!' + + res = xml_parser.get_xml_length(xml) + + timeout = getattr(request, 'timeout', None) + if vulnerable: + if not timeout: + time.sleep(res / 100000) + elif timeout < res: + raise socket.timeout() + + return u'%s' % res + + return { + '/': form, + '/send': result, + } + + +class billion_laughs(unittest.TestCase): + attack = webvulnscan.attacks.billion_laughs + + @tutil.webtest(False) + def test_billion_laughs_static_site(): + return { + '/': u'''''', + } + + @tutil.webtest(False) + def test_billion_laughs_form(): + return form_client() + + @tutil.webtest(True) + def test_billion_laughs_dangerous(): + return form_client(True) diff --git a/test/test_utf7_check.py b/test/test_utf7_check.py new file mode 100644 index 0000000..9d90d18 --- /dev/null +++ b/test/test_utf7_check.py @@ -0,0 +1,20 @@ +import unittest +import tutil +import webvulnscan.attacks.utf7_check + + +class UTF7_checkTest(unittest.TestCase): + attack = webvulnscan.attacks.utf7_check + + @tutil.webtest(False) + def test_utf7_check_static_site(): + return{ + '/': u'''''', + } + + @tutil.webtest(True) + def test_utf7_check_dangerous(): + return{'/': ( + 200, b'''''', + {'Content-Type': 'text/html; charset=UTF-7'}), + } diff --git a/test/xml_parser.py b/test/xml_parser.py new file mode 100644 index 0000000..09dc2ca --- /dev/null +++ b/test/xml_parser.py @@ -0,0 +1,192 @@ +import unittest +import re + + +def get_objects(xml_data, allow_entity_def=True): + idx = 0 + while idx < len(xml_data): + c = xml_data[idx] + if c == '<' and allow_entity_def: + m = re.match(r'''(?x) + !ENTITY\s+ + (?P[a-zA-Z0-9_-]+)\s+ + "(?P[^"]*)"\s* + > + ''', xml_data[idx + 1:].decode()) + if m: + value = get_objects(m.group('value'), allow_entity_def=False) + yield ('entity', (m.group('name'), value)) + idx += len(m.group(0)) + 1 + continue + + if c == '&': + endpos = xml_data.find(';', idx + 1) + yield ('entity_reference', xml_data[idx + 1:endpos]) + + idx = endpos + + else: + yield ('text', c) + + idx += 1 + + +def get_xml_length(xml_data): + object_stream = get_objects(xml_data) + return _calc_xml_length(object_stream, {}) + + +def _calc_xml_length(object_stream, entities): + res = 0 + for otype, ovalue in object_stream: + if otype == 'entity': + ename, evalue = ovalue + entities[ename] = _calc_xml_length(evalue, entities) + elif otype == 'entity_reference': + res += entities[ovalue] + else: # text + res += len(ovalue) + return res + + +def _make_list(gen): + gen = list(gen) + return [ + (key, (val[0], _make_list(val[1]))) + if key == 'entity' + else (key, val) + for key, val in gen] + +if __name__ == '__main__': + unittest.main() + + +class XMLParser_test(unittest.TestCase): + + def test_entity_parsing(self): + self.assertEqual( + _make_list(get_objects( + '''''')), + [('entity', ('x', [('text', 'y')]))] + ) + + self.assertEqual( + _make_list(get_objects( + '''&y;x''')), + [('entity_reference', 'y'), ('text', 'x')] + ) + + self.assertEqual( + _make_list(get_objects( + '''&ya;x''')), + [('entity_reference', 'ya'), ('text', 'x')] + ) + + def test_empty(self): + xml_str = '''''' + res = get_xml_length(xml_str) + self.assertEqual(res, 0) + + def test_invalid_reference(self): + xml_str = '&a;' + self.assertRaises(BaseException, get_xml_length, xml_str) + + def test_lol1(self): + xml_str = ''' + + +&lol1; +''' + res = get_xml_length(xml_str) + self.assertTrue(res >= 30) + + def test_no_reference(self): + xml_str = ''' + + + +''' + res = get_xml_length(xml_str) + self.assertTrue(res < 30) + + def test_lol1_multiple(self): + xml_str = ''' + + + +]> +&lol1;&lol1;&lol1; +''' + res = get_xml_length(xml_str) + self.assertTrue(res >= 90) + + def test_lol1_missing_semicolon(self): + xml_str = ''' + + +&lol1; +''' + self.assertRaises(BaseException, get_xml_length, xml_str) + + def test_quadratic_blowup(self): + xml_str = ''' + + +]> +&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A;&A; +''' + res = get_xml_length(xml_str) + self.assertTrue(res >= 2000) + + def test_missing_bracket1(self): + xml_str = ''' + + +&A; +''' + self.assertRaises(BaseException, get_xml_length, xml_str) + + def test_missing_bracket2(self): + xml_str = ''' + + +]> +&A; +''' + self.assertRaises(BaseException, get_xml_length, xml_str) + + def test_missing_semicolon(self): + xml_str = ''' + + +]> +&A +''' + self.assertRaises(BaseException, get_xml_length, xml_str) + + def test_missing_slash(self): + xml_str = ''' + + +]> +&A; +''' + res = get_xml_length(xml_str) + self.assertTrue(res >= 23) + + def test_missing_quotes(self): + xml_str = ''' + + +]> +&A; +''' + self.assertRaises(BaseException, get_xml_length, xml_str) diff --git a/webvulnscan/__init__.py b/webvulnscan/__init__.py index c04e58a..d04cfb3 100644 --- a/webvulnscan/__init__.py +++ b/webvulnscan/__init__.py @@ -37,7 +37,7 @@ def run(options, targets): log = Log(verbosity=u'vuln') else: log = Log() - client = Client(log=log) + client = Client(log=log, config=options.bl_config) if options.import_cookies: client.cookie_jar = MozillaCookieJar(options.import_cookies) diff --git a/webvulnscan/attacks/__init__.py b/webvulnscan/attacks/__init__.py index 4af52c1..01add74 100644 --- a/webvulnscan/attacks/__init__.py +++ b/webvulnscan/attacks/__init__.py @@ -6,7 +6,9 @@ from .clickjack import clickjack from .cookiescan import cookiescan from .exotic_characters import exotic_characters +from .utf7_check import utf7_check +from .billion_laughs import billion_laughs def all_attacks(): - return [xss, csrf, crlf, breach, clickjack, cookiescan, exotic_characters] + return [xss, csrf, crlf, breach, clickjack, cookiescan, exotic_characters, utf7_check, billion_laughs] diff --git a/webvulnscan/attacks/billion_laughs.py b/webvulnscan/attacks/billion_laughs.py new file mode 100644 index 0000000..a8f7c22 --- /dev/null +++ b/webvulnscan/attacks/billion_laughs.py @@ -0,0 +1,39 @@ +from ..utils import attack +from ..openID_test_server import Create_server +import socket + + +def search(page): + for form in page.get_forms(): + yield (form,) + + +@attack(search) +def billion_laughs(client, log, form): + def attack(attack_url): + parameters = dict(form.get_parameters(True)) + for key in parameters: + if 'openid' in key.lower(): + parameters[key] = attack_url + + try: + # Necessary because submit buttons usually do not have names which + # can cause bugs. So the submit buttons are deleted from the + # parameter list. + if '' in parameters: + del parameters[''] + form.send(client, parameters, timeout=1) + except socket.timeout: + return True + return False + + with Create_server(client.config) as openid_server: + + if attack(openid_server.benign_url): + log('warn', openid_server.benign_url, "Billion Laughs", + "Test did not work!") + return + + for evil_url in openid_server.evil_urls: + if attack(evil_url): + log('vuln', evil_url, "Billion Laughs/Quadratic Blowup") diff --git a/webvulnscan/attacks/utf7_check.py b/webvulnscan/attacks/utf7_check.py new file mode 100644 index 0000000..2c1c51d --- /dev/null +++ b/webvulnscan/attacks/utf7_check.py @@ -0,0 +1,22 @@ +from ..utils import attack + + +@attack() +def utf7_check(client, log, page): + if 'Content-Type' not in page.headers: + return + + content_type = page.headers['Content-Type'] + + if ';' not in content_type: + return + + # Use Content-Type to get the charset + charset = content_type.split(';')[1].split('=')[1] + # If charset is not UTF-7 there is nothing to do + if charset != 'utf-7' and charset != 'UTF-7': + return + else: + # If charset is UTF-7 give warning + log('vuln', page.url, 'UTF-7', + 'Website uses UTF-7') diff --git a/webvulnscan/client.py b/webvulnscan/client.py index bea4984..7717438 100644 --- a/webvulnscan/client.py +++ b/webvulnscan/client.py @@ -8,19 +8,23 @@ import gzip import zlib import webvulnscan.log +import socket from .page import Page from .request import Request class NotAPage(Exception): + """ The content at the URL in question is not a webpage, but something static (image, text, etc.) """ class Client(object): + """ Client provides a easy interface for accessing web content. """ - def __init__(self, log=webvulnscan.log): + def __init__(self, log=webvulnscan.log, config=[]): + self.config = config self.cookie_jar = CookieJar() self.opener = self.setup_opener() self.additional_headers = {} @@ -60,7 +64,7 @@ def _download(self, request): return (request, status_code, response_data, headers) - def download(self, url_or_request, parameters=None, headers=None): + def download(self, url_or_request, parameters=None, headers=None, timeout=None): """ Downloads a URL, returns (request, status_code, response_data, headers) """ @@ -70,7 +74,7 @@ def download(self, url_or_request, parameters=None, headers=None): assert headers is None request = url_or_request.copy() else: - request = Request(url_or_request, parameters, headers) + request = Request(url_or_request, parameters, headers, timeout) for header, value in self.additional_headers.items(): request.add_header(header, value) @@ -82,13 +86,18 @@ def download(self, url_or_request, parameters=None, headers=None): return self._download(request) - def download_page(self, url_or_request, parameters=None, req_headers=None): + def download_page(self, url_or_request, parameters=None, req_headers=None, **kwargs): """ Downloads the content of a site, returns it as page. Throws NotAPage if the content is not a webpage. """ + timeout = None + for key in kwargs: + if key == "timeout": + timeout = kwargs[key] + socket.setdefaulttimeout(timeout) request, status_code, html_bytes, headers = self.download( - url_or_request, parameters, req_headers) + url_or_request, parameters, req_headers, timeout) content_type, charset = parse_content_type( headers.get('Content-Type'), diff --git a/webvulnscan/form.py b/webvulnscan/form.py index 9ce81e9..827e0b3 100644 --- a/webvulnscan/form.py +++ b/webvulnscan/form.py @@ -6,6 +6,7 @@ class Form(object): + def __init__(self, url, document): self.document = document self.action = urljoin(url, document.attrib.get('action')) @@ -22,9 +23,13 @@ def get_inputs(self): for textarea in self.get_textarea_elements(): yield TextArea(textarea) - def get_parameters(self): + def get_parameters(self, no_guessing=False): for item in self.get_inputs(): - yield (item.get_name, item.guess_value()) + if item.get_type == 'hidden' or not no_guessing: + value = item.guess_value() + else: + value = '' + yield (item.get_name, value) def get_input_elements(self): for form_input in self.document.findall('.//input'): @@ -34,9 +39,9 @@ def get_textarea_elements(self): for textarea in self.document.findall('.//textarea'): yield textarea - def send(self, client, parameters): + def send(self, client, parameters, **kwargs): if self.method == "get": url = add_get_params(self.action, parameters) - return client.download_page(url) + return client.download_page(url, **kwargs) else: - return client.download_page(self.action, parameters) + return client.download_page(self.action, parameters, **kwargs) diff --git a/webvulnscan/openID_test_server.py b/webvulnscan/openID_test_server.py new file mode 100644 index 0000000..4b7ccb6 --- /dev/null +++ b/webvulnscan/openID_test_server.py @@ -0,0 +1,185 @@ +from __future__ import unicode_literals +import multiprocessing +import time + +try: + from urllib.request import URLError, HTTPError, urlopen +except ImportError: + from urllib2 import URLError, HTTPError, urlopen + +try: + from urllib.parse import urlparse +except ImportError: + from urlparse import urlparse + +try: + from http.server import BaseHTTPRequestHandler, HTTPServer +except ImportError: + from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer + + +class Handler(BaseHTTPRequestHandler): + + def _default_page(self, pageState="serverxml"): + self.send_response(200) + self.send_header('Content-type', 'text/html') + self.end_headers() + self.wfile.write(''' + + + + OpenID Test Server + + + +

OpenID Test Server

+ + '''.encode("utf-8") % (self.server.domain_name, self.server.server_port, pageState,)) + + def do_GET(self): + parsed_path = urlparse(self.path) + if parsed_path.path == "/": + self._default_page("serverxml") + elif parsed_path.path == "/db": + self._default_page("serverxml_dangerous_billion") + elif parsed_path.path == "/dq": + self._default_page("serverxml_dangerous_quadratic") + elif parsed_path.path == "/serverxml": + self.showServerXML(None) + elif parsed_path.path == "/serverxml_dangerous_billion": + self.showServerXML("dangerous_billion") + elif parsed_path.path == "/serverxml_dangerous_quadratic": + self.showServerXML("dangerous_quadratic") + else: + self.send_error(404, "File not Found!") + + def showServerXML(self, xmlState=None): + self.send_response(200) + self.send_header('Content-type', 'application/xrds+xml') + self.end_headers() + + if xmlState is None: + self.wfile.write('''\ + + + + + + + + + + + + +'''.encode("utf-8")) + + elif xmlState == "dangerous_billion": + self.wfile.write('''\ + + + + + + + + + + + + +]> + + + + + + &lol9; + + + + + + + '''.encode("utf-8")) + else: # quadratic blowup + ent_a = "AAA" * 99999 + ref_a = "&A;" * 99999 + self.wfile.write('''\ + + +]> + + + + + %s + %s + + + + + +'''.encode("utf-8") % (ent_a, ref_a, ref_a)) + + +def main(queue, ip, port): + httpd = HTTPServer(("", port), Handler) + httpd.domain_name = ip + port = httpd.server_port + if queue: + queue.put(port) + httpd.serve_forever() + + +class Create_server(object): + + def __init__(self, config): + self.config = config + self.ip = "localhost" + self.port = 0 + if self.config: + self.ip = self.config[0] + self.port = int(self.config[1]) + self.server = None + self.benign_url = None + self.evil_urls = None + + def __enter__(self): + queue = multiprocessing.Queue() + self.server = multiprocessing.Process( + target=main, args=(queue, self.ip, self.port)) + self.server.daemon = True + self.server.start() + if queue: + self.port = queue.get() + + self.benign_url = "http://%s:%i".encode('UTF-8') % (self.ip, self.port) + self.evil_urls = ("http://%s:%i/db".encode('UTF-8') % + (self.ip, self.port), "http://%s:%i/dq".encode('UTF-8') % (self.ip, self.port)) + + wait_time = 0.001 + for i in range(3): + try: + urlopen(self.benign_url + "?test") + return self + except (HTTPError, URLError): + time.sleep(wait_time) + wait_time *= 10 ** (i + 1) + self.server.terminate() + raise Exception("OpenID Server could not be reached !") + + def __exit__(self, type, value, traceback): + self.server.terminate() + + +if __name__ == '__main__': + main(None, "localhost", 12345) diff --git a/webvulnscan/options.py b/webvulnscan/options.py index 55c2d2a..b43cd83 100644 --- a/webvulnscan/options.py +++ b/webvulnscan/options.py @@ -84,6 +84,23 @@ def parse_options(): 'for standard output).') parser.add_option_group(configuration_options) + billion_laughs_configuration_options = OptionGroup( + parser, "Billion Laughs configuration", + "You are able to specify" + "some options for the" + "Billion Laughs test.") + + billion_laughs_configuration_options.add_option( + '--net', dest='bl_config', + action='store', default=None, + nargs=2, + help='This option will run' + 'the Billion Laughs test' + 'on a network application.' + 'You need to pass your IP and an unused port.' + 'If this option is not used' + 'you can just test local applications.') + # Options for scanning for specific vulnerabilities. attack_options = OptionGroup(parser, "Attacks", "If you specify own or several of the " diff --git a/webvulnscan/request.py b/webvulnscan/request.py index 61a99ae..bc7c0ca 100644 --- a/webvulnscan/request.py +++ b/webvulnscan/request.py @@ -6,7 +6,11 @@ class Request(compat.Request): - def __init__(self, url, parameters=None, headers=None): + + def __init__(self, url, parameters=None, headers=None, timeout=None): + + self.timeout = timeout + self.parameters = parameters if parameters is None: data = None