superduper-io / superduper

Superduper: Transform your database into your AI platform! Integrate AI models and machine learning workflows with your database to implement custom AI applications, without moving your data. Including streaming inference, scalable model hosting, training and vector search.
https://superduper.io
Apache License 2.0
4.62k stars 449 forks source link

[BUG]: can't pickle module object in deepcopy() #1910

Open makkarss929 opened 5 months ago

makkarss929 commented 5 months ago

Contact Details [Optional]

makkarss929@gmail.com

System Information

system : mac m1 pro 14 inch db : postgres

What happened?

While creating vector index

# Import the VectorIndex class from the superduperdb module

# Add a VectorIndex to the SuperDuperDB database with the specified identifier and indexing listener
_ = db.add(
    VectorIndex(
        identifier='my-index',        # Unique identifier for the VectorIndex
        indexing_listener=listener,    # Listener to be used for indexing documents
        measure='cosine'
    )
)

Steps to reproduce

 2024-Mar-28 13:45:20.87| INFO     | Taruns-Laptop.local| superduperdb.components.component:377  | Initializing DataType : dill
 2024-Mar-28 13:45:20.87| INFO     | Taruns-Laptop.local| superduperdb.components.component:380  | Initialized  DataType : dill successfully
 2024-Mar-28 13:45:26.53| INFO     | Taruns-Laptop.local| superduperdb.components.component:377  | Initializing DataType : dill
 2024-Mar-28 13:45:26.53| INFO     | Taruns-Laptop.local| superduperdb.components.component:380  | Initialized  DataType : dill successfully
/Users/tarun/Desktop/superduperDB/superduperdb/superduperdb/backends/ibis/data_backend.py:83: UserWarning: Table already exists, skipping...
  warn("Table already exists, skipping...")
{'_input_id': FieldType(identifier='String'), 'output': DataType(identifier='vector[1024]', encoder=None, decoder=None, info=None, shape=(1024,), directory=None, encodable='native', bytes_encoding=<BytesEncoding.BYTES: 'Bytes'>)}
/Users/tarun/Desktop/superduperDB/superduperdb/superduperdb/backends/ibis/data_backend.py:83: UserWarning: Table already exists, skipping...
  warn("Table already exists, skipping...")
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[22], line 4
      1 # Import the VectorIndex class from the superduperdb module
      2 
      3 # Add a VectorIndex to the SuperDuperDB database with the specified identifier and indexing listener
----> 4 _ = db.add(
      5     VectorIndex(
      6         identifier='my-index',        # Unique identifier for the VectorIndex
      7         indexing_listener=listener,    # Listener to be used for indexing documents
      8         measure='cosine'
      9     )
     10 )

