rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.36k stars 891 forks source link

[QST] Any magic fixes for str.edit_distance_matrix with dask_cudf across partitions? #15532

Closed dangraysf closed 6 months ago

dangraysf commented 6 months ago

I can generate sequences of str:

def generate_sequence(length):
    return ''.join(random.choices(AMINO_ACIDS, k=length))

start = time.time()

AMINO_ACIDS = 'ACDEFGHIKLMNPQRSTVWY'
LENGTH = 12
MATRIX_SIZE = 1_000
N_PART = 1

# Generate sequences
sequences = [generate_sequence(LENGTH) for _ in range(MATRIX_SIZE)]

sequences[:10]

then convert to df and then to ddf:

df = cudf.DataFrame({'sequence': sequences})
ddf = dask_cudf.from_cudf(df, npartitions=N_PART) 

and then use libcudf methods to rapidly calculate the Levenshtein_distance matrix on GPU:

final = ddf.map_partitions(lambda x: x['sequence'].str.edit_distance_matrix(), meta=('', cudf.Series))
final = final.compute()

I get the expected [n x n] matrix (although technically its a cudf.Series of lists). Amazing and fast!

However, at ~100,000 or so rows, I get the expected max string CUDA error. When I bump up the n_partitions >1, the resulting output post-compute() cudf.Series has the correct len for dim0 (n_rows) but the width of matrix aka len(final.iloc[0]) is truncated to a single partition such that the effective the matrix output is: [MATRIX_SIZE, MATRIX_SIZE//N_PART]. Thus, it looks like the edit_distance_matrix calculation is not propagated 'for free' for both dims in a ddf context using dask_cudf. Shucks!

Any tricks to get this to work?

Thx!

rjzamora commented 6 months ago

Hi @dangraysf - Thanks for filing an issue!

The edit_distance_matrix method in cudf currently operates on the entire string column at once. When you convert to a dask.dataframe.DataFrame object (using dask_cudf), you are partitioning your data into a collection of distinct cudf.DataFrame objects. Using ddf.map_partitions(func, ...) tells dask/dask_cudf that func can be independently mapped across the partitions in an "embarrassingly parallel" fashion. Unfortunately, the logic needed to perform an edit_distance_matrix calculation across the global collection is not embarrassingly parallel at all (In fact, it requires that each partition be compared to every other partition).

I don't personally have much expertise in string processing, but my sense is that the all-to-all nature of edit_distance_matrix will make it tricky to scale out. If you only want to compare every row to a smaller number of target strings (using edit_distance(target=...), then the problem becomes much easier.

dangraysf commented 6 months ago

Great insight! My workaround is the following:

AMINO_ACIDS = 'ACDEFGHIKLMNPQRSTVWY'
LENGTH = 12
MATRIX_SIZE = 250_000

# Initialize Dask client
cluster = LocalCUDACluster()
client = Client(cluster)
print(client.dashboard_link)

# Generate sequences
sequences = [generate_sequence(LENGTH) for _ in range(MATRIX_SIZE)]

df = cudf.DataFrame({'sequence': sequences})
ddf = dask_cudf.from_cudf(df, npartitions=4) 

outputs = []

for idx, s in tqdm(enumerate(sequences)):
    output = ddf['sequence'].map_partitions(lambda x: x.str.edit_distance(s), meta=(idx, 'int')).compute()
    outputs.append(pd.DataFrame({idx:output.to_dict()}))

    # Flush CUDA memory
    mempool = cp.get_default_memory_pool()
    mempool.free_all_blocks()

Exactly as you suggest -- edit_distance down the row is all goodand across partitions.

This is still much more performant than a vanilla edit_distance matrix using the the standard C implementations (rapidfuzz) and so RAPIDs cuDF + nvedit remains very good for this simple biologist without resorting to CPU-based HPC -- thank you for this package!!

rjzamora commented 6 months ago

Sounds good! Thanks again for the discussion @dangraysf

I'll close this issue for now, but feel free to follow-up if you have other questions/concerns.