Open CBroz1 opened 6 months ago
Related Issue: It is also the case that many of our IntervalList
inserts use replace=True
. This makes sense to update a conceptually linked, but non-fk-ref'd item, but relies on each pipeline to devise a naming convention for the interval list. This naming convention is obscured by being hidden in populate commands that might change over time, and offers no strong enforcement, placing data at risk of being replaced by an unrelated processing step elsewhere.
Proposal: Centralize naming conventions to common_interval.py
for easy comparison across pipelines.
Considerations: The pipeline
field could mitigate the risk presented above, but is rarely used and unenforced as a secondary key
The team discussed a partial solves that would involve respective pipelines checking related interval_list_name
s for overlapping times to reduce redundancy
To fully address this issue in the distant future, it would be helpful to add an IntervalListObject
class, separate from the table itself. This class would be responsible for validating an interval list and would permit operations on itself. There is currently some redundancy in the current functional approach with multiple functions that handle interval list intersection. A single class could absorb these responsibilities.
With this class, future pipelines could replace foreign key references to IntervalList
to a secondary attribute within the table that preserves the valid times within the table itself, removing problems related to orphaning (#948).
Under the status quo, inserts to IntervalList
use replace=True
. This assumes the pk being used is unique to the inserting table. Because naming conventions are decentralized, it is unlikely, but possible, that data integrity would be compromised if one table overwrote a row referenced by another. I would like to remove all uses of replace
in the codebase for this reason, limiting it to user-managed tables.
In #943, I introduce the possibility of reusing IntervalList
entries to reduce redundancy. This could lead to a situation in which ...
A1
, associated with it's entry A1_Key
A1
rather than generate a new entry as part of B2_Key
A1_Key
is deleted, leaving A1
interval behindA1
Given this collision, we can either
replace
A1
to reflect it's identity as B2 - impractical given the fk refs involved Hoping to collect input to influence how we move forward with #943. @samuelbray32, @edeno, @khl02007
Description
IntervalList
is a central table shared across many pipelines. It is sufficiently generic to suit the needs of any pipeline, but I would argue that the lack of pipeline-specific fields have overburdened the name as a place to store data. A lack of uniqueness in the numpy array stored has lead to redundant contents and slow fetch times.In our production database, this in the only core schema with a backup >1GB and it has been increasing in size. This may slow down day-to-day operations, and makes restoring a backup more cumbersome. Full backups of the database now exceed file size limits for common data transfer methods. Just isolating this tables as a candidate for data size reduction:
Qualitatively, most redundancy comes from registering a new interval list for each electrode for each session, when, even after processing methods, valid_times is unlikely to change within a probe
SubjectDate_InitialInterval_Electrode_ProcessingStep_valid
would be more searchable asSession * IntervalList & uuid_based_key
datajoint QueryExpression .valid_times
, new times are added.Script used to examine redundancy
```python from hashlib import sha256 import datajoint as dj import numpy as np from spyglass.common import IntervalList from spyglass.utils.dj_helper_fn import get_child_tables from spyglass.utils.logging import logger schema = dj.schema("cbroz_temp") @schema class IntervalListHashes(dj.Computed): definition = """ -> IntervalList valid_times_hash: char(64) # unique value for each set of valid_times """ @classmethod def make_hash(cls, valid_times): # low collision hash function return sha256( b"".join(arr.tobytes() for arr in valid_times) ).hexdigest() def make(self, key): valid_times = (IntervalList() & key).fetch1("valid_times") # Normalize different valid_times formats. if not isinstance(valid_times, list): valid_times = [valid_times] first_element = valid_times[0] if valid_times else None if isinstance(first_element, tuple): valid_times = [np.array(arr) for arr in valid_times] self.insert1(dict(key, valid_times_hash=self.make_hash(valid_times))) @schema class IntervalListHashCounts(dj.Computed): definition = """ valid_times_hash: char(64) --- count: int valid_times: longblob description = "" : varchar(64) """ class IntervalListEntry(dj.Part): definition = """ -> master -> IntervalList --- is_orphaned: bool """ # Lazy load properties. These are cached after first call. _orphans = None _non_orphans = None _orphan_status = None _key_source = None @property def key_source(self): """Determine populate candidates without foreign key.""" if self._key_source: return self._key_source # Populate functionally upstream table if len(IntervalList - IntervalListHashes) > 10: IntervalListHashes.populate(display_progress=True) if len(IntervalList - IntervalListHashes) > 10: raise ValueError("IntervalListHashes table is not fully populated.") # In-memory set difference. Only return hash bc primary key above. logger.debug("Fetching HashCount key source") self._key_source = ( dj.U("valid_times_hash").aggr( IntervalListHashes - self, count="COUNT(*)" ) & "count > 1" # only populate repeated valid_times ) return self._key_source @property def part_format(self): """Empty in-memory table used to assemble orphan status.""" return dj.U( "nwb_file_name", "interval_list_name", "valid_times_hash", "is_orphaned", ) @property def orphans(self): if self._orphans: return self._orphans logger.debug("Fetching orphans") # Similar to `cleanup` logic. Ignores tables here under `cbroz` interval_list_valid_children = [ t for t in get_child_tables(IntervalList) if "cbroz" not in t.full_table_name ] self._orphans = self.part_format & ( IntervalListHashes - interval_list_valid_children ).proj(is_orphaned="1") return self._orphans @property def non_orphans(self): if self._non_orphans: return self._non_orphans logger.debug("Fetching non-orphans") self._non_orphans = self.part_format & ( IntervalListHashes - self.orphans # All others are non-orphans ).proj(is_orphaned="0") return self._non_orphans @property def orphan_status(self): if not self._orphan_status: # Lazy load set union self._orphan_status = self.orphans + self.non_orphans return self._orphan_status def is_orphaned(self, key) -> bool: """Check orphan status of a key. Used in list comprehension method.""" return True if key in self.orphans else False def get_time_example(self, full_key): """Fetch a single example of valid_times for a given hash. Times will be the same across all entries with the same hash. """ logger.debug("Fetching time example") return (IntervalList * full_key).fetch("valid_times", limit=1)[0] def make(self, key): # Caching key_source is faster, but possible collisions if # IntervalList is getting new entries. Counts may be out of date. if key in self: logger.debug("Key already exists") return full_key = IntervalListHashes & key full_key_len = len(full_key) logger.debug("Assembling master key") master_key = dict( valid_times_hash=key["valid_times_hash"], count=full_key_len, valid_times=self.get_time_example(full_key), ) # Join method is faster for large numbers # comprehension is faster for small numbers. Arbitrary cutoff at 20. if full_key_len > 20: logger.debug("Assembling part keys, join method") part_keys = (self.orphan_status * full_key).fetch(as_dict=True) else: logger.debug("Assembling part keys, comprehension method") part_keys = [ {**entry, "is_orphaned": self.is_orphaned(entry)} for entry in full_key ] logger.debug(f"Inserting {len(part_keys)} keys") self.insert1(master_key) self.IntervalListEntry.insert(part_keys) def summary(): hash_counts = dj.U("valid_times_hash").aggr( IntervalListHashes, count="COUNT(*)" ) n_entries = len(IntervalList()) n_unique = len(hash_counts & "count=1") unique_percent = n_unique / n_entries * 100 n_repeated = len(hash_counts & "count>1") repeated_percent = n_repeated / n_entries * 100 n_repeat_10 = len(hash_counts & "count>10") repeat_10_percent = n_repeat_10 / n_entries * 100 n_orphans = len(IntervalListHashCounts().orphans) orphan_percent = n_orphans / n_entries * 100 possible_reduction = 100 - len(hash_counts) / n_entries * 100 print(f"Interval lists: {n_entries}") print(f"... with unique valid_times: {n_unique}, {unique_percent:.4f}%") print( f"... sets of repeated valid_times: {n_repeated}, " + f"{repeated_percent:.4f}%" ) print( f"... sets with >10 repeated valid_times: {n_repeat_10}, " + f"{repeat_10_percent:.4f}%" ) print(f"... with no downstream: {n_orphans}, " + f"{orphan_percent:.4f}%") print( f"... size reduction if no repeated valid_times: " + f"{possible_reduction:.4f}%" ) return hash_counts ```Alternatives
New methods, future breaking changes:
IntervalList
could be a table pattern, like params tables, that outlines a solve and provides a template to be adapted for use in individual pipelines, which will use a set of primary keys to avoid redundant entries.Adjusting the current model is trickier. Any departure is going to make interval list names less informative.