Skip to content

Add get many in streams as dependency injection #309

@marcosschroh

Description

@marcosschroh

Is your feature request related to a problem? Please describe.

Currently there is not a way to get many records at once when we are using the dependency injection approach.

With the current state we consume events once by one, this is wonderful for most applications, but there are use cases that we need to perform certain actions inside the consumption handler that can take too long, for example saving the events to a database or maybe doing a HTTP request, for example:

from kstreams import stream, ConsumerRecord

@stream(topic)
async def my_stream(cr: ConsumerRecord):
    # this takes 1 second, better to do it in bulk
    save_to_db(cr)

Describe the solution you'd like

Get a List[ConsumerRecord] every time that the consumption handler is called. The number of elements in the list can be specified, also a timeout.

from kstreams import stream, ConsumerRecord, GetMany

@stream(topic)
async def my_stream(crs: List[ConsumerRecord] = GetMany(records=10, timeout=10)):
    # this takes 1 second, better to do it in bulk
    for cr in crs:
        print(cr)
    
    save_bulk_to_db(crs)

Describe alternatives you've considered

Keep using the get_many from the Stream:

@stream_engine.stream(topic, ...)
async def stream(stream: Stream):
    while True:
        data = await stream.getmany(max_records=5)
        print(data)

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions