robertmrk / aiosfstream

Salesforce Streaming API client for asyncio
MIT License
47 stars 31 forks source link

Support for Python 3.10 #18

Open ghost opened 2 years ago

ghost commented 2 years ago

Related to https://github.com/robertmrk/aiocometd/issues/13 in the aiocometd dependency.

The loop parameter was removed. See last bullet of https://docs.python.org/3/whatsnew/3.10.html#removed.

Traceback (most recent call last): ... File "/external/aiosfstream/aiosfstream/client.py", line 149, in open await super().open() File "/external/aiosfstream/venv/lib/python3.10/site-packages/aiocometd-0.4.5-py3.10.egg/aiocometd/client.py", line 273, in open self._transport = await self._negotiate_transport() File "/external/aiosfstream/venv/lib/python3.10/site-packages/aiocometd-0.4.5-py3.10.egg/aiocometd/client.py", line 198, in _negotiate_transport transport = create_transport(DEFAULT_CONNECTION_TYPE, File "/external/aiosfstream/venv/lib/python3.10/site-packages/aiocometd-0.4.5-py3.10.egg/aiocometd/transports/registry.py", line 47, in create_transport return TRANSPORT_CLASSES[connection_type](*args, *kwargs) # type: ignore File "/external/aiosfstream/venv/lib/python3.10/site-packages/aiocometd-0.4.5-py3.10.egg/aiocometd/transports/long_polling.py", line 25, in init self._http_semaphore = asyncio.Semaphore(2, loop=self._loop) File "/usr/local/Cellar/python@3.10/3.10.1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/locks.py", line 348, in init super().init(loop=loop) File "/usr/local/Cellar/python@3.10/3.10.1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/mixins.py", line 17, in init raise TypeError( TypeError: As of 3.10, the loop* parameter was removed from Semaphore() since it is no longer necessary

MauriceBenink commented 1 year ago

Ran into the same issue managed to resolve it like this as quick fix : Note: This will just override the existing transport class and removes the loop kwarg

import asyncio
import logging
from typing import Any

import aiohttp

from aiocometd.constants import ConnectionType
from aiocometd.exceptions import TransportError
from aiocometd.typing import JsonObject
from aiocometd.transports.registry import register_transport
from aiocometd.transports.base import TransportBase, Payload, Headers

LOGGER = logging.getLogger(__name__)

class UpdatedLongPollingTransport(TransportBase):
    """Long-polling type transport"""

    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        #: semaphore to limit the number of concurrent HTTP connections to 2
        self._http_semaphore = asyncio.Semaphore(2)

    async def _send_final_payload(self, payload: Payload, *,
                                  headers: Headers) -> JsonObject:
        try:
            session = await self._get_http_session()
            async with self._http_semaphore:
                response = await session.post(self._url, json=payload,
                                              ssl=self.ssl, headers=headers,
                                              timeout=self.request_timeout)
            response_payload = await response.json(loads=self._json_loads)
            headers = response.headers
        except aiohttp.client_exceptions.ClientError as error:
            LOGGER.warning("Failed to send payload, %s", error)
            raise TransportError(str(error)) from error
        response_message = await self._consume_payload(
            response_payload,
            headers=headers,
            find_response_for=payload[0]
        )

        if response_message is None:
            error_message = "No response message received for the " \
                            "first message in the payload"
            LOGGER.warning(error_message)
            raise TransportError(error_message)
        return response_message

# In the startup of your engine
import asyncio
from aiocometd.constants import ConnectionType
from aiocometd.transports.registry import register_transport

def startup():
    register_transport(ConnectionType.LONG_POLLING)(UpdatedLongPollingTransport)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(self.salesforce_stream())
mohitmathew commented 10 months ago

HI .. what the plan on making library compatible with python 3.10 ?

ghost commented 10 months ago

HI .. what the plan on making library compatible with python 3.10 ?

@mohitmathew, I doubt there is any plan at this point. We locally forked ours to allow us to get past the version limit.