Refactor the functionality for document batching. This is currently used in the following executors and can be implemented once and imported from jina_commons instead.
It roughly looks like this:
def _batch_generator(data: List[Any], batch_size: int):
for i in range(0, len(data), batch_size):
yield data[i: i + batch_size]
def _get_docs_batch_generator(self, docs: DocumentArray, parameters: Dict):
traversal_path = parameters.get('traversal_path', self.default_traversal_path)
batch_size = parameters.get('batch_size', self.default_batch_size)
flat_docs = docs.traverse_flat(traversal_path)
filtered_docs = [doc for doc in flat_docs if doc is not None and doc.blob is not None]
return _batch_generator(filtered_docs, batch_size)
Refactor the functionality for document batching. This is currently used in the following executors and can be implemented once and imported from jina_commons instead.
It roughly looks like this:
Name | Repo URL | PR