Closed TXAggie2000 closed 3 months ago
Can you please share the error message?
Here is the stacktrace:
Driver stacktrace: at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:240) ... 100 more Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 16.0 failed 4 times, most recent failure: Lost task 14.3 in stage 16.0 (TID 167) (10.139.64.13 executor 5): java.lang.NullPointerException at zingg.block.Block$BlockFunction.call(Block.java:403) at zingg.block.Block$BlockFunction.call(Block.java:393) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:761) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:174) at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:126) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:96) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
thanks. is the match type for id field dont_use or dont use?
Sorry, it is DONT_USE
Ok thanks. Changing the field type of phone from numerical to string seems to be causing this, as the unmarked and/or marked data from earlier rounds would be a number and now a string. How much training data do you have? Is it possible to start from scratch on a new model? Or if you want, you could change the ltraning data under the model folder through pyspark.
Hope that helps
I have essentially started from scratch each time. We currently aren't setting this up for incremental. I have essentially cleared the directory each time and am creating a new database on databricks.
EDIT: Just to clarify, this has been done several times, so anytime I have made any changes to the model, I start over. I again, started from a new directory and am still getting same error.
Ok. then this may be a bug in the code which is triggered in the case of certain values in the data. Is it possible for you to share a test case and your config for us to reproduce this issue at our end?
Certainly. Besides the config, what exactly do you need from me? Sample set of data? I am using the Databricks Solution Accelerator for this.
Yes, a sample dataset and config/python code should be good enough to get started on reproducing this.
@vikasgupta78 fyi
Sorry for the delay. Since this is personal data, I am having to generate mock data with the same fields and then running that through to make sure the error still exists.
Here is the mock dataset: MOCK_DATA.csv
Here is the field definition:
fieldDefinition = [
{'fieldName':'id', 'matchType':'DONT_USE', 'fields':'id', 'dataType':'"integer"'},
{'fieldName':'email', 'matchType':'FUZZY,NULL_OR_BLANK', 'fields':'email', 'dataType':'"string"'},
{'fieldName':'firstname', 'matchType':'FUZZY,NULL_OR_BLANK', 'fields':'firstname', 'dataType':'"string"'},
{'fieldName':'lastname', 'matchType':'FUZZY,NULL_OR_BLANK', 'fields':'lastname', 'dataType':'"string"'},
{'fieldName':'phone', 'matchType':'FUZZY,NULL_OR_BLANK', 'fields':'phone', 'dataType':'"string"'}
]
The code is the same as the Databricks Solution Accelerator with the exception that I remove the loading of the incremental in 00.1 and copied the attached dataset in both downloads and initial. If following that document, the failure is during 01_Intitial for the step 'Get Data (Run Once Per Cycle)' or sometimes it will get through that and fail during the next step 'Perform Labeling (Run Repeatedly Until All Candidate Pairs Labeled)'. With the NUUL_OR_BLANK, I have never gotten past two iterations of those two steps. Without, I was able to run and label repeatedly until I had enough matches to proceed.
Thanks, Scott
thank a lot @TXAggie2000 , will take a look today.
One question @TXAggie2000 - have you tried with zingg 0.4.0 ?
One question @TXAggie2000 - have you tried with zingg 0.4.0 ?
I have not. I had followed the Solution Accelerator which uses 0.3.3 and has success de-duping a few different datasets, but only started having issues when adding the extra match type.
I see. I can not locate the null_or_blank type in 0.3.3. I would suggest trying on 0.4.0 to see if this problem persists.
I see. I can not locate the null_or_blank type in 0.3.3. I would suggest trying on 0.4.0 to see if this problem persists.
Understood. I will try it with 0.4.0 and I will let you know!
Thanks, Scott
Tried the same code with 0.4.0 and am now getting the error:
Error: Failed to load class zingg.client.Client.
Can you please share the steps you used to install 0.4.0 and also the spark/java version you are using?
@vikasgupta78 - I had modified the notebooks (config/setup) in the solution accelerator to download that version. I did notice that I was a minor spark version off for that so, I am re-testing with Databricks runtime version 14.3 LTS
Cool. Please use dbr 14.2 and spark 3.5.0 with Zingg 0.4.0.
Okay, I ran it in 14.2, spark 3.5.0 with Zingg 0.4.0 and still have the same error:
Error: Failed to load class zingg.client.Client
Here is the code for the findTrainingData where it is failing:
def run_find_training_job():
'''
The purpose of this function is to run the Zingg findTraining job that generates
candidate pairs from the initial set of data specified in the job's configuration
'''
# identify the find training job
find_training_job = ZinggJob( config['job']['initial']['findTrainingData'], config['job']['databricks workspace url'], config['job']['api token'])
# run the job and wait for its completion
find_training_job.run_and_wait()
return
config['job']['initial']['findTrainingData'] = zingg_initial_findTrainingData
Could this code have changed from 0.3.3 to 0.4.0?
Keep in mind that if I download 0.3.3 and remove the NULL_OR_BLANK everything runs as expected. I just update 8 lines of code to switch.
You seem to be using an older notebook , please try https://github.com/zinggAI/zingg-vikas/blob/0.4.0/examples/databricks/FebrlExample.ipynb
@vikasgupta78 - Thank you! I will review and test over the weekend and let you know by Monday!
@vikasgupta78 - I got through one round of training/labeling. On the second pass at training the data, I got the following exception:
Py4JJavaError: An error occurred while calling o497.execute.
: zingg.common.client.ZinggClientException: Exception thrown in awaitResult: Job aborted due to stage failure: Task 0 in stage 206.0 failed 4 times, most recent failure: Lost task 0.3 in stage 206.0 (TID 264) (10.139.64.10 executor driver): java.lang.IllegalArgumentException: requirement failed: Vector should have dimension larger than zero.
at scala.Predef$.require(Predef.scala:281)
at org.apache.spark.ml.stat.SummarizerBuffer.add(Summarizer.scala:476)
at org.apache.spark.ml.stat.SummarizerBuffer.add(Summarizer.scala:552)
at org.apache.spark.ml.stat.Summarizer$.$anonfun$getClassificationSummarizers$1(Summarizer.scala:235)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1316)
at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1317)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:896)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:896)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:936)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:103)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:939)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:831)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at zingg.common.core.executor.TrainingDataFinder.execute(TrainingDataFinder.java:139)
at zingg.common.client.Client.execute(Client.java:251)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
at java.lang.Thread.run(Thread.java:750)
This is my field definitions. Didn't see any examples in the docs for multiple match types, but it said it takes an array:
id = FieldDefinition("id", "integer", MatchType.DONT_USE)
email = FieldDefinition("email", "string", [MatchType.EMAIL,MatchType.NULL_OR_BLANK])
fname = FieldDefinition("firstname", "string", [MatchType.FUZZY,MatchType.NULL_OR_BLANK])
lname = FieldDefinition("lastname", "string", [MatchType.FUZZY,MatchType.NULL_OR_BLANK])
phone = FieldDefinition("phone", "string", [MatchType.NUMERIC,MatchType.NULL_OR_BLANK])
fieldDefs = [id, email, fname, lname, phone]
args.setFieldDefinition(fieldDefs)
Did you change the definition after a round of training / labelling?
can you try with phone = FieldDefinition("phone", "string", MatchType. FUZZY,MatchType.NULL_OR_BLANK)
I would be happy to try it out locally if there is a test data you could share
I would be happy to try it out locally if there is a test data you could share
@vikasgupta78 https://github.com/zinggAI/zingg/issues/818#issuecomment-2062843720
We are trying this @TXAggie2000 and will let you know. Thanks for your patience.
@TXAggie2000 I tried your config and data, when giving multiple match types please give like this:
id = FieldDefinition("id", "integer", MatchType.DONT_USE) email = FieldDefinition("email", "string", MatchType.EMAIL,MatchType.NULL_OR_BLANK) fname = FieldDefinition("firstname", "string", MatchType.FUZZY,MatchType.NULL_OR_BLANK) lname = FieldDefinition("lastname", "string", MatchType.FUZZY,MatchType.NULL_OR_BLANK) phone = FieldDefinition("phone", "string", MatchType.NUMERIC,MatchType.NULL_OR_BLANK) fieldDefs = [id, email, fname, lname, phone] args.setFieldDefinition(fieldDefs)
Giving an array is causing the issue
Please run again after correcting the config and let us know
Also just a suggestion you might want to keep fname and lname above email in order in field definition for better results
Please start afresh training/label cycle if you are changing the order of field definition or there match type
Awesome, thank you both for tracking down the problem! It seems to be working without issues now!
I think I spoke too soon. I have been training and labeling several iterations over several hours since I have not found enough match pairs. Usually if I run trainMatch, it will tell me. After several hours I was at:
You have accumulated 5 pairs labeled as positive matches.
You have accumulated 146 pairs labeled as not matches.
When I ran the trainMatch, it ran for quite some time and then failed with:
zingg.common.client.ZinggClientException: Exception thrown in awaitResult: requirement failed: rawPredictionCol vectors must have length=2, but got 1
5 is way too less, at least 20 matches is required.
You can provide training samples and then run couple of rounds of findTrainingData and label to help reduce effort.
https://docs.zingg.ai/zingg/stepbystep/createtrainingdata/addowntrainingdata
I know in the Solution Accelerator it was pretty straight forward how to add the training samples, but, to be honest, The 0.4.0 docs are a bit confusing. Based on the example notebook: https://github.com/zinggAI/zingg/blob/0.4.0/examples/databricks/FebrlExample.ipynb, everything is added to the Arguments class and no JSON configs are used. How do I add the config JSON or the training data to the Arguments?
you will have to do something like this: schema ="z_cluster string, z_ismatch integer, fname string, lname string, stNo string, add1 string, add2 string, city string, areacode string, state string, dob string, ssn string" samplePipe = CsvPipe("sample", "examples/febrl/sample.csv", schema) args.setTrainingSamples(samplePipe)
Basically just like you create the input pipe, create a sample pipe and call setTrainingSamples.
Please note that z_cluster string, z_ismatch integer are the columns in addition to your input data schema
Okay, I have started over and included the training data. After the first round of training and labeling, the output shows 0 matched data even though I have supplied it ~ 40 records of matched data. I continued to train and finally got the outputs and it doesn't appear that any of the training samples that I used were taken into account. Output clusters were wildly off matching any record if one field was the same across all of them. I will try ordering the columns differently and see if that helps, but it doesn't appear the training data was used.
Can you share the training sample file you provided?
training_matches_redacted.csv Here are 20 of them. The other 20 records I used were actual data contacts I've removed.
Here is the relevant code:
#setting trainingCsv to the args
schema = "z_cluster string, z_ismatch integer, id int, phone string, email string, lastname string, firstname string"
trainingPipe = CsvPipe("training", zinggDir + "/" + modelId + "/training",schema)
args.setTrainingSamples(trainingPipe)
id = FieldDefinition("id", "integer", MatchType.DONT_USE)
phone = FieldDefinition("phone", "string", MatchType.NUMERIC,MatchType.NULL_OR_BLANK)
email = FieldDefinition("email", "string", MatchType.FUZZY,MatchType.NULL_OR_BLANK)
lname = FieldDefinition("lastname", "string", MatchType.FUZZY,MatchType.NULL_OR_BLANK)
fname = FieldDefinition("firstname", "string", MatchType.FUZZY,MatchType.NULL_OR_BLANK)
fieldDefs = [id,phone,email,lname,fname]
args.setFieldDefinition(fieldDefs)
I tried the training samples , on running fresh findTrainingData I am seeing following in logs:
24/04/29 14:59:15 WARN TrainingDataFinder: Read training samples 33 neg 0
=> training samples is being used
If you want to book time and the issue is still not resolved, please use the link on the docs @TXAggie2000
Thanks everyone. I am struggling to get decent results with it, and it seems the results are either not using the training data or not ignoring null or blank values. I've redone it several times. I will continue to train, but seeing a lot of records in a single cluster that used one column to match across
I tried to replicate/fix the issue:
In match type changed for EMAIL to EMAIL from FUZZY email = FieldDefinition("email", "string", MatchType.EMAIL,MatchType.NULL_OR_BLANK)
Changed the order to: fieldDefs = [id, fname, lname, phone, email]
Ran 10-15 rounds of findTrainingData and label
In so many rounds only 1 pair came which can be called a match => MOCK_DATA.csv is by and large free from duplicates
On running match all rows ended up in there own individual clusters => no dupes found => may be data we have is only partial so that's why no matches are being found
I am attaching the python file I used (renamed to txt) and final output I got.
Let me know how we can help further.
Also worth mentioning that in final few rounds model started converging that zingg prediction was in line with the input I gave in label
@TXAggie2000 did you get a chance to look into the results?
I am rerunning again. I had switched the email and phone order because those are more important to determine a match. There could be two different people within a company with the same email domain and phone number that we would consider to be a match. That would make those two fields more significant in that case, correct?
@TXAggie2000 From what you are describing does that mean you won't consider first name and last name in such cases? If in some cases you consider certain fields like first name and last name as match but in other cases you don't it will not work out. Model has to be consistent. Fields you don't want to consider should be don't match.
e.g. if you consider x@y.com vikas gupta as match with b@y.com sonal goyal but don't consider x@y.com vikas gupta as match with b@z.com sonal goyal as match it will just confuse the model and it will just not work
If you want to consider domain as a company better to split it in seperate field.
In summary be consistent in your training otherwise you will not get good matches
I had some issues I corrected, and stuck with your suggestion. I went through 4 training cycles over the course of the day (findTrainingData took about an hour each cycle). When done, I had 3 matches and 83 labeled as not matching. With the ~40 records of matching manual training data, the trainMatch failed with not enough training data. After an additional round, it increased to 103 not matching. "trainMatch" did not error out, but the results were still subpar. For example, one cluster had 41 matches for John Smith where the phone numbers and emails were all different. Another where first name and last name were Mike, had 282 matches but none of the phone numbers or emails matched. I am not sure if I just need to spend several days training the model to get different results or is there a way to weigh these differently, or at least equally.
Are any of the pairs you marked as match in training had different phone numbers and email but same name?
If you did this for any of them it would result in Zingg also learning it the same way.
Please run --phase generateDocs and send me the files so that I can check if there is a problem with training data
I am using 0.3.3 to train and dedupe a very simple dataset. The initial results matched too many incorrect values due to null fields. I went back and added NULL_OR_BLANK to the field definition and now I can't even get through training without failure. Here is the current field definition:
fieldDefinition = [ {'fieldName':'id', 'matchType':'DONT USE', 'fields':'id', 'dataType':'"integer"'}, {'fieldName':'email', 'matchType':'FUZZY,NULL_OR_BLANK', 'fields':'email', 'dataType':'"string"'}, {'fieldName':'firstname', 'matchType':'FUZZY,NULL_OR_BLANK', 'fields':'firstname', 'dataType':'"string"'}, {'fieldName':'lastname', 'matchType':'FUZZY,NULL_OR_BLANK', 'fields':'lastname', 'dataType':'"string"'}, {'fieldName':'phone', 'matchType':'FUZZY,NULL_OR_BLANK', 'fields':'phone', 'dataType':'"string"'} ]
The 'phone' field was initially NUMERIC, but adding NULL_OR_BLANK to that caused failure. The above would sometimes get through a single training and labeling, but I was never able to train/label enough data before a failure would occur.
All we want to do is have null values not count as a match. How do I proceed?
Thanks, Scott