AllenInstitute / npc_sessions

Tools for accessing and packaging data from behavior and epyhys sessions from the Mindscope Neuropixels team, in the cloud.
1 stars 1 forks source link

Fix fetching unit info for spike interface "analyzer" organization #135

Open bjhardcastle opened 1 day ago

bjhardcastle commented 1 day ago

Organization of data in sorted assets has changed yet again and broken our NWB packaging.

We currently only have one session that needs to be re-sorted for the paper 1 datacube, but we'll likely find others with missing probes after QC review.

The easiest fix would be to use an older version of the pipeline, which isn't something we can customize in the production pipeline, but should be possible by duplicating and editing the nextflow files.

For now, any sorted assets created after September 24th will now fail in npc_ephys, similar to this:

FileNotFoundError                         Traceback (most recent call last)
File /opt/conda/lib/python3.9/site-packages/npc_ephys/units.py:269, in make_units_table_from_spike_interface_ks25(session_or_spikeinterface_data_or_path, device_timing_on_sync, include_waveform_arrays)
    268 try:
--> 269     _ = future.result()
    270 except npc_ephys.spikeinterface.ProbeNotFoundError:

File /opt/conda/lib/python3.9/concurrent/futures/_base.py:439, in Future.result(self, timeout)
    438 elif self._state == FINISHED:
--> 439     return self.__get_result()
    441 self._condition.wait(timeout)

File /opt/conda/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    390 try:
--> 391     raise self._exception
    392 finally:
    393     # Break a reference cycle with the exception in self._exception

File /opt/conda/lib/python3.9/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:

File /opt/conda/lib/python3.9/site-packages/npc_ephys/units.py:163, in _device_helper(device_timing_on_sync, spike_interface_data, include_waveform_arrays)
    160 electrode_group_name = npc_session.ProbeRecord(
    161     device_timing_on_sync.device.name
    162 ).name
--> 163 spike_interface_data.electrode_locations_xy(electrode_group_name)
    165 df_device_metrics = spike_interface_data.quality_metrics_df(
    166     electrode_group_name
    167 ).merge(
   (...)
    170     right_index=True,
    171 )

File /opt/conda/lib/python3.9/site-packages/npc_ephys/spikeinterface.py:480, in SpikeInterfaceKS25Data.electrode_locations_xy(self, probe)
    477 @functools.cache
    478 def electrode_locations_xy(self, probe: str) -> npt.NDArray[np.floating]:
    479     return np.array(
--> 480         self.sorting_json(probe)["annotations"]["__sorting_info__"]["recording"][
    481             "properties"
    482         ]["location"]
    483     )

File /opt/conda/lib/python3.9/site-packages/npc_ephys/spikeinterface.py:441, in SpikeInterfaceKS25Data.sorting_json(self, probe)
    438 @functools.cache
    439 def sorting_json(self, probe: str) -> dict:
    440     return self.read_json(
--> 441         self.get_correct_path(self.postprocessed(probe), "sorting.json")
    442     )

File /opt/conda/lib/python3.9/site-packages/npc_ephys/spikeinterface.py:153, in SpikeInterfaceKS25Data.get_correct_path(*path_components)
    152     raise ValueError("Must provide at least one path component")
--> 153 path = npc_io.from_pathlike("/".join(str(path) for path in path_components))
    154 if not path.exists():

File /opt/conda/lib/python3.9/site-packages/npc_io/file_io.py:67, in from_pathlike(pathlike, **fsspec_storage_options)
     66 if result is None:
---> 67     raise FileNotFoundError(
     68         f"In attempting to handle a path containing '#', we couldn't find {path}"
     69     )
     70 new = result

FileNotFoundError: In attempting to handle a path containing '#', we couldn't find s3://codeocean-s3datasetsbucket-1u41qdg42ur9/b8e34ab4-10da-4c06-97d8-3b9d26820d0f/postprocessed/experiment1_Record Node 103#Neuropix-PXI-100.ProbeC-AP_recording1.zarr/sorting.json

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
Cell In[8], line 1
----> 1 s.units

File /opt/conda/lib/python3.9/site-packages/npc_io/cached_property.py:79, in cached_property.__get__(self, instance, owner)
     73 ## these lines are removed from the original cached_property ------
     74 # with self.lock:
     75 #     # check if another thread filled cache while we awaited lock
     76 #     val = cache.get(self.attrname, _NOT_FOUND)
     77 # -----------------------------------------------------------------
     78 if val is _NOT_FOUND:
---> 79     val = self.func(instance)
     80     try:
     81         cache[self.attrname] = val

File /opt/conda/lib/python3.9/site-packages/npc_sessions/sessions.py:1528, in DynamicRoutingSession.units(self)
   1520     raise AttributeError(f"{self.id} is not an ephys session")
   1521 units = pynwb.misc.Units(
   1522     name="units",
   1523     description="spike-sorted units from Kilosort 2.5",
   (...)
   1526     electrode_table=self.electrodes,
   1527 )
-> 1528 for column in self._units.columns:
   1529     if column in (
   1530         "spike_times",
   1531         "waveform_mean",
   1532         "waveform_sd",
   1533         "electrode_group",
   1534     ):
   1535         continue

File /opt/conda/lib/python3.9/site-packages/npc_io/cached_property.py:79, in cached_property.__get__(self, instance, owner)
     73 ## these lines are removed from the original cached_property ------
     74 # with self.lock:
     75 #     # check if another thread filled cache while we awaited lock
     76 #     val = cache.get(self.attrname, _NOT_FOUND)
     77 # -----------------------------------------------------------------
     78 if val is _NOT_FOUND:
---> 79     val = self.func(instance)
     80     try:
     81         cache[self.attrname] = val

File /opt/conda/lib/python3.9/site-packages/npc_sessions/sessions.py:1465, in DynamicRoutingSession._units(self)
   1462 if not self.is_sorted:
   1463     raise AttributeError(f"{self.id} hasn't been spike-sorted")
   1464 units = npc_ephys.add_global_unit_ids(
-> 1465     units=npc_ephys.make_units_table_from_spike_interface_ks25(
   1466         self.sorted_data,
   1467         self.ephys_timing_data,
   1468         include_waveform_arrays=self.is_waveforms,
   1469     ),
   1470     session=self.id,
   1471 )
   1472 # remove units from probes that weren't inserted
   1473 units = units[units["electrode_group_name"].isin(self.probes_inserted)]

File /opt/conda/lib/python3.9/site-packages/npc_ephys/units.py:280, in make_units_table_from_spike_interface_ks25(session_or_spikeinterface_data_or_path, device_timing_on_sync, include_waveform_arrays)
    278         except Exception as e:
    279             logger.error(f"{session}{' ' if session else ''}{device}")
--> 280             raise RuntimeError(
    281                 f"Error fetching units for {session} - see original exception above/below"
    282             ) from e
    284 return pd.concat(
    285     device_to_future[device].result() for device in sorted(device_to_future.keys())
    286 )

RuntimeError: Error fetching units for 644864_2023-02-02 - see original exception above/below
bjhardcastle commented 1 day ago

The smarter thing in the long run is to stop trying to interpret the output from spikeinterface in the sorted data asset.

To do that we'd need to implement timestamp adjustment prior to sorting. which can already be done with np_codoecean. To make everything consistent, we'd also need to update existing raw data assets, then re-run sorting.

A good time to do this would be when (if?) switching to ks4.

pros:

cons:

bjhardcastle commented 1 day ago

@arjunsridhar12345 @alejoe91 In the past we've had occasional duplication of probes (e.g. probe A recorded on both Record Node 101 and Record Node 102) - when we looked at the sorting pipeline's NWB file in the past, the units from both copies were pooled together, because they shared the same device attributes. Did that ever get fixed?

alejoe91 commented 1 day ago

Was it intended to record the same probe on multiple Record Nodes?

I think I could add a safeguard mechanism in case this happens. If we detect that two device names are the same, then we can prepend the record node, so that the group name will become something: RecordNode101#ProbeA

What do yout hink?

bjhardcastle commented 16 hours ago

Was it intended to record the same probe on multiple Record Nodes?

No, it only happens if we overlook something in the Open Ephys configuration.

If we detect that two device names are the same, then we can prepend the record node

I don't love the idea of introducing different naming conventions right at the end of the pipeline.. we'll inevitably look up ProbeA in the units table, find nothing, and infer that it wasn't sorted.

Ideally, IMO, there would be a check during dispatch or pre-processing to see if the same device was recorded on multiple record nodes with the same configuration, then only sort one of the copies. We have this as a filtering step before upload now, so it shouldn't happen again for our sessions - but we do have some existing sessions on S3 with this problem, and it can happen fairly easily to others.

alejoe91 commented 15 hours ago

That's a good idea. I'll update the job dispatch capsule to skip such duplicate cases.