zinggAI / zingg

Scalable identity resolution, entity resolution, data mastering and deduplication using ML
GNU Affero General Public License v3.0
957 stars 120 forks source link

The Zingg Match phase often gets stuck on a spark shuffle write stage, making it run much slower #688

Closed KilianLissSMRTR closed 1 year ago

KilianLissSMRTR commented 1 year ago

Describe the bug According to the hardware sizing reccomendations of the documentation.

120k records of examples/febrl120k/test.csv take 5 minutes to run on a 4 core, 10 GB RAM local Spark cluster

On smaller data sets (Eg: 55,878 records), my match stage runtime takes less than 5 minutes as expected. Whenever I increase the number of records (Eg: 78,305), Zingg's match stage often get's stuck on various spark shuffle write operations, causing it to take 1-2 hours to run, even though I have much more hardware resources assigned to the task compare to that quoted in the documentation.

Eventually, we would like to run Zingg on 40 million records, which would get prohibitively expensive unless this issue get's resolved.

To Reproduce I am running Zingg in a databricks environment, with the following cluster configuration:

ClusterConfig

In general, I have tried changing the compute between Single and Multi-node, I've tried bigger computes, changing Zingg's "numPartitions" parameter.

After already having created sufficient training data, my python code looks as follows:

# Imports taken from example code notebook
import time
import uuid
import numpy as np
import pandas as pd
import pyspark.sql.functions as F
from ipywidgets import widgets, interact
from zingg.pipes import Pipe, FieldDefinition, MatchType
from zingg.client import Arguments, ClientOptions, ZinggWithSpark

# Main directory to store zingg project files in
mountPath = '/tmp/Zingg_Investigating_Optimizations'
config = {
    'database name': 'Zingg_Investigating_Optimizations', 
    'model name': 'Zingg_Investigating_Optimizations', 
    'dir': {
        'downloads': f'{mountPath}/downloads', # original unzipped data files that you will upload to the environment
        'input': f'{mountPath}/inputs',        # folder where downloaded files will be seperated into initial and incremental data assets
        'tables': f'{mountPath}/tables',       # location where external tables based on the data files will house data 
        'zingg': f'{mountPath}/zingg',         # zingg models and temp data
        'output': f'{mountPath}/output'
    }
}

# Make sure directories exist
for dir in config['dir'].values():
    dbutils.fs.mkdirs(dir)
args = Arguments()
args.setZinggDir(config['dir']['zingg'] )
args.setModelId(config['model name'])

# configure incoming Zingg input pipe
inputTable = 'optimize_FirstB_LastA'
incoming_input_dir = spark.sql(f"DESCRIBE DETAIL {inputTable}").select('location').collect()[0]['location']
incoming_inputPipe = Pipe(name=inputTable, format='delta')
incoming_inputPipe.addProperty('path', incoming_input_dir )
args.setData(incoming_inputPipe)

# configure Zingg output pipe
outputTable = 'output_optimize_FirstB_LastA3'
matched_output_dir = config['dir']['output'] + f'/incremental/{outputTable}'
outputPipe = Pipe(name=outputTable, format='delta')
outputPipe.addProperty('path', matched_output_dir)
args.setOutput(outputPipe)

# Specify how Zingg should match each variable in incoming data
fieldDefs = [
      FieldDefinition('SourceTableName', 'string', MatchType.DONT_USE)
    , FieldDefinition('SourceTableID', 'integer', MatchType.DONT_USE)
    , FieldDefinition('FirstName', 'string', MatchType.FUZZY)
    , FieldDefinition('LastName', 'string', MatchType.EXACT)
    , FieldDefinition('Email', 'string', MatchType.EMAIL)
    ]
args.setFieldDefinition(fieldDefs)
args.setLabelDataSampleSize(0.2)
args.setNumPartitions(sc.defaultParallelism * 25)

# Define trainMatch Task - Will train the Zingg model and generates potential matches
train_options = ClientOptions([ClientOptions.PHASE, 'train'])
train = ZinggWithSpark(args, train_options)
train.init()

# define match task
match_options = ClientOptions([ClientOptions.PHASE, 'match'])
match = ZinggWithSpark(args, match_options)
match.init()

# this is where Zingg stores unmarked candidate pairs produced by the findTrainData task
UNMARKED_DIR = findTrainingData.getArguments().getZinggTrainingDataUnmarkedDir() 
MARKED_DIR = findTrainingData.getArguments().getZinggTrainingDataMarkedDir() 

print(args.getArgs())
train.execute()
match.execute()

My resulting arguments look like:

