zilliztech / spark-milvus

Apache License 2.0
7 stars 3 forks source link

Cannot insert spark dataframe in batches into a collection #14

Open timtimich35 opened 1 month ago

timtimich35 commented 1 month ago

Dependencies: python 3.8.12 pyspark 3.5.0 pymilvus 2.4.1 grpcio-tools 1.60.0 protobuf 4.25.3 milvus cluster was deployed in k8s using milvus operator 0.9.13

SparkSession setup:

spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName("similarity_search_mech") \
                    .config("spark.driver.extraClassPath", '/data/notebook_files/clickhouse-native-jdbc-shaded-2.6.5.jar') \
                    .config("spark.jars", '/data/notebook_files/spark-milvus-1.0.0-SNAPSHOT.jar') \
                    .config("spark.driver.memory", "4g") \
                    .getOrCreate()

Milvus setup:

client = MilvusClient(uri="http://cluster_address:19530")

# create schema
schema = MilvusClient.create_schema(auto_id=False, enable_dynamic_field=True)

# add fields to schema
schema.add_field(field_name="id", 
                 datatype=DataType.INT64, 
                 is_primary=True)

schema.add_field(field_name="vec", 
                 datatype=DataType.FLOAT_VECTOR, 
                 dim=3000)

# prepare index parameters
index_params = client.prepare_index_params()

# add indexes
index_params.add_index(field_name="id", 
                       index_type="STL_SORT")

index_params.add_index(field_name="vec", 
                       index_type="IVF_FLAT", 
                       metric_type="IP", 
                       params={"nlist": 128})

# create a collection
client.create_collection(collection_name="similarity_search_mech",
                         schema=schema,
                         index_params=index_params)

Given: I work in DataLore IDE deployed in k8s along with Milvus and Spark. I have a spark dataframe of 2.5 million rows and 2 columns, id and 3000 elements long vector of floats. I try to load it in batches 100,000 records each so it should be 25 iterations in total. None of the batches gets inserted.

Insert operation:

    batch.write \
    .mode("append") \
    .option("milvus.host", "cluster_address") \
    .option("milvus.port", "19530") \
    .option("milvus.collection.name", "similarity_search_mech") \
    .option("milvus.collection.vectorField", "vec") \
    .option("milvus.collection.vectorDim", "3000") \
    .option("milvus.collection.primaryKeyField", "id") \
    .format("milvus") \
    .save()

Can you please help me understand what do I do wrong?

timtimich35 commented 1 month ago

The error I get is saying "Py4JJavaError: An error occurred while calling o100353.save. : org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: milvus." however the jar file is just there where I specified it. SparkSession doesn't return any error on clickhouse driver I also specify and utilizes it just fine.

wayblink commented 1 month ago

@timtimich35 Obviously the spark-milvus jar is not loaded correctly. I 'm not quite familiar to pyspark. Have you ever tried .config("spark.driver.extraClassPath", '/data/notebook_files/clickhouse-native-jdbc-shaded-2.6.5.jar,/data/notebook_files/spark-milvus-1.0.0-SNAPSHOT.jar') \ .config("spark.executor.extraClassPath", '/data/notebook_files/clickhouse-native-jdbc-shaded-2.6.5.jar,/data/notebook_files/spark-milvus-1.0.0-SNAPSHOT.jar') or pyspark --jars /data/notebook_files/clickhouse-native-jdbc-shaded-2.6.5.jar,/data/notebook_files/spark-milvus-1.0.0-SNAPSHOT.jar

timtimich35 commented 1 month ago

@wayblink Nope. Haven't yet tried this approach. Will do. Will text you back.

timtimich35 commented 1 month ago

How can I get this part (marked yellow) done if I'm on Windows? image