Unstructured-IO / unstructured

Open source libraries and APIs to build custom preprocessing pipelines for labeling, training, or production machine learning pipelines.
https://www.unstructured.io/
Apache License 2.0
9.21k stars 764 forks source link

bug/'General' object has no attribute 'partition_async' #3785

Closed alexander-zuev closed 3 days ago

alexander-zuev commented 3 days ago

Describe the bug Trying to run batch pipeline

To Reproduce import asyncio import tempfile from uuid import UUID

from unstructured_ingest.v2.interfaces import ProcessorConfig from unstructured_ingest.v2.pipeline.pipeline import Pipeline from unstructured_ingest.v2.processes.chunker import ChunkerConfig from unstructured_ingest.v2.processes.connectors.local import ( LocalConnectionConfig, LocalDownloaderConfig, LocalIndexerConfig, LocalUploaderConfig, ) from unstructured_ingest.v2.processes.partitioner import PartitionerConfig

from src.infrastructure.config.settings import settings from src.infrastructure.external.supabase_client import supabase_client from src.infrastructure.storage.data_repository import DataRepository from src.models.content_models import Document from src.services.data_service import DataService

class UnstructuredChunker: """Class for processing documents using the Unstructured API."""

def __init__(self, data_service: DataService | None = None):
    self.data_service = data_service

async def process_documents(self, source_id: UUID) -> None:
    """Process documents in batches for a given source."""
    documents = await self.data_service.get_documents_by_source(source_id)
    self._process_batch(documents)

def _process_batch(self, documents: list[Document]) -> None:
    """Process a batch of documents using unstructured API."""
    # Update status to processing
    # for doc in documents:
    #     await self.data_service.update_document_status(doc.document_id, DocumentStatus.PROCESSING)

    with tempfile.TemporaryDirectory() as temp_dir:
        # Save documents as files
        for doc in documents:
            file_path = f"{temp_dir}/{doc.document_id}.md"
            with open(file_path, "w") as f:
                f.write(doc.content)

        # Configure and run unstructured pipeline
        pipeline = Pipeline.from_configs(
            context=ProcessorConfig(),
            indexer_config=LocalIndexerConfig(input_path=temp_dir),
            downloader_config=LocalDownloaderConfig(),
            source_connection_config=LocalConnectionConfig(),
            partitioner_config=PartitionerConfig(
                partition_by_api=True,
                api_key=settings.unstructured_api_key,
                partition_endpoint=settings.unstructured_api_url,
                strategy="auto",
            ),
            chunker_config=ChunkerConfig(
                chunking_strategy="by_title",
                chunk_max_characters=512,
                chunk_overlap=20,
                chunk_combine_text_under_n_chars=150,
            ),
            uploader_config=LocalUploaderConfig(output_dir=temp_dir + "/output"),
        )

        pipeline.run()

        # Process results
        # await self._store_chunks(documents, temp_dir + "/output")

        # Update document status
        # for doc in documents:
        # await self.document_repository.update_document_status(doc.document_id, DocumentStatus.CHUNKED)

# async def _store_chunks(self, documents: list[Document], output_dir: str) -> None:
#     """Store chunks for a list of documents."""
#     # Load chunks from output directory
#     chunks = []
#     for doc in documents:
#         file_path = f"{output_dir}/{doc.document_id}.json"
#         with open(file_path) as f:
#             chunks.extend(json.load(f))

async def run_chunker(source_id: UUID) -> None:

Initialize Supabase client

repository = DataRepository(supabase_client)

# Initialize data service
data_service = DataService(repository)

# Initialize the chunker
chunker = UnstructuredChunker(data_service)

# Process documents
await chunker.process_documents(source_id)

if name == "main":

Define the source_id you want to test with

source_id = UUID("4102cd35-af4d-40dd-b45a-453a4cff00a9")

# Run the chunker
asyncio.run(run_chunker(source_id))

Expected behavior Pipeline runs

Screenshots If applicable, add screenshots to help explain your problem.

