diff --git a/src/cloudevents/core/bindings/http.py b/src/cloudevents/core/bindings/http.py index cded9a9..c90aedb 100644 --- a/src/cloudevents/core/bindings/http.py +++ b/src/cloudevents/core/bindings/http.py @@ -23,6 +23,8 @@ encode_header_value, ) from cloudevents.core.formats.base import Format +from cloudevents.core.formats.json import JSONFormat +from cloudevents.core.v1.event import CloudEvent CE_PREFIX: Final[str] = "ce-" @@ -244,3 +246,116 @@ def from_http( return from_binary(message, event_format, event_factory) return from_structured(message, event_format, event_factory) + + +def to_binary_event( + event: BaseCloudEvent, + event_format: Format | None = None, +) -> HTTPMessage: + """ + Convenience wrapper for to_binary with JSON format as default. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.bindings import http + >>> + >>> event = CloudEvent( + ... attributes={"type": "com.example.test", "source": "/test"}, + ... data={"message": "Hello"} + ... ) + >>> message = http.to_binary_event(event) + + :param event: The CloudEvent to convert + :param event_format: Format implementation (defaults to JSONFormat) + :return: HTTPMessage with ce-prefixed headers + """ + if event_format is None: + event_format = JSONFormat() + return to_binary(event, event_format) + + +def from_binary_event( + message: HTTPMessage, + event_format: Format | None = None, +) -> CloudEvent: + """ + Convenience wrapper for from_binary with JSON format and CloudEvent as defaults. + + Example: + >>> from cloudevents.core.bindings import http + >>> event = http.from_binary_event(message) + + :param message: HTTPMessage to parse + :param event_format: Format implementation (defaults to JSONFormat) + :return: CloudEvent instance + """ + if event_format is None: + event_format = JSONFormat() + return from_binary(message, event_format, CloudEvent) + + +def to_structured_event( + event: BaseCloudEvent, + event_format: Format | None = None, +) -> HTTPMessage: + """ + Convenience wrapper for to_structured with JSON format as default. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.bindings import http + >>> + >>> event = CloudEvent( + ... attributes={"type": "com.example.test", "source": "/test"}, + ... data={"message": "Hello"} + ... ) + >>> message = http.to_structured_event(event) + + :param event: The CloudEvent to convert + :param event_format: Format implementation (defaults to JSONFormat) + :return: HTTPMessage with structured content + """ + if event_format is None: + event_format = JSONFormat() + return to_structured(event, event_format) + + +def from_structured_event( + message: HTTPMessage, + event_format: Format | None = None, +) -> CloudEvent: + """ + Convenience wrapper for from_structured with JSON format and CloudEvent as defaults. + + Example: + >>> from cloudevents.core.bindings import http + >>> event = http.from_structured_event(message) + + :param message: HTTPMessage to parse + :param event_format: Format implementation (defaults to JSONFormat) + :return: CloudEvent instance + """ + if event_format is None: + event_format = JSONFormat() + return from_structured(message, event_format, CloudEvent) + + +def from_http_event( + message: HTTPMessage, + event_format: Format | None = None, +) -> CloudEvent: + """ + Convenience wrapper for from_http with JSON format and CloudEvent as defaults. + Auto-detects binary or structured mode. + + Example: + >>> from cloudevents.core.bindings import http + >>> event = http.from_http_event(message) + + :param message: HTTPMessage to parse + :param event_format: Format implementation (defaults to JSONFormat) + :return: CloudEvent instance + """ + if event_format is None: + event_format = JSONFormat() + return from_http(message, event_format, CloudEvent) diff --git a/src/cloudevents/core/bindings/kafka.py b/src/cloudevents/core/bindings/kafka.py index 9c2e16b..94ef3b3 100644 --- a/src/cloudevents/core/bindings/kafka.py +++ b/src/cloudevents/core/bindings/kafka.py @@ -23,6 +23,8 @@ encode_header_value, ) from cloudevents.core.formats.base import Format +from cloudevents.core.formats.json import JSONFormat +from cloudevents.core.v1.event import CloudEvent CE_PREFIX: Final[str] = "ce_" PARTITIONKEY_ATTR: Final[str] = "partitionkey" @@ -320,3 +322,120 @@ def from_kafka( return from_binary(message, event_format, event_factory) return from_structured(message, event_format, event_factory) + + +def to_binary_event( + event: BaseCloudEvent, + event_format: Format | None = None, + key_mapper: KeyMapper | None = None, +) -> KafkaMessage: + """ + Convenience wrapper for to_binary with JSON format and CloudEvent as defaults. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.bindings import kafka + >>> + >>> event = CloudEvent( + ... attributes={"type": "com.example.test", "source": "/test"}, + ... data={"message": "Hello"} + ... ) + >>> message = kafka.to_binary_event(event) + + :param event: The CloudEvent to convert + :param event_format: Format implementation (defaults to JSONFormat) + :param key_mapper: Optional function to extract message key from event + :return: KafkaMessage with ce_-prefixed headers + """ + if event_format is None: + event_format = JSONFormat() + return to_binary(event, event_format, key_mapper) + + +def from_binary_event( + message: KafkaMessage, + event_format: Format | None = None, +) -> CloudEvent: + """ + Convenience wrapper for from_binary with JSON format and CloudEvent as defaults. + + Example: + >>> from cloudevents.core.bindings import kafka + >>> event = kafka.from_binary_event(message) + + :param message: KafkaMessage to parse + :param event_format: Format implementation (defaults to JSONFormat) + :return: CloudEvent instance + """ + if event_format is None: + event_format = JSONFormat() + return from_binary(message, event_format, CloudEvent) + + +def to_structured_event( + event: BaseCloudEvent, + event_format: Format | None = None, + key_mapper: KeyMapper | None = None, +) -> KafkaMessage: + """ + Convenience wrapper for to_structured with JSON format as default. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.bindings import kafka + >>> + >>> event = CloudEvent( + ... attributes={"type": "com.example.test", "source": "/test"}, + ... data={"message": "Hello"} + ... ) + >>> message = kafka.to_structured_event(event) + + :param event: The CloudEvent to convert + :param event_format: Format implementation (defaults to JSONFormat) + :param key_mapper: Optional function to extract message key from event + :return: KafkaMessage with structured content + """ + if event_format is None: + event_format = JSONFormat() + return to_structured(event, event_format, key_mapper) + + +def from_structured_event( + message: KafkaMessage, + event_format: Format | None = None, +) -> CloudEvent: + """ + Convenience wrapper for from_structured with JSON format and CloudEvent as defaults. + + Example: + >>> from cloudevents.core.bindings import kafka + >>> event = kafka.from_structured_event(message) + + :param message: KafkaMessage to parse + :param event_format: Format implementation (defaults to JSONFormat) + :return: CloudEvent instance + """ + if event_format is None: + event_format = JSONFormat() + return from_structured(message, event_format, CloudEvent) + + +def from_kafka_event( + message: KafkaMessage, + event_format: Format | None = None, +) -> CloudEvent: + """ + Convenience wrapper for from_kafka with JSON format and CloudEvent as defaults. + Auto-detects binary or structured mode. + + Example: + >>> from cloudevents.core.bindings import kafka + >>> event = kafka.from_kafka_event(message) + + :param message: KafkaMessage to parse + :param event_format: Format implementation (defaults to JSONFormat) + :return: CloudEvent instance + """ + if event_format is None: + event_format = JSONFormat() + return from_kafka(message, event_format, CloudEvent) diff --git a/tests/test_core/test_bindings/test_http.py b/tests/test_core/test_bindings/test_http.py index 165fae8..cb5b560 100644 --- a/tests/test_core/test_bindings/test_http.py +++ b/tests/test_core/test_bindings/test_http.py @@ -20,10 +20,15 @@ from cloudevents.core.bindings.http import ( HTTPMessage, from_binary, + from_binary_event, from_http, + from_http_event, from_structured, + from_structured_event, to_binary, + to_binary_event, to_structured, + to_structured_event, ) from cloudevents.core.formats.json import JSONFormat from cloudevents.core.v1.event import CloudEvent @@ -966,3 +971,155 @@ def test_real_world_scenario() -> None: assert data["ref"] == "refs/heads/main" assert len(data["commits"]) == 2 assert data["commits"][0]["message"] == "Fix bug" + + +def test_to_binary_with_defaults() -> None: + """Test to_binary_event convenience wrapper using default JSONFormat""" + event = create_event( + extra_attrs={"datacontenttype": "application/json"}, + data={"message": "Hello"}, + ) + + message = to_binary_event(event) + + assert "ce-type" in message.headers + assert message.headers["ce-type"] == "com.example.test" + assert b'"message"' in message.body + assert b'"Hello"' in message.body + + +def test_to_structured_with_defaults() -> None: + """Test to_structured_event convenience wrapper using default JSONFormat""" + event = create_event(data={"message": "Hello"}) + + message = to_structured_event(event) + + assert "content-type" in message.headers + assert message.headers["content-type"] == "application/cloudevents+json" + assert b'"type"' in message.body + assert b'"com.example.test"' in message.body + assert b'"data"' in message.body + + +def test_from_binary_with_defaults() -> None: + """Test from_binary_event convenience wrapper using default JSONFormat and CloudEvent factory""" + message = HTTPMessage( + headers={ + "ce-type": "com.example.test", + "ce-source": "/test", + "ce-id": "123", + "ce-specversion": "1.0", + "content-type": "application/json", + }, + body=b'{"message": "Hello"}', + ) + + event = from_binary_event(message) + + assert isinstance(event, CloudEvent) + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + assert event.get_id() == "123" + assert event.get_data() == {"message": "Hello"} + + +def test_from_structured_with_defaults() -> None: + """Test from_structured_event convenience wrapper using default JSONFormat and CloudEvent factory""" + message = HTTPMessage( + headers={"content-type": "application/cloudevents+json"}, + body=b'{"type": "com.example.test", "source": "/test", "id": "123", "specversion": "1.0", "data": {"message": "Hello"}}', + ) + + event = from_structured_event(message) + + assert isinstance(event, CloudEvent) + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + assert event.get_id() == "123" + assert event.get_data() == {"message": "Hello"} + + +def test_from_http_with_defaults_binary() -> None: + """Test from_http_event convenience wrapper with auto-detection (binary mode)""" + message = HTTPMessage( + headers={ + "ce-type": "com.example.test", + "ce-source": "/test", + "ce-id": "123", + "ce-specversion": "1.0", + }, + body=b'{"message": "Hello"}', + ) + + event = from_http_event(message) + + assert isinstance(event, CloudEvent) + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + + +def test_from_http_with_defaults_structured() -> None: + """Test from_http_event convenience wrapper with auto-detection (structured mode)""" + message = HTTPMessage( + headers={"content-type": "application/cloudevents+json"}, + body=b'{"type": "com.example.test", "source": "/test", "id": "123", "specversion": "1.0"}', + ) + + # Call wrapper function (should use defaults and detect structured mode) + event = from_http_event(message) + + assert isinstance(event, CloudEvent) + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + + +def test_convenience_roundtrip_binary() -> None: + """Test complete roundtrip using convenience wrapper functions with binary mode""" + original_event = create_event( + extra_attrs={"datacontenttype": "application/json"}, + data={"message": "Roundtrip test"}, + ) + + # Convert to message using wrapper + message = to_binary_event(original_event) + + # Convert back using wrapper + recovered_event = from_binary_event(message) + + assert recovered_event.get_type() == original_event.get_type() + assert recovered_event.get_source() == original_event.get_source() + assert recovered_event.get_id() == original_event.get_id() + assert recovered_event.get_data() == original_event.get_data() + + +def test_convenience_roundtrip_structured() -> None: + """Test complete roundtrip using convenience wrapper functions with structured mode""" + original_event = create_event( + extra_attrs={"datacontenttype": "application/json"}, + data={"message": "Roundtrip test"}, + ) + + # Convert to message using wrapper + message = to_structured_event(original_event) + + # Convert back using wrapper + recovered_event = from_structured_event(message) + + assert recovered_event.get_type() == original_event.get_type() + assert recovered_event.get_source() == original_event.get_source() + assert recovered_event.get_id() == original_event.get_id() + assert recovered_event.get_data() == original_event.get_data() + + +def test_convenience_with_explicit_format_override() -> None: + """Test that wrapper functions can override format (still flexible)""" + event = create_event( + extra_attrs={"datacontenttype": "application/json"}, + data={"message": "Hello"}, + ) + + message = to_binary_event(event, JSONFormat()) + recovered = from_binary_event(message, JSONFormat()) + + assert recovered.get_type() == event.get_type() + assert recovered.get_data() == event.get_data() diff --git a/tests/test_core/test_bindings/test_kafka.py b/tests/test_core/test_bindings/test_kafka.py index e7d0d45..8b319c3 100644 --- a/tests/test_core/test_bindings/test_kafka.py +++ b/tests/test_core/test_bindings/test_kafka.py @@ -21,10 +21,15 @@ from cloudevents.core.bindings.kafka import ( KafkaMessage, from_binary, + from_binary_event, from_kafka, + from_kafka_event, from_structured, + from_structured_event, to_binary, + to_binary_event, to_structured, + to_structured_event, ) from cloudevents.core.formats.json import JSONFormat from cloudevents.core.v1.event import CloudEvent @@ -618,3 +623,163 @@ def bytes_mapper(event: BaseCloudEvent) -> bytes: event2 = create_event() msg2 = to_binary(event2, JSONFormat(), key_mapper=bytes_mapper) assert msg2.key == b"bytes-key" + + +def test_to_binary_with_defaults() -> None: + """Test to_binary_event convenience wrapper using default JSONFormat""" + event = create_event( + extra_attrs={"datacontenttype": "application/json"}, + data={"message": "Hello"}, + ) + + message = to_binary_event(event) + + assert "ce_type" in message.headers + assert message.headers["ce_type"] == b"com.example.test" + assert b'"message"' in message.value + assert b'"Hello"' in message.value + + +def test_to_structured_with_defaults() -> None: + """Test to_structured_event convenience wrapper using default JSONFormat""" + event = create_event(data={"message": "Hello"}) + + message = to_structured_event(event) + + assert "content-type" in message.headers + assert message.headers["content-type"] == b"application/cloudevents+json" + assert b'"type"' in message.value + assert b'"com.example.test"' in message.value + assert b'"data"' in message.value + + +def test_from_binary_with_defaults() -> None: + """Test from_binary_event convenience wrapper using default JSONFormat and CloudEvent factory""" + message = KafkaMessage( + headers={ + "ce_type": b"com.example.test", + "ce_source": b"%2Ftest", + "ce_id": b"123", + "ce_specversion": b"1.0", + "content-type": b"application/json", + }, + key=None, + value=b'{"message": "Hello"}', + ) + + # Call wrapper function (should use defaults) + event = from_binary_event(message) + + assert isinstance(event, CloudEvent) + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + assert event.get_id() == "123" + assert event.get_data() == {"message": "Hello"} + + +def test_from_structured_with_defaults() -> None: + """Test from_structured_event convenience wrapper using default JSONFormat and CloudEvent factory""" + message = KafkaMessage( + headers={"content-type": b"application/cloudevents+json"}, + key=None, + value=b'{"type": "com.example.test", "source": "/test", "id": "123", "specversion": "1.0", "data": {"message": "Hello"}}', + ) + + # Call wrapper function (should use defaults) + event = from_structured_event(message) + + assert isinstance(event, CloudEvent) + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + assert event.get_id() == "123" + assert event.get_data() == {"message": "Hello"} + + +def test_from_kafka_with_defaults_binary() -> None: + """Test from_kafka_event convenience wrapper with auto-detection (binary mode)""" + message = KafkaMessage( + headers={ + "ce_type": b"com.example.test", + "ce_source": b"%2Ftest", + "ce_id": b"123", + "ce_specversion": b"1.0", + }, + key=None, + value=b'{"message": "Hello"}', + ) + + # Call wrapper function (should use defaults and detect binary mode) + event = from_kafka_event(message) + + assert isinstance(event, CloudEvent) + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + + +def test_from_kafka_with_defaults_structured() -> None: + """Test from_kafka_event convenience wrapper with auto-detection (structured mode)""" + message = KafkaMessage( + headers={"content-type": b"application/cloudevents+json"}, + key=None, + value=b'{"type": "com.example.test", "source": "/test", "id": "123", "specversion": "1.0"}', + ) + + # Call wrapper function (should use defaults and detect structured mode) + event = from_kafka_event(message) + + assert isinstance(event, CloudEvent) + assert event.get_type() == "com.example.test" + assert event.get_source() == "/test" + + +def test_convenience_roundtrip_binary() -> None: + """Test complete roundtrip using convenience wrapper functions with binary mode""" + original_event = create_event( + extra_attrs={"datacontenttype": "application/json"}, + data={"message": "Roundtrip test"}, + ) + + # Convert to message using wrapper + message = to_binary_event(original_event) + + # Convert back using wrapper + recovered_event = from_binary_event(message) + + assert recovered_event.get_type() == original_event.get_type() + assert recovered_event.get_source() == original_event.get_source() + assert recovered_event.get_id() == original_event.get_id() + assert recovered_event.get_data() == original_event.get_data() + + +def test_convenience_roundtrip_structured() -> None: + """Test complete roundtrip using convenience wrapper functions with structured mode""" + original_event = create_event( + extra_attrs={"datacontenttype": "application/json"}, + data={"message": "Roundtrip test"}, + ) + + # Convert to message using wrapper + message = to_structured_event(original_event) + + # Convert back using wrapper + recovered_event = from_structured_event(message) + + assert recovered_event.get_type() == original_event.get_type() + assert recovered_event.get_source() == original_event.get_source() + assert recovered_event.get_id() == original_event.get_id() + assert recovered_event.get_data() == original_event.get_data() + + +def test_convenience_with_explicit_format_override() -> None: + """Test that wrapper functions can override format (still flexible)""" + event = create_event( + extra_attrs={"datacontenttype": "application/json"}, + data={"message": "Hello"}, + ) + + # Explicitly pass JSONFormat to wrapper function + message = to_binary_event(event, JSONFormat()) + recovered = from_binary_event(message, JSONFormat()) + + assert recovered.get_type() == event.get_type() + assert recovered.get_data() == event.get_data()