NickCrews / mismo

The SQL/Ibis powered sklearn of record linkage
https://nickcrews.github.io/mismo/
GNU Lesser General Public License v3.0
14 stars 3 forks source link

Unable to use 'cluster.connected_components' on a pyspark dataframe #53

Closed jstammers closed 3 months ago

jstammers commented 3 months ago

I'm not able to cluster a pyspark dataframe due to an unsupported data type.

import pandas as pd
from mismo.cluster import connected_components
import ibis

spark = ...
links = pd.DataFrame({"record_id_l":[1,2,3], "record_id_r":[4,5,6]})
records = pd.DataFrame({"record_id":[1,2,3,4,5,6], "name":["A","B","C","D","E","F"]})

con = ibis.get_backend()
con_sql = ibis.pyspark.connect(spark)

links_duck = con.create_table('links', links, overwrite=True)
records_duck = con.create_table('records', records, overwrite=True)

components = connected_components(links=links_duck, records=records_duck) # works as expected

links_spark = con_sql.create_table('links', links, overwrite=True, format="delta",)
records_spark = con_sql.create_table('records', records, overwrite=True, format="delta")
components = connected_components(links=links_spark, records=records_spark) # raises Error

The error is

[[UNSUPPORTED_DATATYPE](https://docs.microsoft.com/azure/databricks/error-messages/error-classes#unsupported_datatype)] Unsupported data type "UBIGINT". SQLSTATE: 0A000

which makes me think that the conversion from an ibis datatype to a pyspark one is not correct.

The full traceback is

File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/mismo/cluster/_connected_components.py:176, in connected_components(links, records, max_iter)
     37 def connected_components(
     38     *,
     39     links: ir.Table,
     40     records: ir.Column | ir.Table | Iterable[ir.Table] | Mapping[str, ir.Table] = None,
     41     max_iter: int | None = None,
     42 ) -> ir.Table | Datasets:
     43     """Label records using connected components, based on the given links.
     44 
     45     This uses [an iterative algorithm]([https://www.drmaciver.com/2008/11/computing-connected-graph-components-via-sql/](https://adb-4926940868882418.18.azuredatabricks.net/%3Ca%20class=)" target="_blank" rel="noopener noreferrer">[https://www.drmaciver.com/2008/11/computing-connected-graph-components-via-sql/</a>)</span>](https://www.drmaciver.com/2008/11/computing-connected-graph-components-via-sql/%3C/a%3E)%3C/span%3E)
   (...)
    174     └───────────┴───────┴───────────┘
    175     """  # noqa: E501
--> 176     int_edges, restore = _intify_edges(links)
    177     int_labels = _connected_components_ints(int_edges, max_iter=max_iter)
    178     labels = restore(int_labels)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/mismo/cluster/_connected_components.py:267, in _intify_edges(raw_edges)
    264 all_node_ids = raw_edges.union(swapped).select("record_id_l")
    266 f = Factorizer(all_node_ids, "record_id_l")
--> 267 edges = f.encode(raw_edges, "record_id_l")
    268 edges = f.encode(edges, "record_id_r")
    270 def restore(int_labels: ir.Table) -> ir.Table:
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/mismo/_factorizer.py:80, in Factorizer.encode(self, t, src, dst, verify)
     74     return self._augmented.mutate(**{dst: _[self._int_column]}).drop(
     75         self._int_column
     76     )
     78 if verify:
     79     if (
---> 80         self._mapping_count == 0
     81         and t.count().execute() > 0
     82         and t[src].notin(self._mapping.original).any().execute()
     83     ):
     84         raise ValueError(
     85             f"Column {src} contains values that are not in the original column"
     86         )
     88 assert self._int_column not in t.columns
File /usr/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
    979 val = cache.get(self.attrname, _NOT_FOUND)
    980 if val is _NOT_FOUND:
--> 981     val = self.func(instance)
    982     try:
    983         cache[self.attrname] = val
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/mismo/_factorizer.py:150, in Factorizer._mapping_count(self)
    148 @cached_property
    149 def _mapping_count(self) -> int:
--> 150     return self._mapping.count().execute()
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/expr/types/core.py:404, in Expr.execute(self, limit, params, **kwargs)
    386 def execute(
    387     self,
    388     limit: int | str | None = "default",
    389     params: Mapping[ir.Value, Any] | None = None,
    390     **kwargs: Any,
    391 ):
    392     """Execute an expression against its backend if one exists.
    393 
    394     Parameters
   (...)
    402         Keyword arguments
    403     """
--> 404     return self._find_backend(use_default=True).execute(
    405         self, limit=limit, params=params, **kwargs
    406     )
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py:430, in Backend.execute(self, expr, params, limit, **kwargs)
    426 sql = self.compile(table, params=params, limit=limit, **kwargs)
    428 schema = table.schema()
--> 430 with self._safe_raw_sql(sql) as query:
    431     df = query.toPandas()  # blocks until finished
    432     result = PySparkPandasData.convert_table(df, schema)
File /usr/lib/python3.10/contextlib.py:135, in _GeneratorContextManager.__enter__(self)
    133 del self.args, self.kwds, self.func
    134 try:
--> 135     return next(self.gen)
    136 except StopIteration:
    137     raise RuntimeError("generator didn't yield") from None
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py:408, in Backend._safe_raw_sql(self, query)
    406 @contextlib.contextmanager
    407 def _safe_raw_sql(self, query: str) -> Any:
--> 408     yield self.raw_sql(query)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/backends/pyspark/__init__.py:413, in Backend.raw_sql(self, query, **kwargs)
    411 with contextlib.suppress(AttributeError):
    412     query = query.sql(dialect=self.dialect)
--> 413 return self._session.sql(query, **kwargs)
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     45 start = time.perf_counter()
     46 try:
---> 47     res = func(*args, **kwargs)
     48     logger.log_success(
     49         module_name, class_name, function_name, time.perf_counter() - start, signature
     50     )
     51     return res
File /databricks/spark/python/pyspark/sql/session.py:1748, in SparkSession.sql(self, sqlQuery, args, **kwargs)
   1744         assert self._jvm is not None
   1745         litArgs = self._jvm.PythonUtils.toArray(
   1746             [_to_java_column(lit(v)) for v in (args or [])]
   1747         )
-> 1748     return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
   1749 finally:
   1750     if len(kwargs) > 0:
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
   1349 command = proto.CALL_COMMAND_NAME +\
   1350     self.command_header +\
   1351     args_command +\
   1352     proto.END_COMMAND_PART
   1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
   1356     answer, self.gateway_client, self.target_id, self.name)
   1358 for temp_arg in temp_args:
   1359     if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:230, in capture_sql_exception.<locals>.deco(*a, **kw)
    226 converted = convert_exception(e.java_exception)
    227 if not isinstance(converted, UnknownException):
    228     # Hide where the exception came from that shows a non-Pythonic
    229     # JVM exception message.
--> 230     raise converted from None
    231 else:
    232     raise

From looking into this, I think it's because pyspark doesn't supported unsigned integers. Whether we'd need the full-width is a different question

NickCrews commented 3 months ago

Yup, looks like spark doesnt support uints of any kind.

I just switched from uint64s to int64s. Let me know if you still run into this problem on main. I imagine you might in other places throughout mismo.

Also curious, are you just trying out pyspark out of curiosity, or is duckdb not cutting it? What is the size of your task? If you wanted to add test infrastructure to test everything on spark I would really appreciate it, and it would really improve the odds I won't break you in the future (as it is I'm only worrying about duckdb)

jstammers commented 3 months ago

Thanks for resolving this, I'll test it out and let you know if I have any issues.

For the most part, I'm using duckdb as that can fairly comfortably handle 10Ms of blocked pairs in my experience. I hae occasionally swapped to pyspark when trying to look for dupes in datasets of ~5M records and blocking on a fairly coarse feature (e.g. zipcode). This is mainly due to a lack of domain-knowledge on a good set of blocking rules, which could probably be mitigated with some EDA.

I'd be happy to add some test infrastructure to support testing using a spark backend. I have some future ER projects in mind that could benefit from a distributed backend due to the number of records we have, so it would be worth ensuring that spark is fully supported by mismo