TileDB-Inc / TileDB-Spark

Spark interface to the TileDB storage manager
MIT License
15 stars 3 forks source link

no duplicates allowed? #154

Closed bruno-ariano closed 6 days ago

bruno-ariano commented 3 weeks ago

When writing a dataframe in a tiledb format i got the following error:

Error: Duplicate coordinates (17437310, T2, CD1) are not allowed

I thought tiledb-spark allowed for sparse arrays?

leipzig commented 3 weeks ago

Hi Bruno, can you check that you allowed duplicates when creating the schema? https://docs.tiledb.com/main/how-to/arrays/creating-arrays/creating-the-array-schema#allowing-duplicates

DimitrisStaratzis commented 3 weeks ago

Hi @bruno-ariano, Thank you for pointing this out! To add to @leipzig's response It looks like our README was missing the explanation for the schema.set_allows_dups write option. For more information, please refer to #155

Update: Our main branch has been updated to include the option description

bruno-ariano commented 3 weeks ago

Thank you @leipzig and @DimitrisStaratzis for the quick answer. Using the schema.set_allows_dups as shown below solved the issue .

>>>test_write.write.format("io.tiledb.spark").option("schema.dim.0.name", "position").option("schema.dim.1.name", "g1").option("schema.dim.2.name", "c2").option("schema.set_allows_dups", True).save("test_spark_tiledb")

However it seems now I have another problem. When I load the saved data, the queries do not work properly on the position column as you can see below.

>>>df = spark.read.format("io.tiledb.spark").load("test_spark_tiledb")
>>> sql_df = spark.sql("SELECT * FROM tiledbArray WHERE position < 16849573")
>>> sql_df.show()
|position|           g1|    c2|            S|                beta|           p-value|
|23538587|E30|C1|C_23538|  0.11|    0.186792280441147|
|23538587|G20|C1|C_23538|  0.21|   0.644820366777995|

At first I thought that when I was saving the data using the sparse and duplicate mode the positions were somehow mapped to some other indexes or maybe was a considered as string, however the schema as you can see show the data being loaded as integer so I am not sure why this is happening.


StructType([StructField('position', IntegerType(), False), StructField('g1', StringType(), False), StructField('c2', StringType(), False), StructField('S', StringType(), True), StructField('beta', DoubleType(), True), StructField('p-value', DoubleType(), True)])

Thank you

DimitrisStaratzis commented 3 weeks ago

If possible could you please provide the code you have used to create the dataframe along with any data files (e.g. csv) you used? I could then try to reproduce your issue.

Thank you!

bruno-ariano commented 2 weeks ago

Hi @DimitrisStaratzis, unfortunately I cannot share any part of my data though I was able to recreate the issue by generating a random dataframe following these instructions:

First create a random dataframe and write it somewhere

import pandas as pd
import numpy as np

# Number of rows
num_rows = 10000

# Create the 'position' column as an integer ranging from 0 to 9999
positions = np.arange(num_rows)

# Create the 'g1' column with a different string every 10 lines
g1_values = ['Group_' + str(i // 10) for i in range(num_rows)]

# Create the 'type' column with a different string every 100 lines
type_values = ['Type_' + str(i // 100) for i in range(num_rows)]

# Create the 'beta' and 't-stat' columns as random floats
beta_values = np.random.randn(num_rows)
t_stat_values = np.random.randn(num_rows)

# Combine all columns into a DataFrame
df = pd.DataFrame({
    'position': positions,
    'g1': g1_values,
    'type': type_values,
    'beta': beta_values,
    't-stat': t_stat_values

df.to_csv('dataframe_output_test_tiledb_spark_error.csv', index=False)

Then read it back using Spark, convert it in TileDB, read it back and query using sql format

schema = StructType([
    StructField("position", IntegerType(), True),
    StructField("g1", StringType(), True),
    StructField("type", StringType(), True),
    StructField("beta", DoubleType(), True),
    StructField("t-stat", DoubleType(), True)

test_spark = spark.read.format("io.tiledb.spark").option("delimiter",",").csv("dataframe_output_test_tiledb_spark_error.csv",header=True,schema = schema)

test_spark.write.format("io.tiledb.spark").mode('overwrite').option("schema.dim.0.name","position").option("schema.dim.1.name", "g1").option("schema.dim.2.name", "type").option("schema.set_allows_dups", True).save("test_spark_tiledb_error")

df = spark.read.format("io.tiledb.spark").load("test_spark_tiledb_error")
sql_df = spark.sql("SELECT * FROM tiledbArray WHERE position < 1345")

In this case instead of getting the wrong results I get the error:

io.tiledb.java.api.TileDBError: TileDB internal: Lower range bound 10864 cannot be larger than the higher bound 1344

However when I filter for position higher than 1345 then I get the results

sql_df = spark.sql("SELECT * FROM tiledbArray WHERE position > 1345")


Thank you

DimitrisStaratzis commented 1 week ago

Thank you for providing the example; it will be very helpful. We'll investigate and get back to you with updates as soon as possible.

DimitrisStaratzis commented 1 week ago

Hi @bruno-ariano ,

Here is how to use the csv to create the TileDB array correctly.

  1. Create the csv file, no changes here:
    import pandas as pd
    import numpy as np

Number of rows

num_rows = 10000

Create the 'position' column as an integer ranging from 0 to 9999

positions = np.arange(num_rows)

Create the 'g1' column with a different string every 10 lines

g1values = ['Group' + str(i // 10) for i in range(num_rows)]

Create the 'type' column with a different string every 100 lines

typevalues = ['Type' + str(i // 100) for i in range(num_rows)]

Create the 'beta' and 't-stat' columns as random floats

beta_values = np.random.randn(num_rows) t_stat_values = np.random.randn(num_rows)

Combine all columns into a DataFrame

df = pd.DataFrame({ 'position': positions, 'g1': g1_values, 'type': type_values, 'beta': beta_values, 't-stat': t_stat_values })

df.to_csv('dataframe_output_test_tiledb_spark_error.csv', index=False)

2. Read the csv file as a dataframe, write the dataframe to a TileDB array and read it back with filtering
// read the csv to a spark dataframe
  Dataset<Row> ds = session().read().option("header", true).csv(pathFinder("test.csv"));
  String URI = "test_spark_tiledb_error";

// Write the dataframe to a TileDB array
          .option("schema.dim.1.name", "g1")
          .option("schema.dim.2.name", "type")
          .option("schema.set_allows_dups", true)

// query the TileDB array
  Dataset<Row> dfRead = session().read().format("io.tiledb.spark").load(URI);
  List<Row> rows = session().sql("SELECT * FROM array WHERE position < 200").collectAsList();

  for (Row row : rows) {

This is what I got. Filtering works correctly.

bruno-ariano commented 6 days ago

Thanks @DimitrisStaratzis