zilliztech / spark-milvus

Apache License 2.0
7 stars 3 forks source link

Missing insertion data exists during bulk write #11

Open Uijeong97 opened 7 months ago

Uijeong97 commented 7 months ago

Hi, I'm using spark milvus connector to do bulk insert, but I have an issue with some data missing.

I'm getting an error from the proxy, and It is a describe collection error. I think it's fatal to have missing data. check please.

milvus_writer = df.write \
    .format("milvus") \
    .mode("append") \
    .option("milvus.host", host) \
    .option("milvus.port", port) \
    .option("milvus.database.name", db_name) \
    .option("milvus.collection.name", collection_name) \
    .option("milvus.collection.vectorField", "embedding") \
    .option("milvus.collection.vectorDim", "768") \
    .option("milvus.collection.primaryKeyField", "poi_id")

I made sure segments were all flushed, and did a count check at the point where enough time had passed.

스크린샷 2024-03-27 오후 10 06 20 스크린샷 2024-03-27 오후 10 06 27
2024-03-27 21:21:40,046 ERROR client.AbstractMilvusGrpcClient: DescribeCollectionRequest failed:can't find collection collection not found[collection=448192185218736664]
2024-03-27 21:21:40,062 ERROR client.AbstractMilvusGrpcClient: Failed to describe collection: cp_poi_embedding
스크린샷 2024-03-27 오후 10 07 07

and another log

2024-03-27 21:21:43,242 ERROR internal.ManagedChannelOrphanWrapper: *~*~*~ Channel ManagedChannelImpl{logId=135, target=gateway-pai-milvus.pai-staging-milvus.svc.pr1.io.navercorp.com:10001} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
    at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
    at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
    at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
    at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:630)
    at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:297)
    at io.milvus.client.MilvusServiceClient.<init>(MilvusServiceClient.java:143)
    at zilliztech.spark.milvus.MilvusConnection$.acquire(MilvusConnection.scala:30)
    at zilliztech.spark.milvus.writer.MilvusDataWriter.<init>(MilvusDataWriter.scala:18)
    at zilliztech.spark.milvus.writer.MilvusDataWriterFactory.createWriter(MilvusDataWriterFactory.scala:11)
    at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:459)
    at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
xiaofan-luan commented 7 months ago

/assign @wayblink

xiaofan-luan commented 7 months ago

is there a easy way to compare the data imported into milvus and the data to import?

xiaofan-luan commented 7 months ago

I think we can build a merkle tree on top of all datas? Easiest way would be directly iterator all the data out all query in batch to find missing datas,

Uijeong97 commented 7 months ago

is there a easy way to compare the data imported into milvus and the data to import?

It's hard to compare all of them, and I only extracted some of them.

part-00000-4c722b82-de61-4204-9946-c089f22da462-c000.snappy.parquet.zip

When I use the "upsert" and "delete" queries using pymilvus client on the data extracted above, it is added or deleted normally.

However, when I bulk write to the spark connector, the same data is not saved.

the collection schema is below.

스크린샷 2024-03-29 오후 12 38 34
wayblink commented 7 months ago

@Uijeong97 Hi, Sorry to hear that. How many data you lost. spark-milvus connector only support insert btw. The above error message seems not related to data lost. Did you get any more relevant error messages?

wayblink commented 7 months ago

@Uijeong97 Or could provide your code and data to me? Maybe I can try to reproduce your issue

Uijeong97 commented 7 months ago

@wayblink

hello.

My data is in the form of a parquet. I have an collection with a field of type string called 'poi_id' and 'embedding' field with 768 dim.

This is the collection schema and index information.

from pymilvus import CollectionSchema, DataType, FieldSchema

poi_id = FieldSchema(
    name="poi_id",
    dtype=DataType.VARCHAR,
    max_length=64,
    is_primary=True,
)
embedding = FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768)

schema = CollectionSchema(
    fields=[poi_id, embedding],
    description="CP Matching Vector DB",
    enable_dynamic_field=True,
)

from pymilvus import Collection, utility
collection = Collection(
    name=collection_name,
    schema=schema,
    # shards_num=2
)

index_settings = [
    {
        "field_name": "embedding",
        "index_name": "vector_index",
        "index_params": {
            "metric_type": "IP",
            "index_type": "FLAT",
        },
    },
    {
        "field_name": "poi_id",
        "index_name": "scalar_index",
    },
]

from pymilvus import utility
for setting in index_settings:
    collection.create_index(**setting)
    utility.index_building_progress(collection_name, index_name=setting["index_name"])

And I tried to save it into milvus using the spark milvus connector from pymilvus in the following way.

# https://milvus.io/docs/integrate_with_spark.md

