From 8ddb4bb1b70a17ec71c98176e0a71dab9a641744 Mon Sep 17 00:00:00 2001 From: Tudor Plugaru Date: Fri, 12 Dec 2025 22:15:50 +0200 Subject: [PATCH 1/3] feat(v2): full AMQP 1.0 implementation Signed-off-by: Tudor Plugaru --- src/cloudevents/core/bindings/amqp.py | 318 ++++++++++ tests/test_core/test_bindings/test_amqp.py | 666 +++++++++++++++++++++ 2 files changed, 984 insertions(+) create mode 100644 src/cloudevents/core/bindings/amqp.py create mode 100644 tests/test_core/test_bindings/test_amqp.py diff --git a/src/cloudevents/core/bindings/amqp.py b/src/cloudevents/core/bindings/amqp.py new file mode 100644 index 00000000..5e13d615 --- /dev/null +++ b/src/cloudevents/core/bindings/amqp.py @@ -0,0 +1,318 @@ +# Copyright 2018-Present The CloudEvents Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Callable, Final + +from dateutil.parser import isoparse + +from cloudevents.core.base import BaseCloudEvent +from cloudevents.core.formats.base import Format + +CE_PREFIX: Final[str] = "cloudEvents_" +CONTENT_TYPE_PROPERTY: Final[str] = "content-type" + + +@dataclass(frozen=True) +class AMQPMessage: + """ + Represents an AMQP 1.0 message containing CloudEvent data. + + This dataclass encapsulates AMQP message properties, application properties, + and application data for transmitting CloudEvents over AMQP. It is immutable + to prevent accidental modifications and works with any AMQP 1.0 library + (e.g., Pika, aio-pika, qpid-proton, azure-servicebus). + + Attributes: + properties: AMQP message properties as a dictionary + application_properties: AMQP application properties as a dictionary + application_data: AMQP application data section as bytes + """ + + properties: dict[str, Any] + application_properties: dict[str, Any] + application_data: bytes + + +def _encode_amqp_value(value: Any) -> Any: + """ + Encode a CloudEvent attribute value for AMQP application properties. + + Handles special encoding for datetime objects to AMQP timestamp type + (milliseconds since Unix epoch as int). Per AMQP 1.0 CloudEvents spec, + senders SHOULD use native AMQP types when efficient. + + :param value: The attribute value to encode + :return: Encoded value (int for datetime timestamp, original type otherwise) + """ + if isinstance(value, datetime): + # AMQP 1.0 timestamp: milliseconds since Unix epoch (UTC) + timestamp_ms = int(value.timestamp() * 1000) + return timestamp_ms + + return value + + +def _decode_amqp_value(attr_name: str, value: Any) -> Any: + """ + Decode a CloudEvent attribute value from AMQP application properties. + + Handles special parsing for the 'time' attribute. Per AMQP 1.0 CloudEvents spec, + receivers MUST accept both native AMQP timestamp (int milliseconds since epoch) + and canonical string form (ISO 8601). + + :param attr_name: The name of the CloudEvent attribute + :param value: The AMQP property value + :return: Decoded value (datetime for 'time' attribute, original type otherwise) + """ + if attr_name == "time": + if isinstance(value, int): + # AMQP timestamp: milliseconds since Unix epoch + return datetime.fromtimestamp(value / 1000.0, tz=timezone.utc) + if isinstance(value, str): + # ISO 8601 string (canonical form, also accepted per spec) + return isoparse(value) + + return value + + +def to_binary(event: BaseCloudEvent, event_format: Format) -> AMQPMessage: + """ + Convert a CloudEvent to AMQP binary content mode. + + In binary mode, CloudEvent attributes are mapped to AMQP application properties + with the 'cloudEvents_' prefix, except for 'datacontenttype' which maps to the + AMQP 'content-type' property. The event data is placed directly in the AMQP + application-data section. Datetime values are encoded as AMQP timestamp type + (milliseconds since Unix epoch), while boolean and integer values are preserved + as native types. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.formats.json import JSONFormat + >>> + >>> event = CloudEvent( + ... attributes={"type": "com.example.test", "source": "/test"}, + ... data={"message": "Hello"} + ... ) + >>> message = to_binary(event, JSONFormat()) + >>> # message.application_properties = {"cloudEvents_type": "com.example.test", ...} + >>> # message.properties = {"content-type": "application/json"} + >>> # message.application_data = b'{"message": "Hello"}' + + :param event: The CloudEvent to convert + :param event_format: Format implementation for data serialization + :return: AMQPMessage with CloudEvent attributes as application properties + """ + properties: dict[str, Any] = {} + application_properties: dict[str, Any] = {} + attributes = event.get_attributes() + + for attr_name, attr_value in attributes.items(): + if attr_name == "datacontenttype": + properties[CONTENT_TYPE_PROPERTY] = str(attr_value) + else: + property_name = f"{CE_PREFIX}{attr_name}" + # Encode datetime to AMQP timestamp (milliseconds since epoch) + # Other types (bool, int, str, bytes) use native AMQP types + application_properties[property_name] = _encode_amqp_value(attr_value) + + data = event.get_data() + datacontenttype = attributes.get("datacontenttype") + application_data = event_format.write_data(data, datacontenttype) + + return AMQPMessage( + properties=properties, + application_properties=application_properties, + application_data=application_data, + ) + + +def from_binary( + message: AMQPMessage, + event_format: Format, + event_factory: Callable[ + [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent + ], +) -> BaseCloudEvent: + """ + Parse an AMQP binary content mode message to a CloudEvent. + + Extracts CloudEvent attributes from cloudEvents_-prefixed AMQP application + properties and treats the AMQP 'content-type' property as the 'datacontenttype' + attribute. The application-data section is parsed as event data according to + the content type. The 'time' attribute accepts both AMQP timestamp (int milliseconds) + and ISO 8601 string, while other native AMQP types (boolean, integer) are preserved. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.formats.json import JSONFormat + >>> + >>> message = AMQPMessage( + ... properties={"content-type": "application/json"}, + ... application_properties={ + ... "cloudEvents_type": "com.example.test", + ... "cloudEvents_source": "/test", + ... "cloudEvents_id": "123", + ... "cloudEvents_specversion": "1.0" + ... }, + ... application_data=b'{"message": "Hello"}' + ... ) + >>> event = from_binary(message, JSONFormat(), CloudEvent) + + :param message: AMQPMessage to parse + :param event_format: Format implementation for data deserialization + :param event_factory: Factory function to create CloudEvent instances + :return: CloudEvent instance + """ + attributes: dict[str, Any] = {} + + for prop_name, prop_value in message.application_properties.items(): + if prop_name.startswith(CE_PREFIX): + attr_name = prop_name[len(CE_PREFIX) :] + # Decode timestamp (int or ISO 8601 string) to datetime, preserve other native types + attributes[attr_name] = _decode_amqp_value(attr_name, prop_value) + + if CONTENT_TYPE_PROPERTY in message.properties: + attributes["datacontenttype"] = message.properties[CONTENT_TYPE_PROPERTY] + + datacontenttype = attributes.get("datacontenttype") + data = event_format.read_data(message.application_data, datacontenttype) + + return event_factory(attributes, data) + + +def to_structured(event: BaseCloudEvent, event_format: Format) -> AMQPMessage: + """ + Convert a CloudEvent to AMQP structured content mode. + + In structured mode, the entire CloudEvent (attributes and data) is serialized + into the AMQP application-data section using the specified format. The + content-type property is set to the format's media type (e.g., + "application/cloudevents+json"). + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.formats.json import JSONFormat + >>> + >>> event = CloudEvent( + ... attributes={"type": "com.example.test", "source": "/test"}, + ... data={"message": "Hello"} + ... ) + >>> message = to_structured(event, JSONFormat()) + >>> # message.properties = {"content-type": "application/cloudevents+json"} + >>> # message.application_data = b'{"type": "com.example.test", ...}' + + :param event: The CloudEvent to convert + :param event_format: Format implementation for serialization + :return: AMQPMessage with structured content in application-data + """ + content_type = event_format.get_content_type() + + properties = {CONTENT_TYPE_PROPERTY: content_type} + application_properties: dict[str, Any] = {} + + application_data = event_format.write(event) + + return AMQPMessage( + properties=properties, + application_properties=application_properties, + application_data=application_data, + ) + + +def from_structured( + message: AMQPMessage, + event_format: Format, + event_factory: Callable[ + [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent + ], +) -> BaseCloudEvent: + """ + Parse an AMQP structured content mode message to a CloudEvent. + + Deserializes the CloudEvent from the AMQP application-data section using the + specified format. Any cloudEvents_-prefixed application properties are ignored + as the application-data contains all event metadata. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.formats.json import JSONFormat + >>> + >>> message = AMQPMessage( + ... properties={"content-type": "application/cloudevents+json"}, + ... application_properties={}, + ... application_data=b'{"type": "com.example.test", "source": "/test", ...}' + ... ) + >>> event = from_structured(message, JSONFormat(), CloudEvent) + + :param message: AMQPMessage to parse + :param event_format: Format implementation for deserialization + :param event_factory: Factory function to create CloudEvent instances + :return: CloudEvent instance + """ + return event_format.read(event_factory, message.application_data) + + +def from_amqp( + message: AMQPMessage, + event_format: Format, + event_factory: Callable[ + [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent + ], +) -> BaseCloudEvent: + """ + Parse an AMQP message to a CloudEvent with automatic mode detection. + + Automatically detects whether the message uses binary or structured content mode: + - If content-type starts with "application/cloudevents" β†’ structured mode + - Otherwise β†’ binary mode + + This function provides a convenient way to handle both content modes without + requiring the caller to determine the mode beforehand. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.formats.json import JSONFormat + >>> + >>> # Works with binary mode + >>> binary_msg = AMQPMessage( + ... properties={"content-type": "application/json"}, + ... application_properties={"cloudEvents_type": "com.example.test", ...}, + ... application_data=b'...' + ... ) + >>> event1 = from_amqp(binary_msg, JSONFormat(), CloudEvent) + >>> + >>> # Also works with structured mode + >>> structured_msg = AMQPMessage( + ... properties={"content-type": "application/cloudevents+json"}, + ... application_properties={}, + ... application_data=b'{"type": "com.example.test", ...}' + ... ) + >>> event2 = from_amqp(structured_msg, JSONFormat(), CloudEvent) + + :param message: AMQPMessage to parse + :param event_format: Format implementation for deserialization + :param event_factory: Factory function to create CloudEvent instances + :return: CloudEvent instance + """ + content_type = message.properties.get(CONTENT_TYPE_PROPERTY, "") + + if isinstance(content_type, str) and content_type.lower().startswith( + "application/cloudevents" + ): + return from_structured(message, event_format, event_factory) + + return from_binary(message, event_format, event_factory) diff --git a/tests/test_core/test_bindings/test_amqp.py b/tests/test_core/test_bindings/test_amqp.py new file mode 100644 index 00000000..76d702cf --- /dev/null +++ b/tests/test_core/test_bindings/test_amqp.py @@ -0,0 +1,666 @@ +# Copyright 2018-Present The CloudEvents Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from datetime import datetime, timezone +from typing import Any + +import pytest + +from cloudevents.core.bindings.amqp import ( + AMQPMessage, + from_amqp, + from_binary, + from_structured, + to_binary, + to_structured, +) +from cloudevents.core.formats.json import JSONFormat +from cloudevents.core.v1.event import CloudEvent + + +@pytest.fixture +def minimal_attributes() -> dict[str, str]: + """Minimal valid CloudEvent attributes""" + return { + "type": "com.example.test", + "source": "/test", + "id": "test-id-123", + "specversion": "1.0", + } + + +def create_event( + extra_attrs: dict[str, Any] | None = None, + data: dict[str, Any] | str | bytes | None = None, +) -> CloudEvent: + """Helper to create CloudEvent with valid required attributes""" + attrs: dict[str, Any] = { + "type": "com.example.test", + "source": "/test", + "id": "test-id-123", + "specversion": "1.0", + } + if extra_attrs: + attrs.update(extra_attrs) + return CloudEvent(attributes=attrs, data=data) + + +def test_amqp_message_creation() -> None: + """Test basic AMQPMessage creation""" + message = AMQPMessage( + properties={"content-type": "application/json"}, + application_properties={"cloudEvents_type": "test"}, + application_data=b"test", + ) + assert message.properties == {"content-type": "application/json"} + assert message.application_properties == {"cloudEvents_type": "test"} + assert message.application_data == b"test" + + +def test_amqp_message_immutable() -> None: + """Test that AMQPMessage is immutable (frozen dataclass)""" + message = AMQPMessage( + properties={"test": "value"}, + application_properties={}, + application_data=b"data", + ) + + with pytest.raises(Exception): # FrozenInstanceError + message.properties = {"new": "dict"} + + with pytest.raises(Exception): # FrozenInstanceError + message.application_properties = {"new": "dict"} + + with pytest.raises(Exception): # FrozenInstanceError + message.application_data = b"new data" + + +def test_amqp_message_with_empty_properties() -> None: + """Test AMQPMessage with empty properties""" + message = AMQPMessage( + properties={}, application_properties={}, application_data=b"test" + ) + assert message.properties == {} + assert message.application_properties == {} + assert message.application_data == b"test" + + +def test_amqp_message_with_empty_application_data() -> None: + """Test AMQPMessage with empty application data""" + message = AMQPMessage( + properties={"test": "value"}, application_properties={}, application_data=b"" + ) + assert message.properties == {"test": "value"} + assert message.application_data == b"" + + +def test_to_binary_required_attributes() -> None: + """Test to_binary with only required attributes""" + event = create_event() + message = to_binary(event, JSONFormat()) + + assert "cloudEvents_type" in message.application_properties + assert message.application_properties["cloudEvents_type"] == "com.example.test" + assert message.application_properties["cloudEvents_source"] == "/test" + assert message.application_properties["cloudEvents_id"] == "test-id-123" + assert message.application_properties["cloudEvents_specversion"] == "1.0" + + +def test_to_binary_with_optional_attributes() -> None: + """Test to_binary with optional attributes""" + event = create_event( + extra_attrs={ + "subject": "test-subject", + "dataschema": "https://example.com/schema", + } + ) + message = to_binary(event, JSONFormat()) + + assert message.application_properties["cloudEvents_subject"] == "test-subject" + assert ( + message.application_properties["cloudEvents_dataschema"] + == "https://example.com/schema" + ) + + +def test_to_binary_with_extensions() -> None: + """Test to_binary with custom extension attributes""" + event = create_event(extra_attrs={"customext": "custom-value"}) + message = to_binary(event, JSONFormat()) + + assert message.application_properties["cloudEvents_customext"] == "custom-value" + + +def test_to_binary_datetime_as_timestamp() -> None: + """Test to_binary converts datetime to AMQP timestamp (milliseconds since epoch)""" + dt = datetime(2023, 1, 15, 10, 30, 45, tzinfo=timezone.utc) + event = create_event(extra_attrs={"time": dt}) + message = to_binary(event, JSONFormat()) + + # Should be serialized as AMQP timestamp (milliseconds since epoch) + expected_timestamp = int(dt.timestamp() * 1000) # 1673781045000 + assert message.application_properties["cloudEvents_time"] == expected_timestamp + assert isinstance(message.application_properties["cloudEvents_time"], int) + + +def test_to_binary_boolean_as_boolean() -> None: + """Test to_binary preserves boolean type (not converted to string)""" + event = create_event(extra_attrs={"boolext": True}) + message = to_binary(event, JSONFormat()) + + # Should be native boolean, not string "true" or "True" + assert message.application_properties["cloudEvents_boolext"] is True + assert isinstance(message.application_properties["cloudEvents_boolext"], bool) + + +def test_to_binary_integer_as_long() -> None: + """Test to_binary preserves integer type (not converted to string)""" + event = create_event(extra_attrs={"intext": 42}) + message = to_binary(event, JSONFormat()) + + # Should be native int/long, not string "42" + assert message.application_properties["cloudEvents_intext"] == 42 + assert isinstance(message.application_properties["cloudEvents_intext"], int) + + +def test_to_binary_datacontenttype_mapping() -> None: + """Test datacontenttype maps to AMQP content-type property""" + event = create_event( + extra_attrs={"datacontenttype": "application/json"}, data={"key": "value"} + ) + message = to_binary(event, JSONFormat()) + + # datacontenttype should go to properties, not application_properties + assert message.properties["content-type"] == "application/json" + assert "cloudEvents_datacontenttype" not in message.application_properties + + +def test_to_binary_with_json_data() -> None: + """Test to_binary with JSON dict data""" + event = create_event( + extra_attrs={"datacontenttype": "application/json"}, + data={"message": "Hello", "count": 42}, + ) + message = to_binary(event, JSONFormat()) + + # JSON serialization may vary in formatting, so check it can be parsed back + import json + + parsed = json.loads(message.application_data) + assert parsed == {"message": "Hello", "count": 42} + + +def test_to_binary_with_string_data() -> None: + """Test to_binary with string data""" + event = create_event(data="Hello World") + message = to_binary(event, JSONFormat()) + + # String data should be serialized + assert b"Hello World" in message.application_data + + +def test_to_binary_with_bytes_data() -> None: + """Test to_binary with bytes data""" + binary_data = b"\x00\x01\x02\x03" + event = create_event(data=binary_data) + message = to_binary(event, JSONFormat()) + + # Bytes should be preserved in application_data + assert len(message.application_data) > 0 + + +def test_to_binary_with_none_data() -> None: + """Test to_binary with None data""" + event = create_event(data=None) + message = to_binary(event, JSONFormat()) + + # None data should result in empty or null serialization + assert message.application_data is not None # Should be bytes + + +def test_from_binary_required_attributes() -> None: + """Test from_binary extracts required attributes""" + message = AMQPMessage( + properties={}, + application_properties={ + "cloudEvents_type": "com.example.test", + "cloudEvents_source": "/test", + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + }, + application_data=b"{}", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + assert event.get_id() == "123" + assert event.get_specversion() == "1.0" + + +def test_from_binary_with_timestamp_property() -> None: + """Test from_binary parses AMQP timestamp (int milliseconds) to datetime""" + dt = datetime(2023, 1, 15, 10, 30, 45, tzinfo=timezone.utc) + timestamp_ms = int(dt.timestamp() * 1000) # 1673781045000 + + message = AMQPMessage( + properties={}, + application_properties={ + "cloudEvents_type": "test", + "cloudEvents_source": "/test", + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + "cloudEvents_time": timestamp_ms, # AMQP timestamp as int + }, + application_data=b"{}", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_time() == dt + assert isinstance(event.get_time(), datetime) + + +def test_from_binary_with_timestamp_string() -> None: + """Test from_binary also accepts ISO 8601 string (canonical form per spec)""" + dt = datetime(2023, 1, 15, 10, 30, 45, tzinfo=timezone.utc) + + message = AMQPMessage( + properties={}, + application_properties={ + "cloudEvents_type": "test", + "cloudEvents_source": "/test", + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + "cloudEvents_time": "2023-01-15T10:30:45Z", # ISO 8601 string (also valid) + }, + application_data=b"{}", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_time() == dt + assert isinstance(event.get_time(), datetime) + + +def test_from_binary_with_boolean_property() -> None: + """Test from_binary preserves boolean type""" + message = AMQPMessage( + properties={}, + application_properties={ + "cloudEvents_type": "test", + "cloudEvents_source": "/test", + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + "cloudEvents_boolext": True, + }, + application_data=b"{}", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_extension("boolext") is True + assert isinstance(event.get_extension("boolext"), bool) + + +def test_from_binary_with_long_property() -> None: + """Test from_binary preserves integer/long type""" + message = AMQPMessage( + properties={}, + application_properties={ + "cloudEvents_type": "test", + "cloudEvents_source": "/test", + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + "cloudEvents_intext": 42, + }, + application_data=b"{}", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_extension("intext") == 42 + assert isinstance(event.get_extension("intext"), int) + + +def test_from_binary_with_json_data() -> None: + """Test from_binary with JSON data""" + message = AMQPMessage( + properties={"content-type": "application/json"}, + application_properties={ + "cloudEvents_type": "test", + "cloudEvents_source": "/test", + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + }, + application_data=b'{"message": "Hello"}', + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_data() == {"message": "Hello"} + assert event.get_datacontenttype() == "application/json" + + +def test_from_binary_with_text_data() -> None: + """Test from_binary with text data""" + message = AMQPMessage( + properties={"content-type": "text/plain"}, + application_properties={ + "cloudEvents_type": "test", + "cloudEvents_source": "/test", + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + }, + application_data=b"Hello World", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + # JSONFormat will decode as UTF-8 string for non-JSON content types + assert event.get_data() == "Hello World" + + +def test_from_binary_with_bytes_data() -> None: + """Test from_binary with binary data""" + binary_data = b"\x00\x01\x02\x03" + message = AMQPMessage( + properties={"content-type": "application/octet-stream"}, + application_properties={ + "cloudEvents_type": "test", + "cloudEvents_source": "/test", + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + }, + application_data=binary_data, + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + # Binary data should be preserved + assert isinstance(event.get_data(), (bytes, str)) + + +def test_binary_round_trip() -> None: + """Test binary mode round-trip preserves event data""" + original = create_event( + extra_attrs={"subject": "test-subject", "datacontenttype": "application/json"}, + data={"message": "Hello", "count": 42}, + ) + + message = to_binary(original, JSONFormat()) + recovered = from_binary(message, JSONFormat(), CloudEvent) + + assert recovered.get_type() == original.get_type() + assert recovered.get_source() == original.get_source() + assert recovered.get_id() == original.get_id() + assert recovered.get_specversion() == original.get_specversion() + assert recovered.get_subject() == original.get_subject() + assert recovered.get_data() == original.get_data() + + +def test_binary_preserves_types() -> None: + """Test binary mode preserves native types (bool, int, datetime)""" + dt = datetime(2023, 1, 15, 10, 30, 45, tzinfo=timezone.utc) + original = create_event( + extra_attrs={"time": dt, "boolext": True, "intext": 42, "strext": "value"} + ) + + message = to_binary(original, JSONFormat()) + recovered = from_binary(message, JSONFormat(), CloudEvent) + + # Types should be preserved + assert recovered.get_time() == dt + assert isinstance(recovered.get_time(), datetime) + assert recovered.get_extension("boolext") is True + assert isinstance(recovered.get_extension("boolext"), bool) + assert recovered.get_extension("intext") == 42 + assert isinstance(recovered.get_extension("intext"), int) + assert recovered.get_extension("strext") == "value" + + +def test_structured_round_trip() -> None: + """Test structured mode round-trip preserves event data""" + original = create_event( + extra_attrs={"subject": "test-subject", "datacontenttype": "application/json"}, + data={"message": "Hello", "count": 42}, + ) + + message = to_structured(original, JSONFormat()) + recovered = from_structured(message, JSONFormat(), CloudEvent) + + assert recovered.get_type() == original.get_type() + assert recovered.get_source() == original.get_source() + assert recovered.get_id() == original.get_id() + assert recovered.get_specversion() == original.get_specversion() + assert recovered.get_subject() == original.get_subject() + assert recovered.get_data() == original.get_data() + + +def test_to_structured_basic_event() -> None: + """Test to_structured with basic event""" + event = create_event(data={"message": "Hello"}) + message = to_structured(event, JSONFormat()) + + # Should have content-type in properties + assert message.properties["content-type"] == "application/cloudevents+json" + + # application_data should contain the complete event + assert b"com.example.test" in message.application_data + assert b"message" in message.application_data + + +def test_to_structured_content_type_header() -> None: + """Test to_structured sets correct content-type""" + event = create_event() + message = to_structured(event, JSONFormat()) + + assert "content-type" in message.properties + assert message.properties["content-type"] == "application/cloudevents+json" + + +def test_to_structured_with_all_attributes() -> None: + """Test to_structured includes all attributes in serialized form""" + dt = datetime(2023, 1, 15, 10, 30, 45, tzinfo=timezone.utc) + event = create_event( + extra_attrs={ + "time": dt, + "subject": "test-subject", + "dataschema": "https://example.com/schema", + "customext": "custom-value", + }, + data={"message": "Hello"}, + ) + message = to_structured(event, JSONFormat()) + + # All attributes should be in the serialized data + assert b"test-subject" in message.application_data + assert b"customext" in message.application_data + + +def test_from_structured_basic_event() -> None: + """Test from_structured parses complete event""" + message = AMQPMessage( + properties={"content-type": "application/cloudevents+json"}, + application_properties={}, + application_data=b'{"type": "com.example.test", "source": "/test", ' + b'"id": "123", "specversion": "1.0", "data": {"message": "Hello"}}', + ) + event = from_structured(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + assert event.get_id() == "123" + assert event.get_data() == {"message": "Hello"} + + +def test_from_amqp_detects_binary_mode() -> None: + """Test from_amqp detects binary mode""" + message = AMQPMessage( + properties={"content-type": "application/json"}, + application_properties={ + "cloudEvents_type": "test", + "cloudEvents_source": "/test", + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + }, + application_data=b'{"message": "Hello"}', + ) + event = from_amqp(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "test" + assert event.get_data() == {"message": "Hello"} + + +def test_from_amqp_detects_structured_mode() -> None: + """Test from_amqp detects structured mode""" + message = AMQPMessage( + properties={"content-type": "application/cloudevents+json"}, + application_properties={}, + application_data=b'{"type": "com.example.test", "source": "/test", ' + b'"id": "123", "specversion": "1.0"}', + ) + event = from_amqp(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + + +def test_from_amqp_case_insensitive_detection() -> None: + """Test from_amqp detection is case-insensitive""" + # Uppercase CLOUDEVENTS + message = AMQPMessage( + properties={"content-type": "application/CLOUDEVENTS+json"}, + application_properties={}, + application_data=b'{"type": "com.example.test", "source": "/test", ' + b'"id": "123", "specversion": "1.0"}', + ) + event = from_amqp(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "com.example.test" + + +def test_from_amqp_defaults_to_binary_when_no_content_type() -> None: + """Test from_amqp defaults to binary mode when content-type is missing""" + message = AMQPMessage( + properties={}, # No content-type + application_properties={ + "cloudEvents_type": "test", + "cloudEvents_source": "/test", + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + }, + application_data=b"{}", + ) + event = from_amqp(message, JSONFormat(), CloudEvent) + + # Should successfully parse as binary mode + assert event.get_type() == "test" + + +def test_unicode_in_attributes() -> None: + """Test handling of unicode characters in attributes""" + event = create_event(extra_attrs={"subject": "ζ΅‹θ―•-subject-🌍"}) + message = to_binary(event, JSONFormat()) + recovered = from_binary(message, JSONFormat(), CloudEvent) + + assert recovered.get_subject() == "ζ΅‹θ―•-subject-🌍" + + +def test_unicode_in_data() -> None: + """Test handling of unicode characters in data""" + event = create_event(data={"message": "Hello δΈ–η•Œ 🌍"}) + message = to_binary(event, JSONFormat()) + recovered = from_binary(message, JSONFormat(), CloudEvent) + + # Data should be preserved, whether as dict or string representation + data = recovered.get_data() + if isinstance(data, dict): + assert data == {"message": "Hello δΈ–η•Œ 🌍"} + else: + assert "Hello δΈ–η•Œ 🌍" in str(data) + + +def test_datetime_utc_handling() -> None: + """Test datetime with UTC timezone""" + dt_utc = datetime(2023, 1, 15, 10, 30, 45, tzinfo=timezone.utc) + event = create_event(extra_attrs={"time": dt_utc}) + message = to_binary(event, JSONFormat()) + recovered = from_binary(message, JSONFormat(), CloudEvent) + + assert recovered.get_time() == dt_utc + + +def test_datetime_non_utc_handling() -> None: + """Test datetime with non-UTC timezone""" + from datetime import timedelta + + # Create a custom timezone (UTC+5) + custom_tz = timezone(timedelta(hours=5)) + dt_custom = datetime(2023, 1, 15, 10, 30, 45, tzinfo=custom_tz) + + event = create_event(extra_attrs={"time": dt_custom}) + message = to_binary(event, JSONFormat()) + recovered = from_binary(message, JSONFormat(), CloudEvent) + + # Datetime should be preserved + assert recovered.get_time() == dt_custom + + +def test_empty_application_properties() -> None: + """Test message with no application properties (structured mode)""" + message = AMQPMessage( + properties={"content-type": "application/cloudevents+json"}, + application_properties={}, + application_data=b'{"type": "test", "source": "/test", "id": "123", ' + b'"specversion": "1.0"}', + ) + event = from_structured(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "test" + + +def test_to_binary_with_multiple_extensions() -> None: + """Test to_binary with multiple custom extensions""" + event = create_event( + extra_attrs={ + "ext1": "value1", + "ext2": "value2", + "ext3": 123, + "ext4": True, + } + ) + message = to_binary(event, JSONFormat()) + + assert message.application_properties["cloudEvents_ext1"] == "value1" + assert message.application_properties["cloudEvents_ext2"] == "value2" + assert message.application_properties["cloudEvents_ext3"] == 123 + assert message.application_properties["cloudEvents_ext4"] is True + + +def test_from_binary_ignores_non_cloudevents_properties() -> None: + """Test from_binary only extracts cloudEvents_ prefixed properties""" + message = AMQPMessage( + properties={}, + application_properties={ + "cloudEvents_type": "test", + "cloudEvents_source": "/test", + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + "custom_property": "should-be-ignored", # No cloudEvents_ prefix + "another_prop": "also-ignored", + }, + application_data=b"{}", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + # Only cloudEvents_ prefixed properties should be extracted + assert event.get_type() == "test" + # Non-prefixed properties should not become extensions + # get_extension returns None for missing extensions + assert event.get_extension("custom_property") is None + assert event.get_extension("another_prop") is None From a25029ee654ee1178cae33915529f18aa2e1f564 Mon Sep 17 00:00:00 2001 From: Tudor Plugaru Date: Tue, 16 Dec 2025 14:52:58 +0200 Subject: [PATCH 2/3] feat(v2): Support both `_` and `:` when reading AMQP messages Signed-off-by: Tudor Plugaru --- src/cloudevents/core/bindings/amqp.py | 32 +++- tests/test_core/test_bindings/test_amqp.py | 210 +++++++++++++++++++++ 2 files changed, 233 insertions(+), 9 deletions(-) diff --git a/src/cloudevents/core/bindings/amqp.py b/src/cloudevents/core/bindings/amqp.py index 5e13d615..1c0cc5c1 100644 --- a/src/cloudevents/core/bindings/amqp.py +++ b/src/cloudevents/core/bindings/amqp.py @@ -21,7 +21,10 @@ from cloudevents.core.base import BaseCloudEvent from cloudevents.core.formats.base import Format -CE_PREFIX: Final[str] = "cloudEvents_" +# AMQP CloudEvents spec allows both cloudEvents_ and cloudEvents: prefixes +# The underscore variant is preferred for JMS 2.0 compatibility +CE_PREFIX_UNDERSCORE: Final[str] = "cloudEvents_" +CE_PREFIX_COLON: Final[str] = "cloudEvents:" CONTENT_TYPE_PROPERTY: Final[str] = "content-type" @@ -99,6 +102,9 @@ def to_binary(event: BaseCloudEvent, event_format: Format) -> AMQPMessage: (milliseconds since Unix epoch), while boolean and integer values are preserved as native types. + Note: Per AMQP CloudEvents spec, attributes may use 'cloudEvents_' or 'cloudEvents:' + prefix. This implementation uses 'cloudEvents_' for JMS 2.0 compatibility. + Example: >>> from cloudevents.core.v1.event import CloudEvent >>> from cloudevents.core.formats.json import JSONFormat @@ -124,7 +130,7 @@ def to_binary(event: BaseCloudEvent, event_format: Format) -> AMQPMessage: if attr_name == "datacontenttype": properties[CONTENT_TYPE_PROPERTY] = str(attr_value) else: - property_name = f"{CE_PREFIX}{attr_name}" + property_name = f"{CE_PREFIX_UNDERSCORE}{attr_name}" # Encode datetime to AMQP timestamp (milliseconds since epoch) # Other types (bool, int, str, bytes) use native AMQP types application_properties[property_name] = _encode_amqp_value(attr_value) @@ -150,11 +156,12 @@ def from_binary( """ Parse an AMQP binary content mode message to a CloudEvent. - Extracts CloudEvent attributes from cloudEvents_-prefixed AMQP application - properties and treats the AMQP 'content-type' property as the 'datacontenttype' - attribute. The application-data section is parsed as event data according to - the content type. The 'time' attribute accepts both AMQP timestamp (int milliseconds) - and ISO 8601 string, while other native AMQP types (boolean, integer) are preserved. + Extracts CloudEvent attributes from AMQP application properties with either + 'cloudEvents_' or 'cloudEvents:' prefix (per AMQP CloudEvents spec), and treats + the AMQP 'content-type' property as the 'datacontenttype' attribute. The + application-data section is parsed as event data according to the content type. + The 'time' attribute accepts both AMQP timestamp (int milliseconds) and ISO 8601 + string, while other native AMQP types (boolean, integer) are preserved. Example: >>> from cloudevents.core.v1.event import CloudEvent @@ -180,8 +187,15 @@ def from_binary( attributes: dict[str, Any] = {} for prop_name, prop_value in message.application_properties.items(): - if prop_name.startswith(CE_PREFIX): - attr_name = prop_name[len(CE_PREFIX) :] + # Check for both cloudEvents_ and cloudEvents: prefixes + attr_name = None + + if prop_name.startswith(CE_PREFIX_UNDERSCORE): + attr_name = prop_name[len(CE_PREFIX_UNDERSCORE) :] + elif prop_name.startswith(CE_PREFIX_COLON): + attr_name = prop_name[len(CE_PREFIX_COLON) :] + + if attr_name: # Decode timestamp (int or ISO 8601 string) to datetime, preserve other native types attributes[attr_name] = _decode_amqp_value(attr_name, prop_value) diff --git a/tests/test_core/test_bindings/test_amqp.py b/tests/test_core/test_bindings/test_amqp.py index 76d702cf..d3a704b5 100644 --- a/tests/test_core/test_bindings/test_amqp.py +++ b/tests/test_core/test_bindings/test_amqp.py @@ -664,3 +664,213 @@ def test_from_binary_ignores_non_cloudevents_properties() -> None: # get_extension returns None for missing extensions assert event.get_extension("custom_property") is None assert event.get_extension("another_prop") is None + + +def test_from_binary_with_colon_prefix() -> None: + """Test from_binary accepts cloudEvents: prefix per AMQP spec""" + message = AMQPMessage( + properties={"content-type": "application/json"}, + application_properties={ + "cloudEvents:type": "com.example.test", + "cloudEvents:source": "/test", + "cloudEvents:id": "test-123", + "cloudEvents:specversion": "1.0", + }, + application_data=b'{"message": "Hello"}', + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + assert event.get_id() == "test-123" + assert event.get_specversion() == "1.0" + assert event.get_data() == {"message": "Hello"} + + +def test_from_binary_colon_prefix_with_extensions() -> None: + """Test from_binary with cloudEvents: prefix handles extensions""" + message = AMQPMessage( + properties={}, + application_properties={ + "cloudEvents:type": "test", + "cloudEvents:source": "/test", + "cloudEvents:id": "123", + "cloudEvents:specversion": "1.0", + "cloudEvents:customext": "custom-value", + "cloudEvents:boolext": True, + "cloudEvents:intext": 42, + }, + application_data=b"{}", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_extension("customext") == "custom-value" + assert event.get_extension("boolext") is True + assert event.get_extension("intext") == 42 + + +def test_from_binary_colon_prefix_with_datetime() -> None: + """Test from_binary with cloudEvents: prefix handles datetime""" + dt = datetime(2023, 1, 15, 10, 30, 45, tzinfo=timezone.utc) + timestamp_ms = int(dt.timestamp() * 1000) + + message = AMQPMessage( + properties={}, + application_properties={ + "cloudEvents:type": "test", + "cloudEvents:source": "/test", + "cloudEvents:id": "123", + "cloudEvents:specversion": "1.0", + "cloudEvents:time": timestamp_ms, # AMQP timestamp + }, + application_data=b"{}", + ) + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_time() == dt + + +def test_from_binary_colon_prefix_round_trip() -> None: + """Test round-trip with cloudEvents: prefix (manual construction)""" + # Create event with underscore prefix + original_event = create_event( + extra_attrs={"customext": "value", "datacontenttype": "application/json"}, + data={"message": "test"}, + ) + message_underscore = to_binary(original_event, JSONFormat()) + + # Manually construct message with colon prefix (simulate receiving from another system) + message_colon = AMQPMessage( + properties=message_underscore.properties, + application_properties={ + # Convert underscore to colon prefix + key.replace("cloudEvents_", "cloudEvents:"): value + for key, value in message_underscore.application_properties.items() + }, + application_data=message_underscore.application_data, + ) + + # Should parse correctly + recovered = from_binary(message_colon, JSONFormat(), CloudEvent) + + assert recovered.get_type() == original_event.get_type() + assert recovered.get_source() == original_event.get_source() + assert recovered.get_extension("customext") == "value" + assert recovered.get_data() == {"message": "test"} + + +def test_from_binary_mixed_prefixes_accepted() -> None: + """Test from_binary accepts mixed cloudEvents_ and cloudEvents: prefixes""" + message = AMQPMessage( + properties={}, + application_properties={ + "cloudEvents_type": "test", # Underscore + "cloudEvents:source": "/test", # Colon - mixed is OK + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + }, + application_data=b"{}", + ) + + event = from_binary(message, JSONFormat(), CloudEvent) + + # Should extract all attributes regardless of prefix + assert event.get_type() == "test" + assert event.get_source() == "/test" + assert event.get_id() == "123" + assert event.get_specversion() == "1.0" + + +def test_from_amqp_with_colon_prefix_binary_mode() -> None: + """Test from_amqp detects binary mode with cloudEvents: prefix""" + message = AMQPMessage( + properties={"content-type": "application/json"}, + application_properties={ + "cloudEvents:type": "test", + "cloudEvents:source": "/test", + "cloudEvents:id": "123", + "cloudEvents:specversion": "1.0", + }, + application_data=b'{"data": "value"}', + ) + + event = from_amqp(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "test" + assert event.get_source() == "/test" + assert event.get_data() == {"data": "value"} + + +def test_from_amqp_mixed_prefixes_accepted() -> None: + """Test from_amqp accepts mixed prefixes""" + message = AMQPMessage( + properties={"content-type": "application/json"}, + application_properties={ + "cloudEvents_type": "test", + "cloudEvents:source": "/test", # Mixed is OK + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + }, + application_data=b"{}", + ) + + event = from_amqp(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "test" + assert event.get_source() == "/test" + + +def test_from_binary_all_underscore_prefix_valid() -> None: + """Test from_binary accepts all cloudEvents_ prefix (baseline)""" + message = AMQPMessage( + properties={}, + application_properties={ + "cloudEvents_type": "test", + "cloudEvents_source": "/test", + "cloudEvents_id": "123", + "cloudEvents_specversion": "1.0", + }, + application_data=b"{}", + ) + + event = from_binary(message, JSONFormat(), CloudEvent) + assert event.get_type() == "test" + + +def test_from_binary_all_colon_prefix_valid() -> None: + """Test from_binary accepts all cloudEvents: prefix""" + message = AMQPMessage( + properties={}, + application_properties={ + "cloudEvents:type": "test", + "cloudEvents:source": "/test", + "cloudEvents:id": "123", + "cloudEvents:specversion": "1.0", + }, + application_data=b"{}", + ) + + event = from_binary(message, JSONFormat(), CloudEvent) + assert event.get_type() == "test" + + +def test_from_binary_colon_prefix_ignores_non_ce_properties() -> None: + """Test from_binary with colon prefix ignores non-CloudEvents properties""" + message = AMQPMessage( + properties={}, + application_properties={ + "cloudEvents:type": "test", + "cloudEvents:source": "/test", + "cloudEvents:id": "123", + "cloudEvents:specversion": "1.0", + "customProperty": "ignored", # No prefix + "anotherProp": 123, + }, + application_data=b"{}", + ) + + event = from_binary(message, JSONFormat(), CloudEvent) + + assert event.get_type() == "test" + assert event.get_extension("customProperty") is None + assert event.get_extension("anotherProp") is None From b8ac198739459bdeb4a1bae5097123bef668b04f Mon Sep 17 00:00:00 2001 From: Tudor Plugaru Date: Wed, 17 Dec 2025 18:19:30 +0200 Subject: [PATCH 3/3] chore: define type alias for EventFactory and use it everywhere Signed-off-by: Tudor Plugaru --- src/cloudevents/core/base.py | 16 +++++++++++++++- src/cloudevents/core/bindings/amqp.py | 16 +++++----------- src/cloudevents/core/bindings/http.py | 16 +++++----------- src/cloudevents/core/bindings/kafka.py | 14 ++++---------- src/cloudevents/core/formats/base.py | 9 +++------ src/cloudevents/core/formats/json.py | 9 +++------ 6 files changed, 35 insertions(+), 45 deletions(-) diff --git a/src/cloudevents/core/base.py b/src/cloudevents/core/base.py index 1c13183d..747f5a6a 100644 --- a/src/cloudevents/core/base.py +++ b/src/cloudevents/core/base.py @@ -13,7 +13,21 @@ # under the License. from datetime import datetime -from typing import Any, Protocol +from typing import Any, Callable, Protocol + +EventFactory = Callable[ + [dict[str, Any], dict[str, Any] | str | bytes | None], "BaseCloudEvent" +] +""" +Type alias for a callable that creates a BaseCloudEvent from attributes and data. + +Args: + attributes: The CloudEvent attributes (required fields like id, source, type, etc.) + data: The CloudEvent data payload (optional) + +Returns: + A BaseCloudEvent instance +""" class BaseCloudEvent(Protocol): diff --git a/src/cloudevents/core/bindings/amqp.py b/src/cloudevents/core/bindings/amqp.py index 1c0cc5c1..bafd8f48 100644 --- a/src/cloudevents/core/bindings/amqp.py +++ b/src/cloudevents/core/bindings/amqp.py @@ -14,11 +14,11 @@ from dataclasses import dataclass from datetime import datetime, timezone -from typing import Any, Callable, Final +from typing import Any, Final from dateutil.parser import isoparse -from cloudevents.core.base import BaseCloudEvent +from cloudevents.core.base import BaseCloudEvent, EventFactory from cloudevents.core.formats.base import Format # AMQP CloudEvents spec allows both cloudEvents_ and cloudEvents: prefixes @@ -149,9 +149,7 @@ def to_binary(event: BaseCloudEvent, event_format: Format) -> AMQPMessage: def from_binary( message: AMQPMessage, event_format: Format, - event_factory: Callable[ - [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent - ], + event_factory: EventFactory, ) -> BaseCloudEvent: """ Parse an AMQP binary content mode message to a CloudEvent. @@ -250,9 +248,7 @@ def to_structured(event: BaseCloudEvent, event_format: Format) -> AMQPMessage: def from_structured( message: AMQPMessage, event_format: Format, - event_factory: Callable[ - [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent - ], + event_factory: EventFactory, ) -> BaseCloudEvent: """ Parse an AMQP structured content mode message to a CloudEvent. @@ -283,9 +279,7 @@ def from_structured( def from_amqp( message: AMQPMessage, event_format: Format, - event_factory: Callable[ - [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent - ], + event_factory: EventFactory, ) -> BaseCloudEvent: """ Parse an AMQP message to a CloudEvent with automatic mode detection. diff --git a/src/cloudevents/core/bindings/http.py b/src/cloudevents/core/bindings/http.py index cded9a9f..15396704 100644 --- a/src/cloudevents/core/bindings/http.py +++ b/src/cloudevents/core/bindings/http.py @@ -13,9 +13,9 @@ # under the License. from dataclasses import dataclass -from typing import Any, Callable, Final +from typing import Any, Final -from cloudevents.core.base import BaseCloudEvent +from cloudevents.core.base import BaseCloudEvent, EventFactory from cloudevents.core.bindings.common import ( CONTENT_TYPE_HEADER, DATACONTENTTYPE_ATTR, @@ -92,9 +92,7 @@ def to_binary(event: BaseCloudEvent, event_format: Format) -> HTTPMessage: def from_binary( message: HTTPMessage, event_format: Format, - event_factory: Callable[ - [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent - ], + event_factory: EventFactory, ) -> BaseCloudEvent: """ Parse an HTTP binary content mode message to a CloudEvent. @@ -172,9 +170,7 @@ def to_structured(event: BaseCloudEvent, event_format: Format) -> HTTPMessage: def from_structured( message: HTTPMessage, event_format: Format, - event_factory: Callable[ - [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent - ], + event_factory: EventFactory, ) -> BaseCloudEvent: """ Parse an HTTP structured content mode message to a CloudEvent. @@ -203,9 +199,7 @@ def from_structured( def from_http( message: HTTPMessage, event_format: Format, - event_factory: Callable[ - [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent - ], + event_factory: EventFactory, ) -> BaseCloudEvent: """ Parse an HTTP message to a CloudEvent with automatic mode detection. diff --git a/src/cloudevents/core/bindings/kafka.py b/src/cloudevents/core/bindings/kafka.py index 9c2e16b7..02bd5837 100644 --- a/src/cloudevents/core/bindings/kafka.py +++ b/src/cloudevents/core/bindings/kafka.py @@ -15,7 +15,7 @@ from dataclasses import dataclass from typing import Any, Callable, Final -from cloudevents.core.base import BaseCloudEvent +from cloudevents.core.base import BaseCloudEvent, EventFactory from cloudevents.core.bindings.common import ( CONTENT_TYPE_HEADER, DATACONTENTTYPE_ATTR, @@ -125,9 +125,7 @@ def to_binary( def from_binary( message: KafkaMessage, event_format: Format, - event_factory: Callable[ - [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent - ], + event_factory: EventFactory, ) -> BaseCloudEvent: """ Parse a Kafka binary content mode message to a CloudEvent. @@ -228,9 +226,7 @@ def to_structured( def from_structured( message: KafkaMessage, event_format: Format, - event_factory: Callable[ - [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent - ], + event_factory: EventFactory, ) -> BaseCloudEvent: """ Parse a Kafka structured content mode message to a CloudEvent. @@ -276,9 +272,7 @@ def from_structured( def from_kafka( message: KafkaMessage, event_format: Format, - event_factory: Callable[ - [dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent - ], + event_factory: EventFactory, ) -> BaseCloudEvent: """ Parse a Kafka message to a CloudEvent with automatic mode detection. diff --git a/src/cloudevents/core/formats/base.py b/src/cloudevents/core/formats/base.py index 7adb2802..9cb3523c 100644 --- a/src/cloudevents/core/formats/base.py +++ b/src/cloudevents/core/formats/base.py @@ -12,9 +12,9 @@ # License for the specific language governing permissions and limitations # under the License. -from typing import Any, Callable, Protocol +from typing import Any, Protocol -from cloudevents.core.base import BaseCloudEvent +from cloudevents.core.base import BaseCloudEvent, EventFactory class Format(Protocol): @@ -29,10 +29,7 @@ class Format(Protocol): def read( self, - event_factory: Callable[ - [dict[str, Any], dict[str, Any] | str | bytes | None], - BaseCloudEvent, - ], + event_factory: EventFactory, data: str | bytes, ) -> BaseCloudEvent: """ diff --git a/src/cloudevents/core/formats/json.py b/src/cloudevents/core/formats/json.py index 91e6ce04..8823e1e4 100644 --- a/src/cloudevents/core/formats/json.py +++ b/src/cloudevents/core/formats/json.py @@ -16,11 +16,11 @@ import re from datetime import datetime from json import JSONEncoder, dumps, loads -from typing import Any, Callable, Final, Pattern +from typing import Any, Final, Pattern from dateutil.parser import isoparse -from cloudevents.core.base import BaseCloudEvent +from cloudevents.core.base import BaseCloudEvent, EventFactory from cloudevents.core.formats.base import Format @@ -49,10 +49,7 @@ class JSONFormat(Format): def read( self, - event_factory: Callable[ - [dict[str, Any], dict[str, Any] | str | bytes | None], - BaseCloudEvent, - ], + event_factory: EventFactory, data: str | bytes, ) -> BaseCloudEvent: """