dedupeio / dedupe

:id: A python library for accurate and scalable fuzzy matching, record deduplication and entity-resolution.
https://docs.dedupe.io
MIT License
4.11k stars 550 forks source link

Consider holding data in sqlite table #964

Open fgregg opened 2 years ago

fgregg commented 2 years ago

The major barrier to efficient parallelization of blocking is the inter process communication of the records. Similarly, much of the potential benefit of parallelization of scoring is lost because of IPC too.

Within python's multiprocessing paradigms (and our need to support the spawn method of parallelization), the only way I know of to reduce IPC is to store the data on disk and and give each process access to that data store.

We are already using sqlite quite a bit in the library, so I'm contemplating loading the data dict passed to dedupe into sqlite.

Pros

Cons

Typing

In order to not radically change the API, we would need to be able to load the records from a data dictionary into sqlite tables. The problem is that right now the fields of a record can be of any type, and sqlite does not have very rich set of native types.

We have comparators for strings, integers, and floats fields. Sqlite has native support for those.

We also have comparators for tuples and sets and allow people to create arbitrary comparators that could take arbitrary types.

You can register arbitrary types for serialization and deserialization with pythons sqlite library, but this is likely to slow things down a lot.

API changes.

The higher level methods could remain unchanged, but a lot of the lower level methods would not really make sense if we had the data in a database. For example, you would not want to generate pairs to feed to a score method.

Those lower level methods are basic to how we now leverage databases in our database examples.

fgregg commented 2 years ago

Perhaps the path is not to try to adapt the core dedupe classes to such a big change but to create a new set of classes like DatabaseDedupe, that take a connection string and some other data about an existing database connection.

fgregg commented 2 years ago

thoughts @fjsj, @NickCrews ?

fjsj commented 2 years ago

If I understood correctly what's being planned, here's my two cents:

potential benefit of parallelization of scoring is lost because of IPC too.

I've tweaked a lot with scoring stage optimizations. The best performance I got was using a huge EC2 machine: a single VM with several CPUs. What I did was: all records and blocks on DB. A orchestrator process would read the blocks from DB and yield the pairs (only the IDs) to workers (1 per CPU). Each worker would fetch the records from DB, score, and save the partial result in files. Each worker got batches of pairs, not single pairs. That's not very different from the current or old code, but it was the best balance between IPC overhead and DB usage. DB was Postgres in a separate machine in this case.

The orchestrator is needed because there's a lot of skewness on block sizes, so you cannot simply "map" blocks to workers. There are papers that suggest "balancing" algorithms to deal with that, but I think the "speed" they reported was lower than what I was getting (on the order of 100k scored pairs/sec, of course that depends a lot on what's being scored), so I never had the time to try those.

I'm not 100% sure about the effectiveness of that in performance, but there are tricks that can speed up pair generation and IPC costs of passing pairs between processes. Using numpy to represent the block as a array of record IDs and generate pairs using numpy.triu can be faster than itertools.combinations. Or moving that to Cython maybe. Also, passing numpy objects between processes seems faster than Python objects. But all of that won't work for index predicates (which are a huge bottleneck).

fgregg commented 2 years ago

interesting! @fjsj, this would confirm that just reducing IPC would significantly improve things (passing ids instead of full record pairs)

the design i was thinking about was something like this.

