Open dazza-codes opened 4 years ago
The job_queues
request works OK when the function is modified as follows, but I have no idea whether this change breaks asyncio behavior because it removes a couple of await
calls. The patched function below was applied directly to site-packages to see what happens and now the following test passes:
# Use dummy AWS credentials
AWS_REGION = "us-west-2"
AWS_ACCESS_KEY_ID = "dummy_AWS_ACCESS_KEY_ID"
AWS_SECRET_ACCESS_KEY = "dummy_AWS_SECRET_ACCESS_KEY"
@pytest.fixture
def aws_credentials(monkeypatch):
monkeypatch.setenv("AWS_ACCESS_KEY_ID", AWS_ACCESS_KEY_ID)
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", AWS_SECRET_ACCESS_KEY)
monkeypatch.setenv("AWS_SECURITY_TOKEN", "testing")
monkeypatch.setenv("AWS_SESSION_TOKEN", "testing")
@pytest.fixture(scope="session")
def aws_region():
return AWS_REGION
@pytest.fixture
def aio_aws_session(aws_credentials, aws_region, event_loop):
session = aiobotocore.get_session(loop=event_loop)
session.user_agent_name = "aiobotocore-pytest"
assert session.get_default_client_config() is None
aioconfig = aiobotocore.config.AioConfig(max_pool_connections=1, region_name=aws_region)
session.set_default_client_config(aioconfig)
assert session.get_default_client_config() == aioconfig
# ensure fake credentials
session.set_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
# Try this debug logger, but it might be overkill
session.set_debug_logger(logger_name="aiobotocore-pytest")
# # Add custom response parser factory
# aio_response_parser_factory = AioResponseParserFactory()
# session.register_component("response_parser_factory", aio_response_parser_factory)
yield session
@pytest.fixture
@pytest.mark.asyncio
async def aio_aws_batch_client(aio_aws_session):
with mock_batch():
async with aio_aws_session.create_client("batch") as client:
yield client
@pytest.mark.asyncio
async def test_aio_aws_batch_client(aio_aws_batch_client):
assert isinstance(aio_aws_batch_client, BaseClient)
job_queues = await aio_aws_batch_client.describe_job_queues()
assert job_queues == {
"ResponseMetadata": {
"HTTPStatusCode": 200,
"HTTPHeaders": {"server": "amazon.com"},
"RetryAttempts": 0,
},
"jobQueues": [],
}
aiobotocore.endpoint
:# aiobotocore/endpoint.py
from botocore.utils import lowercase_dict # this a new import
async def convert_to_response_dict(http_response, operation_model):
"""Convert an HTTP response object to a request dict.
This converts the requests library's HTTP response object to
a dictionary.
:type http_response: botocore.vendored.requests.model.Response
:param http_response: The HTTP response from an AWS service request.
:rtype: dict
:return: A response dictionary which will contain the following keys:
* headers (dict)
* status_code (int)
* body (string or file-like object)
"""
response_dict = {
'headers': HTTPHeaderDict(lowercase_dict(http_response.headers)),
'status_code': http_response.status_code,
'context': {
'operation_name': operation_model.name,
}
}
if response_dict['status_code'] >= 300:
response_dict['body'] = http_response.content # modified but removed `await`
elif operation_model.has_event_stream_output:
response_dict['body'] = http_response.raw
elif operation_model.has_streaming_output:
length = response_dict['headers'].get('content-length')
response_dict['body'] = StreamingBody(http_response.raw, length)
else:
response_dict['body'] = http_response.content # modified but removed `await`
return response_dict
It's possible that moto
registers something with the before-send
event hook and the pytest function never hits the actual aiobotocore methods to send an aiohttp request.
Although this documentation is on boto3, the event system is also in botocore:
By editing site-packages as follows and then running pytest --pdb
, it drops into the test call stack:
async def _do_get_response(self, request, operation_model):
try:
logger.debug("Sending http request: %s", request)
history_recorder.record('HTTP_REQUEST', {
'method': request.method,
'headers': request.headers,
'streaming': operation_model.has_streaming_input,
'url': request.url,
'body': request.body
})
service_id = operation_model.service_model.service_id.hyphenize()
event_name = 'before-send.%s.%s' % (service_id, operation_model.name)
responses = self._event_emitter.emit(event_name, request=request)
http_response = first_non_none_response(responses)
assert False
if http_response is None:
http_response = await self._send(request)
(Pdb) http_response = first_non_none_response(responses)
(Pdb) http_response
<botocore.awsrequest.AWSResponse object at 0x7fafc23bcf98>
So this test never hits the await self._send(...)
call. It never uses the aio_session
of the AioEndpoint
and so it does not use the ClientResponseProxy
.
It's not clear whether the event emitter has any details about the registered callable that returns the http_response
.
(Pdb) dir(self._event_emitter)
['__class__', '__copy__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_alias_event_name', '_emitter', '_event_aliases', '_replace_subsection', '_verify_accept_kwargs', '_verify_and_register', '_verify_is_callable', 'emit', 'emit_until_response', 'register', 'register_first', 'register_last', 'unregister']
If the hack to add the assert False
is replaced with http_response = None
, then it calls aiobotocore code (which starts to call live-aws services, with no proxies defined).
To add a proxy requires an AioSession
test fixture, with something like
session = aiobotocore.get_session(loop=event_loop)
aioconfig = aiobotocore.config.AioConfig(max_pool_connections=1, region_name=aws_region)
# TODO: test passing proxies into the aiobotocore.endpoint; the proxy must replace
# 'https://{service}.{region_name}.amazonaws.com/{url_path}'
proxies = {
'http': os.getenv("HTTP_PROXY", "http://127.0.0.1:5000/"),
'https': os.getenv("HTTPS_PROXY", "http://127.0.0.1:5000/"),
}
aioconfig.proxies = proxies
session.set_default_client_config(aioconfig)
To get that working requires using moto-server or something. Somehow there must be an easy way to:
before_send
In the moto
git repo:
$ git grep 'before-send'
CHANGELOG.md: * Switch from mocking requests to using before-send for AWS calls
moto/core/models.py:BUILTIN_HANDLERS.append(("before-send", botocore_stubber))
moto uses:
from botocore.handlers import BUILTIN_HANDLERS
botocore_stubber = BotocoreStubber()
BUILTIN_HANDLERS.append(("before-send", botocore_stubber))
where BotocoreStubber
does the work of registering and calling callbacks for moto:
For example, an AWS batch
client, with moto mock_batch
applied, has the following event callbacks after the client has issued a client.describe_job_queues()
method call:
>>> for evt, cb in client.meta.events._emitter._lookup_cache.items():
... print(evt, cb)
...
provide-client-params.batch.DescribeJobQueues deque([])
before-parameter-build.batch.DescribeJobQueues deque([<function generate_idempotent_uuid at 0x7facc3585048>])
before-call.batch.DescribeJobQueues deque([<function inject_api_version_header_if_needed at 0x7facc35869d8>])
request-created.batch.DescribeJobQueues deque([<bound method RequestSigner.handler of <botocore.signers.RequestSigner object at 0x7facb826cda0>>])
choose-signer.batch.DescribeJobQueues deque([<function set_operation_specific_signer at 0x7facc3581ea0>])
before-sign.batch.DescribeJobQueues deque([])
before-send.batch.DescribeJobQueues deque([<moto.core.models.BotocoreStubber object at 0x7facc3267b00>])
response-received.batch.DescribeJobQueues deque([])
needs-retry.batch.DescribeJobQueues deque([<botocore.retryhandler.RetryHandler object at 0x7facba787470>])
after-call.batch.DescribeJobQueues deque([])
getattr.batch.get_credentials deque([])
getattr.batch.credentials deque([])
Note esp. the moto callback in:
before-send.batch.DescribeJobQueues deque([<moto.core.models.BotocoreStubber object at 0x7facc3267b00>])
Note that the moto response is an botocore.awsrequest.AWSResponse and not a
:type http_response: botocore.vendored.requests.model.Response
My understanding is that botocore is using the former (what Moto uses) going forward and deprecating the use of requests.
It's possible to detect when moto mocks are active, e.g.
def has_moto_mocks(client, event_name):
# moto registers mock callbacks with the `before-send` event-name, using
# specific callbacks for the methods that are generated dynamically. By
# checking that the first callback is a BotocoreStubber, this verifies
# that moto mocks are intercepting client requests.
callbacks = client.meta.events._emitter._lookup_cache[event_name]
if len(callbacks) > 0:
stub = callbacks[0]
assert isinstance(stub, BotocoreStubber)
return stub.enabled
return False
I don't know if it's possible to simply disable it with stub.enabled = False
. The botocore client does not expose any public API to iterate on the event callbacks, so this ^^ has to resort to sneaking around in the private API. Since that ^^ function treats the data as read-only, it's nearly OK, but if something were to start modifications of the callbacks, that could get very tricky.
When I find some time to craft a full PR on this, there are better ways to work around this using the MotoService
in the test suite of aiobotocore. For example, these snippets are a clue to what seems to be working well, thanks to MotoService
:
# assumes python >= py3.6 (async generators are OK)
@pytest.fixture
async def aio_aws_s3_server():
async with MotoService("s3") as svc:
yield svc.endpoint_url
@pytest.fixture
def aio_aws_session(aws_credentials, aws_region, event_loop):
# pytest-asyncio provides and manages the `event_loop`
session = aiobotocore.get_session(loop=event_loop)
session.user_agent_name = "aiomoto"
assert session.get_default_client_config() is None
aioconfig = aiobotocore.config.AioConfig(max_pool_connections=1, region_name=aws_region)
# forget about adding any proxies for moto.server, that doesn't work
session.set_default_client_config(aioconfig)
assert session.get_default_client_config() == aioconfig
session.set_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
session.set_debug_logger(logger_name="aiomoto")
yield session
@pytest.fixture
async def aio_aws_s3_client(aio_aws_session, aio_aws_s3_server):
# aio_aws_s3_server is just a string URI for the new `moto.server` for this client
async with aio_aws_session.create_client("s3", endpoint_url=aio_aws_s3_server) as client:
yield client
@pytest.mark.asyncio
async def test_aio_aws_s3_client(aio_aws_s3_client):
client = aio_aws_s3_client
assert isinstance(client, AioBaseClient)
assert client.meta.config.region_name == AWS_REGION
assert client.meta.region_name == AWS_REGION
resp = await client.list_buckets()
assert response_success(resp)
assert resp.get("Buckets") == []
# the event-name mocks are dynamically generated after calling the method;
# for aio-clients, they should be disabled for aiohttp to hit moto.server.
assert not has_moto_mocks(client, "before-send.s3.ListBuckets")
The only minor drawback is that MotoService
is designed to spin-up and tear-down a new moto.server
for every test. It wraps it all nicely in a thread with some async entry/exit points. It might be useful to have a session-scope moto.server
with options to just reset
it for each test (haven't figured out what that looks like).
In case helpful, moto does have a reset API: http://docs.getmoto.org/en/latest/docs/moto_apis.html#reset-api
seems to work: https://github.com/aio-libs/aiobotocore/pull/773
given #773 works I see this now as supporting in-proc moto, which we never tackled. This would be a nice thing because it would allow for tests which coordinate between multiple services. This is going to be a big project because moto does not support aiobotocore....maybe not so big though because we don't test that many services :)
ugh, forgot again this is my in-proc version. So we basically don't support the moto wrapper based tests, but that's not a big deal in my opinion as we expose fixtures for each client (see my PR above). I'm going to close this given I don't see any benefits of supporting the moto wrappers. Feel free to re-open if I'm missing something.
Just hit the same issue. I am replacing something written using botocore and set of monkeypatches with aiobotocore. For testing my project I thought of using moto in mocking (default) configuration. However got hit by this.
I am looking into botocore itself and see that their http_session
(default one at least) wraps urllib response in AWSResponse
(as being noted above)
My thinking is that even if you guys decided to not support testing with moto, you should conform to return type of whatever stock botocore returns now in case there are some client code which registers custom handler for some reason which as result produces botocore.awsrequest.AWSResponse
object. That case will make aiobotocore incompatible as replacement for botocore.
Going to re-open for more investigation
Hi just wanted to check in if there's any progress on this ?
haven't had a chance yet to look into this, been swamped at work
@kkopachev totally agree
Adding my voice here. Working on making s3fs async, and would like to test via moto.
is it is most definitely possible, I do it extensively at work, however it requires you use the server model instead of wrapper which requires a bit more work
I honestly didn't know about that - works just fine!
@martindurant Could you share how do u make it work? I am using s3fs and encounter same issue.
here's my latest version btw that I use for testing: https://gist.github.com/thehesiod/2e4094a1db1190f7e122e7043f1973a0
@thehesiod Is there an example showing how to use this class? i am looking for something similar to moto_s3 as an pytest fixture
@noklam : s3fs now uses this fixture, which uses moto in "server" mode, so doesn't need the mock/monkey-patches.
great! so i should use this fixture instead of the moto_s3. Any idea what does the server mode mean?
I will try it out when I get access to my computer. Thanks for the pointer!
「Martin Durant notifications@github.com」在 2020年10月27日週二,下午8:45 寫道:
@noklam https://github.com/noklam : s3fs now uses this fixture https://github.com/dask/s3fs/blob/master/s3fs/tests/test_s3fs.py#L57, which uses moto in "server" mode, so doesn't need the mock/monkey-patches.
— You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub https://github.com/aio-libs/aiobotocore/issues/755#issuecomment-717218175, or unsubscribe https://github.com/notifications/unsubscribe-auth/AELAWL7E27523MB5XWKKCNDSM26GLANCNFSM4KUW3XFA .
Docs: http://docs.getmoto.org/en/latest/docs/server_mode.html moto runs as a real s3 service in a process, and you need to change your endpoint URL to the right address to talk with it.
@noklam this is how I use it:
from contextlib import AsyncExitStack
def __aenter__(self):
self._exit_stack = AsyncExitStack()
self._moto_s3_svc = await self._exit_stack.enter_async_context(moto_svr.MotoService('s3'))
self._exit_stack.enter_context(patch.dict(os.environ, {'s3_mock_endpoint_url': self._moto_s3_svc.endpoint_url}))
moto_svr.patch_aioboto()
can easily adapt to be a fixture, context manager, etc
btw my ver runs in-proc so you can inspect the various services like you normally would running moto, ex: moto.backends.get_backend('s3')['global'].buckets
Thanks both! @martindurant I think the s3fs example is promising, I seem to get it working now, thanks a lot!!
any updates on this? I still am having trouble using the moto context manager.
Hi,
I am currently running into this issue with mock_lambda. Will this be fixed / is there a more active issue to follow?
update for when someone gets time to work on this, the theory is that by ensuring aiobotocore returns a AWSResponse
like botocore it should resolve the issue. (also should switch to latest moto)
One monkey-patch solution which seems to work for my use-case:
from botocore.awsrequest import AWSResponse
class MonkeyPatchedAWSResponse(AWSResponse):
raw_headers = {}
async def read(self):
return self.text
botocore.awsrequest.AWSResponse = MonkeyPatchedAWSResponse
Hi there! I run into the same problem. Thanks to @blackary I managed to solve my issue. I added a fixture to my tests where needed:
@pytest.fixture()
def mock_AWSResponse() -> None:
class MockedAWSResponse(botocore.awsrequest.AWSResponse):
raw_headers = {} # type: ignore
async def read(self): # type: ignore
return self.text
botocore.awsrequest.AWSResponse = MockedAWSResponse
moto.core.models.AWSResponse = MockedAWSRespons
The tricky part was to override the import of AWSResponse
done on moto.core.models
too.
I believe having a @pytest.fixture(autouse=True)
might also help but in my case it was good enough without it.
The monkey patch doesn't exactly work for all uses cases. For example, I'm trying to load partitioned parquet files from S3 using PyArrow
with the S3FileSystem
from s3fs
. Unfortunately this calls convert_to_response_dict
in aiobotocore/endpoint.py
which expects raw_headers
to be present.
I'd have to patch a bunch of libraries just for tests to make this work.
Hi there! I run into the same problem. Thanks to @blackary I managed to solve my issue. I added a fixture to my tests where needed:
@pytest.fixture() def mock_AWSResponse() -> None: class MockedAWSResponse(botocore.awsrequest.AWSResponse): raw_headers = {} # type: ignore async def read(self): # type: ignore return self.text botocore.awsrequest.AWSResponse = MockedAWSResponse moto.core.models.AWSResponse = MockedAWSRespons
The tricky part was to override the import of
AWSResponse
done onmoto.core.models
too.I believe having a
@pytest.fixture(autouse=True)
might also help but in my case it was good enough without it.
Hi @zedfmario thanks for the suggested fix, I was getting an ERROR 'str' object has no attribute 'decode'
for that answer. A slight change in your answer worked for me.
@pytest.fixture()
def mock_AWSResponse() -> None:
class MockedAWSResponse(botocore.awsrequest.AWSResponse):
raw_headers = {} # type: ignore
async def read(self): # type: ignore
return self.text.encode()
botocore.awsrequest.AWSResponse = MockedAWSResponse
moto.core.models.AWSResponse = MockedAWSRespons
I had the same issue, but I had to patch a bit further as moto.core.models.MockRawResponse
was giving me trouble:
@pytest.fixture()
def patch_AWSResponse() -> None:
"""Patch bug in botocore, see https://github.com/aio-libs/aiobotocore/issues/755"""
class PatcheddAWSResponse(botocore.awsrequest.AWSResponse):
raw_headers = {} # type: ignore
async def read(self): # type: ignore
return self.text.encode()
class PatchedMockRawResponse(moto.core.models.MockRawResponse):
async def read(self, size=None):
return super().read()
def stream(self, **kwargs): # pylint: disable=unused-argument
contents = super().read()
while contents:
yield contents
contents = super().read()
botocore.awsrequest.AWSResponse = PatcheddAWSResponse
moto.core.models.AWSResponse = PatcheddAWSResponse
moto.core.models.MockRawResponse = PatchedMockRawResponse
@pytest.fixture() def patch_AWSResponse() -> None: """Patch bug in botocore, see https://github.com/aio-libs/aiobotocore/issues/755"""
class PatcheddAWSResponse(botocore.awsrequest.AWSResponse): raw_headers = {} # type: ignore async def read(self): # type: ignore return self.text.encode() class PatchedMockRawResponse(moto.core.models.MockRawResponse): async def read(self, size=None): return super().read() def stream(self, **kwargs): # pylint: disable=unused-argument contents = super().read() while contents: yield contents contents = super().read() botocore.awsrequest.AWSResponse = PatcheddAWSResponse moto.core.models.AWSResponse = PatcheddAWSResponse moto.core.models.MockRawResponse = PatchedMockRawResponse
@0x26res I had encountered that MockRawResponse
issue and your answer helped me tremendously! Thanks a lot!
@0x26res actually, I encountered a weird behavior where my test script runs into an infinite loop at
while contents:
yield contents
contents = super().read()
But it only happens when I have two unit tests using this fixture. If there's only one unit test that uses it, it can pass without a problem. Do you know what could be the issue here?
@bnsblue it's because you're applying the patch twice. It should only be applied once.
Either set the @pytest.fixture(scope="session")
or write some adhoc code to make sure you don't patch twice.
btw new moto has a threadedserver that can handle requests for all services making this much easier. I'll try to get to this this week
I was using one of the patches mentioned here until recently, but it broke when I upgraded to aiobotocore=2.3.2
.
If it helps anyone, here's the new patch that I'm now using, works for me with aiobotocore==2.3.2
and moto==3.1.9
:
# Patch `aiobotocore.endpoint.convert_to_response_dict` to work with moto.
class PatchedAWSResponse:
def __init__(self, response: botocore.awsrequest.AWSResponse):
self._response = response
self.status_code = response.status_code
self.raw = response.raw
self.raw.raw_headers = {}
@property
async def content(self):
return self._response.content
def factory(original):
def patched_convert_to_response_dict(http_response, operation_model):
return original(PatchedAWSResponse(http_response), operation_model)
return patched_convert_to_response_dict
aiobotocore.endpoint.convert_to_response_dict = factory(aiobotocore.endpoint.convert_to_response_dict)
@Apakottur probably because I made aiobotocore be more like botocore, sorry for disruption.
@Apakottur Thanks for the patch! In my case I made it work with a slight modification used self.raw_headers = {}
instead of self.raw.raw_headers = {}
and async def read(self):
instead of async def content(self):
, should be working for the same versions you mentioned.
I put together a file containing pytest
fixtures that run moto
in server mode to work around this bug.
The code is mostly taken from @martindurant's s3fs shared above.
https://gist.github.com/michcio1234/7d72edc97bd751931aaf1952e4cb479c
@Apakottur thanks for the patch! I'd like to share more implementations regarding your patch (for anyone looking up this issue like I did on 2 AM).
Mocking s3fs
required HTTP headers to be valid, so I switched self.raw.raw_headers = {}
to
self.raw.raw_headers = [(str(k).encode("utf-8"), str(response.headers[k]).encode("utf-8")) for k in response.headers]
.
Another issue I faced was StreamingBody
(aiobotocore.endpoint
) - I had to add an async read()
function to self.raw.content
.
self.raw.content = PatchedRawContentAWSResponse(response)
Then
class PatchedRawContentAWSResponse:
def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
self._response = response
async def read(self, amt: int = None) -> str:
return self._response.content
Thanks for all the tips! I've adapted the above into a patch_aiobotocore fixture and tested it works with aiobotocore 2.3.4, moto 3.1.18, and s3fs 2022.7.1.
Slightly better version from tekumara's one that copies the headers: https://gist.github.com/giles-betteromics/12e68b88e261402fbe31c2e918ea4168
This resolves the error
File "python3.8/site-packages/s3fs/core.py", line 1150, in _info
"LastModified": out["LastModified"],
tested with moto 3.1.18 and 4.0.6, s3fs 2022.8.2, aiobotocore 2.4.0
Just a note, take care of disabling caching (e.g. on s3fs
) otherwise this won't work on parametrized tests with same file names (because it will cache the previous mocked response).
https://filesystem-spec.readthedocs.io/en/latest/features.html#instance-caching
Using the various snippets and gists from this thread I've got S3 Mocking working correctly in my project. Thank you for the help everyone!
This is currently working with s3fs[boto3]==2023.1.0
I've got the following dependencies pinned in my dev environment as of writing this: aiohttp==3.8.3
and moto[s3]==4.1.0
.
"""
conftest.py - Project Wide Fixtures
"""
from typing import Callable, Any
from unittest.mock import MagicMock
import aiobotocore.awsrequest
import aiobotocore.endpoint
import aiohttp
import aiohttp.client_reqrep
import aiohttp.typedefs
import botocore.awsrequest
import botocore.model
import pytest
class MockAWSResponse(aiobotocore.awsrequest.AioAWSResponse):
"""
Mocked AWS Response.
https://github.com/aio-libs/aiobotocore/issues/755
https://gist.github.com/giles-betteromics/12e68b88e261402fbe31c2e918ea4168
"""
def __init__(self, response: botocore.awsrequest.AWSResponse):
self._moto_response = response
self.status_code = response.status_code
self.raw = MockHttpClientResponse(response)
# adapt async methods to use moto's response
async def _content_prop(self) -> bytes:
return self._moto_response.content
async def _text_prop(self) -> str:
return self._moto_response.text
class MockHttpClientResponse(aiohttp.client_reqrep.ClientResponse):
"""
Mocked HTP Response.
See <MockAWSResponse> Notes
"""
def __init__(self, response: botocore.awsrequest.AWSResponse):
"""
Mocked Response Init.
"""
async def read(self: MockHttpClientResponse, n: int = -1) -> bytes:
return response.content
self.content = MagicMock(aiohttp.StreamReader)
self.content.read = read
self.response = response
@property
def raw_headers(self) -> Any:
"""
Return the headers encoded the way that aiobotocore expects them.
"""
return {
k.encode("utf-8"): str(v).encode("utf-8")
for k, v in self.response.headers.items()
}.items()
@pytest.fixture(scope="session", autouse=True)
def patch_aiobotocore() -> None:
"""
Pytest Fixture Supporting S3FS Mocks.
See <MockAWSResponse> Notes
"""
def factory(original: Callable[[Any, Any], Any]) -> Callable[[Any, Any], Any]:
"""
Response Conversion Factory.
"""
def patched_convert_to_response_dict(
http_response: botocore.awsrequest.AWSResponse,
operation_model: botocore.model.OperationModel,
) -> Any:
return original(MockAWSResponse(http_response), operation_model)
return patched_convert_to_response_dict
aiobotocore.endpoint.convert_to_response_dict = factory(
aiobotocore.endpoint.convert_to_response_dict
)
"""
Example Unit Test Mocking S3FS
"""
import boto3
from moto import mock_s3
from internal_module import upload_three_files_using_s3fs
@mock_s3
def test_mocked_s3() -> None:
"""
Test S3 Mocking with S3FS
"""
# Create the bucket in our Mocked S3 Env First
conn = boto3.resource("s3", region_name="us-east-1")
conn.create_bucket(Bucket="example-bucket")
# Create the files using whatever you're doing with S3FS
upload_three_files_using_s3fs(bucket="example-bucket") # pseudo code
# Make some assertions about what's supposed to be in S3
bucket = conn.Bucket("example-bucket")
matching_file_objects = list(bucket.objects.all())
assert len(matching_file_objects) == 3
Since the working snippet was with s3fs, consider implementing it there as part of the test fixture (currently uses moto as separate process).
Is there a fix I can use with little to no boilerplate that I can use instead of skipping tests of functions that use fsspec
? Does upgrading packages do anything?
Note that fsspec/s3fs does us moto for testing, so perhaps set it us as those packages' CIs do?
Interesting observation. On my CI pipeline the tests don't fail but when run locally I get the AttributeError about Moto response. I will sniff around a bit to see if there is anything I can find.
This bug arises in pytest with moto
1.3.14
and althoughrequirements-dev.txt
has a dev-version, that fix is for something else, i.e. this is irrelevant:See also:
lowercase_dict
function alreadyBelow is an exception detail, when testing the following pytest fixtures:
This raises a simple exception when trying to parse a moto response (below) and the source code for botocore seems to match (there is no
AWSResponse.raw_headers
attr). Maybe there are API version differences between aiobotocore, botocore and moto (at the time of posting this issue). In the project, the requirements pull in the aiobotocore deps for boto3/botocore and moto is the latest release:The simple test function is:
The moto job-queues should be an empty list (and it is, see pdb details below).
Note that the moto response is an
botocore.awsrequest.AWSResponse
and not a:type http_response: botocore.vendored.requests.model.Response