Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fc6eaee
Improve WAL
PimSanders Nov 19, 2025
9e4dc6a
Move WAL classes to seperate file
PimSanders Nov 19, 2025
a25688f
Make linter happy
PimSanders Nov 19, 2025
47b7611
Apply suggestions from code review
PimSanders Nov 26, 2025
fc76bfc
Apply suggestions from code review
PimSanders Nov 26, 2025
aa34269
Apply suggestions from code review
PimSanders Nov 26, 2025
36ec9da
Apply suggestions from code review
PimSanders Nov 26, 2025
c727841
Apply suggestions from code review
PimSanders Nov 26, 2025
f1dafd8
Remove open_wal() function
PimSanders Nov 26, 2025
e16fde6
Update dissect/database/sqlite3/sqlite3.py
PimSanders Nov 26, 2025
5c1bdaf
Apply suggestions from code review
PimSanders Nov 26, 2025
b134330
Make linter happy again
PimSanders Nov 26, 2025
95e9ec8
Apply suggestions from code review
PimSanders Dec 1, 2025
527c8ec
Move files
PimSanders Dec 1, 2025
323116c
Update dissect/database/sqlite3/wal.py
PimSanders Dec 1, 2025
307404d
Invert checkpoint order to follow order of commits and frames
PimSanders Dec 1, 2025
ae43fb7
Improve WAL file detection
PimSanders Dec 3, 2025
2dfb115
Add tests for WAL file detection
PimSanders Dec 3, 2025
f38c19d
Apply suggestions from code review
PimSanders Dec 4, 2025
c29c278
Apply suggestions from code review
PimSanders Dec 4, 2025
f79d439
Improve tests
PimSanders Dec 4, 2025
3383f75
Fix missing latest transaction when reading from WAL
PimSanders Dec 4, 2025
438b69d
Move page 1 seek
PimSanders Dec 9, 2025
5d709a0
Revert db seek to first version
PimSanders Dec 9, 2025
07bde24
Apply suggestions from code review
PimSanders Dec 9, 2025
43d1909
Apply suggestions from code review
PimSanders Dec 9, 2025
fe1de49
Write test data to correct directory
PimSanders Dec 9, 2025
e12b54b
Make linter happy once more
PimSanders Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 86 additions & 131 deletions dissect/database/sqlite3/sqlite3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import itertools
import re
import struct
from functools import lru_cache
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING, Any, BinaryIO

from dissect.database.sqlite3.c_sqlite3 import c_sqlite3
Expand All @@ -15,6 +15,7 @@
NoCellData,
)
from dissect.database.sqlite3.util import parse_table_columns_constraints
from dissect.database.sqlite3.wal import WAL, Checkpoint

if TYPE_CHECKING:
from collections.abc import Iterator
Expand Down Expand Up @@ -47,19 +48,50 @@
9: lambda fh: 1,
}

# See https://sqlite.org/fileformat2.html#magic_header_string
SQLITE3_HEADER_MAGIC = b"SQLite format 3\x00"

WAL_HEADER_MAGIC_LE = 0x377F0682
WAL_HEADER_MAGIC_BE = 0x377F0683
WAL_HEADER_MAGIC = {WAL_HEADER_MAGIC_LE, WAL_HEADER_MAGIC_BE}


class SQLite3:
def __init__(self, fh: BinaryIO, wal_fh: BinaryIO | None = None):
"""SQLite3 database class.

Loads a SQLite3 database from the given file-like object or path. If a path is provided (or can be deduced
from the file-like object), a WAL file will be automatically looked for with a few common suffixes.
Optionally a WAL file-like object or path can be directly provided to read changes from the WAL (this takes
priority over the aforementioned WAL lookup). Additionally, a specific checkpoint from the WAL can be applied.

Args:
fh: The path or file-like object to open a SQLite3 database on.
wal: The path or file-like object to open a SQLite3 WAL file on.
checkpoint: The checkpoint to apply from the WAL file. Can be a :class:`Checkpoint` object or an integer index.

Raises:
InvalidDatabase: If the file-like object does not look like a SQLite3 database based on the header magic.

References:
- https://sqlite.org/fileformat2.html
"""

def __init__(
self,
fh: Path | BinaryIO,
wal: WAL | Path | BinaryIO | None = None,
checkpoint: Checkpoint | int | None = None,
):
# Use the provided file handle or try to open the file path.
if hasattr(fh, "read"):
name = getattr(fh, "name", None)
path = Path(name) if name else None
else:
path = fh
fh = path.open("rb")

self.fh = fh
self.wal = WAL(wal_fh) if wal_fh else None
self.path = path
self.wal = None
self.checkpoint = None

self.header = c_sqlite3.header(fh)
self.header = c_sqlite3.header(self.fh)
if self.header.magic != SQLITE3_HEADER_MAGIC:
raise InvalidDatabase("Invalid header magic")

Expand All @@ -72,10 +104,31 @@ def __init__(self, fh: BinaryIO, wal_fh: BinaryIO | None = None):
if self.usable_page_size < 480:
raise InvalidDatabase("Usable page size is too small")

if wal:
self.wal = WAL(wal) if not isinstance(wal, WAL) else wal
elif path:
# Check for WAL sidecar next to the DB.
wal_path = path.with_name(f"{path.name}-wal")
if wal_path.exists():
self.wal = WAL(wal_path)

# If a checkpoint index was provided, resolve it to a Checkpoint object.
if self.wal and isinstance(checkpoint, int):
if checkpoint < 0 or checkpoint >= len(self.wal.checkpoints):
raise IndexError("WAL checkpoint index out of range")
self.checkpoint = self.wal.checkpoints[checkpoint]
else:
self.checkpoint = checkpoint

