Closed jstammers closed 5 months ago
Hi @jstammers, great to meet you!
Yes sampling from a cartesian join is a tricky thing to do without materializing the entire cartesian join (and getting 7.9B rows and too much memory). I looked into this some, but I honestly don't remember the conclusions I found. The implementation of that function you found probably needs to get optimized.
I'm curious what backend you are using? I imagine that would have an impact, though ideally we can come up with an implementation that is efficient on all backends.
I would gladly review any PRs, research, or other effort you do into this, but I must say right now I'm focused on other aspects and probably won't take lead on this for a few weeks or months at least.
More generally, I'd love to hear more about what parts of mismo you have found most valuable, and what parts are missing.
Hi @NickCrews, at the moment I am using the DuckDB backend. I have access to a Spark cluster so could try using that, but I've noticed that some blocking operations are currently only supported for DuckDB and given the size of my data, it makes sense to stick with a local dataframe if possible.
I might have some time over the next few weeks to test out some alternative methods for generating samples of pairs of records. If we can't avoid materializing the entire cartesian product. then I think for my example I may be able to avoid memory bottle-necks if I only sample the ids.
In terms of my experience so far, I've only been using mismo for a few days, but I've found the documentation very helpful to work through an example. In particular, I think being able to specify the blocking rules has helped me to incorporate more domain-specific logic about where I expect to find duplicates, unlike other frameworks which seem to do this more opaquely.
I've also found the existing text_utils
very useful for engineering features that could help identify duplicates using similarity measures. I think it could be worth adding the ability to generate Word/Sentence embeddings using a pre-trained model (e.g. FastText/GLoVe), but I can see how that might be infeasible for some backends.
In my project, I've implemented a method to compare two strings using the Levenshtein distance which could be useful more generally and would be happy to submit this as a PR if so.
Finally, I'd be interested to know if you have any thoughts/plans on methods for deploying a trained clustering model. For my use-cases, I would be looking to deploy something that could detect duplicates within a single dataset, or many-to-one links from one dataset to another. I'm not quite at the stage to do this, but having the ability to encapsulate the blocking, comparison and clustering functionality would be a convenient way to deploy a pre-trained model.
If you think any of these would be better suited as separate feature requests, I'd be happy to do so and contribute where possible.
I've been able to resolve this for my dataset using the following implementation for all_possible_pairs
def all_possible_pairs(
left: Table,
right: Table,
*,
max_pairs: int | None = None,
seed: int | None = None,
) -> Table:
"""Blocks together all possible pairs of records."""
n_left = left.count().execute()
n_right = right.count().execute()
if n_left * n_right > max_pairs:
frac = max_pairs / (n_left * n_right)
left = sample_table(left, n_approx=int(n_left * frac), seed=seed)
right = sample_table(right, n_approx=int(n_right * frac), seed=seed)
n_pairs = _min_ignore_None(n_left * n_right, max_pairs)
pairs = block_one(left, right, True, on_slow="ignore")
return pairs
Although I can see this giving a biased representation of the possible pairs when the total number of pairs is much larger than max_pairs
I've been able to resolve this for my dataset
Thanks @jstammers ! That looks like if right and left are the same table, the sample_table(left\right, n_approx=int(n_left * frac), seed=seed)
call will give you the same sample. Maybe you want to take left.view()
beforehand if they are the same?? IDK you will have to test yourself. And yeah that will give you a biased sample, but maybe that's OK.
I don't really want to merge in this change until I am convinced that this is a well-founded way to do it. Are you currently blocked, or is if possible for you to inject your workaround all_possible_pairs()
into the workflow (ideally if I did a good job, it should be easy 😉 ) without too much trouble?
some blocking operations are currently only supported for DuckDB
Out of curiosity, which ones? Ideally I'd love to make the most common algorithms work with at least duckdb and spark. I've been meaning to add spark tests. I agree you should stay on duckdb if you only have 70k records. I use duckdb into the low millions no problem.
adding the ability to generate Word/Sentence embeddings
I've thought about this. Using pyarrow UDFs shouldn't be too bad? They work with pyspark I assume?? Since these should be applied to the O(N) sized tables, we could probably handle some non-optimized implementations here.
Levenshtein distance
please submit a PR if you are so inclined. Any external deps should be optional, eg using with optional_import():
.
deploying a trained clustering model
Yes I want to get there eventually. If you come up with something I would love to see it. So far I've been trying to stabilize the lower level building blocks, so far they've felt too unstable for me to build on top of. I feel like once I/we have like 5+ examples doing various tasks using the lower level APIs, it might start to crystalize what could get simplified into a higher-level API.
@NickCrews I've been able to modify all_possible_pairs()
within my script. I think for the time being, I will work with it from a fork - I agree it probably shouldn't be modified until it's been tested further.
some blocking operations are currently only supported for DuckDB
Here's a snippet of code that replicates the issue I'm seeing.
import mismo
from pyspark.sql import SparkSession
from mismo.lib.geo import CoordinateBlocker
from mismo.block import BlockingRule, block_many
import ibis
patents = mismo.datasets.load_patents()
spark = SparkSession.builder.getOrCreate()
con = ibis.pyspark.connect(spark)
table = con.create_table('patents', patents.to_pandas())
rules = [
BlockingRule("name", name="Name Exact"),
CoordinateBlocker(distance_km=0.05, name="Coordinates Close", lat="latitude", lon="longitude")
]
blocked = block_many(table, table, rules, labels=True)
with the following traceback
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-27ba30fd-ba49-4baf-9f1b-0042324eccbc/lib/python3.10/site-packages/mismo/block/_block.py:251, in block_many(left, right, conditions, on_slow, task, labels, **kwargs)
248 ids = ids.mutate(blocking_rule=ibis.literal(_util.get_name(rule)))
249 return ids
--> 251 sub_joined = [blk(rule) for rule in conds]
252 if labels:
253 result = ibis.union(*sub_joined, distinct=False)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-27ba30fd-ba49-4baf-9f1b-0042324eccbc/lib/python3.10/site-packages/mismo/block/_block.py:251, in <listcomp>(.0)
248 ids = ids.mutate(blocking_rule=ibis.literal(_util.get_name(rule)))
249 return ids
--> 251 sub_joined = [blk(rule) for rule in conds]
252 if labels:
253 result = ibis.union(*sub_joined, distinct=False)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-27ba30fd-ba49-4baf-9f1b-0042324eccbc/lib/python3.10/site-packages/mismo/block/_block.py:245, in block_many.<locals>.blk(rule)
244 def blk(rule):
--> 245 j = join(left, right, rule, on_slow=on_slow, task=task, **kwargs)
246 ids = _distinct_record_ids(j)
247 if labels:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-27ba30fd-ba49-4baf-9f1b-0042324eccbc/lib/python3.10/site-packages/mismo/block/_block.py:299, in join(left, right, condition, on_slow, task, **kwargs)
292 if (
293 task == "dedupe"
294 and "record_id" in left.columns
295 and "record_id" in right.columns
296 ):
297 pred = pred & (left.record_id < right.record_id)
--> 299 _sql_analyze.check_join_algorithm(left, right, pred, on_slow=on_slow)
300 j = ibis.join(left, right, pred, lname="{name}_l", rname="{name}_r")
301 j = _ensure_suffixed(left.columns, right.columns, j)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-27ba30fd-ba49-4baf-9f1b-0042324eccbc/lib/python3.10/site-packages/mismo/block/_sql_analyze.py:106, in check_join_algorithm(left, right, condition, on_slow)
104 if on_slow == "ignore":
105 return
--> 106 alg = get_join_algorithm(left, right, condition)
107 if alg in SLOW_JOIN_ALGORITHMS:
108 if on_slow == "error":
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-27ba30fd-ba49-4baf-9f1b-0042324eccbc/lib/python3.10/site-packages/mismo/block/_sql_analyze.py:68, in get_join_algorithm(left, right, condition)
61 """Return one of the JOIN_ALGORITHMS for the outermost join in the expression.
62
63 If there are multiple joins in the query, this will return the top (outermost) one.
64 This only works with expressions bound to a DuckDB backend.
65 Other kinds of expressions will raise NotImplementedError.
66 """
67 j = ibis.join(left, right, condition)
---> 68 ex = _explain_str(j)
69 return _extract_top_join_alg(ex)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-27ba30fd-ba49-4baf-9f1b-0042324eccbc/lib/python3.10/site-packages/mismo/block/_sql_analyze.py:117, in _explain_str(duckdb_expr, analyze)
114 def _explain_str(duckdb_expr: ir.Expr | str, *, analyze: bool = False) -> str:
115 # we can't use a separate backend eg from ibis.duckdb.connect()
116 # or it might not be able to find the tables/data referenced
--> 117 sql, con = _to_sql_and_backend(duckdb_expr)
118 if analyze:
119 sql = "EXPLAIN ANALYZE " + sql
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-27ba30fd-ba49-4baf-9f1b-0042324eccbc/lib/python3.10/site-packages/mismo/block/_sql_analyze.py:136, in _to_sql_and_backend(duckdb_expr)
134 raise NotImplementedError("The given expression must have a backend.")
135 if not isinstance(con, DuckDBBackend):
--> 136 raise NotImplementedError(
137 "The given expression must be a DuckDB expression."
138 )
139 sql = ibis.to_sql(duckdb_expr, dialect="duckdb")
140 return sql, con
adding the ability to generate Word/Sentence embeddings
I haven't tried this yet, but have used other scalar python UDFs to create comparison functions. I expect this would work in much the same way
lol oops, that is an explicit dis-allow that I have in mismo/block/_sql_analyze.py
during the safety check for the blocking algorithm used in with the "on_slow" argument to .block_one() and .block_many(), because that safety check only works for duckdb. Definitely should make the error message more actionable, what you want to do is pass on_slow="ignore". But I'm curious your opinion there, perhaps we should change what should happen if a user is on non-duckdb. currently what happens:
OK I just pushed https://github.com/NickCrews/mismo/commit/4892899badc20b9f1450549c5f4e946c9bd343c3 that change this to a warning, how does that look?
That looks great, thanks for resolving this so quickly!
@jstammers can you take a look at and test out the implementation in https://github.com/NickCrews/mismo/commit/7da78bd5430d4a34056825e6483fde8a2a5c4c92#diff-a799560a3e4e8d29563606114a31b44fd6c99b7d74a7b980b486a4121787904fR19 and see if it resolves your original issue?
Also related, how important to you is it to have reproducible runs, eg support for a random seed? I had to remove that in my new implementation, I'm not totally convinced how useful it really is, and it has been pretty difficult to support.
@NickCrews this implementation worked pretty well for me. I didn't notice any significant performance issues when sampling 1M pairs from a possible 9B records, either.
For my use case, I don't think having a fully reproducible run is essential. I agree that it's not clear how useful a reproducible run is - if the trained weights are sensitive to the sample size, I would rather observe that than falsely believe the model is stable simply because I used the same seed
hmm that's a good point, I hadn't even thought of that!
Closing this out since the original issue is solved then. Please open more though and keep the conversation going!
Hi @NickCrews , I'm not sure if this warrants re-opening a closed issue, but I've discovered that this method fails when sampling from two different tables.
When doing this, I encounter a RecursionError
here
https://github.com/NickCrews/mismo/blob/976db6d05006aece1ee8d96c017392603dce78e2/mismo/block/_util.py#L71
which I suspect is related to how ibis
handles the execution.
I've been able to get this working using
n_possible_pairs = (left.count().execute() * right.count().execute())
but without knowing too much about how the queries are structured, I would expect the result to be the same in either case.
Do you have a concrete script that can repro? I haven't tested yet but I'm pretty sure if left and right are simple .read_csv()s then this won't error, I bet it is erroring because left and right are somehow related with some other complicated preprocessing you're doing. If you .cache() left and right just before you call this function does it still error?
Here's an example that reproduces my error
import ibis
from mismo.fs._train import sample_all_pairs
from mismo.datasets import load_febrl1, load_febrl2
ibis.options.interactive = True
left = load_febrl1()[0]
right = load_febrl2()[0]
n_possible_pairs1 = (left.count().execute() * right.count().execute()) #succeeds
n_possible_pairs = (left.count() * right.count()).execute()
Caching each table still causes the error, as does saving those tables as csvs and then using ibis.read_csv
to load them.
Thank you! I bet this is an upstream issue with ibis, but will file there if so.
Yeah I already filed this as an upstream issue with ibis at https://github.com/ibis-project/ibis/issues/7616.
Changing to your suggested workaround now.
Hi, I've recently come across this package and after testing out a few other ER packages (dedupe, splink, zingg etc.) I think mismo has a very intuitive, flexible API that should work very well for my needs.
When estimating weights from a set of known dupes, I've ran into some memory issues when using
mismo.fs.train_using_labels
. This occurred even when setting themax_pairs
argument to something very low, e.g. 100.After debugging this further, I found that this was caused by the following function https://github.com/NickCrews/mismo/blob/a5874fab5c20505055e77545a83a60fe323eccd2/mismo/fs/_train.py#L14-L24
In my case, my
left
andright
datasets have 70k records, so that block would produce around 7.9B pairs.I should be able to work around this by reducing the number of known dupes for the time being, but it seems to me that generating all these pairs might not be required if they are then sampled. Perhaps it would be better to sample the required number of pairs from a product of the
record_ids
ifmax_pairs
is less than the total number of pars?Alternatively, it might be helpful to raise an exception if the expected size of the result of
block_one
exceeds the available memory, similar to how Scipy raises aMemoryError
when trying to allocate memory for a very large array