milvus_writer = poi_vector_db_df.write \
    .format("milvus") \
    .mode("append") \
    .option("milvus.host", host) \
    .option("milvus.port", port) \
    .option("milvus.database.name", db_name) \
    .option("milvus.collection.name", collection_name) \
    .option("milvus.collection.vectorField", "embedding") \
    .option("milvus.collection.vectorDim", "768") \
    .option("milvus.collection.primaryKeyField", "poi_id")

I've extracted only the data that is missed.

part-00000-4c722b82-de61-4204-9946-c089f22da462-c000.snappy.parquet.zip

Uijeong97 commented 7 months ago

@wayblink

I think the problem is that there was missing data, but the spark write operation ended successfully.

Did you get any more relevant error messages?

Um.. That seems to be the only thing that looks like an error in the logs.

wayblink commented 7 months ago

@Uijeong97 alright, I will try to reproduce it. Can it be reproduced in your environment? What is your milvus version

Uijeong97 commented 7 months ago

@wayblink

Thanks. In my environment, it reproduces on load with the above extracted data.

wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar

sayanbiswas59 commented 2 months ago

Hi @Uijeong97, I quickly wanted to check with you if you have checked the embeddings were inserted and getting stored correctly in Milvus, as I am doing something similar but my embeddings get changed when inserted using the spark-milvus connector as shown below:

Raw values of the embeddings that we want to ingest: [0.04970105364918709, 0.03401502966880798, 0.003363868221640587,..]

The embedding values that appear in Milvus is : [-1.0842022e-19, -1.2552958, -0.0, ..]

Could you please help me resolve this and please let me know if I am missing something? TIA!

wayblink commented 2 months ago

Hi @Uijeong97, I quickly wanted to check with you if you have checked the embeddings were inserted and getting stored correctly in Milvus, as I am doing something similar but my embeddings get changed when inserted using the spark-milvus connector as shown below:

Raw values of the embeddings that we want to ingest: [0.04970105364918709, 0.03401502966880798, 0.003363868221640587,..]

The embedding values that appear in Milvus is : [-1.0842022e-19, -1.2552958, -0.0, ..]

Could you please help me resolve this and please let me know if I am missing something? TIA!

Hi, it seems a serious problem and I'd like to look into it. Can you provide me more info like your code? I will try to reproduce it.

sayanbiswas59 commented 2 months ago

Hi @wayblink It seems like the issue was due to the datatype mismatch of the embedding field between Milvus and Spark DataFrame. In Milvus, we are using the DataType.FLOAT_VECTOR, and in the Spark DataFrame, the embedding field is of type array of double. The issue gets resolved when we convert the embedding field to array of float in the Spark DataFrame before ingesting it to Milvus.

wayblink commented 2 months ago

Hi @wayblink It seems like the issue was due to the datatype mismatch of the embedding field between Milvus and Spark DataFrame. In Milvus, we are using the DataType.FLOAT_VECTOR, and in the Spark DataFrame, the embedding field is of type array of double. The issue gets resolved when we convert the embedding field to array of float in the Spark DataFrame before ingesting it to Milvus.

Cool! Thanks. Please let me know if you have any issue or suggestion for spark-milvus connector

sayanbiswas59 commented 2 months ago

Hi @wayblink, We are currently planning to ingest approximately 500 million image embeddings to Milvus, leveraging the spark-milvus connector and Pyspark. We have noticed that two data formats - 'Milvus' and 'mjson', are recommended for data ingestion. Additionally, we are aware that MilvusUtils is currently only supported for Scala. Given these factors, we would greatly appreciate if you could guide us on how to effectively perform a bulk insert to Milvus using Pyspark. Thank you in advance for your assistance.

sayanbiswas59 commented 1 month ago

Hi @wayblink Could you please help guide us how to effectively perform a bulk insert to Milvus using Pyspark? Thanks!

wayblink commented 1 month ago

@sayanbiswas59 OK, I'd like to support you do this. I can provide some demo for you. What is your original data format?

wayblink commented 1 month ago

@sayanbiswas59 By the way, what is your Milvus version? Considering milvus already support bulkinsert parquet format. It is no need to use 'mjson'. I think do some processing in pyspark, store data in parquet format, and then bulkinsert into milvus might be an approach. Maybe you can try it with a small piece of data first?

sayanbiswas59 commented 1 month ago

Hi @wayblink thank you for your response. We are using Milvus 2.4.4. The id, embeddings, url are stored in a parquet format. We want to bulk ingest them to Milvus using Pyspark(spark 3.3.2)

wayblink commented 1 month ago

Hi @wayblink thank you for your response. We are using Milvus 2.4.4. The id, embeddings, url are stored in a parquet format. We want to bulk ingest them to Milvus using Pyspark(spark 3.3.2)

Yes, I think you don't need spark-connector. What you need to do:

  1. move data to milvus storage(minio or s3), so bulkinsert can read the files
  2. create a collection based on your schema
  3. write a script to list the parquet directory and call bulkinsert by file.

Does this meet your need?