{
    "output": [
        {
            "name": "output_optimize_FirstB_LastA3",
            "format": "delta",
            "props": {
                "path": "/tmp/Zingg_Investigating_Optimizations/output/incremental/output_optimize_FirstB_LastA3"
            },
            "id": 0
        }
    ],
    "data": [
        {
            "name": "optimize_FirstB_LastA",
            "format": "delta",
            "props": {
                "path": "dbfs:/tmp/Zingg_Investigating_Optimizations/tables/optimize_FirstB_LastA"
            },
            "id": 0
        }
    ],
    "zinggDir": "/tmp/Zingg_Investigating_Optimizations/zingg",
    "fieldDefinition": [
        {
            "matchType": "DONT_USE",
            "fieldName": "SourceTableName",
            "fields": "SourceTableName",
            "stopWords": null,
            "abbreviations": null,
            "dataType": "\"string\""
        },
        {
            "matchType": "DONT_USE",
            "fieldName": "SourceTableID",
            "fields": "SourceTableID",
            "stopWords": null,
            "abbreviations": null,
            "dataType": "\"integer\""
        },
        {
            "matchType": "FUZZY",
            "fieldName": "FirstName",
            "fields": "FirstName",
            "stopWords": null,
            "abbreviations": null,
            "dataType": "\"string\""
        },
        {
            "matchType": "EXACT",
            "fieldName": "LastName",
            "fields": "LastName",
            "stopWords": null,
            "abbreviations": null,
            "dataType": "\"string\""
        },
        {
            "matchType": "EMAIL",
            "fieldName": "Email",
            "fields": "Email",
            "stopWords": null,
            "abbreviations": null,
            "dataType": "\"string\""
        }
    ],
    "numPartitions": 800,
    "labelDataSampleSize": 0.2,
    "modelId": "Zingg_Investigating_Optimizations",
    "jobId": 1,
    "collectMetrics": true,
    "showConcise": false,
    "stopWordsCutoff": 0.1,
    "blockSize": 100
}

As we can see from the spark job run logs, just one of the multiple Shuffle Write stages took about 8 minutes (longer than what the entire match stage should take):

SparkRunLog

Expected behavior I don't think Zingg should be getting stuck on spark Shuffle Write stages. Please let me know if you need more information, and thanks in advance for helping out!

sonalgoyal commented 1 year ago

Thanks for using Zingg and reporting this issue @KilianLissSMRTR ! It is likely that the blocking model is not the most optimal in this case. How much training data do oyu have and how many of them are matches?

KilianLissSMRTR commented 1 year ago

Closed the issue.

KilianLissSMRTR commented 1 year ago

Hi @sonalgoyal , thanks for responding.

On average, it looks like we have around 2-3 matches per customer in our data. Our total data set consists of 46 million rows, and should have 16 million customers according to our old process (which we are looking to replace).

For our training data, we have: 396 Matches 2411 No Matches 165 Uncertain Matches

The full list of columns that we would like to use Zingg on are:

fieldDefs = [
      FieldDefinition('SourceTableName', 'string', MatchType.DONT_USE)
    , FieldDefinition('SourceTableID', 'integer', MatchType.DONT_USE)
    , FieldDefinition('FirstName', 'string', MatchType.FUZZY)
    , FieldDefinition('LastName', 'string', MatchType.EXACT)
    , FieldDefinition('dateOfBirth', 'string', MatchType.FUZZY)
    , FieldDefinition('Address', 'string', MatchType.FUZZY)
    , FieldDefinition('Suburb', 'string', MatchType.FUZZY)
    , FieldDefinition('State', 'string', MatchType.FUZZY)
    , FieldDefinition('Postcode', 'string', MatchType.NUMERIC)
    , FieldDefinition('Email', 'string', MatchType.EMAIL)
    , FieldDefinition('Landline', 'string', MatchType.NUMERIC)
    , FieldDefinition('Mobile', 'string', MatchType.NUMERIC)
    ]
sonalgoyal commented 1 year ago

The dataset and match numbers you reported should be doable with Zingg.

Are you seeing similar issue withother data sizes as well and other models? Can you share a reproducible test case for us to debug this? I will need the config, data and training data.

KilianLissSMRTR commented 1 year ago

Yes, so since I am not able to share our data, I did try replicating the issue with the NCVoters example dataset, but the issue didn't appear.

I did manage to solve it partially by using a single node cluster and reducing the Zingg "numPartitions" parameter equal to the number of cores my cluster is running (32 partition in my case), instead of 20-30 times the number of cores as mentioned in the documentation.

Occasionally, I did get some memory spill, but I'm still investigating this. Restarting my cluster seemed to solve it, but need to check that this consistently solves the issue.

sonalgoyal commented 1 year ago

Thanks for sharing this information. Please keep us posted how this goes and if we can help in any way

sonalgoyal commented 1 year ago

@KilianLissSMRTR - are you still seeing this?

sonalgoyal commented 1 year ago

@KilianLissSMRTR - please let us know if this is still an issue.

KilianLissSMRTR commented 1 year ago

Hi @sonalgoyal , this is no longer an issue. I've found tweaking the number of partitions and adjusting our clusters seemed to speed things up.

sonalgoyal commented 1 year ago

Thanks @KilianLissSMRTR