Skip to content

Processor is confused #903

@dachengx

Description

@dachengx

Describe the bug

The processors are confused when some data_types are saved but required to be processed again.

To Reproduce

The processors are confused by the following code:

import shutil
import straxen
from straxen.test_utils import nt_test_run_id


shutil.rmtree("./strax_test_data", ignore_errors=True)
st = straxen.test_utils.nt_test_context()
st.set_context_config({
    "allow_multiprocess": True,
    "timeout": 120,
})

st.make(
    nt_test_run_id,
    "peaklets",
    processor=processor,
)
st.make(
    nt_test_run_id,
    ("peaklets", "pulse_counts", "veto_regions"),
    processor=processor,
    allow_multiple=True,
)

assert st.is_stored(nt_test_run_id, "veto_regions")

The ThreadedMailboxProcessor and SingleThreadProcessor are both confused, so there are two problems.

If processor = "threaded_mailbox", you will see error:

2024-10-13 12:34:07,179 - utilix - DEBUG - Token exists at /home/xudc/.dbtoken
2024-10-13 12:34:07,180 - utilix - DEBUG - Token is valid.
/home/xudc/straxen/straxen/config/preprocessors.py:16: UserWarning: From straxen version 2.1.0 onward, URLConfig parameters will be sorted alphabetically before being passed to the plugins, this will change the lineage hash for non-sorted URLs. To load data processed with non-sorted URLs, you will need to use an older version.
  warnings.warn(
/home/xudc/straxen/straxen/plugins/records/records.py:467: RuntimeWarning: invalid value encountered in divide
  means = baseline_buffer / count
/home/xudc/straxen/straxen/plugins/records/records.py:470: RuntimeWarning: invalid value encountered in divide
  res["baseline_rms_mean"][:] = (baseline_rms_buffer / count)[:]
/home/xudc/strax/strax/processing/general.py:396: UserWarning: endtime of things is not sorted! touching_windows will return the indices of the first and last things which are touching the container.
  warnings.warn(
Multiple targets detected! This is only suitable for mass producing dataypes since only ['veto_regions'] will be subscribed in the mailbox system!
/home/xudc/straxen/straxen/plugins/records/records.py:467: RuntimeWarning: invalid value encountered in divide
  means = baseline_buffer / count
/home/xudc/straxen/straxen/plugins/records/records.py:470: RuntimeWarning: invalid value encountered in divide
  res["baseline_rms_mean"][:] = (baseline_rms_buffer / count)[:]
Exception in thread read_0:PulseProcessing_divide_outputs_mailbox:
Exception in thread discard_records:
Traceback (most recent call last):
Target Mailbox (veto_regions) killed, exception <class 'strax.mailbox.MailboxKilled'>, message (<class 'strax.mailbox.MailBoxAlreadyClosed'>, MailBoxAlreadyClosed("Can't send to closed pulse_counts_mailbox"), <traceback object at 0x7f9941ccd740>)
Exception in thread divide_outputs:veto_regions:
Traceback (most recent call last):
  File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 980, in _bootstrap_inner
  File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 980, in _bootstrap_inner
Traceback (most recent call last):
  File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 980, in _bootstrap_inner
    self.run()
    self.run()
  File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 917, in run
  File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 917, in run
    self.run()
    self._target(*self._args, **self._kwargs)
  File "/home/xudc/strax/strax/processors/threaded_mailbox.py", line 220, in discarder
  File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "/home/xudc/strax/strax/mailbox.py", line 519, in divide_outputs
    for _ in source:
  File "/home/xudc/strax/strax/mailbox.py", line 389, in _read
    source.throw(e)
  File "/home/xudc/strax/strax/mailbox.py", line 441, in _read
    self._target(*self._args, **self._kwargs)
    raise MailboxKilled(self.killed_because)
  File "/home/xudc/strax/strax/mailbox.py", line 293, in _send_from
strax.mailbox.MailboxKilled: (<class 'strax.mailbox.MailBoxAlreadyClosed'>, MailBoxAlreadyClosed("Can't send to closed pulse_counts_mailbox"), <traceback object at 0x7f9941ccd740>)
    self.kill_from_exception(e)
  File "/home/xudc/strax/strax/mailbox.py", line 203, in kill_from_exception
    self.close()
  File "/home/xudc/strax/strax/mailbox.py", line 357, in close
    raise e
  File "/home/xudc/strax/strax/mailbox.py", line 438, in _read
    self.send(StopIteration)
  File "/home/xudc/strax/strax/mailbox.py", line 310, in send
    yield res
  File "/home/xudc/strax/strax/mailbox.py", line 516, in divide_outputs
    raise MailboxKilled(self.killed_because)
strax.mailbox.MailboxKilled: (<class 'strax.mailbox.MailBoxAlreadyClosed'>, MailBoxAlreadyClosed("Can't send to closed pulse_counts_mailbox"), <traceback object at 0x7f9941ccd800>)
    mailboxes[d].send(x)
  File "/home/xudc/strax/strax/mailbox.py", line 307, in send
    raise MailBoxAlreadyClosed(f"Can't send to closed {self.name}")
strax.mailbox.MailBoxAlreadyClosed: Can't send to closed pulse_counts_mailbox
You specified _auto_append_rucio_local=True and you are not on dali compute nodes, so we will add the following rucio local path: /project/lgrandi/rucio/
convert_channel::	changed channel
convert_channel_like::	update area_per_channel
convert_channel_like::	update saturated_channel
Traceback (most recent call last):
  File "/home/xudc/t.py", line 21, in <module>
    st.make(
  File "/home/xudc/strax/strax/context.py", line 1755, in make
    for _ in self.get_iter(
  File "/home/xudc/strax/strax/context.py", line 1646, in get_iter
    generator.throw(e)
  File "/home/xudc/strax/strax/context.py", line 1613, in get_iter
    for n_chunks, result in enumerate(strax.continuity_check(generator), 1):
  File "/home/xudc/strax/strax/chunk.py", line 363, in continuity_check
    for chunk in chunk_iter:
  File "/home/xudc/strax/strax/processors/threaded_mailbox.py", line 304, in iter
    raise exc.with_traceback(traceback)
  File "/home/xudc/strax/strax/mailbox.py", line 519, in divide_outputs
    source.throw(e)
  File "/home/xudc/strax/strax/mailbox.py", line 441, in _read
    self.kill_from_exception(e)
  File "/home/xudc/strax/strax/mailbox.py", line 203, in kill_from_exception
    raise e
  File "/home/xudc/strax/strax/mailbox.py", line 438, in _read
    yield res
  File "/home/xudc/strax/strax/mailbox.py", line 516, in divide_outputs
    mailboxes[d].send(x)
  File "/home/xudc/strax/strax/mailbox.py", line 307, in send
    raise MailBoxAlreadyClosed(f"Can't send to closed {self.name}")
strax.mailbox.MailBoxAlreadyClosed: Can't send to closed pulse_counts_mailbox

If processor = "single_thread", you will see error:

2024-10-13 12:47:32,900 - utilix - DEBUG - Token exists at /home/xudc/.dbtoken
2024-10-13 12:47:32,901 - utilix - DEBUG - Token is valid.
/home/xudc/straxen/straxen/config/preprocessors.py:16: UserWarning: From straxen version 2.1.0 onward, URLConfig parameters will be sorted alphabetically before being passed to the plugins, this will change the lineage hash for non-sorted URLs. To load data processed with non-sorted URLs, you will need to use an older version.
  warnings.warn(
/home/xudc/straxen/straxen/plugins/records/records.py:467: RuntimeWarning: invalid value encountered in divide
  means = baseline_buffer / count
/home/xudc/straxen/straxen/plugins/records/records.py:470: RuntimeWarning: invalid value encountered in divide
  res["baseline_rms_mean"][:] = (baseline_rms_buffer / count)[:]
/home/xudc/strax/strax/processing/general.py:396: UserWarning: endtime of things is not sorted! touching_windows will return the indices of the first and last things which are touching the container.
  warnings.warn(
Multiple targets detected! This is only suitable for mass producing dataypes since only ['pulse_counts'] will be subscribed in the mailbox system!
You specified _auto_append_rucio_local=True and you are not on dali compute nodes, so we will add the following rucio local path: /project/lgrandi/rucio/
convert_channel::	changed channel
convert_channel_like::	update area_per_channel
convert_channel_like::	update saturated_channel
Traceback (most recent call last):
  File "/home/xudc/t.py", line 21, in <module>
    st.make(
  File "/home/xudc/strax/strax/context.py", line 1755, in make
    for _ in self.get_iter(
  File "/home/xudc/strax/strax/context.py", line 1594, in get_iter
    generator = processor(
  File "/home/xudc/strax/strax/processors/single_thread.py", line 38, in __init__
    self.post_office.register_producer(
  File "/home/xudc/strax/strax/processors/post_office.py", line 131, in register_producer
    self.register_producer(iterator, sub_topic)
  File "/home/xudc/strax/strax/processors/post_office.py", line 135, in register_producer
    raise RuntimeError(f"{topic} already has a producer")
RuntimeError: pulse_counts already has a producer

Expected behavior

No error happens and veto_regions is saved.

Screenshots
If applicable, add screenshots to help explain your problem.

Versions

strax dca3545
straxen 9d2a6b6111b0e43051f53d19fd394c6861465fdb

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workinghelp wantedExtra attention is needed

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions