Closed valebedu closed 10 months ago
Thanks for opening this issue @valebedu - Am I understanding correctly that the OpenAI vectorizer within Weaviate is erroring which is in turn breaking the sync?
If yes, it looks like something Weaviate should ideally handle internally.
Is it possible the specific text from your records is causing the error on OpenAI side?
That being said we can add a retry mechanism on the Airbyte side as well to retry to load objects that failed in the current batch, I'm going to work on this.
Hi @flash1293 ,
Yes that's right in my configuration Airbyte doesn't do the vectorizer, it simply sync data from source to destination. That's Weaviate job to vectorize content with OpenAI.
It's not possible that a specific text cause the error because in that run the job fail after 175000 records. In a previous run it went up to 300000 records without any issue. It's just a random unavailable server error.
A retry logic on the current batch of 128 elements could be really nice!
Weaviate is already doing retries in case of timeout and similar, but not in case of a 500 error. I added this on the Airbyte side for all objects in a batch that didn't succeed (will retry twice): https://github.com/airbytehq/airbyte/pull/32038
If the error on OpenAI side is transient, it should catch this problem
Awesome! I'll ask Weaviate team why this case is not handled on Weaviate side
@valebedu I just realized, I might have misunderstood the Weaviate code and I just configured the client incorrectly for automatic retries. Let me check again please.
No problem, also note that this is not a critical issue as it works most of the time, I just need luck for the sync to finish successfully without 5XX error.
Additionally, Weaviate released 1.22 yesterday with async vector indexing. That will allow non blocking sync and faster sync I hope, I'll try to sync with this version.
That sounds great, thanks. It's not enabled by default, but the weaviate client already has retries on all errors built in. I switched the PR linked above to enable that - this along with the async vector indexing should help a lot.
@valebedu - Thanks very much for logging this. Even if not a critical bug, feedback is helpful to our improving these connectors and is very much appreciated. π
@valebedu Were you able to run your sync successfully?
Hi @flash1293 ,
Unfortunately no π, I'm facing a new error but I think this one is on the Weaviate client side I asked on the Weavaite #support slack channel to get more info on the error
FYI the error is:
...
# same setup as other logs
...
2023-11-03 02:38:01 destination > /usr/local/lib/python3.9/site-packages/weaviate/warnings.py:80: DeprecationWarning: Dep002: You are batching manually. This means you are NOT using the client's built-in
2023-11-03 02:38:01 destination > multi-threading. Setting `batch_size` in `client.batch.configure()` to an int value will enabled automatic
2023-11-03 02:38:01 destination > batching. See:
2023-11-03 02:38:01 destination > https://weaviate.io/developers/weaviate/current/restful-api-references/batch.html#example-request-1
2023-11-03 02:38:01 destination > warnings.warn(
2023-11-03 02:38:01 source > 2023-11-03 02:38:01 INFO i.a.c.d.j.s.AdaptiveStreamingQueryConfig(accept):40 - Set new fetch size: 39624 rows
2023-11-03 02:44:28 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 5000 (20 MB)
2023-11-03 02:46:24 destination > string indices must be integers
Traceback (most recent call last):
File "/airbyte/integration_code/main.py", line 11, in <module>
DestinationWeaviate().run(sys.argv[1:])
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/destination.py", line 119, in run
for message in output_messages:
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/destination.py", line 113, in run_cmd
yield from self._run_write(config=config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin)
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/destination.py", line 49, in _run_write
yield from self.write(config=config, configured_catalog=catalog, input_messages=input_messages)
File "/airbyte/integration_code/destination_weaviate/destination.py", line 38, in write
yield from writer.write(configured_catalog, input_messages)
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/vector_db_based/writer.py", line 68, in write
self._process_batch()
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/vector_db_based/writer.py", line 48, in _process_batch
self.indexer.index(documents, namespace, stream)
File "/airbyte/integration_code/destination_weaviate/indexer.py", line 126, in index
self._flush()
File "/airbyte/integration_code/destination_weaviate/indexer.py", line 155, in _flush
results = self.client.batch.create_objects()
File "/usr/local/lib/python3.9/site-packages/weaviate/batch/crud_batch.py", line 921, in create_objects
response = self._create_data(
File "/usr/local/lib/python3.9/site-packages/weaviate/batch/crud_batch.py", line 696, in _create_data
batch_to_retry, response_json_successful = self._retry_on_error(
File "/usr/local/lib/python3.9/site-packages/weaviate/batch/crud_batch.py", line 1716, in _retry_on_error
successful_responses = new_batch.add_failed_objects_from_response(
File "/usr/local/lib/python3.9/site-packages/weaviate/batch/requests.py", line 324, in add_failed_objects_from_response
if self._skip_objects_retry(obj, errors_to_exclude, errors_to_include):
File "/usr/local/lib/python3.9/site-packages/weaviate/batch/requests.py", line 104, in _skip_objects_retry
len(entry["result"]) == 0
TypeError: string indices must be integers
...
# same failure as other logs: readFromDestination: exception caught
...
I also tried to see the diff between weaviate client v3.23.2 and v3.25.2 https://github.com/weaviate/weaviate-python-client/compare/v3.23.2...v3.25.2 but I didn't find anything relevant it's the same logic
@valebedu Are you using async vector indexing and version 1.22
? If yes, it might return some new result shape the older client library can't deal with. Judging from the error, it tries to read entry
as a dictionary but it's actually a string. In any way, I will create a PR to upgrade the client version, this can't hurt for sure.
Yes I do, I'll try again without async indexing
Hi @flash1293 FYI I downgraded Weaviate to 1.21.9 and the job finally run without any issue thanks a lot for your help :)
That's great to hear @valebedu , keep me updated in case any other problems / feature gaps come up
Connector Name
destination-weaviate
Connector Version
0.2.5
What step the error happened?
During the sync
Relevant information
Context
I'm syncing a table of 308k articles from Postgres to Weaviate and the sync failed because OpenAI return a 500 error for a single element.
update vector: connection to: OpenAI API failed with status: 500 error
The current batch of 128 elements failed and should be retried or perhaps it's better to stop it and retry it on the next job attempt.
But all the previous lines correctly added to Weaviate and vectorize should not be re added to Weaviate and revectorize, it incurs time and cost that can be avoided.
Obtained Result
Lines correctly added are revectorize
Expected Result
Lines correctly added are marked as added and are not revectorize
Step To Reproduce
Unfortunately to reproduce it you need to sync a large amount of data (really small) to weaviate in order to generate a lot of vector and be unlucky in order to get a 500 error
Or you can force the error with a proxy
Config
JOB_MAIN_CONTAINER_MEMORY_LIMIT=4096Mi
GOMEMLIMIT=2048MiB
Chunck size: 7372
Text fields to embed: publisher, publication_date, language_code, title, body
to build a custom field with secured token lengthText splitter: By separator
with Keep separator enableEmbedding: No external embedding
to let Weaviate do the vectorization jobBatch size: 128
(default)Relevant log output
Contribute