Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions src/cloudevents/core/bindings/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
encode_header_value,
)
from cloudevents.core.formats.base import Format
from cloudevents.core.formats.json import JSONFormat
from cloudevents.core.v1.event import CloudEvent

CE_PREFIX: Final[str] = "ce-"

Expand Down Expand Up @@ -244,3 +246,116 @@ def from_http(
return from_binary(message, event_format, event_factory)

return from_structured(message, event_format, event_factory)


def to_binary_event(
event: BaseCloudEvent,
event_format: Format | None = None,
) -> HTTPMessage:
"""
Convenience wrapper for to_binary with JSON format as default.

Example:
>>> from cloudevents.core.v1.event import CloudEvent
>>> from cloudevents.core.bindings import http
>>>
>>> event = CloudEvent(
... attributes={"type": "com.example.test", "source": "/test"},
... data={"message": "Hello"}
... )
>>> message = http.to_binary_event(event)

:param event: The CloudEvent to convert
:param event_format: Format implementation (defaults to JSONFormat)
:return: HTTPMessage with ce-prefixed headers
"""
if event_format is None:
event_format = JSONFormat()
return to_binary(event, event_format)


def from_binary_event(
message: HTTPMessage,
event_format: Format | None = None,
) -> CloudEvent:
"""
Convenience wrapper for from_binary with JSON format and CloudEvent as defaults.

Example:
>>> from cloudevents.core.bindings import http
>>> event = http.from_binary_event(message)

:param message: HTTPMessage to parse
:param event_format: Format implementation (defaults to JSONFormat)
:return: CloudEvent instance
"""
if event_format is None:
event_format = JSONFormat()
return from_binary(message, event_format, CloudEvent)


def to_structured_event(
event: BaseCloudEvent,
event_format: Format | None = None,
) -> HTTPMessage:
"""
Convenience wrapper for to_structured with JSON format as default.

Example:
>>> from cloudevents.core.v1.event import CloudEvent
>>> from cloudevents.core.bindings import http
>>>
>>> event = CloudEvent(
... attributes={"type": "com.example.test", "source": "/test"},
... data={"message": "Hello"}
... )
>>> message = http.to_structured_event(event)

:param event: The CloudEvent to convert
:param event_format: Format implementation (defaults to JSONFormat)
:return: HTTPMessage with structured content
"""
if event_format is None:
event_format = JSONFormat()
return to_structured(event, event_format)


def from_structured_event(
message: HTTPMessage,
event_format: Format | None = None,
) -> CloudEvent:
"""
Convenience wrapper for from_structured with JSON format and CloudEvent as defaults.

Example:
>>> from cloudevents.core.bindings import http
>>> event = http.from_structured_event(message)

:param message: HTTPMessage to parse
:param event_format: Format implementation (defaults to JSONFormat)
:return: CloudEvent instance
"""
if event_format is None:
event_format = JSONFormat()
return from_structured(message, event_format, CloudEvent)


def from_http_event(
message: HTTPMessage,
event_format: Format | None = None,
) -> CloudEvent:
"""
Convenience wrapper for from_http with JSON format and CloudEvent as defaults.
Auto-detects binary or structured mode.

Example:
>>> from cloudevents.core.bindings import http
>>> event = http.from_http_event(message)

:param message: HTTPMessage to parse
:param event_format: Format implementation (defaults to JSONFormat)
:return: CloudEvent instance
"""
if event_format is None:
event_format = JSONFormat()
return from_http(message, event_format, CloudEvent)
119 changes: 119 additions & 0 deletions src/cloudevents/core/bindings/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
encode_header_value,
)
from cloudevents.core.formats.base import Format
from cloudevents.core.formats.json import JSONFormat
from cloudevents.core.v1.event import CloudEvent

CE_PREFIX: Final[str] = "ce_"
PARTITIONKEY_ATTR: Final[str] = "partitionkey"
Expand Down Expand Up @@ -320,3 +322,120 @@ def from_kafka(
return from_binary(message, event_format, event_factory)

return from_structured(message, event_format, event_factory)


def to_binary_event(
event: BaseCloudEvent,
event_format: Format | None = None,
key_mapper: KeyMapper | None = None,
) -> KafkaMessage:
"""
Convenience wrapper for to_binary with JSON format and CloudEvent as defaults.

Example:
>>> from cloudevents.core.v1.event import CloudEvent
>>> from cloudevents.core.bindings import kafka
>>>
>>> event = CloudEvent(
... attributes={"type": "com.example.test", "source": "/test"},
... data={"message": "Hello"}
... )
>>> message = kafka.to_binary_event(event)

:param event: The CloudEvent to convert
:param event_format: Format implementation (defaults to JSONFormat)
:param key_mapper: Optional function to extract message key from event
:return: KafkaMessage with ce_-prefixed headers
"""
if event_format is None:
event_format = JSONFormat()
return to_binary(event, event_format, key_mapper)


def from_binary_event(
message: KafkaMessage,
event_format: Format | None = None,
) -> CloudEvent:
"""
Convenience wrapper for from_binary with JSON format and CloudEvent as defaults.

Example:
>>> from cloudevents.core.bindings import kafka
>>> event = kafka.from_binary_event(message)

:param message: KafkaMessage to parse
:param event_format: Format implementation (defaults to JSONFormat)
:return: CloudEvent instance
"""
if event_format is None:
event_format = JSONFormat()
return from_binary(message, event_format, CloudEvent)


def to_structured_event(
event: BaseCloudEvent,
event_format: Format | None = None,
key_mapper: KeyMapper | None = None,
) -> KafkaMessage:
"""
Convenience wrapper for to_structured with JSON format as default.

Example:
>>> from cloudevents.core.v1.event import CloudEvent
>>> from cloudevents.core.bindings import kafka
>>>
>>> event = CloudEvent(
... attributes={"type": "com.example.test", "source": "/test"},
... data={"message": "Hello"}
... )
>>> message = kafka.to_structured_event(event)

:param event: The CloudEvent to convert
:param event_format: Format implementation (defaults to JSONFormat)
:param key_mapper: Optional function to extract message key from event
:return: KafkaMessage with structured content
"""
if event_format is None:
event_format = JSONFormat()
return to_structured(event, event_format, key_mapper)


def from_structured_event(
message: KafkaMessage,
event_format: Format | None = None,
) -> CloudEvent:
"""
Convenience wrapper for from_structured with JSON format and CloudEvent as defaults.

Example:
>>> from cloudevents.core.bindings import kafka
>>> event = kafka.from_structured_event(message)

:param message: KafkaMessage to parse
:param event_format: Format implementation (defaults to JSONFormat)
:return: CloudEvent instance
"""
if event_format is None:
event_format = JSONFormat()
return from_structured(message, event_format, CloudEvent)


def from_kafka_event(
message: KafkaMessage,
event_format: Format | None = None,
) -> CloudEvent:
"""
Convenience wrapper for from_kafka with JSON format and CloudEvent as defaults.
Auto-detects binary or structured mode.

Example:
>>> from cloudevents.core.bindings import kafka
>>> event = kafka.from_kafka_event(message)

:param message: KafkaMessage to parse
:param event_format: Format implementation (defaults to JSONFormat)
:return: CloudEvent instance
"""
if event_format is None:
event_format = JSONFormat()
return from_kafka(message, event_format, CloudEvent)
Loading
Loading