Closed maulikagarwal800 closed 5 years ago
import asyncio import json import shelve from azure.servicebus.control_client import ServiceBusService from aiosfstream import SalesforceStreamingClient from aiosfstream import ReplayMarker import mysql.connector
mydb = mysql.connector.connect( host="localhost", user="root", passwd="", database="salesforce" )
mycursor = mydb.cursor()
# store *replay_marker* for the given *subscription*
# retrieve the replay marker for the given *subscription*
class MyReplayMarkerStorage(): def set_replay_marker(self, subscription, replay_marker,date): sql = "INSERT INTO replay (subscription, replaymarker,date) VALUES (%s, %s,%s)" val = (subscription, replay_marker,date) mycursor.execute(sql, val) mydb.commit() def get_replay_marker(self, subscription): sql = "SELECT MAX(replaymarker),MAX(date1) FROM replay WHERE subscription = %s " val = (subscription,) mycursor.execute(sql, val) myresult = mycursor.fetchall() replay_marker = myresult[0][0] date = myresult[0][1] return replay_marker,date object = MyReplayMarkerStorage() async def stream_events(): replay,date = object.get_replay_marker("/topic/SalesOrderss") marker = ReplayMarker(str(date) ,replay) print(marker) async with SalesforceStreamingClient(consumer_key="",replay = marker) as client: await client.subscribe("/topic/SalesOrderss") async for message in client: topic = message["channel"] data = message["data"] date = message ["data"]["event"]["createdDate"] print(date) print(message) event = data["event"] replay = event["replayId"] print(type(replay)) object.set_replay_marker(topic,replay,date) json = data['sobject'] service_name = 'aligntechpoc' key_name = 'rpolicy' # SharedAccessKeyName from Azure portal key_value = '88PqRJLNpiAPao0UVyDDr9uhjexCaEZGrqCvV+nXh2I=' # SharedAccessKey from Azure portal
# sbs.send_event("aligntechrec",json)
print(message)
if name == "main": loop = asyncio.get_event_loop() loop.run_until_complete(stream_events())
Hi @maulikagarwal800
To be honest your code example looks really messy, even after applying correct indentation. So instead of editing your example, I created a basic ReplayMarkerStorage
implementation for you, for storing ReplayMarker
objects in a MySQL database.
You can use it as-is, but I would recommend to extend it with some local caching, and flush ReplayMarker
objects to the database periodically, or on shutdown. Or use some simpler key-value store instead.
BTW. I'm not sure but it seems like your example contains some authentication credentials. Please remove it if possible.
import asyncio
from typing import Optional
import aiomysql # type: ignore
from aiosfstream import (
SalesforceStreamingClient,
ReplayMarker,
ReplayMarkerStorage,
)
MYSQL_HOST = "<mysql_host>"
MYSQL_PORT = 3306
MYSQL_USER = "<mysql_user>"
MYSQL_PASSWORD = "<mysql_password>"
MYSQL_DATABASE = "<mysql_database>"
SF_CONSUMER_KEY = "<salesforce_consumer_key>"
SF_CONSUMER_SECRET = "<salesforce_consumer_secret>"
SF_USERNAME = "<salesforce_username>"
SF_PASSWORD = "<salesforce_password>"
class MySQLReplayMarkerStorage(ReplayMarkerStorage):
"""Replay marker storage implementation for storing replay markers in
a MySQL table
"""
#: SQL table creation statement template
TABLE_CREATION_TEMPLATE = """
CREATE TABLE IF NOT EXISTS `{table_name}` (
`subscription` VARCHAR(255) NOT NULL,
`date` VARCHAR(32) NOT NULL,
`replay_id` INT NOT NULL,
CONSTRAINT `pk_replay` PRIMARY KEY (`subscription`)
)
"""
#: SQL statement template for setting the value of a replay marker
SET_REPLAY_MARKER_TEMPLATE = """
REPLACE INTO `{table_name}` (`subscription`, `date`, `replay_id`)
VALUES (%s, %s, %s)
"""
#: SQL statement template for getting the value of a replay marker
GET_REPLAY_MARKER_TEMPLATE = """
SELECT `date`, `replay_id`
FROM `{table_name}`
WHERE `subscription`=%s
"""
def __init__(self, connection_pool: aiomysql.Pool,
table_name: str = "replay") -> None:
"""
:param connection_pool: MySQL connection pool
:param table_name: Name of the table for storing replay markers
"""
super().__init__()
self.connection_pool = connection_pool
self.table_name = table_name
def render_sql(self, template: str) -> str:
"""Create an SQL statement from the given *template* by inserting the
correct table name
:param template: SQL statement template
:return: The rendered SQL statement
"""
return template.format(table_name=self.table_name)
async def ensure_table_exists(self) -> None:
"""Create the table for storing the replay markers if it doesn't
already exist
"""
async with self.connection_pool.acquire() as connection:
async with connection.cursor() as cursor:
sql = self.render_sql(self.TABLE_CREATION_TEMPLATE)
await cursor.execute(sql)
await connection.commit()
async def set_replay_marker(self, subscription: str,
replay_marker: ReplayMarker) -> None:
"""Store the *replay_marker* for the given *subscription*
:param subscription: Name of the subscribed channel
:param replay_marker: A replay marker
"""
async with self.connection_pool.acquire() as connection:
async with connection.cursor() as cursor:
sql = self.render_sql(self.SET_REPLAY_MARKER_TEMPLATE)
await cursor.execute(sql, (
subscription,
replay_marker.date,
replay_marker.replay_id
))
await connection.commit()
async def get_replay_marker(self, subscription: str) \
-> Optional[ReplayMarker]:
"""Retrieve a stored replay marker for the given *subscription*
:param subscription: Name of the subscribed channel
:return: A replay marker or ``None`` if there is nothing stored for \
the given *subscription*
"""
async with self.connection_pool.acquire() as connection:
async with connection.cursor() as cursor:
sql = self.render_sql(self.GET_REPLAY_MARKER_TEMPLATE)
await cursor.execute(sql, (subscription,))
result = await cursor.fetchone()
if result is None:
return None
return ReplayMarker(**result)
async def stream_events() -> None:
"""Stream event messages using the Streaming API"""
# create the MySQL connection pool
async with aiomysql.create_pool(
host=MYSQL_HOST,
port=MYSQL_PORT,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
db=MYSQL_DATABASE,
cursorclass=aiomysql.DictCursor,
) as connection_pool:
# create the replay marker storage object
replay_storage = MySQLReplayMarkerStorage(connection_pool)
# make sure the required table exists
await replay_storage.ensure_table_exists()
# create the Streaming API client
async with SalesforceStreamingClient(
consumer_key=SF_CONSUMER_KEY,
consumer_secret=SF_CONSUMER_SECRET,
username=SF_USERNAME,
password=SF_PASSWORD,
replay=replay_storage,
) as client:
# subscribe to PushTopic or CDC or ...
await client.subscribe("/data/ChangeEvents")
# listen for incoming messages
async for message in client:
print(message)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(stream_events())
Thanks a lot, the credentials are invalid hence no harm there
@robertmrk I have modelled my code similar to above but it is closing the connection abruptly if it receives an existing replay id. Please see my issue here and an not sure what I am doing wrong there. Any input there will help me greatly.
https://github.com/robertmrk/aiosfstream/issues/19#issue-1126620196
Hi i can't find out the return type of the Replay marker that needs to be passed in the Streaming client class, i have made a custom class to store the Replay id in SQL When i retrieve it from SQL and try to pass that replay id in the Streaming client, i get an error saying it's not a valid type for the parameter