Environment Info kollektiv-py3.12az@AZ-MBP kollektiv % /Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/bin/python /Users/az/CursorProjects/kollektiv/src/core/content/unstructured_chunker.py 2024-11-18 18:27:04,238 MainProcess INFO created index with configs: {"input_path": "/var/folders/83/06q52hv166395skv171z0sdr0000gn/T/tmpxejerick", "recursive": false}, connection configs: {"access_config": "**"} 2024-11-18 18:27:04,238 MainProcess INFO Created download with configs: {"download_dir": null}, connection configs: {"access_config": "**"} 2024-11-18 18:27:04,238 MainProcess INFO created partition with configs: {"strategy": "auto", "ocr_languages": null, "encoding": null, "additional_partition_args": null, "skip_infer_table_types": null, "fields_include": ["element_id", "text", "type", "metadata", "embeddings"], "flatten_metadata": false, "metadata_exclude": [], "element_exclude": [], "metadata_include": [], "partition_endpoint": "https://api.unstructured.io/general/v0/general", "partition_by_api": true, "api_key": "*", "hi_res_model_name": null} 2024-11-18 18:27:04,238 MainProcess INFO created chunk with configs: {"chunking_strategy": "by_title", "chunking_endpoint": "https://api.unstructuredapp.io/general/v0/general", "chunk_by_api": false, "chunk_api_key": null, "chunk_combine_text_under_n_chars": 150, "chunk_include_orig_elements": null, "chunk_max_characters": 512, "chunk_multipage_sections": true, "chunk_new_after_n_chars": null, "chunk_overlap": 20, "chunk_overlap_all": null} 2024-11-18 18:27:04,238 MainProcess INFO Created upload with configs: {"output_dir": "/var/folders/83/06q52hv166395skv171z0sdr0000gn/T/tmpxejerick/output"}, connection configs: {"access_config": "**"} 2024-11-18 18:27:04,238 MainProcess INFO running local pipeline: index (LocalIndexer) -> download (LocalDownloader) -> partition (auto) -> chunk (by_title) -> upload (LocalUploader) with configs: {"reprocess": false, "verbose": false, "tqdm": false, "work_dir": "/Users/az/.cache/unstructured/ingest/pipeline", "num_processes": 2, "max_connections": null, "raise_on_error": false, "disable_parallelism": false, "preserve_downloads": false, "download_only": false, "re_download": false, "uncompress": false, "iter_delete": false, "delete_cache": false, "otel_endpoint": null, "status": {}} 2024-11-18 18:27:05,774 MainProcess INFO index finished in 2.7e-05s 2024-11-18 18:27:05,776 MainProcess INFO calling DownloadStep with 1 docs 2024-11-18 18:27:05,776 MainProcess INFO processing content async 2024-11-18 18:27:05,776 MainProcess WARNING async code being run in dedicated thread pool to not conflict with existing event loop: <_UnixSelectorEventLoop running=True closed=False debug=False> 2024-11-18 18:27:05,779 MainProcess INFO download finished in 0.002183s, attributes: file_id=31db2781fc0e 2024-11-18 18:27:05,779 MainProcess INFO download step finished in 0.00311s 2024-11-18 18:27:05,779 MainProcess INFO calling PartitionStep with 1 docs 2024-11-18 18:27:05,779 MainProcess INFO processing content async 2024-11-18 18:27:05,779 MainProcess WARNING async code being run in dedicated thread pool to not conflict with existing event loop: <_UnixSelectorEventLoop running=True closed=False debug=False> 2024-11-18 18:27:05,902 MainProcess INFO partition finished in 0.122278s, attributes: file_id=31db2781fc0e INFO: partition finished in 0.122278s, attributes: file_id=31db2781fc0e 2024-11-18 18:27:05,902 MainProcess ERROR Exception raised while running partition Traceback (most recent call last): File "/Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/lib/python3.12/site-packages/unstructured_ingest/v2/pipeline/interfaces.py", line 171, in run_async return await self._run_async(fn=fn, kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/lib/python3.12/site-packages/unstructured_ingest/v2/pipeline/steps/partition.py", line 66, in _run_async partitioned_content = await fn(fn_kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/lib/python3.12/site-packages/unstructured_ingest/v2/processes/partitioner.py", line 188, in run_async return await self.partition_via_api(filename, metadata=metadata, *kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/lib/python3.12/site-packages/unstructured_ingest/utils/dep_check.py", line 62, in wrapper_async return await func(args, kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/lib/python3.12/site-packages/unstructured_ingest/v2/processes/partitioner.py", line 170, in partition_via_api elements = await call_api_async( ^^^^^^^^^^^^^^^^^^^^^ File "/Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/lib/python3.12/site-packages/unstructured_ingest/v2/unstructured_api.py", line 74, in call_api_async res = await client.general.partition_async(request=partition_request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AttributeError: 'General' object has no attribute 'partition_async' ERROR: Exception raised while running partition Traceback (most recent call last): File "/Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/lib/python3.12/site-packages/unstructured_ingest/v2/pipeline/interfaces.py", line 171, in run_async return await self._run_async(fn=fn, kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/lib/python3.12/site-packages/unstructured_ingest/v2/pipeline/steps/partition.py", line 66, in _run_async partitioned_content = await fn(fn_kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/lib/python3.12/site-packages/unstructured_ingest/v2/processes/partitioner.py", line 188, in run_async return await self.partition_via_api(filename, metadata=metadata, *kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/lib/python3.12/site-packages/unstructured_ingest/utils/dep_check.py", line 62, in wrapper_async return await func(args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/lib/python3.12/site-packages/unstructured_ingest/v2/processes/partitioner.py", line 170, in partition_via_api elements = await call_api_async( ^^^^^^^^^^^^^^^^^^^^^ File "/Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/lib/python3.12/site-packages/unstructured_ingest/v2/unstructured_api.py", line 74, in call_api_async res = await client.general.partition_async(request=partition_request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AttributeError: 'General' object has no attribute 'partition_async' 2024-11-18 18:27:05,903 MainProcess INFO partition step finished in 0.124058s INFO: partition step finished in 0.124058s 2024-11-18 18:27:05,903 MainProcess INFO No files to process after partitioning, exiting INFO: No files to process after partitioning, exiting 2024-11-18 18:27:05,903 MainProcess INFO ingest process finished in 1.664952s INFO: ingest process finished in 1.664952s 2024-11-18 18:27:05,904 MainProcess ERROR 1 failed documents: ERROR: 1 failed documents: 2024-11-18 18:27:05,904 MainProcess ERROR /Users/az/.cache/unstructured/ingest/pipeline/index/31db2781fc0e.json: [partition] 'General' object has no attribute 'partition_async' ERROR: /Users/az/.cache/unstructured/ingest/pipeline/index/31db2781fc0e.json: [partition] 'General' object has no attribute 'partition_async' Traceback (most recent call last): File "/Users/az/CursorProjects/kollektiv/src/core/content/unstructured_chunker.py", line 106, in asyncio.run(run_chunker(source_id)) File "/Users/az/.pyenv/versions/3.12.7/lib/python3.12/asyncio/runners.py", line 194, in run return runner.run(main) ^^^^^^^^^^^^^^^^ File "/Users/az/.pyenv/versions/3.12.7/lib/python3.12/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/az/.pyenv/versions/3.12.7/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete return future.result() ^^^^^^^^^^^^^^^ File "/Users/az/CursorProjects/kollektiv/src/core/content/unstructured_chunker.py", line 98, in run_chunker await chunker.process_documents(source_id) File "/Users/az/CursorProjects/kollektiv/src/core/content/unstructured_chunker.py", line 32, in process_documents self._process_batch(documents) File "/Users/az/CursorProjects/kollektiv/src/core/content/unstructured_chunker.py", line 68, in _process_batch pipeline.run() File "/Users/az/Library/Caches/pypoetry/virtualenvs/kollektiv-1CopE29f-py3.12/lib/python3.12/site-packages/unstructured_ingest/v2/pipeline/pipeline.py", line 144, in run raise PipelineError("Pipeline did not run successfully") unstructured_ingest.v2.pipeline.pipeline.PipelineError: Pipeline did not run successfully kollektiv-py3.12az@AZ-MBP kollektiv %