self.page = lru_cache(256)(self.page)

def open_wal(self, fh: BinaryIO) -> None:
self.wal = WAL(fh)
def checkpoints(self) -> Iterator[SQLite3]:
"""Yield instances of the database at all available checkpoints in the WAL file, if applicable."""
if not self.wal:
return

for checkpoint in self.wal.checkpoints:
yield SQLite3(self.fh, self.wal, checkpoint)

def table(self, name: str) -> Table | None:
name = name.lower()
Expand Down Expand Up @@ -108,10 +161,33 @@ def indices(self) -> Iterator[Index]:
yield Index(self, *cell.values)

def raw_page(self, num: int) -> bytes:
"""Retrieve the raw frame data for the given page number.

Reads the page from a checkpoint, if this class was initialized with a WAL checkpoint.

If a WAL is available, will first check if the WAL contains a more recent version of the page,
otherwise it will read the page from the database file.

References:
- https://sqlite.org/fileformat2.html#reader_algorithm
"""
# Only throw an out of bounds exception if the header contains a page_count.
# Some old versions of SQLite3 do not set/update the page_count correctly.
if (num < 1 or num > self.header.page_count) and self.header.page_count > 0:
raise InvalidPageNumber("Page number exceeds boundaries")

# If a specific WAL checkpoint was provided, use it instead of the on-disk page.
if self.checkpoint is not None and (frame := self.checkpoint.get(num)):
return frame.data

# Check if the latest valid instance of the page is committed (either the frame itself
# is the commit frame or it is included in a commit's frames). If so, return that frame's data.
if self.wal:
for commit in reversed(self.wal.commits):
if (frame := commit.get(num)) and frame.valid:
return frame.data

# Else we read the page from the database file.
if num == 1: # Page 1 is root
self.fh.seek(len(c_sqlite3.header))
else:
Expand Down Expand Up @@ -465,127 +541,6 @@ def values(self) -> list[int | float | str | bytes | None]:
return self._values


class WAL:
def __init__(self, fh: BinaryIO):
self.fh = fh
self.header = c_sqlite3.wal_header(fh)

if self.header.magic not in WAL_HEADER_MAGIC:
raise InvalidDatabase("Invalid header magic")

self.checksum_endian = "<" if self.header.magic == WAL_HEADER_MAGIC_LE else ">"
self._checkpoints = None

self.frame = lru_cache(1024)(self.frame)

def frame(self, frame_idx: int) -> WALFrame:
frame_size = len(c_sqlite3.wal_frame) + self.header.page_size
offset = len(c_sqlite3.wal_header) + frame_idx * frame_size
return WALFrame(self, offset)

def frames(self) -> Iterator[WALFrame]:
frame_idx = 0
while True:
try:
yield self.frame(frame_idx)
frame_idx += 1
except EOFError: # noqa: PERF203
break

def checkpoints(self) -> list[WALCheckpoint]:
if not self._checkpoints:
checkpoints = []
frames = []

for frame in self.frames():
frames.append(frame)

if frame.page_count != 0:
checkpoints.append(WALCheckpoint(self, frames))
frames = []

self._checkpoints = checkpoints

return self._checkpoints


class WALFrame:
def __init__(self, wal: WAL, offset: int):
self.wal = wal
self.offset = offset

self.fh = wal.fh
self._data = None

self.fh.seek(offset)
self.header = c_sqlite3.wal_frame(self.fh)

def __repr__(self) -> str:
return f"<WALFrame page_number={self.page_number} page_count={self.page_count}>"

@property
def valid(self) -> bool:
salt1_match = self.header.salt1 == self.wal.header.salt1
salt2_match = self.header.salt2 == self.wal.header.salt2

return salt1_match and salt2_match

@property
def data(self) -> bytes:
if not self._data:
self.fh.seek(self.offset + len(c_sqlite3.wal_frame))
self._data = self.fh.read(self.wal.header.page_size)
return self._data

@property
def page_number(self) -> int:
return self.header.page_number

@property
def page_count(self) -> int:
return self.header.page_count


class WALCheckpoint:
def __init__(self, wal: WAL, frames: list[WALFrame]):
self.wal = wal
self.frames = frames
self._page_map = None

def __contains__(self, page: int) -> bool:
return page in self.page_map

def __getitem__(self, page: int) -> WALFrame:
return self.page_map[page]

def __repr__(self) -> str:
return f"<WALCheckpoint frames={len(self.frames)}>"

@property
def page_map(self) -> dict[int, WALFrame]:
if not self._page_map:
self._page_map = {frame.page_number: frame for frame in self.frames}

return self._page_map

def get(self, page: int, default: Any = None) -> WALFrame:
return self.page_map.get(page, default)


def wal_checksum(buf: bytes, endian: str = ">") -> tuple[int, int]:
"""For future use, will be used when WAL is fully implemented"""

s0 = s1 = 0
num_ints = len(buf) // 4
arr = struct.unpack(f"{endian}{num_ints}I", buf)

for int_num in range(0, num_ints, 2):
s0 = (s0 + (arr[int_num] + s1)) & 0xFFFFFFFF
s1 = (s1 + (arr[int_num + 1] + s0)) & 0xFFFFFFFF

return s0, s1


def walk_tree(sqlite: SQLite3, page: Page) -> Iterator[Cell]:
if page.header.flags in (
c_sqlite3.PAGE_TYPE_LEAF_TABLE,
Expand Down
Loading