zinggAI / zingg

Scalable identity resolution, entity resolution, data mastering and deduplication using ML
GNU Affero General Public License v3.0
952 stars 118 forks source link

When only Date is used as matching/blocking criteria Zingg's findTrainingData phase is going into error. #906

Open magadummanoj opened 1 week ago

magadummanoj commented 1 week ago

Describe the bug Wanted to Fuzzy match dates from 2 different data sets but encountered the null pointer, when date is the only matching/blocking criteria then Zingg is running into error.

To Reproduce Steps to reproduce the behavior:

  1. Create 2 source files with only 2 columns Start and End date. The match type is FUZZY for dates because I wanted to compare overlapping dates as match.
  2. When findTrainingData phase is executed, it runs into null pointer error, the stack trace is shared below.

Expected behavior Zingg should match the records which are within the range, please refer below table for matched and unmatched scenarios.

Source 1 : EBL |   | Source 2 : LID | START_DATE | END_DATE | START_DATE | END_DATE |   01-09-2024 | 08-09-2024 | 01-Sep | 08-Sep | Matched 01-09-2024 | 08-09-2024 | 01-Sep | 06-Sep | Matched 01-09-2024 | 08-09-2024 | 01-Sep | 10-Sep | Matched 01-09-2024 | 08-09-2024 | 03-Sep | 08-Sep | Matched 01-09-2024 | 08-09-2024 | 29-Aug | 08-Sep | Matched 01-09-2024 | 08-09-2024 | 03-Sep | 05-Sep | Matched 01-09-2024 | 08-09-2024 | 29-Aug | 10-Sep | Matched 01-09-2024 | 08-09-2024 | 01-Aug | 10-Aug | Unmatched 01-09-2024 | 08-09-2024 | 12-Sep | 20-Sep | Unmatched 01-09-2024 | 08-09-2024 | 30-Aug | 31-Aug | Unmatched 01-09-2024 | 08-09-2024 | 09-Sep | 10-Sep | Unmatched

