Skip to content

Conversation

@amokan
Copy link
Contributor

@amokan amokan commented Jan 13, 2026

Split from #3084

See #3090 before merging this

  • Adds supervision tree and worker for consolidated pipelines
  • Adds consolidated mode to the BufferProducer
  • Additional typespecs and tests (temporarily has to mock a consolidated backend adaptor until the next PR where the ClickHouse adapter is converted)

Note that this is based off the branch in #3083 - so it looks a bit larger than it really is.

end

@impl GenStage
def format_discarded(discarded, %{consolidated: true} = state) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can consider removing this callback fully since we are not inserting directly into the Producer process anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for a future PR.


case IngestEventQueue.pop_pending(key, n) do
{:error, :not_initialized} ->
Logger.warning(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Logger.warning(
Logger.error(

alias Logflare.Backends.Adaptor
alias Logflare.Backends.ConsolidatedSup

@default_interval 30_000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can consider moving the reconciliation logic to a stateless periodic job (hourly?) using quantum, since this 30s is high for the moduledoc goals (bullet point 2), as pt2 should happen very infrequently.
pt1 should already be handled on ingestion, a 30s wait would be too slow anyway.

def start_pipeline(%Backend{} = backend) do
adaptor_module = Adaptor.get_adaptor(backend)
child_spec = adaptor_module.child_spec(backend)
DynamicSupervisor.start_child(@dynamic_sup_name, child_spec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: will probably need to partition this in future to prevent blocking, since DynamicSupervisor can be a bottleneck.

)
end

@spec build_telemetry_metadata(state :: DynamicPipeline.state()) :: %{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine for now, we can make this module more agnostic (and also easier to open source in the future) by hoisting the telemetry metadata building and/or callback definition to the caller.

Comment on lines +841 to +844
Logger.warning("Failed to start consolidated pipeline",
backend_id: backend.id,
reason: inspect(reason)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Logger.warning("Failed to start consolidated pipeline",
backend_id: backend.id,
reason: inspect(reason)
)
Logger.warning("Failed to start consolidated pipeline: #{inspect(reason)}",
backend_id: backend.id
)

Comment on lines +869 to +877
if Adaptor.consolidated_ingest?(backend) do
case ConsolidatedSup.stop_pipeline(backend) do
:ok ->
Logger.info("Stopped consolidated pipeline", backend_id: backend.id)

{:error, :not_found} ->
:ok
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be done with a with/do

@Ziinc Ziinc merged commit 082d26f into main Jan 15, 2026
7 checks passed
@Ziinc Ziinc deleted the adammokan/anl-1285-supervision-tree-split branch January 15, 2026 12:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants