robertmrk / aiosfstream

Salesforce Streaming API client for asyncio
MIT License
47 stars 31 forks source link

ReplayMarkerStorage with PgSQL not working with a saved replay id #19

Open welcomemat-services opened 2 years ago

welcomemat-services commented 2 years ago

I have the below code that is working if the Pgsql table is empty. But if it pulls an existing replay id, it is throwing the below exception.

Thanks for any help provided here.

Code:

class MyReplayMarkerStorage(ReplayMarkerStorage):

    def __init__(self, connection, cursor):
        super().__init__()
        self.connection = connection
        self.cursor = cursor

    async def set_replay_marker(self, subscription: str, replay_marker: ReplayMarker):
        # store *replay_marker* for the given *subscription*
        event = subscription[subscription.rfind('/')+1:]
        self.cursor.execute(f"""
            INSERT INTO platform_event_setting (channel, replay_id, date_str)
            VALUES ('{event}', '{replay_marker.replay_id}', '{replay_marker.date}')
            ON CONFLICT ON CONSTRAINT platform_event_setting_pkey
            DO UPDATE SET (replay_id, date_str) = (EXCLUDED.replay_id, EXCLUDED.date_str)
        """)

        print(f"Upserting replay id {replay_marker.replay_id} for {subscription}")
        self.connection.commit()

    async def get_replay_marker(self, subscription: str):
        # retrieve the replay marker for the given *subscription*
        event = subscription[subscription.rfind('/')+1:]
        self.cursor.execute(f"""
            SELECT replay_id, date_str
            FROM platform_event_setting
            WHERE channel = '{event}'
            LIMIT 1
        """)

        channel = self.cursor.fetchone()

        if channel == None:
            print(f"Platform Event Setting does not exist: {subscription}")
            return None

        print(f"{subscription} - {channel['replay_id']} - {channel['date_str']}")
        return ReplayMarker(date=channel['date_str'], replay_id=channel['replay_id'])
# MyReplayMarkerStorage

async def process_events():
    try:
        connection = psycopg2.connect(os.getenv('DATABASE_URL'), sslmode='require')
        cursor = connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

        myReplay = MyReplayMarkerStorage(connection, cursor)

        async with SalesforceStreamingClient(
            consumer_key=os.getenv("SF_APP_KEY"),
            consumer_secret=os.getenv("SF_APP_SECRET"),
            username=os.getenv('SF_USERNAME'),
            password=os.getenv('SF_PASSWORD'),
            replay=myReplay,
            replay_fallback=ReplayOption.ALL_EVENTS,
            replay_storage_policy=ReplayMarkerStoragePolicy.MANUAL) as client:

            await client.subscribe("/event/Notification__e")

            async for message in client:
                async with client.replay_storage(message):
                    print(message['data']['payload'])

    except (Exception, psycopg2.Error) as e:
        exc_type, exc_value, exc_traceback = sys.exc_info()
        print(f"Exception - {e}")
    finally:
        if cursor:
            cursor.close()
        if connection:
            connection.close()

# process_events

Error log:

DEBUG:asyncio:Using proactor: IocpProactor Connected to DB DEBUG:aiosfstream.client:Client created with replay storage: <main.MyReplayMarkerStorage object at 0x0000024E0287CA30>, replay fallback: <ReplayOption.ALL_EVENTS: -2> DEBUG:aiosfstream.client:Authenticating using PasswordAuthenticator(consumer_key='xxxxx',consumer_secret='yyyyy', username='test.test.com', password='abcde'). INFO:aiosfstream.client:Successful authentication. Instance URL: 'https://test.my.salesforce.com'. INFO:aiocometd.client:Opening client with connection types ['websocket', 'long-polling'] ... INFO:aiocometd.client:Connection types supported by the server: ['long-polling'] DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'kvl2mi3sd9hs6ei1m3vnsilh4nb9', 'advice': {'interval': 0, 'timeout': 110000, 'reconnect': 'retry'}, 'channel': '/meta/connect', 'id': '1', 'successful': True} INFO:aiocometd.client:Client opened with connection_type 'long-polling' /event/Notification__e - 7785481 - 2022-02-10T23:23:49.572Z INFO:aiocometd.client:Closing client... ERROR:asyncio:Exception in callback TransportBase._connect_done(<Task cancell...\utils.py:22>>) handle: <Handle TransportBase._connect_done(<Task cancell...\utils.py:22>>)> Traceback (most recent call last): File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiocometd\utils.py", line 27, in wrapper return await coro_func(*args, **kwargs) File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiocometd\transports\base.py", line 524, in _connect result = await self._send_payload_with_auth(payload) File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiocometd\transports\base.py", line 323, in _send_payload_with_auth await self._auth.authenticate() File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiosfstream\auth.py", line 96, in authenticate status_code, response_data = await self._authenticate() File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiosfstream\auth.py", line 173, in _authenticate response = await session.post(self._token_url, data=data) File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiohttp\client.py", line 535, in _request conn = await self._connector.connect( File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiohttp\connector.py", line 542, in connect proto = await self._create_connection(req, traces, timeout) File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiohttp\connector.py", line 907, in _createconnection , proto = await self._create_direct_connection(req, traces, timeout) File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiohttp\connector.py", line 1154, in _create_direct_connection hosts = await asyncio.shield(host_resolved) asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

self._raise_server_error(response)

File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiocometd\client.py", line 385, in _raise_server_error raise ServerError(message, response) aiocometd.exceptions.ServerError: ('Subscribe request failed.', {'clientId': 'kvl2mi3sd9hs6ei1m3vnsilh4nb9', 'channel': '/meta/subscribe', 'id': '2', 'subscription': '/event/Notification__e', 'error': 'Failed to create an internal subscription!', 'successful': False})

The above exception was the direct cause of the following exception: Traceback (most recent call last): File "c:\code\Python\scheduled_processes\processor.py", line 70, in process_events await client.subscribe("/event/Notificatione") File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiosfstream\exceptions.py", line 143, in async_wrapper return await func(*args, **kwargs) File "C:\Users\test\AppData\Local\Programs\Python\Python39\lib\contextlib.py", line 137, in exit self.gen.throw(typ, value, traceback) File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiosfstream\exceptions.py", line 123, in translate_errors_context raise error_cls(*cometd_error.args) from cometd_error aiosfstream.exceptions.ServerError: ('Subscribe request failed.', {'clientId': 'kvl2mi3sd9hs6ei1m3vnsilh4nb9', 'channel': '/meta/subscribe', 'id': '2', 'subscription': '/event/Notificatione', 'error': 'Failed to create an internal subscription!', 'successful': False})

efh365 commented 2 years ago

This is the code I am using and it works for me

from aiosfstream import ReplayMarker, ReplayMarkerStorage
from asgiref.sync import sync_to_async
from django.db import connection, transaction
from psycopg2 import sql

class LocalReplayMarkerStorage(ReplayMarkerStorage):
    """Replay marker storage implementation for storing replay markers in
    a PostgresSQL table
    """

    #: SQL table creation statement template
    TABLE_CREATION_TEMPLATE = """
        CREATE TABLE IF NOT EXISTS {table_name} (
            subscription VARCHAR(255) NOT NULL PRIMARY KEY,
            date VARCHAR(32) NOT NULL,
            replay_id INT NOT NULL
        )
    """

    #: SQL statement template for setting the value of a replay marker
    SET_REPLAY_MARKER_TEMPLATE = """
        INSERT INTO {table_name} (subscription, date, replay_id)
        VALUES (%s, %s, %s)
        ON CONFLICT (subscription) DO UPDATE 
        SET date = excluded.date, replay_id = excluded.replay_id
    """

    #: SQL statement template for getting the value of a replay marker
    GET_REPLAY_MARKER_TEMPLATE = """
        SELECT date, replay_id
        FROM {table_name}
        WHERE subscription=%s
    """

    def __init__(self, table_name: str = "replay") -> None:
        """
        :param connection_pool: MySQL connection pool
        :param table_name: Name of the table for storing replay markers
        """
        super().__init__()
        self.table_name = table_name

    def render_sql(self, template: str) -> str:
        """Create an SQL statement from the given *template* by inserting the
        correct table name

        :param template: SQL statement template
        :return: The rendered SQL statement
        """
        query = sql.SQL(template)
        return query.format(table_name=sql.Identifier(self.table_name))

    async def ensure_table_exists(self) -> None:
        """Create the table for storing the replay markers if it doesn't
        already exist
        """
        await self._ensure_table_exists()

    @sync_to_async
    def _ensure_table_exists(self) -> None:
        with transaction.atomic(), connection.cursor() as cursor:
            sql = self.render_sql(self.TABLE_CREATION_TEMPLATE)
            cursor.execute(
                sql.as_string(cursor.cursor.connection)
            )

    async def set_replay_marker(
        self, subscription: str, replay_marker: ReplayMarker
    ) -> None:
        """Store the *replay_marker* for the given *subscription*

        :param subscription: Name of the subscribed channel
        :param replay_marker: A replay marker
        """
        await self._set_replay_marker(subscription, replay_marker)

    @sync_to_async
    def _set_replay_marker(
        self, subscription: str, replay_marker: ReplayMarker
    ) -> None:
        with transaction.atomic(), connection.cursor() as cursor:
            sql = self.render_sql(self.SET_REPLAY_MARKER_TEMPLATE)
            cursor.execute(
                sql.as_string(cursor.cursor.connection),
                [subscription, replay_marker.date, replay_marker.replay_id]
            )

    async def get_replay_marker(self, subscription: str) -> Optional[ReplayMarker]:
        """Retrieve a stored replay marker for the given *subscription*

        :param subscription: Name of the subscribed channel
        :return: A replay marker or ``None`` if there is nothing stored for \
        the given *subscription*
        """
        return await self._get_replay_marker(subscription)

    @sync_to_async
    def _get_replay_marker(self, subscription: str) -> Optional[ReplayMarker]:
        with transaction.atomic(), connection.cursor() as cursor:
            sql = self.render_sql(self.GET_REPLAY_MARKER_TEMPLATE)
            cursor.execute(
                sql.as_string(cursor.cursor.connection),
                [subscription]
            )
            result = cursor.fetchone()
            logger.info(f"RESULT: {result}")
            #  ('2022-03-09T15:29:37.188Z', 20090)
            if result is None:
                return None
            marker = ReplayMarker(date=result[0], replay_id=result[1])
            if marker:
                logger.info(f"MARKER.REPLAY: {marker.replay_id}") 
            return marker

 class Command(BaseCommand):
    def handle(self, *args, **options):
        self.stream_events()

    @async_to_sync
    async def stream_events(self):
        replay_storage = LocalReplayMarkerStorage()
        # make sure the required table exists
        await replay_storage.ensure_table_exists()

        sandbox = False if settings.DJANGO_MODE == "Production" else True

        async with SalesforceStreamingClient(
            consumer_key=settings.SALESFORCE_CONSUMER_KEY,
            consumer_secret=settings.SALESFORCE_CONSUMER_SECRET,
            username=settings.SALESFORCE_USERNAME,
            password=settings.SALESFORCE_PASSWORD,
            sandbox=sandbox,
            replay=replay_storage,  # type: ignore
        ) as client:

            await client.subscribe(ACCOUNT_TOPIC)

            async for message in client:
                topic = message["channel"]
                data = message["data"]
                logger.info(f"{topic} - {data}")
                processor = MessageProcessor(topic, data)
                try:
                    await processor.process_message()
                except Exception as e:
                    logger.exception(f"Unable to process message due to: {e}")
trankos commented 2 years ago

Same issue. The problem was on replay_id data type. Must be int!!!

jdelStrother commented 1 year ago

@trankos thanks, that saved me some hair-pulling