Open jkbhagatio opened 10 months ago
Additional info from #327
Contains duplicate keys, e.g. see Z:\aeon\data\raw\AEON4\social0.2\2024-01-31T10-14-14\Environment\Environment_SubjectVisits_2024-01-31T10-00-00.csv
and load
throws the following error.
Interestingly, the error only occurs on data saved in aeon4, not in aeon3.
Also interestingly, this works:
aeon.load(
"/ceph/aeon/aeon/data/raw/AEON4/social0.2",
social02.Environment.SubjectVisits,
pd.Timestamp("2024-01-31 10:00:00"),
pd.Timestamp("2024-02-05 13:00:00"),
)
but moving the end timestamp up by 1 hour:
aeon.load(
"/ceph/aeon/aeon/data/raw/AEON4/social0.2",
social02.Environment.SubjectVisits,
pd.Timestamp("2024-01-31 10:00:00"),
pd.Timestamp("2024-02-05 14:00:00"),
)
throws this error:
KeyError Traceback (most recent call last) Cell In[14], line 3 1 """Environment info.""" ----> 3 aeon.load(block.root, social02.Environment.SubjectVisits, pd.Timestamp("2024-01-31 10:00:00"), exp_end)
File ~/ProjectAeon/aeon_mecha/aeon/io/api.py:151, in load(root, reader, start, end, time, tolerance, epoch) 149 warnings.warn(f"data index for {reader.pattern} contains duplicate keys!") 150 data = data[~data.index.duplicated(keep="first")] --> 151 return data.loc[start:end] 152 return data
File ~/mambaforge/envs/aeon/lib/python3.11/site-packages/pandas/core/indexing.py:1103, in _LocationIndexer.getitem(self, key) 1100 axis = self.axis or 0 1102 maybe_callable = com.apply_if_callable(key, self.obj) -> 1103 return self._getitem_axis(maybe_callable, axis=axis)
File ~/mambaforge/envs/aeon/lib/python3.11/site-packages/pandas/core/indexing.py:1323, in _LocIndexer._getitem_axis(self, key, axis) 1321 if isinstance(key, slice): 1322 self._validate_key(key, axis) -> 1323 return self._get_slice_axis(key, axis=axis) 1324 elif com.is_bool_indexer(key): 1325 return self._getbool_axis(key, axis=axis)
File ~/mambaforge/envs/aeon/lib/python3.11/site-packages/pandas/core/indexing.py:1355, in _LocIndexer._get_slice_axis(self, slice_obj, axis) 1352 return obj.copy(deep=False) 1354 labels = obj._get_axis(axis) -> 1355 indexer = labels.slice_indexer(slice_obj.start, slice_obj.stop, slice_obj.step) 1357 if isinstance(indexer, slice): 1358 return self.obj._slice(indexer, axis=axis)
File ~/mambaforge/envs/aeon/lib/python3.11/site-packages/pandas/core/indexes/datetimes.py:636, in DatetimeIndex.slice_indexer(self, start, end, step) 628 # GH#33146 if start and end are combinations of str and None and Index is not 629 # monotonic, we can not use Index.slice_indexer because it does not honor the 630 # actual elements, is only searching for start and end 631 if ( 632 check_str_or_none(start) 633 or check_str_or_none(end) 634 or self.is_monotonic_increasing 635 ): --> 636 return Index.slice_indexer(self, start, end, step) 638 mask = np.array(True) 639 raise_mask = np.array(True)
File ~/mambaforge/envs/aeon/lib/python3.11/site-packages/pandas/core/indexes/base.py:6344, in Index.slice_indexer(self, start, end, step) 6300 def slice_indexer( 6301 self, 6302 start: Hashable | None = None, 6303 end: Hashable | None = None, 6304 step: int | None = None, 6305 ) -> slice: 6306 """ 6307 Compute the slice indexer for input labels and step. 6308 (...) 6342 slice(1, 3, None) 6343 """ -> 6344 start_slice, end_slice = self.slice_locs(start, end, step=step) 6346 # return a slice 6347 if not is_scalar(start_slice):
File ~/mambaforge/envs/aeon/lib/python3.11/site-packages/pandas/core/indexes/base.py:6537, in Index.slice_locs(self, start, end, step) 6535 start_slice = None 6536 if start is not None: -> 6537 start_slice = self.get_slice_bound(start, "left") 6538 if start_slice is None: 6539 start_slice = 0
File ~/mambaforge/envs/aeon/lib/python3.11/site-packages/pandas/core/indexes/base.py:6462, in Index.get_slice_bound(self, label, side) 6459 return self._searchsorted_monotonic(label, side) 6460 except ValueError: 6461 # raise the original KeyError -> 6462 raise err 6464 if isinstance(slc, np.ndarray): 6465 # get_loc may return a boolean array, which 6466 # is OK as long as they are representable by a slice. 6467 assert is_bool_dtype(slc.dtype)
File ~/mambaforge/envs/aeon/lib/python3.11/site-packages/pandas/core/indexes/base.py:6456, in Index.get_slice_bound(self, label, side) 6454 # we need to look up the label 6455 try: -> 6456 slc = self.get_loc(label) 6457 except KeyError as err: 6458 try:
File ~/mambaforge/envs/aeon/lib/python3.11/site-packages/pandas/core/indexes/datetimes.py:586, in DatetimeIndex.get_loc(self, key) 584 return Index.get_loc(self, key) 585 except KeyError as err: --> 586 raise KeyError(orig_key) from err
KeyError: Timestamp('2024-01-31 10:00:00')
Additional info noticed by @ttngu207
"We've also encountered this type of KeyError with other reader throughout the different parts of ingestion. Probably the same root cause"
e.g. with rfid events
Also interestingly, this works:
aeon.load( "/ceph/aeon/aeon/data/raw/AEON4/social0.2", social02.Environment.SubjectVisits, pd.Timestamp("2024-01-31 10:00:00"), pd.Timestamp("2024-02-05 13:00:00"), )
but moving the end timestamp up by 1 hour:
aeon.load( "/ceph/aeon/aeon/data/raw/AEON4/social0.2", social02.Environment.SubjectVisits, pd.Timestamp("2024-01-31 10:00:00"), pd.Timestamp("2024-02-05 14:00:00"), )
throws this error: ...
KeyError: Timestamp('2024-01-31 10:00:00')
This means the non-monotonic indices are in between 13:00 and 14:00. I think all of these issues stem from non-monotonicity and could perhaps be solved with:
if start is not None or end is not None:
try:
return data.loc[start:end]
except KeyError:
if not data.index.is_monotonic_increasing:
warnings.warn(f"data index for {reader.pattern} contains out-of-order timestamps!")
data = data.sort_index()
return data.loc[start:end]
@lochhh and I have noticed that the current fix (commit 8071a40) can cause api.load to drop the final row. For example, the table retreived by aeon_api.load(root, social02.Environment.EnvironmentState,pd.Timestamp("2024-01-31 11:28:45.543519974"), pd.Timestamp("2024-02-03 16:28:29.139999866") )
should be this:
But this is what is returned instead:
A few more instances of this error
raw_data_dir = "/ceph/aeon/aeon/data/raw/AEON3/social0.1"
chunk_start = datetime.datetime(2023, 12, 1, 14, 0)
chunk_end = datetime.datetime(2023, 12, 1, 15, 0)
stream_reader = aeon_schemas.social01.RfidNest2.RfidEvents
raw_data_dir = "/ceph/aeon/aeon/data/raw/AEON3/social0.1"
chunk_start = datetime.datetime(2023, 12, 2, 14, 0)
chunk_end = datetime.datetime(2023, 12, 2, 15, 0)
stream_reader = aeon_schemas.social01.RfidPatch2.RfidEvents
You can load with
stream_data = io_api.load(
root=raw_data_dir.as_posix(),
reader=stream_reader,
start=pd.Timestamp(chunk_start),
end=pd.Timestamp(chunk_end),
)
And you should see the KeyError
As an update to this, @JaerongA has provided a csv of chunks where this occurs on Aeon3 (additional cases have occured on Aeon4)
Unfortunately, this is not always limited to the first chunk in an epoch, though that is most often where this error occurs.
An update on this issue:
The issues in the SubjectVisits streams seem to be due to a logic bug in the Bonsai workflow: "basically it looks like somehow when the new subject entered the exit arena state got triggered using the exact same timestamp of the previous test animal exit...this also explains why it's mostly happening around start of epochs, since in this case it seems to be connected with entering and exit of test animals"
The issues in the WheelEncoder streams seem to occur on the heartbeat, with occasionally the first timestamp after the heartbeat going back in time. This fix should be done on the Harp side ideally, but for now we can do a sort on the indices for this stream.
The issues reported for RFID and PatchState streams I haven't been able to reproduce, and am still looking into this.
I've asked Datajoint to share logs of all instances where this KeyError in load
occurs in both arenas for social0.2
A fundamental issue here seems to be that we often have what is really a multi-index data frame. All rows with duplicate timestamps actually have a secondary (or tertiary) index which discriminates the rows, e.g. animal ID, body part ID.
A possible solution might be to just make sure we make this explicit by returning a MultiIndex dataframe, and determine how to properly index it, e.g. see https://stackoverflow.com/questions/66588217/how-to-slice-into-a-multiindex-pandas-dataframe
Related to #294
Example for how to create and manipulate a multi-index frame:
Simple example data-frame with duplicate "timestamps":
df = pd.DataFrame([[0, 32, 24], [1,33,45], [0,32,25], [1, 42, 60]], index=[23,23,24, 24], columns=['id','x','y'])
This will return the following dataframe:
id x y
23 0 32 24
23 1 33 45
24 0 32 25
24 1 42 60
The idea here is that the key is some timestamp in seconds, and all duplicate timestamps include a column acting as a secondary key, in this case id
. Assuming no duplicate pairs, we can then assemble the multi-index easily like so:
mi = pd.MultiIndex.from_tuples(zip(df.index, df.id))
Assigning this multi-index to the dataframe (and dropping the now-redundant id
column) results in the following dataframe:
x y
23 0 32 24
1 33 45
24 0 32 25
1 42 60
Given the above dataframe, the below should all be valid queries over the multi-index frame:
df.loc[23]
df.loc[23:24]
df.reindex([(23, 0), (23, 1)], method='pad')
In this case we need to be explicit and for each timestamp create a tuple that reindexes that time for all secondary-keys of the multi-index. This could potentially be automated with a similar strategy to the above zip
.
This is unfortunately where vanilla pandas first falls short:
df.reindex([(23, 0), (23, 1)], method='pad', tolerance=1)
outputs:
NotImplementedError: tolerance not implemented yet for MultiIndex
Sadly the latest version of pandas still doesn't support this out-of-the-box, so even though it looks quite doable to export everything to multi-index, it wouldn't solve the ultimate purpose of flexibly extracting data from streams close to events from another stream.
The limit tolerance is important so we don't pick up random far-away events simply because there is no data. As a glimmer of hope, though, the below works, and would work probably for all periodic streams:
df.reindex([(23.1, 0), (23.1, 1)], method='pad')
For other streams we would need to be careful and keep in mind this limitation.
The multi-index solution will work provided the IDs are unique per timestamp. If, for instance, SLEAP outputs two instances of animal 0 at time 23, we once again end up with ValueError: cannot handle a non-unique multi-index!
:
x y
23 0 32 24
0 33 45
24 0 32 25
1 42 60
An example from social0.2 AEON3
Another possible solution is to "collapse" these duplicate indices into a single row, to get unique indices for reindexing, and then undo the "collapse" action using explode
:
df.groupby(df.index).agg(list).reindex(rfid.index, method="pad").explode(list(df.columns))
This is a good point. If we are building the multi-index with zip anyway we can also easily index an extra optional "sequence number" index for frames with duplicate entries to make it more efficient.
@glopesdev @lochhh do you remember the status of this?
https://github.com/SainsburyWellcomeCentre/aeon_mecha/blob/main/aeon/io/api.py#L140
here, there may not be data corresponding to the 'start' or 'end' index, due to these not aligning with a given chunk.
e.g. imagine you are calling
but the acquisition epoch started after 14:00:00 but before 15:00:00 - in this case there would be no index in the data corresponding to
start