zilliztech / spark-milvus

Apache License 2.0
7 stars 3 forks source link

[Bug]: Add support for SPARSE_FLOAT_VECTOR #18

Open juanandreas opened 1 month ago

juanandreas commented 1 month ago

Is there an existing issue for this?

Describe the bug

Writing data with sparse vector gets unrecognized class error:

fields = [
   FieldSchema(name="id", is_primary=True, dtype=DataType.VARCHAR, max_length=100),
   FieldSchema(name="sparse_vector", dtype=DataType.SPARSE_FLOAT_VECTOR)
]

...

spark_df.write \
  .mode("append") \
  .option("milvus.host", MILVUS_HOST) \
  .option("milvus.port", MILVUS_PORT) \
  .option("milvus.collection.name", COLLECTION_NAME) \
  .option("milvus.collection.vectorField", "sparse_vector") \
  .option("milvus.collection.primaryKeyField", "id") \
  .option("milvus.database.name", "default") \
  .format("milvus") \
  .save()
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 72) (10.126.148.241 executor 0): scala.MatchError: UNRECOGNIZED (of class io.milvus.grpc.DataType)

Expected Behavior

Pymilvus should be able to recognize SPARSE_FLOAT_VECTOR data type.

Full error stack trace:

Py4JJavaError: An error occurred while calling o1300.save.
: org.apache.spark.SparkException: [WRITING_JOB_ABORTED] Writing job aborted
    at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:996)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:419)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:363)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:249)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:342)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:341)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:249)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:247)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:250)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:400)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:195)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:995)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:149)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:350)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:247)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:232)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:245)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:238)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:99)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:298)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:294)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:238)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:354)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:238)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:192)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:183)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:274)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:965)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:347)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:259)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 72) (10.126.148.241 executor 0): scala.MatchError: UNRECOGNIZED (of class io.milvus.grpc.DataType)
    at zilliztech.spark.milvus.writer.MilvusDataWriter$.$anonfun$newInsertBuffer$2(MilvusDataWriter.scala:94)
    at scala.runtime.java8.JFunction1$mcZI$sp.apply(JFunction1$mcZI$sp.java:23)
    at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
    at scala.collection.immutable.Range.foreach(Range.scala:158)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
    at zilliztech.spark.milvus.writer.MilvusDataWriter$.newInsertBuffer(MilvusDataWriter.scala:93)
    at zilliztech.spark.milvus.writer.MilvusDataWriter.<init>(MilvusDataWriter.scala:25)
    at zilliztech.spark.milvus.writer.MilvusDataWriterFactory.createWriter(MilvusDataWriterFactory.scala:11)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:440)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:391)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)
    at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:126)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:96)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3420)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3342)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3331)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3331)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1439)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1439)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1439)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3631)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3569)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3557)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1188)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1176)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2754)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2737)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:387)
    ... 49 more
Caused by: scala.MatchError: UNRECOGNIZED (of class io.milvus.grpc.DataType)
    at zilliztech.spark.milvus.writer.MilvusDataWriter$.$anonfun$newInsertBuffer$2(MilvusDataWriter.scala:94)
    at scala.runtime.java8.JFunction1$mcZI$sp.apply(JFunction1$mcZI$sp.java:23)
    at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
    at scala.collection.immutable.Range.foreach(Range.scala:158)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
    at zilliztech.spark.milvus.writer.MilvusDataWriter$.newInsertBuffer(MilvusDataWriter.scala:93)
    at zilliztech.spark.milvus.writer.MilvusDataWriter.<init>(MilvusDataWriter.scala:25)
    at zilliztech.spark.milvus.writer.MilvusDataWriterFactory.createWriter(MilvusDataWriterFactory.scala:11)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:440)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:391)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)
    at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:126)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:96)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
xiaofan-luan commented 1 month ago

anyone want to take this issue?

juanandreas commented 1 month ago

Is there a timeline for when we'll have support for this feature?

xiaofan-luan commented 1 month ago

If no one take this issue, we may need some time due to lack of human power. but ideally this could be fixed in next months

juanandreas commented 1 month ago

Does pymilvus do_bulk_insert support sparse vector at this time? I am experiencing a bug where dimension is attempted to be looked up on a sparse vector:

{'failed_reason': 'typeutil.GetDim should not invoke on sparse vector type', 'progress_percent': '0'},

I can't find this error message in the pymilvus github repository.

xiaofan-luan commented 1 month ago

This is gonna to be supported on 2.4.3 https://milvus.io/docs/release_notes.md#Release-Notes