File ~/Desktop/superduperDB/superduperdb/superduperdb/base/datalayer.py:481, in Datalayer.add(self, object, dependencies)
    473     return type(object)(
    474         self._add(
    475             object=component,
   (...)
    478         for component in object
    479     )
    480 elif isinstance(object, Component):
--> 481     return self._add(object=object, dependencies=dependencies), object
    482 else:
    483     return self._add(superduper(object)), object

File ~/Desktop/superduperDB/superduperdb/superduperdb/base/datalayer.py:849, in Datalayer._add(self, object, dependencies, parent)
    847 artifacts = [leaf for leaf in leaves if isinstance(leaf, _BaseEncodable)]
    848 children = [leaf for leaf in leaves if isinstance(leaf, Component)]
--> 849 jobs.extend(self._add_child_components(children, parent=object))
    851 # need to do this again to get the versions of the children
    852 object.set_variables(self)

File ~/Desktop/superduperDB/superduperdb/superduperdb/base/datalayer.py:809, in Datalayer._add_child_components(self, components, parent)
    805     component = lookup[n]
    806     dependencies = sum(
    807         [jobs.get(d[:2], []) for d in component.dependencies], []
    808     )
--> 809     tmp = self._add(
    810         component, parent=parent.unique_id, dependencies=dependencies
    811     )
    812     jobs[n] = tmp
    814 return sum(list(jobs.values()), [])

File ~/Desktop/superduperDB/superduperdb/superduperdb/base/datalayer.py:864, in Datalayer._add(self, object, dependencies, parent)
    862 object.post_create(self)
    863 self._add_component_to_cache(object)
--> 864 these_jobs = object.schedule_jobs(self, dependencies=dependencies)
    865 jobs.extend(these_jobs)
    866 return jobs

File ~/Desktop/superduperDB/superduperdb/superduperdb/components/listener.py:181, in Listener.schedule_jobs(self, db, dependencies, overwrite)
    173     return []
    174 assert not isinstance(self.model, str)
    176 out = [
    177     self.model.predict_in_db_job(
    178         X=self.key,
    179         db=db,
    180         predict_id=f'{self.identifier}::{self.version}',
--> 181         select=self.select.copy(),
    182         dependencies=dependencies,
    183         overwrite=overwrite,
    184     )
    185 ]
    186 return out

File ~/Desktop/superduperDB/superduperdb/superduperdb/base/serializable.py:156, in Serializable.copy(self)
    155 def copy(self):
--> 156     return deepcopy(self)

File ~/miniconda3/lib/python3.11/copy.py:172, in deepcopy(x, memo, _nil)
    170                 y = x
    171             else:
--> 172                 y = _reconstruct(x, memo, *rv)
    174 # If is its own copy, don't memoize.
    175 if y is not x:

File ~/miniconda3/lib/python3.11/copy.py:271, in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    269 if state is not None:
    270     if deep:
--> 271         state = deepcopy(state, memo)
    272     if hasattr(y, '__setstate__'):
    273         y.__setstate__(state)

File ~/miniconda3/lib/python3.11/copy.py:146, in deepcopy(x, memo, _nil)
    144 copier = _deepcopy_dispatch.get(cls)
    145 if copier is not None:
--> 146     y = copier(x, memo)
    147 else:
    148     if issubclass(cls, type):

File ~/miniconda3/lib/python3.11/copy.py:231, in _deepcopy_dict(x, memo, deepcopy)
    229 memo[id(x)] = y
    230 for key, value in x.items():
--> 231     y[deepcopy(key, memo)] = deepcopy(value, memo)
    232 return y

File ~/miniconda3/lib/python3.11/copy.py:172, in deepcopy(x, memo, _nil)
    170                 y = x
    171             else:
--> 172                 y = _reconstruct(x, memo, *rv)
    174 # If is its own copy, don't memoize.
    175 if y is not x:

File ~/miniconda3/lib/python3.11/copy.py:271, in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    269 if state is not None:
    270     if deep:
--> 271         state = deepcopy(state, memo)
    272     if hasattr(y, '__setstate__'):
    273         y.__setstate__(state)

File ~/miniconda3/lib/python3.11/copy.py:146, in deepcopy(x, memo, _nil)
    144 copier = _deepcopy_dispatch.get(cls)
    145 if copier is not None:
--> 146     y = copier(x, memo)
    147 else:
    148     if issubclass(cls, type):

File ~/miniconda3/lib/python3.11/copy.py:231, in _deepcopy_dict(x, memo, deepcopy)
    229 memo[id(x)] = y
    230 for key, value in x.items():
--> 231     y[deepcopy(key, memo)] = deepcopy(value, memo)
    232 return y

    [... skipping similar frames: _deepcopy_dict at line 231 (4 times), _reconstruct at line 271 (4 times), deepcopy at line 172 (4 times), deepcopy at line 146 (4 times)]

File ~/miniconda3/lib/python3.11/copy.py:172, in deepcopy(x, memo, _nil)
    170                 y = x
    171             else:
--> 172                 y = _reconstruct(x, memo, *rv)
    174 # If is its own copy, don't memoize.
    175 if y is not x:

File ~/miniconda3/lib/python3.11/copy.py:271, in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    269 if state is not None:
    270     if deep:
--> 271         state = deepcopy(state, memo)
    272     if hasattr(y, '__setstate__'):
    273         y.__setstate__(state)

File ~/miniconda3/lib/python3.11/copy.py:146, in deepcopy(x, memo, _nil)
    144 copier = _deepcopy_dispatch.get(cls)
    145 if copier is not None:
--> 146     y = copier(x, memo)
    147 else:
    148     if issubclass(cls, type):

File ~/miniconda3/lib/python3.11/copy.py:231, in _deepcopy_dict(x, memo, deepcopy)
    229 memo[id(x)] = y
    230 for key, value in x.items():
--> 231     y[deepcopy(key, memo)] = deepcopy(value, memo)
    232 return y

File ~/miniconda3/lib/python3.11/copy.py:161, in deepcopy(x, memo, _nil)
    159 reductor = getattr(x, "__reduce_ex__", None)
    160 if reductor is not None:
--> 161     rv = reductor(4)
    162 else:
    163     reductor = getattr(x, "__reduce__", None)

TypeError: cannot pickle 'module' object

Relevant log output

No response

blythed commented 5 months ago

This doesn't allow us to reproduce. What is the listener?

makkarss929 commented 5 months ago

@blythed this is the listener

import sentence_transformers
from superduperdb import Model, vector

model = Model(
    identifier='embedding', 
    object=sentence_transformers.SentenceTransformer('BAAI/bge-large-en-v1.5'),
    encoder=vector(shape=(1024,)),
    predict_method='encode', # Specify the prediction method
    postprocess=lambda x: x.tolist(),  # Define postprocessing function
    batch_predict=True, # Generate predictions for a set of observations all at once 
    datatype=vector(shape=(1024,))
)

# Import the Listener class from the superduperdb module
from superduperdb import Listener

# Create a Listener instance with the specified model, key, and selection criteria
listener = Listener(
    model=model,          # The model to be used for listening
    key='txt',            # The key field in the documents to be processed by the model
    select=table  # The selection criteria for the documents
)