diff --git a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py index 4aee06a..63e1a30 100644 --- a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py +++ b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py @@ -150,7 +150,7 @@ def __repr__(self) -> str: f"fh={self.cipher_path or self.cipher_fh} " f"wal={self.wal} " f"checkpoint={bool(self.checkpoint)} " - f"pages={self.header.page_count}>" + f"pages={self.page_count}>" ) def close(self) -> None: diff --git a/dissect/database/sqlite3/sqlite3.py b/dissect/database/sqlite3/sqlite3.py index 6ded7a2..23bfad3 100644 --- a/dissect/database/sqlite3/sqlite3.py +++ b/dissect/database/sqlite3/sqlite3.py @@ -129,10 +129,13 @@ def __init__( else: self.checkpoint = checkpoint + # Determine the highest page count we have encountered while parsing the SQLite3 header and optionally WAL. + self.page_count = max(self.header.page_count, self.wal.highest_page_num) if self.wal else self.header.page_count + self.page = lru_cache(256)(self.page) def __repr__(self) -> str: - return f"" # noqa: E501 + return f"" # noqa: E501 def __enter__(self) -> Self: """Return ``self`` upon entering the runtime context.""" @@ -202,7 +205,7 @@ def raw_page(self, num: int) -> bytes: """ # Only throw an out of bounds exception if the header contains a page_count. # Some old versions of SQLite3 do not set/update the page_count correctly. - if (num < 1 or num > self.header.page_count) and self.header.page_count > 0: + if (num < 1 or num > self.page_count) and self.page_count > 0: raise InvalidPageNumber("Page number exceeds boundaries") data = None @@ -235,7 +238,7 @@ def page(self, num: int) -> Page: return Page(self, num) def pages(self) -> Iterator[Page]: - for i in range(self.header.page_count): + for i in range(self.page_count): yield self.page(i + 1) def cells(self) -> Iterator[Cell]: diff --git a/dissect/database/sqlite3/wal.py b/dissect/database/sqlite3/wal.py index 7d4ec76..ae01f34 100644 --- a/dissect/database/sqlite3/wal.py +++ b/dissect/database/sqlite3/wal.py @@ -39,6 +39,7 @@ def __init__(self, fh: Path | BinaryIO): raise InvalidDatabase("Invalid WAL header magic") self.checksum_endian = "<" if self.header.magic == WAL_HEADER_MAGIC_LE else ">" + self.highest_page_num = max(fr.page_number for commit in self.commits for fr in commit.frames if fr.valid) self.frame = lru_cache(1024)(self.frame) diff --git a/tests/_data/sqlite3/page_count.db b/tests/_data/sqlite3/page_count.db new file mode 100644 index 0000000..b67b9d3 --- /dev/null +++ b/tests/_data/sqlite3/page_count.db @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:ef1f7ec4df0e2e8a0bbe1bfeb89f8c04fe881cd5f2e5139c8cb94ec88bf53c5e +size 8192 diff --git a/tests/_data/sqlite3/page_count.db-wal b/tests/_data/sqlite3/page_count.db-wal new file mode 100644 index 0000000..a4781d6 --- /dev/null +++ b/tests/_data/sqlite3/page_count.db-wal @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:ea21010a729e817d32f70f93024f96b03136c95047c8d76a8aa342f3e9391266 +size 16512 diff --git a/tests/sqlite3/test_wal.py b/tests/sqlite3/test_wal.py index 6d477fe..ee27e6b 100644 --- a/tests/sqlite3/test_wal.py +++ b/tests/sqlite3/test_wal.py @@ -5,6 +5,7 @@ import pytest from dissect.database.sqlite3 import sqlite3 +from tests._util import absolute_path if TYPE_CHECKING: from pathlib import Path @@ -162,3 +163,39 @@ def _assert_checkpoint_3(s: sqlite3.SQLite3) -> None: assert rows[9].id == 11 assert rows[9].name == "second checkpoint" assert rows[9].value == 101 + + +def test_wal_page_count() -> None: + """Test if we count the page numbers in the SQLite3 and WAL correctly. + + Test data generated using: + + $ sqlite3 tests/_data/sqlite3/page_count.db + SQLite version 3.45.1 2024-01-30 16:01:20 + Enter ".help" for usage hints. + sqlite> PRAGMA journal_mode = WAL; + wal + sqlite> CREATE TABLE t1 (a, b); + sqlite> .quit # commits wal + + $ python + >>> import sqlite3 + >>> con = sqlite3.connect("tests/_data/sqlite3/page_count.db") + ... cur = con.cursor() + >>> cur.execute("INSERT INTO t1 VALUES (1, ?)", ("A" * 8192,)) + >>> con.commit() + # Copy page_count.db* files before closing + """ + + db = sqlite3.SQLite3(absolute_path("_data/sqlite3/page_count.db")) + table = db.table("t1") + assert table.sql == "CREATE TABLE t1 (a, b)" + + row = next(table.rows()) + assert row.a == 1 + assert row.b == "A" * 8192 + + assert db.wal + assert db.wal.highest_page_num == 4 + assert db.header.page_count == 2 + assert db.page_count == 4