DeepRec-AI / HybridBackend

A high-performance framework for training wide-and-deep recommender systems on heterogeneous cluster
Apache License 2.0
156 stars 30 forks source link

to_sparse failed for Value with ragged_rank > 1 read from parquet file #69

Open SamJia opened 2 years ago

SamJia commented 2 years ago

Current behavior

when hb read some nested lists with ragged_rank > 1,the read Value cannot be transformed to SparseTensor by function hb.data.to_sparse.

For example: dense_feature is one of the features read by hb.data.ParquetDataset, and to_sparse does not work for it.

image

Moreover, if I swap the order of the two nested_row_splits, then it can be to_sparse.

image

So maybe the order of the nested_row_splits when reading parquet file is incorrect?

Expected behavior

the Value read from parquet file can be transformed to SparseTensor.

System information

Code to reproduce

import tensorflow as tf
import hybridbackend.tensorflow as hb
dataset = hb.data.ParquetDataset("test2.zstd.parquet", batch_size=1)
dataset = dataset.apply(hb.data.to_sparse())
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()
sess = tf.Session()
vals = sess.run(next_element)

# One more simple demo:
import tensorflow as tf
import hybridbackend.tensorflow as hb
val = hb.data.dataframe.DataFrame.Value(values = np.array([1,2,3,4,5]), nested_row_splits=(np.array([0,1,3,4,5]), np.array([0,2,4])))
sess = tf.Session()
sess.run(val.to_sparse())

Willing to contribute

Yes

2sin18 commented 2 years ago

Thanks for your report, I will look into it.

SamJia commented 2 years ago

An example to create a parquet dataset file and reproduce the error:

# Create parquet file
import pyarrow as pa
import pyarrow.parquet as pq

arr = pa.array([[[1], [2, 3]], [[4], [5]]], pa.list_(pa.list_(pa.int64())))
table = pa.Table.from_arrays([arr], ['test'])
pq.write_table(table, 'test.zstd.parquet', compression='ZSTD')

# Reading the parquet file
import tensorflow as tf
import hybridbackend.tensorflow as hb

dataset = hb.data.ParquetDataset("test.zstd.parquet", batch_size=2)
dataset = dataset.apply(hb.data.to_sparse())
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()  
sess = tf.Session()
vals = sess.run(next_element)
DelightRun commented 1 year ago

It seems this error still exists in 0.8.0

2sin18 commented 1 year ago

@DelightRun Could you try the latest commit ?

DelightRun commented 1 year ago

@DelightRun Could you try the latest commit ?

I use your pre-built v0.8.0 wheel package with TensorFlow 1.15.0. It's not very convenient for me to compile from source (I use this in our prod env, which has several limits).

However, I found it seems the problem is nested_row_splits need to be reversed:

WRONG CODE

# One more simple demo:
import tensorflow as tf
import hybridbackend.tensorflow as hb
val = hb.data.dataframe.DataFrame.Value(values = np.array([1,2,3,4,5]), nested_row_splits=(np.array([0,1,3,4,5]), np.array([0,2,4])))
sess = tf.Session()
sess.run(val.to_sparse())

RIGHT CODE

# One more simple demo:
import tensorflow as tf
import hybridbackend.tensorflow as hb
val = hb.data.dataframe.DataFrame.Value(values = np.array([1,2,3,4,5]), nested_row_splits=(np.array([0,1,3,4,5]), np.array([0,2,4]))[::-1])
sess = tf.Session()
sess.run(val.to_sparse())
2sin18 commented 1 year ago

@DelightRun Could you try the latest commit ?

I use your pre-built v0.8.0 wheel package with TensorFlow 1.15.0. It's not very convenient for me to compile from source (I use this in our prod env, which has several limits).

However, I found it seems the problem is nested_row_splits need to be reversed:

WRONG CODE

# One more simple demo:
import tensorflow as tf
import hybridbackend.tensorflow as hb
val = hb.data.dataframe.DataFrame.Value(values = np.array([1,2,3,4,5]), nested_row_splits=(np.array([0,1,3,4,5]), np.array([0,2,4])))
sess = tf.Session()
sess.run(val.to_sparse())

RIGHT CODE

# One more simple demo:
import tensorflow as tf
import hybridbackend.tensorflow as hb
val = hb.data.dataframe.DataFrame.Value(values = np.array([1,2,3,4,5]), nested_row_splits=(np.array([0,1,3,4,5]), np.array([0,2,4]))[::-1])
sess = tf.Session()
sess.run(val.to_sparse())

You are right, and the issue has been fixed, but might not released for your platform. Which Python version, CUDA version (or CPU-only), TensorFlow version do you use? I would release v1.0 in these days.

DelightRun commented 1 year ago

Hybridbackend is installed via pip: wheel == https://files.pythonhosted.org/packages/11/de/1408b520b9e4eed382ee068aba170d6089d3731f0d51ce3e898b0cb2aef6/hybridbackend_tf115_cpu-0.8.0-cp36-cp36m-manylinux_2_24_x86_64.whl

DelightRun commented 1 year ago

@DelightRun Could you try the latest commit ?

Tried the latest commit (compiled via docker), still has this error. RaggedTensor with rank >= 2 seems pretty buggy.

francktcheng commented 1 year ago

Hi @DelightRun, I tried your previous demo (with an adjustment of API accordingly) with the latest commit (4486ba138515a1dbdb6f7d542d7ad23a27476524)

# Create parquet file
import pyarrow as pa
import pyarrow.parquet as pq

arr = pa.array([[[1], [2, 3]], [[4], [5]]], pa.list_(pa.list_(pa.int64())))
table = pa.Table.from_arrays([arr], ['test'])
pq.write_table(table, './test.zstd.parquet', compression='ZSTD')

# Reading the parquet file
import tensorflow as tf
import hybridbackend.tensorflow as hb

dataset = hb.data.ParquetDataset("./test.zstd.parquet", batch_size=2)
dataset = dataset.apply(hb.data.parse())
next_element = tf.data.make_one_shot_iterator(dataset).get_next()
sess = tf.Session()
vals = sess.run(next_element)
print(vals)

The output is

{'test': SparseTensorValue(indices=array([[0, 0, 0],
       [0, 1, 0],
       [0, 1, 1],
       [1, 0, 0],
       [1, 1, 0]]), values=array([1, 2, 3, 4, 5]), dense_shape=array([2, 2, 2]))}

It seems OK and could you reproduce this result? my env is python == 3.6 tensorflow == 1.15.5 hybridbackend == 1.0.0 (cpu-only)