From e619458aac38b38069d285539edc75ed3a356f02 Mon Sep 17 00:00:00 2001 From: chredeur <50155089+chredeur@users.noreply.github.com> Date: Thu, 18 Dec 2025 16:35:42 +0100 Subject: [PATCH] escape dict keys to prevent SQL injection (CVE-2025-65896), add test for it and update conftest to allow unit tests without MySQL --- asyncmy/converters.pyx | 2 +- conftest.py | 4 ++-- tests/test_converters.py | 30 +++++++++++++++++++++++++++++- 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/asyncmy/converters.pyx b/asyncmy/converters.pyx index 66ad3f8..8a7e127 100644 --- a/asyncmy/converters.pyx +++ b/asyncmy/converters.pyx @@ -30,7 +30,7 @@ cpdef dict escape_dict(dict val, str charset, mapping: dict = None): n = {} for k, v in val.items(): quoted = escape_item(v, charset, mapping) - n[k] = quoted + n[escape_string(str(k))] = quoted return n cpdef str escape_sequence(tuple val, str charset, mapping: dict = None): diff --git a/conftest.py b/conftest.py index 00e30e9..b7fcd04 100644 --- a/conftest.py +++ b/conftest.py @@ -36,7 +36,7 @@ async def connection(): await conn.ensure_closed() -@pytest_asyncio.fixture(scope="session", autouse=True) +@pytest_asyncio.fixture(scope="session", autouse=False) async def initialize_tests(connection): async with connection.cursor(cursor=DictCursor) as cursor: await cursor.execute("create database if not exists test") @@ -56,7 +56,7 @@ async def initialize_tests(connection): ) -@pytest_asyncio.fixture(scope="function", autouse=True) +@pytest_asyncio.fixture(scope="function", autouse=False) async def truncate_table(connection): async with connection.cursor(cursor=DictCursor) as cursor: await cursor.execute("truncate table test.asyncmy") diff --git a/tests/test_converters.py b/tests/test_converters.py index 1927457..5c6c07f 100644 --- a/tests/test_converters.py +++ b/tests/test_converters.py @@ -1,6 +1,6 @@ import datetime -from asyncmy.converters import convert_datetime, escape_item, escape_str +from asyncmy.converters import convert_datetime, escape_dict, escape_item, escape_str class CustomDate(datetime.date): @@ -28,3 +28,31 @@ def test_convert_datetime(): # invalid datetime should be returned as str assert convert_datetime("0000-00-00 00:00:00") == "0000-00-00 00:00:00" + + +def test_escape_dict_keys(): + """Test that dict keys are properly escaped (CVE-2025-65896). + + This test ensures that SQL injection via dict keys is prevented. + Previously, only dict values were escaped, allowing attackers to + inject arbitrary SQL via crafted dict keys. + """ + # Test that keys with SQL injection characters are escaped + malicious_key = "foo'; DROP TABLE users; --" + result = escape_dict({malicious_key: "bar"}, "utf-8") + # The key should be escaped, not contain raw SQL injection + assert malicious_key not in result + assert "foo\\'; DROP TABLE users; --" in result + + # Test escaping of various dangerous characters in keys + result = escape_dict({"key'with\"quotes": "value"}, "utf-8") + assert "key\\'with\\\"quotes" in result + + # Test backslash escaping in keys + result = escape_dict({"key\\with\\backslash": "value"}, "utf-8") + assert "key\\\\with\\\\backslash" in result + + # Test normal dict still works + result = escape_dict({"name": "test", "id": 123}, "utf-8") + assert result["name"] == "'test'" + assert result["id"] == "123"