duckdb / duckdb_aws

MIT License
41 stars 14 forks source link

`duckdb.duckdb.IOException: IO Error: Connection error for HTTP HEAD to ...` error while reading from `moto` mock s3 bucket #48

Open mayankanand007 opened 3 months ago

mayankanand007 commented 3 months ago

Thanks a lot for all the work with duckdb! I have found it to be extremely useful!

I'm running into an issue with querying files stored in a mock s3 bucket (working with the real s3 bucket), here are the details:

Error:

FAILED error.py::test_duckdb_aws - duckdb.duckdb.IOException: IO Error: Connection error for HTTP HEAD to 'https://some-random-bucket.test/_url/random_output_dir/test.parquet'

Steps to reproduce:

duckdb version: 1.0.0 s3fs: 2023.6.0 aiohttp: 3.9.0 aibotocore: 2.5.2 moto: 4.1.0

I'm using moto for mocking the s3 bucket. Specifically, I use the fixture defined in this comment: https://github.com/aio-libs/aiobotocore/issues/755#issuecomment-1424945194

from collections.abc import Callable
from typing import Any
from unittest.mock import MagicMock

import aiobotocore.awsrequest
import aiobotocore.endpoint
import aiohttp
import aiohttp.client_reqrep
import aiohttp.typedefs
import boto3
import botocore.awsrequest
import botocore.model
import duckdb
import pandas as pd
import pytest
from moto import mock_s3
from upath import UPath

"""
Source: https://github.com/aio-libs/aiobotocore/issues/755#issuecomment-1424945194
"""

class MockAWSResponse(aiobotocore.awsrequest.AioAWSResponse):
    """
    Mocked AWS Response.
    https://github.com/aio-libs/aiobotocore/issues/755
    https://gist.github.com/giles-betteromics/12e68b88e261402fbe31c2e918ea4168
    """

    def __init__(self, response: botocore.awsrequest.AWSResponse):
        self._moto_response = response
        self.status_code = response.status_code
        self.raw = MockHttpClientResponse(response)

    # adapt async methods to use moto's response
    async def _content_prop(self) -> bytes:
        return self._moto_response.content

    async def _text_prop(self) -> str:
        return self._moto_response.text

class MockHttpClientResponse(aiohttp.client_reqrep.ClientResponse):
    """
    Mocked HTP Response.
    See <MockAWSResponse> Notes
    """

    read_count = 0

    _loop = None  # type: ignore

    def __init__(self, response: botocore.awsrequest.AWSResponse):
        """
        Mocked Response Init.
        """

        async def read(bytes_size: int = -1) -> bytes:
            if self.read_count == 0:
                self.read_count += 1
                return response.content
            else:
                return b""

        self.content = MagicMock(aiohttp.StreamReader)
        self.content.read = read
        self.response = response

    @property
    def raw_headers(self) -> Any:
        """
        Return the headers encoded the way that aiobotocore expects them.
        """
        return {
            k.encode("utf-8"): str(v).encode("utf-8") for k, v in self.response.headers.items()
        }.items()

@pytest.fixture(scope="session", autouse=True)
def patch_aiobotocore() -> None:
    """
    Pytest Fixture Supporting S3FS Mocks.
    See <MockAWSResponse> Notes
    """

    def factory(original: Callable[[Any, Any], Any]) -> Callable[[Any, Any], Any]:
        """
        Response Conversion Factory.
        """

        def patched_convert_to_response_dict(
            http_response: botocore.awsrequest.AWSResponse,
            operation_model: botocore.model.OperationModel,
        ) -> Any:
            return original(MockAWSResponse(http_response), operation_model)

        return patched_convert_to_response_dict

    aiobotocore.endpoint.convert_to_response_dict = factory(
        aiobotocore.endpoint.convert_to_response_dict
    )

@pytest.fixture
def s3_fixture() -> boto3.resources.base.ServiceResource:
    with mock_s3():
        conn = boto3.resource("s3", region_name="us-east-1")
        yield conn

def test_duckdb_aws(s3_fixture: boto3.resources.base.ServiceResource):
    # Initialize data to lists.
    data = [{"a": 1, "b": 2, "c": 3}, {"a": 10, "b": 20, "c": 30}]

    # Creates DataFrame.
    df = pd.DataFrame(data)

    TEST_S3_BUCKET_NAME = "some-random-bucket"
    s3_fixture.create_bucket(Bucket=TEST_S3_BUCKET_NAME)

    output_dir = UPath(
        f"s3://{TEST_S3_BUCKET_NAME}/random_output_dir", s3_additional_kwargs={"ACL": "private"}
    )

    output_pth = output_dir / "test.parquet"

    df.to_parquet(output_pth, storage_options={"s3_additional_kwargs": {"ACL": "private"}})

    with duckdb.connect() as con:
        con.sql("INSTALL httpfs")
        con.sql("LOAD https")
        con.sql(f"SET s3_access_key_id='test_access_key'")
        con.sql(f"SET s3_secret_access_key='test_secret_key'")
        con.sql(f"SET s3_endpoint='test_url'")
        con.sql(f"""select * from '{output_pth}'; """)
        result = con.fetchdf()
        print(result)

Any advice for this? Thanks in advance!

samansmink commented 3 months ago

hey @mayankanand007 thanks for reporting this! I think this has to do with the fact that DuckDB uses a custom S3 implementation instead of an official SDK.

You could try switching to fsspec instead (https://duckdb.org/docs/guides/python/filesystems.html) but that may kindof defeat the purpose of your tests