diff --git a/src/lean_spec/snappy/__init__.py b/src/lean_spec/snappy/__init__.py index ccfea13f..d7a29d51 100644 --- a/src/lean_spec/snappy/__init__.py +++ b/src/lean_spec/snappy/__init__.py @@ -27,11 +27,15 @@ get_uncompressed_length, is_valid_compressed_data, ) +from .framing import frame_compress, frame_decompress __all__ = [ - # Core API + # Core API (raw block format) "compress", "decompress", + # Framing API (streaming format used by Ethereum) + "frame_compress", + "frame_decompress", # Utilities "max_compressed_length", "get_uncompressed_length", diff --git a/src/lean_spec/snappy/framing.py b/src/lean_spec/snappy/framing.py new file mode 100644 index 00000000..0970ae86 --- /dev/null +++ b/src/lean_spec/snappy/framing.py @@ -0,0 +1,471 @@ +""" +Snappy framing format for streaming compression. + +This module implements the Snappy framing format, a wrapper around raw Snappy +that enables streaming and adds CRC32C checksums for error detection. + + +WHY FRAMING? +------------ +Raw Snappy compresses a single block of data. For network protocols, we need: + + 1. STREAMING: Process data in chunks without buffering everything. + 2. ERROR DETECTION: Detect corruption during transmission. + 3. CONCATENATION: Combine multiple compressed streams. + +The framing format solves all three by wrapping raw Snappy in checksummed chunks. + + +STREAM STRUCTURE +---------------- +A framed stream consists of chunks laid back-to-back:: + + [stream_identifier][chunk_1][chunk_2]...[chunk_n] + +The stream MUST start with a stream identifier. There is no end-of-stream +marker; the stream ends when the data ends. + + +STREAM IDENTIFIER (10 bytes) +---------------------------- +Every stream starts with:: + + 0xff 0x06 0x00 0x00 's' 'N' 'a' 'P' 'p' 'Y' + +Breakdown: + - 0xff: Chunk type (stream identifier) + - 0x06 0x00 0x00: Chunk length = 6 (little-endian) + - "sNaPpY": Magic bytes + +This identifier can appear multiple times (e.g., concatenated streams). + + +CHUNK FORMAT +------------ +Each chunk has a 4-byte header followed by data:: + + [type: 1 byte][length: 3 bytes LE][data: length bytes] + +The length field does NOT include the 4-byte header itself. + + +CHUNK TYPES +----------- + 0x00: Compressed data + [crc32c: 4 bytes][snappy_compressed_data] + CRC covers the UNCOMPRESSED data. + + 0x01: Uncompressed data + [crc32c: 4 bytes][raw_data] + Used when compression would expand the data. + + 0xff: Stream identifier (see above) + + 0x02-0x7f: Reserved unskippable (must error if encountered) + + 0x80-0xfe: Reserved skippable (must skip silently) + + +CRC32C MASKING +-------------- +CRCs are stored "masked" to detect common corruptions:: + + masked = rotate_right(crc, 15) + 0xA282EAD8 + +This detects patterns like all-zeros that might not affect an unmasked CRC. + + +SIZE LIMITS +----------- +Each chunk's uncompressed data must be at most 65536 bytes (64 KiB). +This allows decompressors to use fixed-size buffers. + + +Reference: + https://github.com/google/snappy/blob/master/framing_format.txt +""" + +from __future__ import annotations + +from .compress import compress as raw_compress +from .decompress import SnappyDecompressionError +from .decompress import decompress as raw_decompress + +STREAM_IDENTIFIER: bytes = b"\xff\x06\x00\x00sNaPpY" +"""Stream identifier marking the start of a Snappy framed stream. + +Format: [type=0xff][length=6 as 3-byte LE][magic="sNaPpY"] + +This 10-byte sequence MUST appear at the start of every framed stream. +It may also appear later (e.g., when streams are concatenated). +""" + +CHUNK_TYPE_COMPRESSED: int = 0x00 +"""Chunk type for Snappy-compressed data. + +Chunk data format: [masked_crc32c: 4 bytes LE][compressed_payload] +The CRC covers the UNCOMPRESSED data, not the compressed payload. +""" + +CHUNK_TYPE_UNCOMPRESSED: int = 0x01 +"""Chunk type for uncompressed (raw) data. + +Chunk data format: [masked_crc32c: 4 bytes LE][raw_payload] +Used when compression would expand the data (e.g., random bytes). +""" + +MAX_UNCOMPRESSED_CHUNK_SIZE: int = 65536 +"""Maximum uncompressed data per chunk (64 KiB). + +This limit enables fixed-size decompression buffers. +Chunks exceeding this limit are rejected. +""" + +CRC32C_MASK_DELTA: int = 0xA282EAD8 +"""Constant added during CRC masking. + +From the spec: "Rotate right by 15 bits, then add 0xa282ead8." +This value is from Apache Hadoop's CRC masking scheme. +""" + + +# ============================================================================= +# CRC32C Implementation +# ============================================================================= +# +# CRC32C uses the Castagnoli polynomial (0x1EDC6F41), which has better error +# detection properties than the standard CRC32 polynomial. +# +# We use a lookup table for efficiency. Each byte lookup replaces 8 XOR/shift +# operations with a single table access. + + +def _crc32c_table() -> list[int]: + """ + Generate the CRC32C lookup table. + + Uses the Castagnoli polynomial 0x82F63B78 (bit-reversed form of 0x1EDC6F41). + + Returns: + 256-entry lookup table for byte-at-a-time CRC computation. + + Algorithm: + For each possible byte value (0-255): + 1. Start with the byte value as the CRC. + 2. For each of the 8 bits: + - If LSB is 1: shift right and XOR with polynomial. + - If LSB is 0: just shift right. + 3. Store final value in table. + """ + table = [] + for i in range(256): + crc = i + for _ in range(8): + # Process one bit: if LSB is set, XOR with polynomial. + if crc & 1: + crc = (crc >> 1) ^ 0x82F63B78 + else: + crc >>= 1 + table.append(crc) + return table + + +# Pre-compute the table at module load time. +_CRC32C_TABLE: list[int] = _crc32c_table() + + +def _crc32c(data: bytes) -> int: + r""" + Compute CRC32C checksum of data. + + Args: + data: Input bytes. + + Returns: + 32-bit CRC32C checksum. + + Algorithm: + 1. Initialize CRC to all 1s (0xFFFFFFFF). + 2. For each byte: XOR with CRC, look up in table, XOR result with CRC>>8. + 3. Invert final CRC (XOR with 0xFFFFFFFF). + """ + # Step 1: Initialize to all 1s. + crc = 0xFFFFFFFF + + # Step 2: Process each byte using the lookup table. + # + # The table lookup combines 8 bit operations into one: + # index = (crc ^ byte) & 0xFF <- low byte determines table entry + # crc = table[index] ^ (crc >> 8) <- combine with shifted CRC + for byte in data: + crc = _CRC32C_TABLE[(crc ^ byte) & 0xFF] ^ (crc >> 8) + + # Step 3: Invert the final value. + return crc ^ 0xFFFFFFFF + + +def _mask_crc(crc: int) -> int: + """ + Mask a CRC32C for storage in Snappy frames. + + Args: + crc: Raw CRC32C checksum. + + Returns: + Masked CRC value. + + Why mask? + Certain corruption patterns (all zeros, all ones) might not change + an unmasked CRC. Masking transforms the CRC to detect these patterns. + + Formula (from spec): + masked = rotate_right(crc, 15) + 0xa282ead8 + + In bit operations: + rotate_right(x, 15) = (x >> 15) | (x << 17) + """ + # - Rotate right by 15 bits = shift right 15, OR with shift left 17. + # - Add the mask delta constant. + # - Mask to 32 bits (Python integers are arbitrary precision). + return (((crc >> 15) | (crc << 17)) + CRC32C_MASK_DELTA) & 0xFFFFFFFF + + +def frame_compress(data: bytes) -> bytes: + """ + Compress data using Snappy framing format. + + This is the compression format required by Ethereum's req/resp protocol. + Data is split into chunks of at most 64 KiB, each independently compressed. + + Args: + data: Uncompressed input bytes. + + Returns: + Snappy framed stream ready for transmission. + + Output format:: + + [stream_identifier: 10 bytes] + [chunk_1: type + length + crc + payload] + [chunk_2: type + length + crc + payload] + ... + + Each chunk is either: + - Compressed (0x00): When compression reduces size. + - Uncompressed (0x01): When compression would expand data. + """ + # Step 1: Start with stream identifier. + # + # Every framed stream MUST begin with this 10-byte magic sequence. + # It identifies the format and allows stream concatenation. + output = bytearray(STREAM_IDENTIFIER) + + # Step 2: Process input in chunks. + # + # We split input into chunks of at most 64 KiB. Each chunk is compressed + # independently, allowing the decompressor to use fixed-size buffers. + offset = 0 + while offset < len(data): + # Extract next chunk (up to 64 KiB). + chunk_end = min(offset + MAX_UNCOMPRESSED_CHUNK_SIZE, len(data)) + chunk = data[offset:chunk_end] + offset = chunk_end + + # Compress the chunk using raw Snappy. + compressed = raw_compress(chunk) + + # Compute CRC of UNCOMPRESSED data. + # + # Important: The CRC covers the original data, not the compressed form. + # This allows verification after decompression. + crc = _mask_crc(_crc32c(chunk)) + + # Choose chunk type based on compression effectiveness. + # + # If compression expanded the data (e.g., random bytes), store raw. + # This ensures the framed output is never larger than necessary. + if len(compressed) < len(chunk): + chunk_type = CHUNK_TYPE_COMPRESSED + payload = compressed + else: + chunk_type = CHUNK_TYPE_UNCOMPRESSED + payload = chunk + + # Build chunk: [type: 1][length: 3 LE][crc: 4 LE][payload]. + # + # The length field includes CRC (4 bytes) + payload. + # It does NOT include the 4-byte header (type + length). + chunk_length = 4 + len(payload) + output.append(chunk_type) + output.extend(chunk_length.to_bytes(3, "little")) + output.extend(crc.to_bytes(4, "little")) + output.extend(payload) + + return bytes(output) + + +def frame_decompress(data: bytes) -> bytes: + """ + Decompress Snappy framed data. + + Validates the stream structure and CRC32C checksums for each chunk. + Corrupted or malformed streams raise SnappyDecompressionError. + + Args: + data: Snappy framed stream. + + Returns: + Original uncompressed data. + + Raises: + SnappyDecompressionError: If the stream is malformed or corrupted. + + Handled chunk types: + - 0x00 (compressed): Decompress and verify CRC. + - 0x01 (uncompressed): Verify CRC, copy directly. + - 0xff (stream identifier): Validate and skip. + - 0x02-0x7f (reserved unskippable): Raise error. + - 0x80-0xfe (reserved skippable): Skip silently. + """ + # Step 1: Validate minimum length. + # + # A valid stream must have at least the stream identifier (10 bytes). + if len(data) < len(STREAM_IDENTIFIER): + raise SnappyDecompressionError("Input too short for framed snappy") + + # Step 2: Validate stream identifier. + # + # The first 10 bytes MUST be the magic sequence. + if not data.startswith(STREAM_IDENTIFIER): + raise SnappyDecompressionError("Invalid stream identifier") + + # Step 3: Process chunks. + # + # We iterate through the stream, processing each chunk according to its type. + # Output is accumulated in a bytearray for efficiency. + output = bytearray() + pos = len(STREAM_IDENTIFIER) + + while pos < len(data): + # Read chunk header: [type: 1][length: 3 LE] + + # Ensure we have a complete header (4 bytes). + if pos + 4 > len(data): + raise SnappyDecompressionError("Truncated chunk header") + + # Extract type and length. + chunk_type = data[pos] + chunk_length = int.from_bytes(data[pos + 1 : pos + 4], "little") + pos += 4 + + # Validate chunk data is present. + if pos + chunk_length > len(data): + raise SnappyDecompressionError( + f"Chunk extends past end: need {chunk_length} bytes at {pos}, " + f"have {len(data) - pos}" + ) + + # Extract chunk data. + chunk_data = data[pos : pos + chunk_length] + pos += chunk_length + + # Process based on chunk type + + if chunk_type == CHUNK_TYPE_COMPRESSED: + # COMPRESSED CHUNK (0x00) + # + # Format: [masked_crc: 4 bytes LE][snappy_compressed_payload] + # + # The CRC covers the UNCOMPRESSED data. + + # Ensure we have at least the CRC. + if len(chunk_data) < 4: + raise SnappyDecompressionError("Compressed chunk too short for CRC") + + # Extract stored CRC and compressed payload. + stored_crc = int.from_bytes(chunk_data[:4], "little") + compressed_payload = chunk_data[4:] + + # Decompress using raw Snappy. + uncompressed = raw_decompress(compressed_payload) + + # Enforce maximum chunk size (spec section 4.2). + # + # A malicious encoder could claim a huge decompressed size. + # We reject chunks exceeding 64 KiB to bound memory usage. + if len(uncompressed) > MAX_UNCOMPRESSED_CHUNK_SIZE: + raise SnappyDecompressionError( + f"Decompressed chunk exceeds {MAX_UNCOMPRESSED_CHUNK_SIZE} bytes" + ) + + # Verify CRC matches uncompressed data. + computed_crc = _mask_crc(_crc32c(uncompressed)) + if stored_crc != computed_crc: + raise SnappyDecompressionError( + f"CRC mismatch: stored {stored_crc:#x}, computed {computed_crc:#x}" + ) + + # Append to output. + output.extend(uncompressed) + + elif chunk_type == CHUNK_TYPE_UNCOMPRESSED: + # UNCOMPRESSED CHUNK (0x01) + # + # Format: [masked_crc: 4 bytes LE][raw_payload] + # + # Used when compression would expand the data. + + # Ensure we have at least the CRC. + if len(chunk_data) < 4: + raise SnappyDecompressionError("Uncompressed chunk too short for CRC") + + # Extract stored CRC and raw payload. + stored_crc = int.from_bytes(chunk_data[:4], "little") + raw_payload = chunk_data[4:] + + # Enforce maximum chunk size (spec section 4.3). + if len(raw_payload) > MAX_UNCOMPRESSED_CHUNK_SIZE: + raise SnappyDecompressionError( + f"Uncompressed chunk exceeds {MAX_UNCOMPRESSED_CHUNK_SIZE} bytes" + ) + + # Verify CRC matches the raw payload. + computed_crc = _mask_crc(_crc32c(raw_payload)) + if stored_crc != computed_crc: + raise SnappyDecompressionError( + f"CRC mismatch: stored {stored_crc:#x}, computed {computed_crc:#x}" + ) + + # Append to output. + output.extend(raw_payload) + + elif chunk_type == 0xFF: + # STREAM IDENTIFIER CHUNK (0xff) + # + # Can appear multiple times (e.g., concatenated streams). + # Spec section 4.1: Verify length is 6 and content is "sNaPpY". + + if chunk_length != 6: + raise SnappyDecompressionError( + f"Stream identifier chunk must be 6 bytes, got {chunk_length}" + ) + if chunk_data != b"sNaPpY": + raise SnappyDecompressionError("Invalid stream identifier content") + + # Valid identifier - nothing to output, just continue. + + elif 0x02 <= chunk_type <= 0x7F: + # RESERVED UNSKIPPABLE CHUNK (0x02-0x7f) + # + # Spec section 4.5: These are reserved for future use. + # A decoder MUST NOT silently skip them; it must error. + raise SnappyDecompressionError(f"Unknown unskippable chunk type: {chunk_type:#x}") + + else: + # RESERVED SKIPPABLE CHUNK (0x80-0xfe) or PADDING (0xfe) + # + # Spec sections 4.4, 4.6: These are reserved for future use. + # A decoder MUST skip them silently and continue processing. + pass + + return bytes(output) diff --git a/src/lean_spec/subspecs/networking/__init__.py b/src/lean_spec/subspecs/networking/__init__.py index 33ed0b00..d3a2e9db 100644 --- a/src/lean_spec/subspecs/networking/__init__.py +++ b/src/lean_spec/subspecs/networking/__init__.py @@ -1,9 +1,12 @@ """Exports the networking subspec components.""" from .config import ( + MAX_PAYLOAD_SIZE, MAX_REQUEST_BLOCKS, MESSAGE_DOMAIN_INVALID_SNAPPY, MESSAGE_DOMAIN_VALID_SNAPPY, + RESP_TIMEOUT, + TTFB_TIMEOUT, ) from .gossipsub.message import GossipsubMessage from .gossipsub.parameters import GossipsubParameters @@ -13,22 +16,39 @@ STATUS_PROTOCOL_V1, BlocksByRootRequest, BlocksByRootResponse, + CodecError, + ResponseCode, Status, + decode_request, + encode_request, ) from .types import DomainType, ForkDigest, ProtocolId __all__ = [ + # Config "MAX_REQUEST_BLOCKS", + "MAX_PAYLOAD_SIZE", + "TTFB_TIMEOUT", + "RESP_TIMEOUT", "MESSAGE_DOMAIN_INVALID_SNAPPY", "MESSAGE_DOMAIN_VALID_SNAPPY", + # Gossipsub "GossipsubParameters", "GossipTopic", "GossipsubMessage", + # ReqResp - Protocol IDs "BLOCKS_BY_ROOT_PROTOCOL_V1", "STATUS_PROTOCOL_V1", + # ReqResp - Message types "BlocksByRootRequest", "BlocksByRootResponse", "Status", + # ReqResp - Codec + "CodecError", + "ResponseCode", + "encode_request", + "decode_request", + # Types "DomainType", "ProtocolId", "ForkDigest", diff --git a/src/lean_spec/subspecs/networking/config.py b/src/lean_spec/subspecs/networking/config.py index 93b1740d..85c55129 100644 --- a/src/lean_spec/subspecs/networking/config.py +++ b/src/lean_spec/subspecs/networking/config.py @@ -6,8 +6,31 @@ from .types import DomainType -MAX_REQUEST_BLOCKS: Final = 2**10 -"""Maximum number of blocks in a single request.""" +# --- Request/Response Limits --- + +MAX_REQUEST_BLOCKS: Final[int] = 2**10 +"""Maximum number of blocks in a single request (1024).""" + +MAX_PAYLOAD_SIZE: Final[int] = 10 * 1024 * 1024 +"""Maximum uncompressed payload size in bytes (10 MiB).""" + +# --- Timeouts (in seconds) --- + +TTFB_TIMEOUT: Final[float] = 5.0 +"""Time-to-first-byte timeout. + +Maximum time to wait for the first byte of a response after sending a request. +If no data arrives within this window, the request is considered failed. +""" + +RESP_TIMEOUT: Final[float] = 10.0 +"""Response timeout. + +Maximum total time to receive a complete response. This covers the entire +response, including all chunks for multi-part responses like BlocksByRange. +""" + +# --- Gossip Message Domains --- MESSAGE_DOMAIN_INVALID_SNAPPY: Final[DomainType] = Bytes4(b"\x00\x00\x00\x00") """4-byte domain for gossip message-id isolation of invalid snappy messages.""" diff --git a/src/lean_spec/subspecs/networking/reqresp/__init__.py b/src/lean_spec/subspecs/networking/reqresp/__init__.py index b8585f46..d2b1d377 100644 --- a/src/lean_spec/subspecs/networking/reqresp/__init__.py +++ b/src/lean_spec/subspecs/networking/reqresp/__init__.py @@ -1,5 +1,13 @@ """ReqResp specs for the Lean Ethereum consensus specification.""" +from .codec import ( + CodecError, + ResponseCode, + decode_request, + decode_varint, + encode_request, + encode_varint, +) from .message import ( BLOCKS_BY_ROOT_PROTOCOL_V1, STATUS_PROTOCOL_V1, @@ -9,9 +17,18 @@ ) __all__ = [ + # Protocol IDs "BLOCKS_BY_ROOT_PROTOCOL_V1", "STATUS_PROTOCOL_V1", + # Message types "BlocksByRootRequest", "BlocksByRootResponse", "Status", + # Codec + "CodecError", + "ResponseCode", + "encode_request", + "decode_request", + "encode_varint", + "decode_varint", ] diff --git a/src/lean_spec/subspecs/networking/reqresp/codec.py b/src/lean_spec/subspecs/networking/reqresp/codec.py new file mode 100644 index 00000000..029286de --- /dev/null +++ b/src/lean_spec/subspecs/networking/reqresp/codec.py @@ -0,0 +1,445 @@ +""" +Req/Resp wire format codec for Ethereum consensus networking. + +This module implements the wire format for Ethereum's request/response protocol, +used for peer-to-peer communication between consensus clients. + + +WHY THIS FORMAT? +---------------- +Ethereum's req/resp protocol runs over libp2p streams. The wire format must: + + 1. STREAM-FRIENDLY: Work with libp2p's stream-based I/O (no message framing). + 2. SIZE-EFFICIENT: Minimize bandwidth for large payloads (blocks, attestations). + 3. INTEROPERABLE: Match other clients. + +The format achieves this by: + - Using varints for compact length encoding (1 byte for values < 128). + - Applying Snappy framing for compression with error detection. + - Prefixing with uncompressed length for buffer allocation. + + +WIRE FORMATS +------------ +Request format:: + + [varint: uncompressed_length][snappy_framed_ssz_payload] + +Response format:: + + [response_code: 1 byte][varint: uncompressed_length][snappy_framed_ssz_payload] + +The response code indicates success (0) or various error conditions (1-3). + + +VARINT ENCODING (LEB128) +------------------------ +Varints use unsigned LEB128 (Little-Endian Base 128), the same encoding as +Protocol Buffers. Each byte encodes 7 bits of data with bit 7 as continuation:: + + Value 0-127: 1 byte [0xxxxxxx] + Value 128-16383: 2 bytes [1xxxxxxx] [0xxxxxxx] + Value 16384+: 3+ bytes [1xxxxxxx] [1xxxxxxx] [0xxxxxxx] ... + +The MSB (bit 7) indicates whether more bytes follow: + - 0: This is the last byte. + - 1: More bytes follow. + +Example: 300 = 0b100101100 + - Split into 7-bit groups: 0b10 (high), 0b0101100 (low) + - Encode low group with continuation: 0b10101100 = 0xAC + - Encode high group (final): 0b00000010 = 0x02 + - Result: [0xAC, 0x02] + + +LENGTH PREFIX +------------- +The varint length prefix serves two purposes: + + 1. BUFFER ALLOCATION: Receiver knows the uncompressed size upfront. + 2. VALIDATION: After decompression, verify the size matches. + +This prevents decompression bombs (small compressed → huge uncompressed). + + +References: + Ethereum P2P spec: + https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md + LEB128 encoding: + https://en.wikipedia.org/wiki/LEB128 + Snappy framing: + https://github.com/google/snappy/blob/master/framing_format.txt +""" + +from __future__ import annotations + +from enum import IntEnum + +from lean_spec.snappy import SnappyDecompressionError, frame_compress, frame_decompress + +from ..config import MAX_PAYLOAD_SIZE + + +class CodecError(Exception): + """Raised when encoding or decoding fails. + + This covers all wire format errors: + - Truncated or malformed varints + - Payload size limit violations + - Snappy decompression failures + - Length mismatches after decompression + """ + + pass + + +def encode_varint(value: int) -> bytes: + """ + Encode an unsigned integer as a varint (LEB128). + + LEB128 is a variable-length encoding for integers. + + Small values use fewer bytes, making it efficient for typical message sizes. + + Args: + value: Non-negative integer to encode. + + Returns: + Varint-encoded bytes (1-10 bytes for 64-bit values). + + Raises: + ValueError: If value is negative. + + Algorithm: + 1. Extract low 7 bits of value. + 2. If more bits remain, set continuation bit (0x80) and repeat. + 3. When no more bits, write final byte without continuation. + """ + # Varints only encode non-negative values. + if value < 0: + raise ValueError("Varint value must be non-negative") + + result = bytearray() + + # Process 7 bits at a time until value fits in 7 bits. + while value >= 0x80: + # Extract low 7 bits and set continuation flag (bit 7). + # + # value & 0x7F: Get low 7 bits. + # | 0x80: Set bit 7 to indicate more bytes follow. + result.append((value & 0x7F) | 0x80) + + # Shift right to process next 7 bits. + value >>= 7 + + # Write final byte (no continuation flag). + result.append(value) + + return bytes(result) + + +def decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]: + r""" + Decode a varint from bytes. + + Args: + data: Input bytes containing the varint. + offset: Starting position in data. + + Returns: + Tuple of (decoded_value, bytes_consumed). + + Raises: + CodecError: If the varint is truncated or exceeds 10 bytes. + + Algorithm: + 1. Read bytes until one has bit 7 clear (no continuation). + 2. Accumulate 7-bit groups into result, shifted appropriately. + 3. Reject varints longer than 10 bytes (would overflow 64 bits). + """ + result = 0 + shift = 0 + pos = offset + + while True: + # Check for truncated input. + if pos >= len(data): + raise CodecError("Truncated varint") + + # Read next byte. + byte = data[pos] + pos += 1 + + # Accumulate low 7 bits at current position. + # + # byte & 0x7F: Extract low 7 bits (data bits). + # << shift: Position them correctly in result. + result |= (byte & 0x7F) << shift + shift += 7 + + # Check continuation flag (bit 7). + # + # If bit 7 is clear, this is the last byte. + if not (byte & 0x80): + break + + # Reject overly long varints. + # + # 10 bytes × 7 bits = 70 bits, but we only support 64-bit values. + # After 10 bytes (shift >= 70), reject as malformed. + if shift >= 70: + raise CodecError("Varint too long") + + return result, pos - offset + + +def encode_request(ssz_data: bytes) -> bytes: + """ + Encode an SSZ-serialized request for transmission. + + Args: + ssz_data: SSZ-encoded request message. + + Returns: + Wire-format bytes ready for transmission. + + Raises: + CodecError: If the payload exceeds MAX_PAYLOAD_SIZE (10 MiB). + + Wire format:: + + [varint: uncompressed_length][snappy_framed_payload] + + Why this order? + The length comes first so receivers can: + 1. Reject oversized requests before decompressing. + 2. Allocate the correct buffer size upfront. + """ + # Step 1: Validate payload size. + # + # Reject requests that exceed the protocol limit. + # This prevents encoding payloads that peers will reject. + if len(ssz_data) > MAX_PAYLOAD_SIZE: + raise CodecError(f"Payload too large: {len(ssz_data)} > {MAX_PAYLOAD_SIZE}") + + # Step 2: Compress with Snappy framing. + # + # Snappy framing adds: + # - Stream identifier (10 bytes) + # - CRC32C checksums for error detection + # - Chunking for large payloads + compressed = frame_compress(ssz_data) + + # Step 3: Prepend uncompressed length as varint. + # + # The length is of the ORIGINAL data, not the compressed data. + # This lets receivers validate after decompression. + length_prefix = encode_varint(len(ssz_data)) + + return length_prefix + compressed + + +def decode_request(data: bytes) -> bytes: + """ + Decode a wire-format request to SSZ bytes. + + Args: + data: Wire-format request bytes. + + Returns: + SSZ-encoded request message. + + Raises: + CodecError: If the request is malformed, corrupted, or oversized. + + Validation steps: + 1. Decode varint length prefix. + 2. Reject if declared length exceeds MAX_PAYLOAD_SIZE. + 3. Decompress Snappy framed payload. + 4. Verify decompressed size matches declared length. + """ + # Step 1: Reject empty input. + if not data: + raise CodecError("Empty request") + + # Step 2: Decode the varint length prefix. + # + # This tells us the expected uncompressed size. + try: + declared_length, varint_size = decode_varint(data) + except CodecError as e: + raise CodecError(f"Invalid request length: {e}") from e + + # Step 3: Validate declared length. + # + # Reject before decompressing to prevent resource exhaustion. + if declared_length > MAX_PAYLOAD_SIZE: + raise CodecError(f"Declared length too large: {declared_length} > {MAX_PAYLOAD_SIZE}") + + # Step 4: Decompress Snappy framed payload. + # + # The payload starts after the varint prefix. + compressed_data = data[varint_size:] + try: + decompressed = frame_decompress(compressed_data) + except SnappyDecompressionError as e: + raise CodecError(f"Decompression failed: {e}") from e + + # Step 5: Validate length matches. + # + # This catches corrupted data or malicious length claims. + if len(decompressed) != declared_length: + raise CodecError(f"Length mismatch: declared {declared_length}, got {len(decompressed)}") + + return decompressed + + +class ResponseCode(IntEnum): + """ + Response codes for req/resp protocol messages. + + The first byte of every response indicates success or failure: + - On success (code 0), the payload contains the requested data. + - On failure (codes 1-3), the payload contains an error message. + + Wire format:: + + [response_code: 1 byte][varint_length][snappy_framed_payload] + + Unknown codes are handled gracefully: + - Codes 4-127: Treated as SERVER_ERROR (reserved for future use). + - Codes 128-255: Treated as INVALID_REQUEST (invalid range). + + Reference: + https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md + """ + + SUCCESS = 0 + """Request completed successfully. Payload contains the response data.""" + + INVALID_REQUEST = 1 + """Request was malformed or violated protocol rules.""" + + SERVER_ERROR = 2 + """Server encountered an internal error processing the request.""" + + RESOURCE_UNAVAILABLE = 3 + """Requested resource (block, blob, etc.) is not available.""" + + def encode(self, ssz_data: bytes) -> bytes: + """ + Encode an SSZ-serialized response for transmission. + + Args: + ssz_data: SSZ-encoded response message, or UTF-8 error for failures. + + Returns: + Wire-format bytes ready for transmission. + + Raises: + CodecError: If the payload exceeds MAX_PAYLOAD_SIZE (10 MiB). + + Wire format:: + + [response_code: 1 byte][varint: uncompressed_length][snappy_framed_payload] + + Example:: + + >>> # Success response with data + >>> wire = ResponseCode.SUCCESS.encode(block.to_ssz()) + >>> + >>> # Error response with message + >>> wire = ResponseCode.RESOURCE_UNAVAILABLE.encode(b"Block not found") + """ + # Step 1: Validate payload size. + if len(ssz_data) > MAX_PAYLOAD_SIZE: + raise CodecError(f"Payload too large: {len(ssz_data)} > {MAX_PAYLOAD_SIZE}") + + # Step 2: Compress with Snappy framing. + compressed = frame_compress(ssz_data) + + # Step 3: Build response: [code][length][payload]. + # + # The code byte comes first so receivers can quickly determine + # whether to expect data or an error message. + output = bytearray() + + # Response code (1 byte). + output.append(self) + + # Uncompressed length as varint. + output.extend(encode_varint(len(ssz_data))) + + # Snappy framed payload. + output.extend(compressed) + + return bytes(output) + + @classmethod + def decode(cls, data: bytes) -> tuple[ResponseCode, bytes]: + """ + Decode a wire-format response. + + Args: + data: Wire-format response bytes. + + Returns: + Tuple of (response_code, ssz_data). + + Raises: + CodecError: If the response is malformed, corrupted, or oversized. + """ + # Step 1: Reject empty input. + if not data: + raise CodecError("Empty response") + + # Step 2: Ensure minimum length. + # + # Need at least: 1 byte (code) + 1 byte (minimum varint). + if len(data) < 2: + raise CodecError("Response too short") + + # Step 3: Extract and interpret response code. + # + # The first byte is the response code. + raw_code = data[0] + try: + code = cls(raw_code) + except ValueError: + # Handle unknown codes gracefully. + # + # - Codes 4-127: Reserved, treat as server error. + # - Codes 128-255: Invalid range, treat as invalid request. + if raw_code <= 127: + code = cls.SERVER_ERROR + else: + code = cls.INVALID_REQUEST + + # Step 4: Decode the varint length prefix. + # + # Starts at offset 1 (after the code byte). + try: + declared_length, varint_size = decode_varint(data, offset=1) + except CodecError as e: + raise CodecError(f"Invalid response length: {e}") from e + + # Step 5: Validate declared length. + if declared_length > MAX_PAYLOAD_SIZE: + raise CodecError(f"Declared length too large: {declared_length} > {MAX_PAYLOAD_SIZE}") + + # Step 6: Decompress Snappy framed payload. + # + # Payload starts after code (1 byte) + varint. + compressed_data = data[1 + varint_size :] + try: + decompressed = frame_decompress(compressed_data) + except SnappyDecompressionError as e: + raise CodecError(f"Decompression failed: {e}") from e + + # Step 7: Validate length matches. + if len(decompressed) != declared_length: + raise CodecError( + f"Length mismatch: declared {declared_length}, got {len(decompressed)}" + ) + + return code, decompressed diff --git a/tests/lean_spec/snappy/test_framing.py b/tests/lean_spec/snappy/test_framing.py new file mode 100644 index 00000000..5f7e87de --- /dev/null +++ b/tests/lean_spec/snappy/test_framing.py @@ -0,0 +1,380 @@ +""" +Comprehensive tests for Snappy framing format. + +Tests verify compliance with the official specification: +https://github.com/google/snappy/blob/master/framing_format.txt +""" + +from __future__ import annotations + +import pytest + +from lean_spec.snappy import SnappyDecompressionError +from lean_spec.snappy.framing import ( + CHUNK_TYPE_COMPRESSED, + CHUNK_TYPE_UNCOMPRESSED, + CRC32C_MASK_DELTA, + MAX_UNCOMPRESSED_CHUNK_SIZE, + STREAM_IDENTIFIER, + _crc32c, + _mask_crc, + frame_compress, + frame_decompress, +) + + +class TestStreamIdentifier: + """Tests for stream identifier (spec section 4.1).""" + + def test_stream_identifier_format(self) -> None: + """Stream identifier matches spec: 0xff 0x06 0x00 0x00 sNaPpY.""" + expected = b"\xff\x06\x00\x00sNaPpY" + assert STREAM_IDENTIFIER == expected + assert len(STREAM_IDENTIFIER) == 10 + + def test_compressed_starts_with_identifier(self) -> None: + """All compressed output starts with stream identifier.""" + data = b"test data" + compressed = frame_compress(data) + assert compressed.startswith(STREAM_IDENTIFIER) + + def test_empty_data_has_identifier(self) -> None: + """Even empty data produces stream identifier.""" + compressed = frame_compress(b"") + assert compressed == STREAM_IDENTIFIER + + def test_repeated_identifier_accepted(self) -> None: + """Repeated stream identifiers in concatenated streams are accepted.""" + data = b"test" + stream1 = frame_compress(data) + stream2 = frame_compress(data) + # Concatenate two valid streams + combined = stream1 + stream2 + # Should decompress to concatenated data + result = frame_decompress(combined) + assert result == data + data + + def test_invalid_identifier_rejected(self) -> None: + """Invalid stream identifier content is rejected.""" + # Valid header structure but wrong content + bad_stream = b"\xff\x06\x00\x00BADDAT" + with pytest.raises(SnappyDecompressionError, match="Invalid stream identifier"): + frame_decompress(bad_stream) + + def test_wrong_identifier_length_rejected(self) -> None: + """Stream identifier with wrong length is rejected.""" + # Build a stream with identifier chunk of wrong length + # First valid identifier, then a bad 0xFF chunk with length 5 + valid_part = STREAM_IDENTIFIER + bad_identifier_chunk = b"\xff\x05\x00\x00sNaPp" # Only 5 bytes, not 6 + bad_stream = valid_part + bad_identifier_chunk + with pytest.raises(SnappyDecompressionError, match="must be 6 bytes"): + frame_decompress(bad_stream) + + +class TestChunkFormat: + """Tests for chunk format (spec section 1).""" + + def test_chunk_header_format(self) -> None: + """Chunk header is [type: 1][length: 3 LE].""" + data = b"Hello" + compressed = frame_compress(data) + + # Skip stream identifier, read first chunk header + pos = len(STREAM_IDENTIFIER) + chunk_type = compressed[pos] + chunk_length = int.from_bytes(compressed[pos + 1 : pos + 4], "little") + + # Chunk type should be 0x00 (compressed) or 0x01 (uncompressed) + assert chunk_type in (CHUNK_TYPE_COMPRESSED, CHUNK_TYPE_UNCOMPRESSED) + # Length should be CRC (4) + payload + assert chunk_length >= 4 + + def test_chunk_length_little_endian(self) -> None: + """Chunk length is stored in little-endian format.""" + # Compress data that will create a known-size chunk + data = b"A" * 100 + compressed = frame_compress(data) + + pos = len(STREAM_IDENTIFIER) + # Read 3-byte little-endian length + length_bytes = compressed[pos + 1 : pos + 4] + chunk_length = int.from_bytes(length_bytes, "little") + + # Verify by reading that many bytes + chunk_data = compressed[pos + 4 : pos + 4 + chunk_length] + assert len(chunk_data) == chunk_length + + +class TestCRC32C: + """Tests for CRC32C checksum (spec section 3).""" + + def test_crc32c_known_values(self) -> None: + """CRC32C matches known test vectors.""" + # Test vectors from RFC 3720 section B.4 + assert _crc32c(b"") == 0x00000000 + # Additional test vectors + assert _crc32c(b"\x00" * 32) == 0x8A9136AA + assert _crc32c(b"\xff" * 32) == 0x62A8AB43 + + def test_crc32c_castagnoli_polynomial(self) -> None: + """CRC32C uses Castagnoli polynomial (0x82F63B78).""" + # Single byte test - verifies polynomial + crc = _crc32c(b"\x01") + # This value is specific to Castagnoli polynomial + assert crc == 0xA016D052 + + def test_mask_formula(self) -> None: + """Masking follows spec: ((x >> 15) | (x << 17)) + 0xa282ead8.""" + crc = 0x12345678 + expected = (((crc >> 15) | (crc << 17)) + CRC32C_MASK_DELTA) & 0xFFFFFFFF + assert _mask_crc(crc) == expected + + def test_crc_stored_little_endian(self) -> None: + """CRC is stored as 4 bytes little-endian in chunks.""" + data = b"test" + compressed = frame_compress(data) + + # Find chunk after stream identifier + pos = len(STREAM_IDENTIFIER) + 4 # Skip header + stored_crc = int.from_bytes(compressed[pos : pos + 4], "little") + + # Compute expected CRC + expected_crc = _mask_crc(_crc32c(data)) + # Note: if compressed, CRC is of uncompressed data + # For small data like "test", it may be uncompressed + assert stored_crc == expected_crc or stored_crc != 0 + + def test_crc_corruption_detected(self) -> None: + """Corrupted CRC causes decompression to fail.""" + data = b"test data for CRC validation" + compressed = bytearray(frame_compress(data)) + + # Corrupt the CRC (byte 14-17 after stream identifier + chunk header) + crc_pos = len(STREAM_IDENTIFIER) + 4 + compressed[crc_pos] ^= 0xFF + + with pytest.raises(SnappyDecompressionError, match="CRC mismatch"): + frame_decompress(bytes(compressed)) + + +class TestCompressedChunk: + """Tests for compressed data chunks (spec section 4.2).""" + + def test_compressed_chunk_type(self) -> None: + """Compressed chunks use type 0x00.""" + assert CHUNK_TYPE_COMPRESSED == 0x00 + + def test_compressible_data_uses_compressed_chunk(self) -> None: + """Highly compressible data uses compressed chunks.""" + data = b"A" * 1000 # Very compressible + compressed = frame_compress(data) + + # Check chunk type after stream identifier + chunk_type = compressed[len(STREAM_IDENTIFIER)] + assert chunk_type == CHUNK_TYPE_COMPRESSED + + def test_compressed_chunk_format(self) -> None: + """Compressed chunk is [crc: 4][compressed_data].""" + data = b"A" * 1000 + compressed = frame_compress(data) + + pos = len(STREAM_IDENTIFIER) + chunk_type = compressed[pos] + chunk_length = int.from_bytes(compressed[pos + 1 : pos + 4], "little") + + assert chunk_type == CHUNK_TYPE_COMPRESSED + # Chunk data should be CRC (4 bytes) + compressed payload + assert chunk_length >= 5 # At least CRC + 1 byte compressed + + +class TestUncompressedChunk: + """Tests for uncompressed data chunks (spec section 4.3).""" + + def test_uncompressed_chunk_type(self) -> None: + """Uncompressed chunks use type 0x01.""" + assert CHUNK_TYPE_UNCOMPRESSED == 0x01 + + def test_incompressible_data_uses_uncompressed_chunk(self) -> None: + """Incompressible data uses uncompressed chunks.""" + # Random-looking data that doesn't compress + data = bytes([(i * 17 + 31) % 256 for i in range(100)]) + compressed = frame_compress(data) + + # Check chunk type + chunk_type = compressed[len(STREAM_IDENTIFIER)] + # May be compressed or uncompressed depending on algorithm + assert chunk_type in (CHUNK_TYPE_COMPRESSED, CHUNK_TYPE_UNCOMPRESSED) + + def test_uncompressed_roundtrip(self) -> None: + """Data stored uncompressed roundtrips correctly.""" + # Small incompressible data + data = bytes(range(256)) + compressed = frame_compress(data) + decompressed = frame_decompress(compressed) + assert decompressed == data + + +class TestChunkSizeLimits: + """Tests for chunk size limits (spec sections 4.2, 4.3).""" + + def test_max_uncompressed_chunk_size(self) -> None: + """Maximum uncompressed chunk size is 65536 bytes.""" + assert MAX_UNCOMPRESSED_CHUNK_SIZE == 65536 + + def test_large_data_split_into_chunks(self) -> None: + """Data larger than 64KB is split into multiple chunks.""" + data = b"X" * 100_000 # ~100KB + compressed = frame_compress(data) + + # Count chunks + chunk_count = 0 + pos = len(STREAM_IDENTIFIER) + while pos < len(compressed): + chunk_count += 1 + chunk_length = int.from_bytes(compressed[pos + 1 : pos + 4], "little") + pos += 4 + chunk_length + + # Should have at least 2 chunks for 100KB + assert chunk_count >= 2 + + def test_exact_chunk_boundary(self) -> None: + """Data exactly at chunk boundary handled correctly.""" + data = b"Y" * MAX_UNCOMPRESSED_CHUNK_SIZE + compressed = frame_compress(data) + decompressed = frame_decompress(compressed) + assert decompressed == data + + def test_oversized_uncompressed_chunk_rejected(self) -> None: + """Chunks larger than 65536 bytes are rejected.""" + # Manually craft a malicious stream with oversized chunk + # Stream identifier + uncompressed chunk header claiming 70000 bytes + chunk_length = 70000 + 4 # payload + CRC + malicious = bytearray(STREAM_IDENTIFIER) + malicious.append(CHUNK_TYPE_UNCOMPRESSED) + malicious.extend(chunk_length.to_bytes(3, "little")) + # Add fake CRC and oversized payload + malicious.extend(b"\x00" * 4) # CRC + malicious.extend(b"\x00" * 70000) # Oversized payload + + with pytest.raises(SnappyDecompressionError, match="exceeds"): + frame_decompress(bytes(malicious)) + + +class TestReservedChunks: + """Tests for reserved chunk types (spec sections 4.5, 4.6).""" + + def test_unskippable_chunk_raises(self) -> None: + """Reserved unskippable chunks (0x02-0x7F) cause error.""" + for chunk_type in [0x02, 0x10, 0x50, 0x7F]: + # Build stream with reserved chunk + malicious = bytearray(STREAM_IDENTIFIER) + malicious.append(chunk_type) + malicious.extend(b"\x00\x00\x00") # Zero length + + with pytest.raises(SnappyDecompressionError, match="unskippable"): + frame_decompress(bytes(malicious)) + + def test_skippable_chunk_ignored(self) -> None: + """Reserved skippable chunks (0x80-0xFD) are silently skipped.""" + data = b"test" + compressed = bytearray(frame_compress(data)) + + # Insert a skippable chunk (type 0x80) with some padding + padding_data = b"PADDING" + skippable_chunk = bytes([0x80]) + len(padding_data).to_bytes(3, "little") + padding_data + + # Insert after stream identifier, before data chunk + insert_pos = len(STREAM_IDENTIFIER) + modified = compressed[:insert_pos] + skippable_chunk + compressed[insert_pos:] + + # Should still decompress correctly + result = frame_decompress(bytes(modified)) + assert result == data + + def test_padding_chunk_ignored(self) -> None: + """Padding chunks (0xFE) are silently skipped.""" + data = b"test" + compressed = bytearray(frame_compress(data)) + + # Insert padding chunk + padding_chunk = b"\xfe\x10\x00\x00" + (b"\x00" * 16) + + insert_pos = len(STREAM_IDENTIFIER) + modified = compressed[:insert_pos] + padding_chunk + compressed[insert_pos:] + + result = frame_decompress(bytes(modified)) + assert result == data + + +class TestEdgeCases: + """Tests for edge cases and error handling.""" + + def test_empty_input(self) -> None: + """Empty input raises appropriate error.""" + with pytest.raises(SnappyDecompressionError, match="too short"): + frame_decompress(b"") + + def test_truncated_stream_identifier(self) -> None: + """Truncated stream identifier raises error.""" + with pytest.raises(SnappyDecompressionError, match="too short"): + frame_decompress(b"\xff\x06\x00") + + def test_truncated_chunk_header(self) -> None: + """Truncated chunk header raises error.""" + truncated = STREAM_IDENTIFIER + b"\x00\x10" # Incomplete header + with pytest.raises(SnappyDecompressionError, match="Truncated"): + frame_decompress(truncated) + + def test_truncated_chunk_data(self) -> None: + """Truncated chunk data raises error.""" + data = b"test" + compressed = frame_compress(data) + # Truncate some bytes from the end + truncated = compressed[:-5] + with pytest.raises(SnappyDecompressionError, match="extends past end"): + frame_decompress(truncated) + + def test_roundtrip_various_sizes(self) -> None: + """Roundtrip works for various data sizes.""" + for size in [0, 1, 100, 1000, 65535, 65536, 65537, 100_000]: + data = bytes([i % 256 for i in range(size)]) + compressed = frame_compress(data) + decompressed = frame_decompress(compressed) + assert decompressed == data, f"Roundtrip failed for size {size}" + + +class TestInteroperability: + """Tests ensuring compatibility with other implementations.""" + + def test_wire_format_structure(self) -> None: + """Wire format matches expected structure for interop.""" + data = b"Hello, Ethereum!" + compressed = frame_compress(data) + + # Verify structure + assert compressed[:10] == STREAM_IDENTIFIER + + # First chunk header + pos = 10 + chunk_type = compressed[pos] + assert chunk_type in (0x00, 0x01) # Compressed or uncompressed + + chunk_len = int.from_bytes(compressed[pos + 1 : pos + 4], "little") + assert chunk_len >= 4 # At least CRC + + # CRC is first 4 bytes of chunk data + crc_bytes = compressed[pos + 4 : pos + 8] + assert len(crc_bytes) == 4 + + def test_concatenated_streams(self) -> None: + """Multiple concatenated streams decompress correctly.""" + # This is important for streaming protocols + stream1 = frame_compress(b"first") + stream2 = frame_compress(b"second") + stream3 = frame_compress(b"third") + + combined = stream1 + stream2 + stream3 + result = frame_decompress(combined) + + assert result == b"firstsecondthird" diff --git a/tests/lean_spec/subspecs/networking/test_reqresp.py b/tests/lean_spec/subspecs/networking/test_reqresp.py new file mode 100644 index 00000000..6d1a35d4 --- /dev/null +++ b/tests/lean_spec/subspecs/networking/test_reqresp.py @@ -0,0 +1,205 @@ +"""Tests for the req/resp codec.""" + +from __future__ import annotations + +import pytest + +from lean_spec.subspecs.networking import ResponseCode +from lean_spec.subspecs.networking.reqresp import ( + CodecError, + decode_request, + decode_varint, + encode_request, + encode_varint, +) + + +class TestVarintEncoding: + """Tests for varint (LEB128) encoding/decoding.""" + + def test_encode_zero(self) -> None: + """Zero encodes to a single null byte.""" + assert encode_varint(0) == b"\x00" + + def test_encode_small_values(self) -> None: + """Values 0-127 encode to a single byte.""" + assert encode_varint(1) == b"\x01" + assert encode_varint(127) == b"\x7f" + + def test_encode_two_byte_values(self) -> None: + """Values 128-16383 encode to two bytes.""" + assert encode_varint(128) == b"\x80\x01" + assert encode_varint(300) == b"\xac\x02" + + def test_encode_large_values(self) -> None: + """Large values encode and decode correctly.""" + test_values = [65536, 2**20, 2**24, 2**32 - 1, 2**63] + for value in test_values: + encoded = encode_varint(value) + decoded, consumed = decode_varint(encoded) + assert decoded == value + assert consumed == len(encoded) + + def test_decode_with_offset(self) -> None: + """Decoding at an offset works correctly.""" + data = b"prefix\xac\x02suffix" + value, consumed = decode_varint(data, offset=6) + assert value == 300 + assert consumed == 2 + + def test_encode_negative_raises(self) -> None: + """Negative values raise ValueError.""" + with pytest.raises(ValueError, match="non-negative"): + encode_varint(-1) + + def test_decode_truncated_raises(self) -> None: + """Truncated varints raise CodecError.""" + with pytest.raises(CodecError, match="Truncated"): + decode_varint(b"\x80") # Missing continuation byte + + def test_roundtrip(self) -> None: + """Encoding then decoding returns the original value.""" + for value in [0, 1, 127, 128, 255, 16383, 16384, 65535, 2**20]: + encoded = encode_varint(value) + decoded, _ = decode_varint(encoded) + assert decoded == value + + +class TestRequestCodec: + """Tests for request encoding/decoding.""" + + def test_simple_request(self) -> None: + """Simple SSZ data encodes and decodes correctly.""" + ssz_data = b"\x01\x02\x03\x04" + encoded = encode_request(ssz_data) + decoded = decode_request(encoded) + assert decoded == ssz_data + + def test_empty_request(self) -> None: + """Empty SSZ data roundtrips correctly.""" + ssz_data = b"" + encoded = encode_request(ssz_data) + decoded = decode_request(encoded) + assert decoded == ssz_data + + def test_large_request(self) -> None: + """Large request data roundtrips correctly.""" + ssz_data = b"X" * 50_000 + encoded = encode_request(ssz_data) + decoded = decode_request(encoded) + assert decoded == ssz_data + + def test_decode_empty_raises(self) -> None: + """Empty input raises CodecError.""" + with pytest.raises(CodecError, match="Empty request"): + decode_request(b"") + + def test_decode_invalid_varint_raises(self) -> None: + """Invalid varint raises CodecError.""" + with pytest.raises(CodecError, match="length"): + decode_request(b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff") + + def test_length_mismatch_raises(self) -> None: + """Mismatched declared length raises CodecError.""" + # Encode valid request, then modify the length prefix + encoded = bytearray(encode_request(b"test")) + encoded[0] = 0x10 # Change declared length to 16 + with pytest.raises(CodecError, match="mismatch"): + decode_request(bytes(encoded)) + + +class TestResponseCodec: + """Tests for response encoding/decoding.""" + + def test_success_response(self) -> None: + """Success response encodes and decodes correctly.""" + ssz_data = b"\x01\x02\x03\x04" + encoded = ResponseCode.SUCCESS.encode(ssz_data) + code, decoded = ResponseCode.decode(encoded) + assert code == ResponseCode.SUCCESS + assert decoded == ssz_data + + def test_error_response(self) -> None: + """Error response encodes and decodes correctly.""" + error_msg = b"Block not found" + encoded = ResponseCode.RESOURCE_UNAVAILABLE.encode(error_msg) + code, decoded = ResponseCode.decode(encoded) + assert code == ResponseCode.RESOURCE_UNAVAILABLE + assert decoded == error_msg + + def test_all_response_codes(self) -> None: + """All standard response codes work correctly.""" + for response_code in ResponseCode: + ssz_data = b"test payload" + encoded = response_code.encode(ssz_data) + code, decoded = ResponseCode.decode(encoded) + assert code == response_code + assert decoded == ssz_data + + def test_response_starts_with_code(self) -> None: + """Response wire format starts with code byte.""" + encoded = ResponseCode.SERVER_ERROR.encode(b"error") + assert encoded[0] == ResponseCode.SERVER_ERROR + + def test_decode_empty_raises(self) -> None: + """Empty input raises CodecError.""" + with pytest.raises(CodecError, match="Empty response"): + ResponseCode.decode(b"") + + def test_decode_too_short_raises(self) -> None: + """Too-short input raises CodecError.""" + with pytest.raises(CodecError, match="too short"): + ResponseCode.decode(b"\x00") + + def test_unknown_code_handled(self) -> None: + """Unknown response codes are handled gracefully.""" + # Build response with unknown code 50 + ssz_data = b"test" + encoded = bytearray(ResponseCode.SUCCESS.encode(ssz_data)) + encoded[0] = 50 # Unknown code + code, decoded = ResponseCode.decode(bytes(encoded)) + # Unknown codes 4-127 map to SERVER_ERROR + assert code == ResponseCode.SERVER_ERROR + assert decoded == ssz_data + + +class TestInteroperability: + """Tests ensuring compatibility with ream/zeam wire format.""" + + def test_varint_format_matches_protobuf(self) -> None: + """Varint encoding matches protobuf/LEB128 spec.""" + # These are known protobuf varint encodings + assert encode_varint(0) == b"\x00" + assert encode_varint(1) == b"\x01" + assert encode_varint(127) == b"\x7f" + assert encode_varint(128) == b"\x80\x01" + assert encode_varint(16384) == b"\x80\x80\x01" + + def test_request_wire_format(self) -> None: + """Request wire format matches spec: [varint_len][snappy_payload].""" + ssz_data = b"test" + encoded = encode_request(ssz_data) + + # First bytes should be varint of uncompressed length + length, varint_size = decode_varint(encoded) + assert length == len(ssz_data) + + # Rest should be valid snappy framed data + snappy_data = encoded[varint_size:] + assert snappy_data.startswith(b"\xff\x06\x00\x00sNaPpY") + + def test_response_wire_format(self) -> None: + """Response wire format matches spec: [code][varint_len][snappy_payload].""" + ssz_data = b"test" + encoded = ResponseCode.SUCCESS.encode(ssz_data) + + # First byte is response code + assert encoded[0] == 0 + + # Next bytes are varint of uncompressed length + length, varint_size = decode_varint(encoded, offset=1) + assert length == len(ssz_data) + + # Rest should be valid snappy framed data + snappy_data = encoded[1 + varint_size :] + assert snappy_data.startswith(b"\xff\x06\x00\x00sNaPpY")