Open OlivierBinette opened 1 year ago
This seems like a useful idea. Just FYI I am in the process of writing some documentation that serves as a dev guide for adding new comparisons and comparison-levels to hopefully make this process a little bit more accessible - I'll try to finish that off sooner rather than later.
This is a really interesting idea and I'd love to see how it pans out - would be amazing if Splink could leverage embeddings effectively.
There's a rough and ready discussion of how you could use custom SQL in splink here.
In particular, the kind of syntax you'd be looking for would be like this (but note the cosine_similarity function DOES NOT EXIST!) - not in particular sql_condition
part:
comparison_product_name = {
"output_column_name": "product_name",
"comparison_description": "product name comparison",
"comparison_levels": [
{
"sql_condition": "product_name_l IS NULL OR product_name_r IS NULL",
"label_for_charts": "Null",
"is_null_level": True,
},
{
"sql_condition": "product_name_l = product_name_r",
"label_for_charts": "Exact match",
"tf_adjustment_column": "product_name",
},
{
"sql_condition": "cosine_similarity(product_name_embeddings_array_l = product_name_embeddings_array_r) > 0.9",
"label_for_charts": "cosine_similarity > 0.9",
},
{"sql_condition": "ELSE", "label_for_charts": "All other comparisons"},
],
}
This approach only works if you have additional comparison columns which are 'independent' of product name, e.g price, sku or something. Because in general a splink model with a single comparison doesn't work very well (you could apply it if you knew the match weights, but you couldn't easily train it using EM). Also I guess if you only had that column, you might as well just use cosine similarity of the embeddings directly and not use splink at all
If you only have the product name, i suppose you could consider trying to get the embeddings down to a small array and splitting out to columns (embedding_1, embedding_2 etc.) and comparing them separately numerically. (Though I must admit, I don't understand embeddings well, so don't know if this is a viable strategy)
In terms of implementing a similarity function:
So it seems like the fundamental challenge is that database needs to support a similarity function that takes two vectors of embeddings as an input, and outputs a similarity score.
The functions that work on nested data are listed here. Sadly, I don't immediately see one that looks useful for your purpose, but there may be a creative use that I haven't thought of.
It looks like there are some planned functions that may be of help here - in particular arrayZip when combined with arrayMap feels like it could be useful. You could perhaps comment on that discussion to see if there's been any progress.
I also found this: https://blog.eto.ai/vector-similarity-search-with-duckdb-44dec043532a
Thanks for the links @RobinL ! I missed the part of the documentation with the custom comparison as SQL.
With AWS Athena (Trino SQL), I think zip_with
and reduce
would work well for implementing a dot product.
Nice - yeah, that def looks possible.
This is pretty nasty, but it occurred to me that in dialects that don't support zip_with
you could probably get to the same functionality with an array_map (list transform) function, so long as you stored your embeddings as arrays of tuples in the form [(embedding_value, counter), ...]
array_l = [(embedding_0, 0), (embedding_1, 1), ... ]
Then something like this would iterate across the elements
list_transform(array_l, x -> array_r[x[1]])
where x[1]
is retrieving the counter value i.e. the corresponding index from array_r
Also worth noting that the duckdb team have already included jaro winkler in direct response to a request from splink users, so it's possible they would consider adding cosine similarity from two arrays natively https://github.com/duckdb/duckdb/issues/4217
@RobinL That is good to know. I'm sure there in interest to provide more functionality for machine learning data preprocessing and NLP as well. Trino has some good stuff for ngram tokenization and min-hashing: https://trino.io/docs/current/functions/setdigest.html
This is also worth keeping an eye on: https://twitter.com/__AlexMonahan__/status/1621141511000961026 Might make it easier to write a udf (e.g. for cosine distance)
If you only have the product name, i suppose you could consider trying to get the embeddings down to a small array and splitting out to columns (embedding_1, embedding_2 etc.) and comparing them separately numerically. (Though I must admit, I don't understand embeddings well, so don't know if this is a viable strategy)
I think you are correct, the individual components of the embedding are going to be correlated, so I think you need to treat all the components together as one Comparison.
@mamonu has now created 'first try' at a cosine similarity function for the Spark backend which can be found here: https://github.com/moj-analytical-services/splink_scalaudfs/blob/embeddings/jars/scala-udf-similarity-0.1.1-EMBEDDINGSDEV.jar
Here's a working Splink model using the above jar, with openai text-embedding-ada-002
embeddings :
@OlivierBinette you may be interested ☝️
What is preventing us from implementing the cosine similarity in SQL, so it works on all backends? Unless I'm missing something it wouldn't be that hard to generate some SQL based on column names that the user provides. Are we just worried about performance?
import duckdb
def _dot(lefts, rights):
def _clause(left, right):
# for testing do it literally
return f"{left} * {right}"
# would use this for real
# return f"l_{left} * r_{right}"
clauses = (_clause(*pair) for pair in zip(lefts, rights, strict=True))
return "(" + " + ".join(clauses) + ")"
def _norm(cols):
return f"SQRT({_dot(cols, cols)})"
def make_cosine_similarity_sql(
column_names: list[str], right_column_names: list[str] | None = None
) -> str:
if right_column_names is None:
lefts = column_names
rights = column_names
else:
lefts = column_names
rights = right_column_names
return f"{_dot(lefts, rights)} / ( {_norm(lefts)} * {_norm(rights)})"
conn = duckdb.connect()
conn.execute(
"CREATE TABLE 'data' AS SELECT * FROM READ_CSV_AUTO('https://gist.githubusercontent.com/curran/a08a1080b88344b0c8a7/raw/0e7a9b0a5d22642a06d3d5b9bcbad9890c8ee534/iris.csv')"
)
left_cols = ["petal_length", "petal_width"]
right_cols = ["sepal_length", "sepal_width"]
cs = make_cosine_similarity_sql(left_cols, right_cols)
print(cs)
conn.sql(f"SELECT *, {cs} AS similarity from data")
results in
(petal_length * sepal_length + petal_width * sepal_width) / ( SQRT((petal_length * petal_length + petal_width * petal_width)) * SQRT((sepal_length * sepal_length + sepal_width * sepal_width)))
┌──────────────┬─────────────┬──────────────┬─────────────┬───────────┬────────────────────┐
│ sepal_length │ sepal_width │ petal_length │ petal_width │ species │ similarity │
│ double │ double │ double │ double │ varchar │ double │
├──────────────┼─────────────┼──────────────┼─────────────┼───────────┼────────────────────┤
│ 5.1 │ 3.5 │ 1.4 │ 0.2 │ setosa │ 0.896248789296663 │
│ 4.9 │ 3.0 │ 1.4 │ 0.2 │ setosa │ 0.9181234236331912 │
│ 4.7 │ 3.2 │ 1.3 │ 0.2 │ setosa │ 0.9025637129150959 │
│ 4.6 │ 3.1 │ 1.5 │ 0.2 │ setosa │ 0.8958521726609922 │
│ 5.0 │ 3.6 │ 1.4 │ 0.2 │ setosa │ 0.8860111771934351 │
│ 5.4 │ 3.9 │ 1.7 │ 0.4 │ setosa │ 0.923229445290974 │
│ 4.6 │ 3.4 │ 1.4 │ 0.3 │ setosa │ 0.9108675898478961 │
│ 5.0 │ 3.4 │ 1.5 │ 0.2 │ setosa │ 0.8939896049388586 │
│ 4.4 │ 2.9 │ 1.4 │ 0.2 │ setosa │ 0.904392513917151 │
│ 4.9 │ 3.1 │ 1.5 │ 0.1 │ setosa │ 0.8787709575388238 │
│ · │ · │ · │ · │ · │ · │
│ · │ · │ · │ · │ · │ · │
│ · │ · │ · │ · │ · │ · │
│ 6.7 │ 3.1 │ 5.6 │ 2.4 │ virginica │ 0.9995949828334421 │
│ 6.9 │ 3.1 │ 5.1 │ 2.3 │ virginica │ 0.9999989949665131 │
│ 5.8 │ 2.7 │ 5.1 │ 1.9 │ virginica │ 0.9968761695450432 │
│ 6.8 │ 3.2 │ 5.9 │ 2.3 │ virginica │ 0.9976798018232167 │
│ 6.7 │ 3.3 │ 5.7 │ 2.5 │ virginica │ 0.9990176195924825 │
│ 6.7 │ 3.0 │ 5.2 │ 2.3 │ virginica │ 0.9999896398613478 │
│ 6.3 │ 2.5 │ 5.0 │ 1.9 │ virginica │ 0.9998931348496853 │
│ 6.5 │ 3.0 │ 5.2 │ 2.0 │ virginica │ 0.9978730208564014 │
│ 6.2 │ 3.4 │ 5.4 │ 2.3 │ virginica │ 0.9951085010602806 │
│ 5.9 │ 3.0 │ 5.1 │ 1.8 │ virginica │ 0.9914171734791748 │
├──────────────┴─────────────┴──────────────┴─────────────┴───────────┴────────────────────┤
│ 150 rows (20 shown) 6 columns │
└──────────────────────────────────────────────────────────────────────────────────────────┘
@NickCrews I think that would work well for generic SQL backends. For SQL engines that support array data types (like Trino/AWS Athena or lists with duckdb), it would be more convenient to have the embedding arrays contained in a single column. This way, you'd put the result of an embedding model (e.g. sent2vec) in a single column, and then be able to define the similarity function just by referencing to that single column's name.
That's true. For that case, each backend might have slightly different syntax, so we should make it possible for each backend to override with their own implementation. But it can still happen in the SQL level, I don't think we need to write custom UDFs for most cases
I think we could keep the same API as the function I wrote above, but if you supply a single string (instead of Iterable[str] like I have there), then it is interpreted as a single column holding array values
Interesting. It hadn't occurred to me that rather than using an 'array_transform'/'array_reduce' type function, you could spread over multiple columns, and more importantly, I hadn't thought about the possibility that you can treating an array column as if they're just a group of columns (e.g. replace petal_length
and sepal_length
etc. with arr_col[0]
and arr_col[1]
etc. in your formulation.
I agree that approach seems like it's more likely to work in a backend-agnostic way. Note sure what performance would look like - one of the problems with embeddings is they're often long (e.g. openAI's are of length 1536 ). I guess for relatively small linking problems, performance would probably be acceptable given cosine distance is a relatively simple calculation so it feels like it's of value.
@OlivierBinette in your work with embeddings, are they long, or are you finding a way of compressing them? I haven't tried it by I wondered if there was any milage in using dimensionality reduction techniques. Though probably only relevant for very large input datasets, otherwise you might as well leave them 'raw'.
Native implementations (eg the one in scala above) are going to scale O(length of vector) just like the above SQL. You are always gonna need the same number of basic operations. I think it's more of a question of overhead. We could run that SQL in scala and compare it to the native implementation as an easy first test.
@RobinL The embeddings can be quite long. PatentsView uses the sent2vec package to embed and compare patent titles. These are of length 100 by default if I remember correctly. These embeddings can get larger with bigger models (as with OpenAI's models as you mention), so scalability in the embedding dimension is very important.
Worth noting for people reading this thread that Python udfs coming to duckdb very soon: https://twitter.com/holanda_pe/status/1655602267083489282
@RobinL, @samnlindsay, duckdb has added list_cosine_similarity as a function and Trino has added cosine_similarity.
I'd be interested working with your team to showcase this functionality, using deep embedding models (via openAI's API or HuggingFace) with Splink for complex comparisons. I'm quite busy finishing my PhD but I could find the time if we plan for something useful.
@OlivierBinette nice, i hadn't seen that in duckdb. yes a showcase would be great!
We're actually thinking about refactoring some of the comparisons because it's currently a bit tricky to implement new ones, although there's always the option of writing SQL directly into the settings dictionary, which will be fine for a showcase.
I'll contact you direct so we can set up a chat
Hi @RobinL,
Apologies for re-opening this conversation and for the long message, but I'm curious if you have any advice regarding the use of the jar file and your code from above.
More specifically, when I use the vector_cosine_sim
udf in my comparisons, the Linker doesn't find any instances of the given thresholds in my data, even though I'm quite confident that they exist.
I don't think it's an issue with registration of the jar/udf because I'm able to run the following:
def configure_spark_session(driver_memory="16g", executor_memory="8g") -> SparkSession:
"""Configure a Spark session with specific settings.
Args:
driver_memory (str): Memory allocated to driver.
executor_memory (str): Memory allocated to executor.
checkpoint_dir_path (str): Path of model chceckpoint data.
Returns:
spark (SparkSession): The configured Spark session.
"""
conf = SparkConf()
conf.set("spark.driver.memory", driver_memory)
conf.set("spark.executor.memory", executor_memory)
conf.set("spark.sql.codegen.wholeStage", "true")
conf.set("spark.sql.warehouse.dir", tempfile.mkdtemp())
conf.set("spark.driver.maxResultSize", "2g")
conf.set("spark.jars", "./scala-udf-similarity-0.1.1-EMBEDDINGSDEV.jar")
conf.set("spark.driver.extraClassPath", "./scala-udf-similarity-0.1.1-EMBEDDINGSDEV.jar")
conf.set("spark.default.parallelism", "800")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("ERROR")
spark = SparkSession(sc)
spark.sparkContext.setCheckpointDir(tempfile.mkdtemp())
return spark
spark = configure_spark_session()
df = spark.read.parquet('./full.parquet').select('unique_id', 'name', 'title_embeddings')
df = df.withColumn('title_embeddings', df['title_embeddings'].cast(T.ArrayType(T.DoubleType())))
df.printSchema()
root
|-- unique_id: string (nullable = true)
|-- name: string (nullable = true)
|-- title_embeddings: array (nullable = true)
| |-- element: double (containsNull = true)
from pyspark.sql.types import DoubleType
spark.udf.registerJavaFunction(
"vector_cosine_sim",
"uk.gov.moj.dash.linkage.VectorCosineSimilarity",
DoubleType()
)
excerpt = df.sample(False, 0.01)
excerpt_l = excerpt.select(F.col('title_embeddings').alias('title_embeddings_l'), F.col('unique_id').alias('unique_id_l'))
excerpt_r = excerpt.select(F.col('title_embeddings').alias('title_embeddings_r'), F.col('unique_id').alias('unique_id_r'))
cross_joined = excerpt_l.crossJoin(excerpt_r)
cross_joined.show(5)
+--------------------+--------------------+--------------------+--------------------+
| title_embeddings_l| unique_id_l| title_embeddings_r| unique_id_r|
+--------------------+--------------------+--------------------+--------------------+
|[-0.1078699529170...|2c7cd150b8ca89dc0...|[-0.1078699529170...|2c7cd150b8ca89dc0...|
|[-0.1078699529170...|2c7cd150b8ca89dc0...|[0.47958111763000...|40fb2575b7f4787f2...|
|[-0.1078699529170...|2c7cd150b8ca89dc0...|[0.19567984342575...|43ba4ec689260af41...|
|[-0.1078699529170...|2c7cd150b8ca89dc0...|[0.59064656496047...|5125345379d0d15e9...|
|[-0.1078699529170...|2c7cd150b8ca89dc0...|[0.23891282081604...|7ca7ebb81a398d15a...|
+--------------------+--------------------+--------------------+--------------------+
df_with_similarity = cross_joined.withColumn(
"cosine_similarity",
F.expr("vector_cosine_sim(title_embeddings_l, title_embeddings_r)")
)
df_with_similarity.show(5)
+--------------------+--------------------+--------------------+--------------------+------------------+
| title_embeddings_l| unique_id_l| title_embeddings_r| unique_id_r| cosine_similarity|
+--------------------+--------------------+--------------------+--------------------+------------------+
|[-0.1078699529170...|2c7cd150b8ca89dc0...|[-0.1078699529170...|2c7cd150b8ca89dc0...|1.0000000000000002|
|[-0.1078699529170...|2c7cd150b8ca89dc0...|[0.47958111763000...|40fb2575b7f4787f2...|0.8944097413898618|
|[-0.1078699529170...|2c7cd150b8ca89dc0...|[0.19567984342575...|43ba4ec689260af41...|0.8861883455703176|
|[-0.1078699529170...|2c7cd150b8ca89dc0...|[0.59064656496047...|5125345379d0d15e9...|0.8801735873211239|
|[-0.1078699529170...|2c7cd150b8ca89dc0...|[0.23891282081604...|7ca7ebb81a398d15a...|0.8433293128231946|
+--------------------+--------------------+--------------------+--------------------+------------------+
As you can see, the udf seems to be registered and is running correclty.
However, when then when I set up my Linker object and run parameter estimation, I get the following results:
EMBEDDING_COMPARISON = cl.distance_function_at_thresholds(
"title_embeddings",
"vector_cosine_sim",
[0.96, 0.92, 0.88, 0.84, 0.80, 0.76, 0.72, 0.68, 0.5, .0001],
include_exact_match_level=False,
)
settings = {
"link_type": "dedupe_only",
"comparisons": [EMBEDDING_COMPARISON],
"retain_matching_columns": True,
"retain_intermediate_calculation_columns": True,
"max_iterations": 100,
"probability_two_random_records_match": 1e-5
}
linker = SparkLinker(df, settings)
linker.estimate_u_using_random_sampling(1e6)
----- Estimating u probabilities using random sampling -----
u probability not trained for title_embeddings - Vector_cosine_sim <= 0.92 (comparison vector value: 9). This usually means the comparison level was never observed in the training data.
u probability not trained for title_embeddings - Vector_cosine_sim <= 0.88 (comparison vector value: 8). This usually means the comparison level was never observed in the training data.
u probability not trained for title_embeddings - Vector_cosine_sim <= 0.84 (comparison vector value: 7). This usually means the comparison level was never observed in the training data.
u probability not trained for title_embeddings - Vector_cosine_sim <= 0.8 (comparison vector value: 6). This usually means the comparison level was never observed in the training data.
u probability not trained for title_embeddings - Vector_cosine_sim <= 0.76 (comparison vector value: 5). This usually means the comparison level was never observed in the training data.
u probability not trained for title_embeddings - Vector_cosine_sim <= 0.72 (comparison vector value: 4). This usually means the comparison level was never observed in the training data.
u probability not trained for title_embeddings - Vector_cosine_sim <= 0.68 (comparison vector value: 3). This usually means the comparison level was never observed in the training data.
u probability not trained for title_embeddings - Vector_cosine_sim <= 0.5 (comparison vector value: 2). This usually means the comparison level was never observed in the training data.
u probability not trained for title_embeddings - Vector_cosine_sim <= 0.0001 (comparison vector value: 1). This usually means the comparison level was never observed in the training data.
By chance is this something you observed in any of your experiments or is there anything glaringly incorrect in what I've shared above?
I'm happy to help troubleshoot this and work on a PR if there's a bug or something missing in the codebase!
Many thanks in advance, Jackson
On my phone, but the place is start is hy looking at the SQL generated by the EMBEDDING_COMPARISON. There should be a method on that object to get the SQL from it. Does it look odd?
Here's what's produced when I run print(EMBEDDING_COMPARISON.human_readable_description)
Comparison 'Title_Embeddings within vector_cosine_sim thresholds 0.96, 0.92, 0.88, 0.84, 0.8, 0.76, 0.72, 0.68, 0.5, 0.0001 vs. anything else' of `title_embeddings`.
Similarity is assessed using the following ComparisonLevels:
- 'Null' with SQL rule: `title_embeddings_l` IS NULL OR `title_embeddings_r` IS NULL
- 'Vector_cosine_sim <= 0.96' with SQL rule: vector_cosine_sim(`title_embeddings_l`, `title_embeddings_r`) <= 0.96
- 'Vector_cosine_sim <= 0.92' with SQL rule: vector_cosine_sim(`title_embeddings_l`, `title_embeddings_r`) <= 0.92
- 'Vector_cosine_sim <= 0.88' with SQL rule: vector_cosine_sim(`title_embeddings_l`, `title_embeddings_r`) <= 0.88
- 'Vector_cosine_sim <= 0.84' with SQL rule: vector_cosine_sim(`title_embeddings_l`, `title_embeddings_r`) <= 0.84
- 'Vector_cosine_sim <= 0.8' with SQL rule: vector_cosine_sim(`title_embeddings_l`, `title_embeddings_r`) <= 0.8
- 'Vector_cosine_sim <= 0.76' with SQL rule: vector_cosine_sim(`title_embeddings_l`, `title_embeddings_r`) <= 0.76
- 'Vector_cosine_sim <= 0.72' with SQL rule: vector_cosine_sim(`title_embeddings_l`, `title_embeddings_r`) <= 0.72
- 'Vector_cosine_sim <= 0.68' with SQL rule: vector_cosine_sim(`title_embeddings_l`, `title_embeddings_r`) <= 0.68
- 'Vector_cosine_sim <= 0.5' with SQL rule: vector_cosine_sim(`title_embeddings_l`, `title_embeddings_r`) <= 0.5
- 'Vector_cosine_sim <= 0.0001' with SQL rule: vector_cosine_sim(`title_embeddings_l`, `title_embeddings_r`) <= 0.0001
- 'All other comparisons' with SQL rule: ELSE
I tested again:
df_with_similarity = cross_joined.withColumn(
"cosine_similarity",
F.expr("vector_cosine_sim(title_embeddings_l, title_embeddings_r) <= .96")
)
to see if the Boolean could be output, and it did indeed work:
+--------------------+--------------------+--------------------+--------------------+-----------------+
| title_embeddings_l| unique_id_l| title_embeddings_r| unique_id_r|cosine_similarity|
+--------------------+--------------------+--------------------+--------------------+-----------------+
|[-0.2301929146051...|08c2cb77c857104eb...|[-0.2301929146051...|08c2cb77c857104eb...| false|
|[-0.2301929146051...|08c2cb77c857104eb...|[0.42279487848281...|3484bbf51a31aba09...| true|
|[-0.2301929146051...|08c2cb77c857104eb...|[0.99581563472747...|37c8ee1fdd49c8ceb...| true|
|[-0.2301929146051...|08c2cb77c857104eb...|[0.91193675994873...|6036e32bb4a9bb492...| true|
|[-0.2301929146051...|08c2cb77c857104eb...|[0.56825220584869...|61de65185976fa16f...| true|
+--------------------+--------------------+--------------------+--------------------+-----------------+
Is it possible that the vector_cosine_sim
function somehow doesn't get registered with the SQL engine in Splink and just works on the PySpark session (even though I think they're basically the same thing)?
Thanks very much for the help!
I think it's the <= that's the problem, it's the wrong way around, should be >=, if you look at the function signature, there's an argument like higher is better or something. Maybe there's a bug, but try changing that option
You're absolutely right. I do believe that's the issue.
I'm trying to change that parameter now; however, it doesn't seem to be flipping the operator as would be expected. I'll keep investigating and will get back to you as to whether it's a user error or a small bug.
Thanks again!
PR above should address the issue I was mentioning! All working on my side now. Thanks a lot for the help!
Is your proposal related to a problem?
I would like to be able to use Splink with embedding-based similarity functions, specifically with duckdb and Athena backends.
For example, to evaluate the similarity between two product description titles, I can use a deep embedding model (e.g., sent2vec) to embed the titles as numeric arrays. Then, I can compute similarity between the titles as the dot product normalized dot product (cosine similarity) between the two.
The embedding does not have to be managed by Splink. I can compute the embeddings and append them as an array feature column to my data.
Alternatives considered
I can use custom SQL with Splink when defining my comparisons, but it would be more convenient and user-friendly to have this in a comparison library.
Describe the solution you'd like
I'd like the Athena and duckdb comparison libraries to contain a dot product or cosine similarity function that can be applied to array or list columns.
Additional context
It seems very simple to implement this functionality in SQL. However, Splink's architecture is a bit obscure and I'm not sure how I would be able to incorporate this to comparison libraries myself. I would be able to do it with some guidance.