Conversation
9c6b0c4 to
8c36e95
Compare
mhoff
left a comment
There was a problem hiding this comment.
Hey @kaya-david I did a first review pass. Happy to discuss
9db4ab9 to
04234fd
Compare
af8deb4 to
5dbb963
Compare
mhoff
left a comment
There was a problem hiding this comment.
Hello @kaya-david, I have added some comments
| async def shut_down(self): | ||
| """Raises Uvicorn HTTP Server internal stop flag and waits to join""" | ||
| if self.http_server: | ||
| self.http_server.shut_down() | ||
| return super()._shut_down() | ||
| await super().shut_down() |
There was a problem hiding this comment.
As explained above, we no longer introduce separate "clean-up" paths, but instead rely on a single, well-defined tear-down path that implies the instance must not be used afterwards.
logprep/ng/util/worker/worker.py
Outdated
| tasks_but_current = [t for t in self._worker_tasks if t is not current_task] | ||
|
|
||
| logger.debug("waiting for termination of %d tasks", len(tasks_but_current)) | ||
| logger.debug(f"waiting for termination of {len(tasks_but_current)} tasks") |
There was a problem hiding this comment.
I am a bit puzzled by this change. I thought using %d would be the proper way to avoid string interpolation if the log level is not activated.
There was a problem hiding this comment.
Good catch, thanks! I’ve reverted this change.
Pablu23
left a comment
There was a problem hiding this comment.
Thanks for your work, I left you a few Comments I noticed in regards to the in and outputs
| consumer = await self.get_consumer() | ||
|
|
||
| if consumer is not None: | ||
| await consumer.unsubscribe() |
There was a problem hiding this comment.
I believe unsubscribe is unnecessary here, but its probably also not doing any bad
There was a problem hiding this comment.
Yes you are rigth, unsubscribe is only needed for dynamic topic switching during runtime.
In our case, shut_down is not designed for that. It follows RAII-like semantics: calling it implies a full teardown of the instance and all associated resources. After that, a fresh instance is expected to be created via setup (as the counterpart to shut_down).
Continuing to operate on the same instance after a partial cleanup (e.g. via unsubscribe) is explicitly not part of the intended lifecycle.
|
|
||
| except BufferError: | ||
| # block program until buffer is empty or timeout is reached | ||
| self._producer.flush(timeout=self.config.flush_timeout) | ||
| logger.debug("Buffer full, flushing") | ||
|
|
||
| try: | ||
| self._producer.produce( | ||
| topic=target, | ||
| value=self._encoder.encode(document), | ||
| on_delivery=partial(self.on_delivery, event), | ||
| ) | ||
| except BufferError as err: | ||
| event.state.current_state = EventStateType.FAILED | ||
| event.errors.append(err) | ||
| logger.error("Message delivery failed after retry: %s", err) | ||
| self.metrics.number_of_errors += 1 | ||
| return |
There was a problem hiding this comment.
This is new logic, now we dont try to flush everytime, we only try to flush if we get a BufferError, which I guess is fine, if we just want to flush on a full Buffer but I dont think we do. Also I dont like nesting try, except like this. But I also dont have a better solution for now, maybe recursivly call this same function an increment an optional depth argument, and if that reaches the set retry times, we can error out again
There was a problem hiding this comment.
This applied to the old sync producer where we had to manually coordinate produce/poll/flush. I’ve now migrated the producer to the async AIOProducer, where delivery is handled via awaitable futures and internal batching, so this control flow (and related concerns) no longer applies.
Could you please cherry-pick the relevant parts into your output ticket and take another look there? If needed, feel free to implement your own async variant of store_custom / producer handling.
For now I’d keep the current implementation as is and suggest we clean this up together in a separate PR to properly align on the async approach.
| logger.error("Message delivery failed: %s", err) | ||
| self.metrics.number_of_errors += 1 | ||
| return | ||
|
|
There was a problem hiding this comment.
Also this part is weird, why do we have to handle the case that Kafka ran into an Exception twice? First in the try, except of the store_custom function, and also here? This Callback is run from the Context of the store_custom function
There was a problem hiding this comment.
see comment above
| if "_producer" in self.__dict__: | ||
| await self.flush() |
There was a problem hiding this comment.
Why do we do this? Shouldnt we just always flush, I mmean shouldnt the flush be agnostic to, there is a producer and there is none? Also I dont like this if, isnt there any other way to check if we have a producer?
There was a problem hiding this comment.
_producer is a cached property and is only initialized on first access. A shut_down could technically occur before it was ever used (i.e. before the producer exists), which would cause a crash during flush. This check is therefore a precaution.
| search_context = self.__dict__.get("_search_context") | ||
| if search_context is not None: | ||
| await search_context.close() |
There was a problem hiding this comment.
| search_context = self.__dict__.get("_search_context") | |
| if search_context is not None: | |
| await search_context.close() | |
| await self._search_context.close() |
There was a problem hiding this comment.
Same as above, I added a guard here as well.
PS: I removed the override decorator - as long as mypy does not complain (which is currently the case), overrides don’t add much value. Also, since we have many overloads and don’t consistently use override elsewhere, I prefer to omit it here for consistency.
… for Worker and some logs
…cle across components - unify component lifecycle by introducing async setup/shut_down across NG components - remove legacy _shut_down pattern and simplify base Component shutdown logic - align Connector/Input/Output/Processor lifecycle interfaces - fix kafka output delivery semantics by setting DELIVERED only via on_delivery callback - improve kafka error handling (BufferError retry, KafkaException -> FAILED) - ensure proper resource cleanup (consumer unsubscribe/close, producer flush, opensearch context close) - improve worker shutdown by cancelling only unfinished tasks # Conflicts: # logprep/ng/connector/opensearch/output.py
- remove docker compose teardown from SIGINT handler to avoid interfering with active OpenSearch requests - introduce coordinated shutdown via _shutdown_requested flag - add shutdown checkpoints to abort benchmark flow safely - ensure compose teardown happens only in controlled finally blocks - fix intermittent 503 errors during OpenSearch _count caused by concurrent shutdown
- remove docker compose teardown from SIGINT handler to avoid interfering with active OpenSearch requests - introduce coordinated shutdown via _shutdown_requested flag - add shutdown checkpoints to abort benchmark flow safely - ensure compose teardown happens only in controlled finally blocks - fix intermittent 503 errors during OpenSearch _count caused by concurrent shutdown
…le shutdown semantics
b037231 to
99cd7ec
Compare
… (unsubscribe only needed for dynamic topic switching during runtime)
…with awaitable delivery futures
5e28118 to
6d8bb81
Compare
… and align with project naming conventions
…OverRide decorator for consistency
The rendered docs for this PR can be found here.