class ScoreDupes(object):
    def __init__(self,
                 data_model,
                 classifier,
                 connection_string,
                 data_table,
                 block_table,
                 score_file_path: str,
                 dtype: numpy.dtype,
                 offset,
                 sql_offset,
                 n_processes):
        self.data_model = data_model
        self.classifier = classifier
        self.score_file_path = score_file_path
        self.dtype = dtype
        self.offset = offset
        self.connection_string
        self.data_table
        self.block_table
        self.sql_offset,
        self.n_processes 

    def __call__(self) -> None:

        connection = db.connect(self.connection_string)
        cursor = connection.serverside_cursor()

        cursor.execute("""
               select a.donor_id,
                      row_to_json((select d from (select a.city,
                                                         a.name,
                                                         a.zip,
                                                         a.state,
                                                         a.address) d)),
                      b.donor_id,
                      row_to_json((select d from (select b.city,
                                                         b.name,
                                                         b.zip,
                                                         b.state,
                                                         b.address) d))
               from (select DISTINCT l.donor_id as east, r.donor_id as west
                     from blocking_map as l
                     INNER JOIN blocking_map as r
                     using (block_key)
                     where l.donor_id < r.donor_id) ids
               INNER JOIN processed_donors a on ids.east=a.donor_id
               INNER JOIN processed_donors b on ids.west=b.donor_id
               OFFSET = ?
               WHERE row_number() % ?""", (self.sql_offset, self.number_of_processes)

       for record_pair in cursor:
            self.score(record_pair)

    def score(self, record_pairs: RecordPairs) -> None:
        # compare distances and write results to memmap array

that way each scoring process is responsible for generating their own record pairs to work on. i think this would likely be much faster than your design (and would also eliminate dupe comparisons that your approach would have)

the percent is a modulo in case that wasn't clear. the idea is that if there are four processes, the first process would do the 1st, 5th, and 9th records, the second process would do the 2nd, 6th, and 10th, etc.

fjsj commented 2 years ago

The offset/modulo idea is nice. If you have a block in memory, you can pass the same object to multiple worker processes, but each worker scores only a subset of pairs from that block based on the offset they got. It's similar to what you're doing here, I guess. If we had parallel threads in Python, this would greatly reduce memory usage and IPC costs.

However, the bottleneck I see in your approach here is the OFFSET in SQL. At least in Postgres, that self-join on blocking_map requires a materialization of the whole LxR results (use EXPLAIN SELECT ... to see that). Only after that materialization, the OFFSET will be applied, which means the whole self-join calculation and temp storage will happen in each process. If you make N processes do that at the same time, I think you're just moving the bottleneck to the DB and it will do a lot of repeated work.

Therefore, I think the scalable solutions are:

fgregg commented 2 years ago

as long as the temp storage is on disk, it still might be favorable.

but, if that offset/modulo strategy does not work, i can think at least one other per-process SQL idea that might work.*

in any regard, we probably need a shoot out at this point to see what's best.

* just to record the idea for the future:

there are N records and K = N * (N - 1) / 2 record pairs. if the record_ids are randomly assigned, then, in expectation, there should be same number of blocked pairs is in records pairs from 0 to K/2 - 1, as from K/2 to K.

assume our record_ids are integers from 0 to N and are randomly assigned. then we could partition like so.

        cursor.execute("""
               select a.donor_id,
                      row_to_json((select d from (select a.city,
                                                         a.name,
                                                         a.zip,
                                                         a.state,
                                                         a.address) d)),
                      b.donor_id,
                      row_to_json((select d from (select b.city,
                                                         b.name,
                                                         b.zip,
                                                         b.state,
                                                         b.address) d))
               from (select DISTINCT l.donor_id as east, r.donor_id as west
                     from blocking_map as l
                     INNER JOIN blocking_map as r
                     using (block_key)
                     where l.donor_id < r.donor_id and l.donor_id > ? and l.donor_id < ?) ids
               INNER JOIN processed_donors a on ids.east=a.donor_id
               INNER JOIN processed_donors b on ids.west=b.donor_id
               """, (lower, upper)

such that the in expectation, the number of blocked pairs should be about equal in each segment defined by lower_a and upper_a

NickCrews commented 2 years ago

In general, I agree that trying to get things out of memory and moving to a disk-based model seems like the next logical step. Multiple ways to do this of course. Everything you both have said make sense. I must admit I know hardly anything about SQL and databases, so the details that you talked about were mostly lost on me.

Couple thoughts:

  1. Consider how many times a record must be (de)serialized:

    • Initial write to store the records
    • [if doing parallel blocking or indexing] Multiple reads by different processes
    • IDK how that row_to_json call in the SQL query works, but if that occurs after the INNER JOIN then it would happen up to 30k times for a super block.
    • Other times too I'm not thinking of? I don't know, should be clear from some simple tests, but I have the feeling that this is going to be potentially pretty slow. If the records contain generic python objects, that unpickling is gonna be a real doozy. I think this might be a good first step: time how long it takes to read and write maybe 1k, 100k, 1M rows. Once with records just being primitives, again with some types like lists that require pickle.
  2. Can/should we move away from the current, row-based model and go towards a column-based model? e.g. using something like a pandas DataFrame, where individual series are contiguous in memory? This would have several benefits:

    • Reduced memory
    • MUCH faster scoring, since instead of having N^2 score_field(scalar, scalar) calls, you would have just N score_field(scalar, vector) calls, many of which could rely on numpy ufuncs and other vectorised computations. For instance, the new price comparator could be
      def comparator(price_1: float, price_2: np.array) -> np.array:
      result = numpy.empty_like(price_2.shape)
      result[:] = np.nan
      if price_1 <= 0:
          return result
      legit = price_2 > 0
      result[legit] = numpy.abs(numpy.log10(price_1) - numpy.log10(price_2[legit]))
      return result

      In my experience, using these vectorized functions has allowed many algorithms that would otherwise be too slow. I think we could also allow a back door for people to write their own non-vectorized versions if they want to, and then they could have the old behavior. Sometimes this could result in a more complex API, such as when doing cosine similarity, but it should be possible. Also having this coexist with the current scalar-based API could be tricky.

  3. If we liked point 2, then this could work well with disk-based storage, using memmapped arrays. We could use numpy arrays, but I think a better option is this format called apache arrow. I'm not super familiar with it, but it allows for zero-copy and zero-serialization costs in many cases, and it supports basic nested types such as lists and maps. I think it would be possible to completely get rid of any databases, and just rely on using memmapped arrays. These support concurrent reads, so it should be even easier to parallelize than a database approach I think. Do the inner join once to generate all record_ids to compare, write that to an arrow array. Then spawn a bunch of processes that grab the record_ids and then self-serve the record_data from a shared array. They write to their own result files, which are then merged together.

Realize that a lot of these suggestions are coming from someone who doesn't understand databases well so I'm trying to apply the tools that I'm more familiar with 😉 Hopefully this makes sense, I must admit I still don't totally understand the internals of dedupe so I might be missing something.

fjsj commented 2 years ago

I missed vectorized comparators as one of scalable solution proposals. I definitely agree changing all comparators to be vector-based will improve a lot performance and may trivialize parallelization. I'm not sure how to vectorize edit-distance operations, though.

Apache Arrow value proposal looks awesome, I've checked it. But at least ~1 year ago I found it wasn't trivial to use Arrow API to deal with zero-copy regular Python dicts/arrays (instead of numpy or pandas objects).

fgregg commented 2 years ago

i don't see any strong benefits from vectorization. the complexity still remains O(n^2) because, as in your price example, len(price_1) == len(price_2) ~ O(n^2).

if the vectorization allows for much more efficient calculation on chip (which is often the case for pure numeric operations), that would be great. typically, it also reduces loop overhead as well.

if vectorization helped a lot, that would be fantastic, but it would only change the overall complexity by a scalar factor, though potentially a very attractive factor.

that said, i don't know of ways to turn edit distances into operations that would really benefit from vectorization. would love to see examples of this!

fgregg commented 2 years ago

if there is a project that offers a zero-copy dict, that would be amazing.

NickCrews commented 2 years ago

Yes, oops, you're totally right it doesn't change the complexity. I was treating each "call into c code" as my atomic operation that I was counting, which sometimes for wall time seems to be an OK approximation :)

Just tried an experiment with the price comparator:

import numpy as np
from dedupe.variables.price import PriceType

N = 10000
N_ZEROS = N // 10
# float64 in [0, N)
prices = np.random.rand(N) * N
# Set 10% of prices to 0
to_zero = np.random.randint(0, N, size=(N_ZEROS,))
prices[to_zero] = 0
prices_list = list(prices)

def comparator(price_1: float, price_2: np.array) -> np.array:
    result = np.empty(shape=price_2.shape, dtype=np.float64)
    result[:] = np.nan
    if price_1 <= 0:
        return result
    legit = price_2 > 0
    result[legit] = np.abs(np.log10(price_1) - np.log10(price_2[legit]))
    return result

def comp_iter():
    result = []
    for p1 in prices_list:
        for p2 in prices_list:
            result.append(PriceType.comparator(p1, p2))
    return result

def comp_vec():
    result = []
    for p1 in prices_list:
        result.extend(comparator(p1, prices))
    return result

np.testing.assert_almost_equal(comp_iter(), comp_vec())

%timeit comp_iter()
%timeit comp_vec()

yields

3.07 s ± 80.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
93.5 ms ± 2.9 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

so that's a 32x improvement

The other big room for improvement when using vectorized operations is that you can throw out duplicate operations. In each block, last_name might be smith 1000 times, smiht twice, and `` 100 times. Assuming that all comparators are pure functions, you could throw out the duplicates and only have to perform 3x3 = 9 operations, then rebroadcast the result back to the original input.

You're right, string edit distance would be tricky to parallelize. Although some algorithms like SymSpell might actually be possible to modify. Probably though, the low hanging fruit would be 1. only operate on unique values as above and 2. you could move one of the loops into Cython, probably would help some. Similarly, you could apply these two optimizations to all the comparators, since they wouldn't require different algorithms. Affine Gap would probably be the next obvious thing to benchmark.

I should have added that caveat, the nested types in arrow really do look like a PITA to use. But I think some operations, such as cosine similarity, should be possible using the pyarrow primitive operations? But you're totally right, code using python dicts gets us right back to where we are. But this is going to be a problem if we ever are trying to serialize dicts, lists, etc.

Some other test code to compare pyarrow serialization vs pickle

import pickle
import numpy as np
import pandas as pd
import pyarrow as pa

udt = pa.map_(pa.string(), pa.int32())

names = pd.read_csv("https://raw.githubusercontent.com/dominictarr/random-name/master/first-names.txt").squeeze().tolist()
nums = np.random.randint(0, 100, size=len(names)).tolist()
items = list(zip(names, nums))

# Pyarrow takes dicts as lists of tuples, not as dicts
dicts = []
chunk_size = 5
for i in range(0, len(items), chunk_size):
    dicts.append(items[i:i + chunk_size])
print(dicts[:3])
print(len(dicts))

assert pickle.loads(pickle.dumps(dicts)) == dicts
assert pa.array(dicts, type=udt).to_pylist() == dicts

%timeit pickle.loads(pickle.dumps(dicts))
%timeit pa.array(dicts, type=udt).to_pylist()

results in

[[('Aarika', 71), ('Abagael', 33), ('Abagail', 37), ('Abbe', 97), ('Abbey', 11)], [('Abbi', 42), ('Abbie', 60), ('Abby', 95), ('Abbye', 62), ('Abigael', 75)], [('Abigail', 44), ('Abigale', 40), ('Abra', 79), ('Ada', 42), ('Adah', 17)]]
989
1.68 ms ± 27.6 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
14.2 ms ± 102 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

So pyarrow looks way worse than just pickle. Maybe I'm not doing that correctly though.

Of course I think one of the biggest drawbacks to vectorizing things is that it would require overhauling a LOT of things to fit the new data model, whereas the database approach just changes how we pipe the data (same shape as before) around.

fgregg commented 2 years ago

i don't follow how you can throw out duplicate operations?

fgregg commented 2 years ago

Of course I think one of the biggest drawbacks to vectorizing things is that it would require overhauling a LOT of things to fit the new data model, whereas the database approach just changes how we pipe the data (same shape as before) around.

leaning more into databases would also open up to pushing the distance functions into the database.

for example using something like sqlite's create_function facilities, we could imagine something like.

               select a.donor_id,
                      b.donor_id, 
                      levenshtein(a.name, b.name), 
                      levenshtein(a.zip. b.zip)
               from (select DISTINCT l.donor_id as east, r.donor_id as west
                     from blocking_map as l
                     INNER JOIN blocking_map as r
                     using (block_key)
                     where l.donor_id < r.donor_id) ids
               INNER JOIN processed_donors a on ids.east=a.donor_id
               INNER JOIN processed_donors b on ids.west=b.donor_id
               OFFSET = ?
               WHERE row_number() % ?

and from there it would be a small step to doing the scoring too.

NickCrews commented 2 years ago

i don't follow how you can throw out duplicate operations?

for example with the price comparator: price1 = 0 other_prices = [0,0,2,3,3,2,0,2] only need to do the comparison for the unique inputs to the function: compare(0,[0,2,3]) and then you do a JOIN (pandas (and other dataframe libraries?) have joins) to map each input to the output. semantically:

m = {inp: f(inp) for inp in unique(inp)}
result = inp.replace(m)

I actually have a function decorator that does this for pandas dataframes and series that I use extensively for doing expensive calculations on data that has a lot of duplicates.

for example using something like sqlite's create_function facilities, we could imagine something like.

That's super cool! Does this require SQL to go out to python and then drop into the function, or can you pass in a cython function and have SQL go straight to the C implementation?

fjsj commented 2 years ago

I would like to mention Awkward Array as an alternative for holding records in memory or DB and perhaps faster serialization/deserialization/IPC: https://github.com/scikit-hep/awkward-1.0

The basic example shows a 10x gain in RAM usage. It seems to be compatible with Apache Arrow and to support zero-copy conversions to it.

NickCrews commented 2 years ago

Another option that seems very intriguing: vaex

NickCrews commented 2 years ago

Pointing out here for inspiration that splink does pretty much everything in SQL. It even is backend-agnostic, so it can run locally with DuckDB, or distributed on spark