milvus-io / pymilvus

Python SDK for Milvus.
Apache License 2.0
1.04k stars 334 forks source link

[Bug]: The bulk writer cannot convert nullable data to a CSV file. #2313

Open zhuwenxing opened 1 month ago

zhuwenxing commented 1 month ago

Is there an existing issue for this?

Describe the bug

[2024-10-25 19:42:28 - ERROR - local_bulk_writer]: Failed to fulsh, error: int() argument must be a string, a bytes-like object or a number, not 'NoneType' (local_bulk_writer.py:142)

[2024-10-25 19:42:27 - INFO - ci_test]: ################################################################################ (conftest.py:232)
[2024-10-25 19:42:27 - INFO - ci_test]: [initialize_milvus] Log cleaned up, start testing... (conftest.py:233)
[2024-10-25 19:42:27 - INFO - ci_test]: [setup_class] Start setup class... (client_base.py:40)
[2024-10-25 19:42:27 - INFO - ci_test]: *********************************** setup *********************************** (client_base.py:46)
[2024-10-25 19:42:27 - INFO - ci_test]: [setup_method] Start setup test case test_with_all_field_csv_with_bulk_writer. (client_base.py:47)
-------------------------------- live log call ---------------------------------
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Data path created: /Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/bulk_writer (local_bulk_writer.py:77)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Data path created: /Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/bulk_writer/6c82cb06-dff3-4dcf-ba6e-e2a48c89f262 (local_bulk_writer.py:82)
[2024-10-25 19:42:28 - INFO - remote_bulk_writer]: Minio/S3 blob storage client successfully initialized (remote_bulk_writer.py:157)
[2024-10-25 19:42:28 - INFO - remote_bulk_writer]: Remote buffer writer initialized, target path: /bulk_data/6c82cb06-dff3-4dcf-ba6e-e2a48c89f262 (remote_bulk_writer.py:123)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Prepare to flush buffer, row_count: 1000, size: 1360576 (local_bulk_writer.py:109)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Flush thread begin, name: Thread-10 (local_bulk_writer.py:116)
[2024-10-25 19:42:28 - ERROR - local_bulk_writer]: Failed to fulsh, error: int() argument must be a string, a bytes-like object or a number, not 'NoneType' (local_bulk_writer.py:142)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Wait flush to finish (local_bulk_writer.py:120)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Flush thread finished, name: Thread-10 (local_bulk_writer.py:146)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Commit done with async=False (local_bulk_writer.py:124)
[2024-10-25 19:42:28 - INFO - ci_test]: bulk insert files: [] (test_bulk_insert.py:1874)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Delete local directory '/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/bulk_writer/6c82cb06-dff3-4dcf-ba6e-e2a48c89f262' (local_bulk_writer.py:88)
[2024-10-25 19:42:29 - INFO - ci_test]:  collection entities: 0 (test_bulk_insert.py:1889)
FAILED
testcases/test_bulk_insert.py:1806 (TestBulkInsert.test_with_all_field_csv_with_bulk_writer[doc-True-True-1000-128-True])
0 != 1000

Expected :1000
Actual   :0
<Click to see difference>

self = <test_bulk_insert.TestBulkInsert object at 0x130eae7c0>, auto_id = True
dim = 128, entities = 1000, enable_dynamic_field = True, sparse_format = 'doc'
nullable = True

    @pytest.mark.tags(CaseLabel.L3)
    @pytest.mark.parametrize("auto_id", [True, False])
    @pytest.mark.parametrize("dim", [128])  # 128
    @pytest.mark.parametrize("entities", [1000])  # 1000
    @pytest.mark.parametrize("enable_dynamic_field", [True, False])
    @pytest.mark.parametrize("nullable", [True, False])
    @pytest.mark.parametrize("sparse_format", ["doc", "coo"])
    def test_with_all_field_csv_with_bulk_writer(self, auto_id, dim, entities, enable_dynamic_field, sparse_format, nullable):
        """
        """
        self._connect()
        fields = [
            cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
            cf.gen_int64_field(name=df.int_field, nullable=nullable),
            cf.gen_float_field(name=df.float_field, nullable=nullable),
            cf.gen_string_field(name=df.string_field, nullable=nullable),
            cf.gen_json_field(name=df.json_field, nullable=nullable),
            cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64, nullable=nullable),
            cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT, nullable=nullable),
            cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100, nullable=nullable),
            cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL, nullable=nullable),
            cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
            cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim),
            cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=dim),
            cf.gen_sparse_vec_field(name=df.sparse_vec_field),
        ]
        c_name = cf.gen_unique_str("bulk_insert")
        schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
        self.collection_wrap.init_collection(c_name, schema=schema)
        with RemoteBulkWriter(
            schema=schema,
            remote_path="bulk_data",
            connect_param=RemoteBulkWriter.ConnectParam(
                bucket_name=self.bucket_name,
                endpoint=self.minio_endpoint,
                access_key="minioadmin",
                secret_key="minioadmin",
            ),
            file_type=BulkFileType.CSV,
        ) as remote_writer:
            json_value = [
                {"key": "value"},
            ]
            for i in range(entities):
                row = {
                    df.pk_field: i,
                    df.int_field: 1 if not (nullable and random.random() < 0.5) else None,
                    df.float_field: 1.0 if not (nullable and random.random() < 0.5) else None,
                    df.string_field: "string" if not (nullable and random.random() < 0.5) else None,
                    df.json_field: json_value[i%len(json_value)] if not (nullable and random.random() < 0.5) else None,
                    df.array_int_field: [1, 2] if not (nullable and random.random() < 0.5) else None,
                    df.array_float_field: [1.0, 2.0] if not (nullable and random.random() < 0.5) else None,
                    df.array_string_field: ["string1", "string2"] if not (nullable and random.random() < 0.5) else None,
                    df.array_bool_field: [True, False] if not (nullable and random.random() < 0.5) else None,
                    df.float_vec_field: cf.gen_vectors(1, dim)[0],
                    df.fp16_vec_field: cf.gen_vectors(1, dim, vector_data_type="FLOAT16_VECTOR")[0],
                    df.bf16_vec_field: cf.gen_vectors(1, dim, vector_data_type="BFLOAT16_VECTOR")[0],
                    df.sparse_vec_field: cf.gen_sparse_vectors(1, dim, sparse_format=sparse_format)[0]
                }
                if auto_id:
                    row.pop(df.pk_field)
                if enable_dynamic_field:
                    row["name"] = fake.name()
                    row["address"] = fake.address()
                remote_writer.append_row(row)
            remote_writer.commit()
            files = remote_writer.batch_files
            log.info(f"bulk insert files: {files}")
        # import data
        for f in files:
            t0 = time.time()
            task_id, _ = self.utility_wrap.do_bulk_insert(
                collection_name=c_name, files=f
            )
            logging.info(f"bulk insert task ids:{task_id}")
            success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
                task_ids=[task_id], timeout=300
            )
            tt = time.time() - t0
            log.info(f"bulk insert state:{success} in {tt} with states:{states}")
            assert success
        num_entities = self.collection_wrap.num_entities
        log.info(f" collection entities: {num_entities}")
>       assert num_entities == entities
E       assert 0 == 1000

test_bulk_insert.py:1890: AssertionError

Expected Behavior

No response

Steps/Code To Reproduce behavior

No response

Environment details

- Hardware/Softward conditions (OS, CPU, GPU, Memory):
- Method of installation (Docker, or from source):
- Milvus version (v0.3.1, or v0.4.0):
- Milvus configuration (Settings you made in `server_config.yaml`):

Anything else?

No response

binbinlv commented 1 month ago

Now bulk writer does not support nullable, @smellthemoon could you please confirm? Thanks.

binbinlv commented 1 month ago

/assign @smellthemoon

smellthemoon commented 1 month ago

seems related with #2281

smellthemoon commented 1 month ago

could you help to verify it? @zhuwenxing

smellthemoon commented 1 month ago

/assign @zhuwenxing