AxFoundation / strax

Stream analysis for xenon TPCs
BSD 3-Clause "New" or "Revised" License
28 stars 38 forks source link

Processor is confused #903

Open dachengx opened 1 week ago

dachengx commented 1 week ago

Describe the bug

The processors are confused when some data_types are saved but required to be processed again.

To Reproduce

The processors are confused by the following code:

import shutil
import straxen
from straxen.test_utils import nt_test_run_id

shutil.rmtree("./strax_test_data", ignore_errors=True)
st = straxen.test_utils.nt_test_context()
st.set_context_config({
    "allow_multiprocess": True,
    "timeout": 120,
})

st.make(
    nt_test_run_id,
    "peaklets",
    processor=processor,
)
st.make(
    nt_test_run_id,
    ("peaklets", "pulse_counts", "veto_regions"),
    processor=processor,
    allow_multiple=True,
)

assert st.is_stored(nt_test_run_id, "veto_regions")

The ThreadedMailboxProcessor and SingleThreadProcessor are both confused, so there are two problems.

If processor = "threaded_mailbox", you will see error:

2024-10-13 12:34:07,179 - utilix - DEBUG - Token exists at /home/xudc/.dbtoken
2024-10-13 12:34:07,180 - utilix - DEBUG - Token is valid.
/home/xudc/straxen/straxen/config/preprocessors.py:16: UserWarning: From straxen version 2.1.0 onward, URLConfig parameters will be sorted alphabetically before being passed to the plugins, this will change the lineage hash for non-sorted URLs. To load data processed with non-sorted URLs, you will need to use an older version.
  warnings.warn(
/home/xudc/straxen/straxen/plugins/records/records.py:467: RuntimeWarning: invalid value encountered in divide
  means = baseline_buffer / count
/home/xudc/straxen/straxen/plugins/records/records.py:470: RuntimeWarning: invalid value encountered in divide
  res["baseline_rms_mean"][:] = (baseline_rms_buffer / count)[:]
/home/xudc/strax/strax/processing/general.py:396: UserWarning: endtime of things is not sorted! touching_windows will return the indices of the first and last things which are touching the container.
  warnings.warn(
Multiple targets detected! This is only suitable for mass producing dataypes since only ['veto_regions'] will be subscribed in the mailbox system!
/home/xudc/straxen/straxen/plugins/records/records.py:467: RuntimeWarning: invalid value encountered in divide
  means = baseline_buffer / count
/home/xudc/straxen/straxen/plugins/records/records.py:470: RuntimeWarning: invalid value encountered in divide
  res["baseline_rms_mean"][:] = (baseline_rms_buffer / count)[:]
Exception in thread read_0:PulseProcessing_divide_outputs_mailbox:
Exception in thread discard_records:
Traceback (most recent call last):
Target Mailbox (veto_regions) killed, exception <class 'strax.mailbox.MailboxKilled'>, message (<class 'strax.mailbox.MailBoxAlreadyClosed'>, MailBoxAlreadyClosed("Can't send to closed pulse_counts_mailbox"), <traceback object at 0x7f9941ccd740>)
Exception in thread divide_outputs:veto_regions:
Traceback (most recent call last):
  File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 980, in _bootstrap_inner
  File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 980, in _bootstrap_inner
Traceback (most recent call last):
  File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 980, in _bootstrap_inner
    self.run()
    self.run()
  File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 917, in run
  File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 917, in run
    self.run()
    self._target(*self._args, **self._kwargs)
  File "/home/xudc/strax/strax/processors/threaded_mailbox.py", line 220, in discarder
  File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "/home/xudc/strax/strax/mailbox.py", line 519, in divide_outputs
    for _ in source:
  File "/home/xudc/strax/strax/mailbox.py", line 389, in _read
    source.throw(e)
  File "/home/xudc/strax/strax/mailbox.py", line 441, in _read
    self._target(*self._args, **self._kwargs)
    raise MailboxKilled(self.killed_because)
  File "/home/xudc/strax/strax/mailbox.py", line 293, in _send_from
strax.mailbox.MailboxKilled: (<class 'strax.mailbox.MailBoxAlreadyClosed'>, MailBoxAlreadyClosed("Can't send to closed pulse_counts_mailbox"), <traceback object at 0x7f9941ccd740>)
    self.kill_from_exception(e)
  File "/home/xudc/strax/strax/mailbox.py", line 203, in kill_from_exception
    self.close()
  File "/home/xudc/strax/strax/mailbox.py", line 357, in close
    raise e
  File "/home/xudc/strax/strax/mailbox.py", line 438, in _read
    self.send(StopIteration)
  File "/home/xudc/strax/strax/mailbox.py", line 310, in send
    yield res
  File "/home/xudc/strax/strax/mailbox.py", line 516, in divide_outputs
    raise MailboxKilled(self.killed_because)
strax.mailbox.MailboxKilled: (<class 'strax.mailbox.MailBoxAlreadyClosed'>, MailBoxAlreadyClosed("Can't send to closed pulse_counts_mailbox"), <traceback object at 0x7f9941ccd800>)
    mailboxes[d].send(x)
  File "/home/xudc/strax/strax/mailbox.py", line 307, in send
    raise MailBoxAlreadyClosed(f"Can't send to closed {self.name}")
strax.mailbox.MailBoxAlreadyClosed: Can't send to closed pulse_counts_mailbox
You specified _auto_append_rucio_local=True and you are not on dali compute nodes, so we will add the following rucio local path: /project/lgrandi/rucio/
convert_channel::   changed channel
convert_channel_like::  update area_per_channel
convert_channel_like::  update saturated_channel
Traceback (most recent call last):
  File "/home/xudc/t.py", line 21, in <module>
    st.make(
  File "/home/xudc/strax/strax/context.py", line 1755, in make
    for _ in self.get_iter(
  File "/home/xudc/strax/strax/context.py", line 1646, in get_iter
    generator.throw(e)
  File "/home/xudc/strax/strax/context.py", line 1613, in get_iter
    for n_chunks, result in enumerate(strax.continuity_check(generator), 1):
  File "/home/xudc/strax/strax/chunk.py", line 363, in continuity_check
    for chunk in chunk_iter:
  File "/home/xudc/strax/strax/processors/threaded_mailbox.py", line 304, in iter
    raise exc.with_traceback(traceback)
  File "/home/xudc/strax/strax/mailbox.py", line 519, in divide_outputs
    source.throw(e)
  File "/home/xudc/strax/strax/mailbox.py", line 441, in _read
    self.kill_from_exception(e)
  File "/home/xudc/strax/strax/mailbox.py", line 203, in kill_from_exception
    raise e
  File "/home/xudc/strax/strax/mailbox.py", line 438, in _read
    yield res
  File "/home/xudc/strax/strax/mailbox.py", line 516, in divide_outputs
    mailboxes[d].send(x)
  File "/home/xudc/strax/strax/mailbox.py", line 307, in send
    raise MailBoxAlreadyClosed(f"Can't send to closed {self.name}")
strax.mailbox.MailBoxAlreadyClosed: Can't send to closed pulse_counts_mailbox

If processor = "single_thread", you will see error:

2024-10-13 12:47:32,900 - utilix - DEBUG - Token exists at /home/xudc/.dbtoken
2024-10-13 12:47:32,901 - utilix - DEBUG - Token is valid.
/home/xudc/straxen/straxen/config/preprocessors.py:16: UserWarning: From straxen version 2.1.0 onward, URLConfig parameters will be sorted alphabetically before being passed to the plugins, this will change the lineage hash for non-sorted URLs. To load data processed with non-sorted URLs, you will need to use an older version.
  warnings.warn(
/home/xudc/straxen/straxen/plugins/records/records.py:467: RuntimeWarning: invalid value encountered in divide
  means = baseline_buffer / count
/home/xudc/straxen/straxen/plugins/records/records.py:470: RuntimeWarning: invalid value encountered in divide
  res["baseline_rms_mean"][:] = (baseline_rms_buffer / count)[:]
/home/xudc/strax/strax/processing/general.py:396: UserWarning: endtime of things is not sorted! touching_windows will return the indices of the first and last things which are touching the container.
  warnings.warn(
Multiple targets detected! This is only suitable for mass producing dataypes since only ['pulse_counts'] will be subscribed in the mailbox system!
You specified _auto_append_rucio_local=True and you are not on dali compute nodes, so we will add the following rucio local path: /project/lgrandi/rucio/
convert_channel::   changed channel
convert_channel_like::  update area_per_channel
convert_channel_like::  update saturated_channel
Traceback (most recent call last):
  File "/home/xudc/t.py", line 21, in <module>
    st.make(
  File "/home/xudc/strax/strax/context.py", line 1755, in make
    for _ in self.get_iter(
  File "/home/xudc/strax/strax/context.py", line 1594, in get_iter
    generator = processor(
  File "/home/xudc/strax/strax/processors/single_thread.py", line 38, in __init__
    self.post_office.register_producer(
  File "/home/xudc/strax/strax/processors/post_office.py", line 131, in register_producer
    self.register_producer(iterator, sub_topic)
  File "/home/xudc/strax/strax/processors/post_office.py", line 135, in register_producer
    raise RuntimeError(f"{topic} already has a producer")
RuntimeError: pulse_counts already has a producer

Expected behavior

No error happens and veto_regions is saved.

Screenshots If applicable, add screenshots to help explain your problem.

Versions

strax dca35457bea07b278e331b07fefb98cea842e60a straxen 9d2a6b6111b0e43051f53d19fd394c6861465fdb

dachengx commented 1 week ago

https://github.com/AxFoundation/strax/pull/901 until 560cf9bee2ac1030ff5463b23e746768ed1585fe is trying to solve the error when "single_thread". It can solve that, but after that commit, another error RANDOMLY occurs:

2024-10-13 13:00:05,088 - utilix - DEBUG - Token exists at /home/xudc/.dbtoken
2024-10-13 13:00:05,088 - utilix - DEBUG - Token is valid.
/home/xudc/straxen/straxen/config/preprocessors.py:16: UserWarning: From straxen version 2.1.0 onward, URLConfig parameters will be sorted alphabetically before being passed to the plugins, this will change the lineage hash for non-sorted URLs. To load data processed with non-sorted URLs, you will need to use an older version.
  warnings.warn(
/home/xudc/straxen/straxen/plugins/records/records.py:467: RuntimeWarning: invalid value encountered in divide
  means = baseline_buffer / count
/home/xudc/straxen/straxen/plugins/records/records.py:470: RuntimeWarning: invalid value encountered in divide
  res["baseline_rms_mean"][:] = (baseline_rms_buffer / count)[:]
/home/xudc/strax/strax/processing/general.py:396: UserWarning: endtime of things is not sorted! touching_windows will return the indices of the first and last things which are touching the container.
  warnings.warn(
Multiple targets detected! This is only suitable for mass producing dataypes since only ['pulse_counts'] will be subscribed in the mailbox system!
You specified _auto_append_rucio_local=True and you are not on dali compute nodes, so we will add the following rucio local path: /project/lgrandi/rucio/
convert_channel::   changed channel
convert_channel_like::  update area_per_channel
convert_channel_like::  update saturated_channel
convert_channel_like::  update pulse_count
convert_channel_like::  update lone_pulse_count
convert_channel_like::  update pulse_area
convert_channel_like::  update lone_pulse_area
convert_channel_like::  update baseline_mean
convert_channel_like::  update baseline_rms_mean
Traceback (most recent call last):
  File "/home/xudc/t.py", line 28, in <module>
    assert st.is_stored(nt_test_run_id, "veto_regions")
AssertionError

This is because targets is not deterministic after https://github.com/AxFoundation/strax/blob/dca35457bea07b278e331b07fefb98cea842e60a/strax/context.py#L1552 because set is not sorted. Sometimes targets will be ('veto_regions', 'pulse_counts', 'peaklets') and sometimes will be ('peaklets', 'pulse_counts', 'veto_regions'), etc.

When targets is ('peaklets', 'pulse_counts', 'veto_regions'), no error shows. Because in this case veto_region will be the final_plugin in https://github.com/AxFoundation/strax/blob/dca35457bea07b278e331b07fefb98cea842e60a/strax/context.py#L1313. Maybe there are other cases of no errors.

But ('pulse_counts', 'veto_regions', 'peaklets') will definitely show an error.

This is done in the 2nd part of #901 after 560cf9bee2ac1030ff5463b23e746768ed1585fe.