Additional context Below is the stack trace of issue test@FCE11561:~/zingg$ ./scripts/zingg.sh --phase findTrainingData --conf examples/febrl/configReconDBDatesOnly.json 24/09/24 18:35:33 WARN Utils: Your hostname, FCE11561 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo) 24/09/24 18:35:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 24/09/24 18:35:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 24/09/24 18:35:35 INFO Client: 24/09/24 18:35:35 INFO Client: **** 24/09/24 18:35:35 INFO Client: Zingg AI 24/09/24 18:35:35 INFO Client: (C) 2021 Zingg.AI 24/09/24 18:35:35 INFO Client: **** 24/09/24 18:35:35 INFO Client: 24/09/24 18:35:35 INFO Client: using: Zingg v0.4.0 24/09/24 18:35:35 INFO Client: 24/09/24 18:35:35 INFO ClientOptions: --phase 24/09/24 18:35:35 INFO ClientOptions: findTrainingData 24/09/24 18:35:35 INFO ClientOptions: --conf 24/09/24 18:35:35 INFO ClientOptions: examples/febrl/configReconDBDatesOnly.json 24/09/24 18:35:35 INFO ClientOptions: --email 24/09/24 18:35:35 INFO ClientOptions: zingg@zingg.ai 24/09/24 18:35:35 INFO ClientOptions: --license 24/09/24 18:35:35 INFO ClientOptions: zinggLicense.txt 24/09/24 18:35:35 WARN ArgumentsUtil: Config Argument is examples/febrl/configReconDBDatesOnly.json 24/09/24 18:35:35 WARN ArgumentsUtil: phase is findTrainingData 24/09/24 18:35:35 INFO Client: 24/09/24 18:35:35 INFO Client: 24/09/24 18:35:35 INFO Client: Zingg is not collecting any analytics data and will only log a blank event with the name of the phase 24/09/24 18:35:35 INFO Client: 24/09/24 18:35:35 INFO Client: 24/09/24 18:35:38 WARN PipeUtil: Reading Pipe [name=source1, format=jdbc, preprocessors=null, props={password=****(redacted), driver=oracle.jdbc.driver.OracleDriver, user=ccb_owner, url=jdbc:oracle:thin:@10.21.3.199:1549:revekdev, query=select START_DATE, END_DATE from EBL_DATA}] 24/09/24 18:35:42 WARN PipeUtil: Reading Pipe [name=source2, format=jdbc, preprocessors=null, props={password=****(redacted), driver=oracle.jdbc.driver.OracleDriver, user=ccb_owner, url=jdbc:oracle:thin:@10.21.3.199:1549:revekdev, query=select START_DATE, END_DATE from LID_DATA}] 24/09/24 18:35:44 WARN TrainingDataFinder: Read input data 47 24/09/24 18:35:44 WARN PipeUtil: Reading Pipe [name=null, format=parquet, preprocessors=null, props={location=models/127/trainingData//marked/}] 24/09/24 18:35:45 WARN PipeUtil: [PATH_NOT_FOUND] Path does not exist: file:/home/test/zingg/models/127/trainingData/marked. 24/09/24 18:35:45 WARN DSUtil: No marked data found 24/09/24 18:35:45 WARN DSUtil: No configured training samples 24/09/24 18:35:45 WARN DSUtil: No training data found 24/09/24 18:35:45 INFO TrainingDataFinder: Created positive sample pairs 24/09/24 18:35:45 INFO TrainingDataFinder: Preprocessing DS for stopWords 24/09/24 18:35:45 WARN SimpleFunctionRegistry: The function removestopwordsudf replaced a previously registered function. 24/09/24 18:35:45 WARN SimpleFunctionRegistry: The function round replaced a previously registered function. 24/09/24 18:35:45 INFO Heuristics: Block size 8 and total count was 21 24/09/24 18:35:45 INFO Heuristics: Heuristics suggest 8 24/09/24 18:35:45 INFO BlockingTreeUtil: Learning indexing rules for block size 8 24/09/24 18:35:46 INFO TrainingDataFinder: Writing uncertain pairs when either positive or negative samples not provided 24/09/24 18:35:46 ERROR Executor: Exception in task 0.0 in stage 22.0 (TID 22) java.lang.NullPointerException at zingg.common.core.block.BlockFunction.call(BlockFunction.java:21) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 24/09/24 18:35:46 WARN TaskSetManager: Lost task 0.0 in stage 22.0 (TID 22) (10.255.255.254 executor driver): java.lang.NullPointerException at zingg.common.core.block.BlockFunction.call(BlockFunction.java:21) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

24/09/24 18:35:46 ERROR TaskSetManager: Task 0 in stage 22.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 (TID 22) (10.255.255.254 executor driver): java.lang.NullPointerException at zingg.common.core.block.BlockFunction.call(BlockFunction.java:21) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) Caused by: java.lang.NullPointerException at zingg.common.core.block.BlockFunction.call(BlockFunction.java:21) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 24/09/24 18:35:48 WARN Email: Unable to send email Can't send command to SMTP host 24/09/24 18:35:48 WARN Client: Apologies for this message. Zingg has encountered an error. Job aborted due to stage failure: Task 0 in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 (TID 22) (10.255.255.254 executor driver): java.lang.NullPointerException at zingg.common.core.block.BlockFunction.call(BlockFunction.java:21) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

Driver stacktrace: zingg.common.client.ZinggClientException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 (TID 22) (10.255.255.254 executor driver): java.lang.NullPointerException at zingg.common.core.block.BlockFunction.call(BlockFunction.java:21) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

Driver stacktrace: at zingg.common.core.executor.TrainingDataFinder.execute(TrainingDataFinder.java:139) at zingg.common.client.Client.execute(Client.java:251) at zingg.common.client.Client.mainMethod(Client.java:187) at zingg.spark.client.SparkClient.main(SparkClient.java:76) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1029) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

sania-16 commented 4 days ago

what is the data type being used for date in the config @magadummanoj ?

sonalgoyal commented 4 days ago

it is date