I'd like be able to use httpio with httpx on trio, I'd be able to do this if all of this with a pluggable session strategy:
@attr.s(frozen=True)
class AIOHttpFactory:
_session = attr.ib()
_kwargs = attr.ib()
def length(self, url) -> int:
async with self._session.head(self.url, **self._kwargs) as response:
response.raise_for_status()
return int(response.headers.get('content-length', None))
def get(self, url, start, end) -> bytes:
headers = {
"Range": "bytes=%d-%d" % (start, end - 1),
**self._kwargs.get("headers", {})
}
kwargs = {**self._kwargs, "headers": headers}
async with self._session.get(url, **kwargs) as response:
response.raise_for_status()
return await response.read()
@classmethod
@contextlib.asynccontextmanager
def session(cls, kwargs):
async with asyncio.ClientSession() as s:
yield cls(s, kwargs)
def __init__(self, url, block_size=-1, session_factory=None, **kwargs):
"""
:param url: The URL of the file to open
:param block_size: The cache block size, or `-1` to disable caching.
:param kwargs: Additional arguments to pass to `session.get`
"""
super(AsyncHTTPIOFile, self).__init__()
self.url = url
self.block_size = block_size
if session_factory is None
from . import aiohttp_strategy
session_factory = aiohttp_strategy.AIOHttpFactory
self._session_factory = session_factory
...
I'd like be able to use httpio with httpx on trio, I'd be able to do this if all of this with a pluggable session strategy: