xorbitsai / xorbits

Scalable Python DS & ML, in an API compatible & lightning fast way.
https://xorbits.readthedocs.io
Apache License 2.0
1.11k stars 67 forks source link

How to perform deduplication in a cluster environment? #759

Open Dzg0309 opened 10 months ago

Dzg0309 commented 10 months ago

When setting up the cluster environment, I want to run a deduplication task for a large data set (1T, stored locally), but how should I load the data? Should I put all the data on the supervisor node and then load it? Or should we divide the data equally into each node, and then run xorbits.init(address=http://supervisor_ip:web_port) on the supervisor node to load all the node data for deduplication? Please answer it, thank you~

ChengjieLi28 commented 10 months ago

Hi, @Dzg0309 . You should upload your data to a filesystem like S3 and use xorbits interfaces like read_csv, read_parquet (According to your data) to read it, then use deduplication operation to complete what you want. Here's an example:

  1. First start you xorbits cluster, get the xorbits cluster endpoint.
  2. 
    import xorbits
    xorbits.init("<your xorbits cluster endpoint>")

import xorbits.pandas as pd df = pd.read_csv("s3://xxx") # according to your dataset. This step will process your data in a distributed way.

then use xorbits operator to complete your task

Dzg0309 commented 10 months ago

You should upload your data to a filesystem like S3 and use xorbits interfaces like read_csv, read_parquet (According to your data) to read it,

Is it necessary to use the s3 file system? Or you can use the hadoop file system, or you can read the data files directly from the local. I tried putting all the data under the supervisor, and then when starting the cluster, other workers would report file not found errors. When the data is split to all nodes (the data directories of each node remain consistent), there will be a problem that the file does not exist during the operation, which makes me very confused.

Below is my code:

import xorbits
import xorbits.pandas as pd
from xorbits.experimental import dedup
import xorbits.datasets as xdatasets

xorbits.init(address='http://xx.xx.xxx.xx:xxxx',
              session_id = 'xorbits_dedup_test_09',
              )
ds = xdatasets.from_huggingface("/passdata/xorbits_data/mnbvc_test", split='train', cache_dir='/passdata/.cache')
df = ds.to_dataframe()

res = dedup(df, col="content", method="minhash", threshold=0.7, num_perm=128, min_length=5, ngrams=5, seed=42) # for 'minhash' method
res.to_parquet('/passdata/xorbits_data/output')
xorbits.shutdown()
codingl2k1 commented 10 months ago

You should upload your data to a filesystem like S3 and use xorbits interfaces like read_csv, read_parquet (According to your data) to read it,

Is it necessary to use the s3 file system? Or you can use the hadoop file system, or you can read the data files directly from the local. I tried putting all the data under the supervisor, and then when starting the cluster, other workers would report file not found errors. When the data is split to all nodes (the data directories of each node remain consistent), there will be a problem that the file does not exist during the operation, which makes me very confused.

Below is my code:

import xorbits
import xorbits.pandas as pd
from xorbits.experimental import dedup
import xorbits.datasets as xdatasets

xorbits.init(address='http://xx.xx.xxx.xx:xxxx',
              session_id = 'xorbits_dedup_test_09',
              )
ds = xdatasets.from_huggingface("/passdata/xorbits_data/mnbvc_test", split='train', cache_dir='/passdata/.cache')
df = ds.to_dataframe()

res = dedup(df, col="content", method="minhash", threshold=0.7, num_perm=128, min_length=5, ngrams=5, seed=42) # for 'minhash' method
res.to_parquet('/passdata/xorbits_data/output')
xorbits.shutdown()

If the data is in a local directory, then each worker should have the same copy of the data in the same local path. Or, you can put the data in a S3 directory, then each worker get it's partition from S3 directly.

If your data is in csv or parquet format, you can try the read_parquet API: https://doc.xorbits.io/en/stable/reference/pandas/generated/xorbits.pandas.read_parquet.html#xorbits.pandas.read_parquet These APIs allow for more flexible slicing of data. But local data still needs to be copied to each worker in advance.

Dzg0309 commented 10 months ago

ok I tried to copy the data to each node and it worked, but at the same time two other problems occurred:

  1. Can the read_json of xorbits directly read the data folder path and load the json file in parallel?
  2. After performing the deduplication, I wanted to use to_parquet('/passdata/xorbits_data/output') to save the data. I found that it was very slow and only saved a 0.parquet file to one of the nodes. This made me It’s a headache. I want it to be saved to multiple nodes in parallel to increase the saving speed. What should I do?
codingl2k1 commented 10 months ago

ok I tried to copy the data to each node and it worked, but at the same time two other problems occurred:

  1. Can the read_json of xorbits directly read the data folder path and load the json file in parallel?
  2. After performing the deduplication, I wanted to use to_parquet('/passdata/xorbits_data/output') to save the data. I found that it was very slow and only saved a 0.parquet file to one of the nodes. This made me It’s a headache. I want it to be saved to multiple nodes in parallel to increase the saving speed. What should I do?
  1. Currenlty, read_json API is not implemented. So, it is fall back to pandas read_json which is not distributed. If your json data is in jsonl format (each line is a json string), then we can schedule a PR to implement the distributed read_json.

  2. to_parquet accept a path contains * to write the chunk data to the node that generated it. If your to_parquet only save the data to one node, you may want to check:

    • Is * in your save path? If not, add a * to it.
    • Is the data to parquet already tiled? If not, rechunk it or use a distributed data source, e.g. read_csv..