Closed balmasi closed 5 years ago
Currently solving this by subclassing:
class MyClient(SalesforceStreamingClient):
async def receive(self):
response = await super(Client, self).receive()
return response
async def __aiter__(self):
async for message in super(Client, self).__aiter__():
yield message
await self.replay_storage.extract_replay_id(message)
Tried doing this in receive
but got rejected.
See any problems with what I'm trying to do??
I'm saving every event to file, then i read event from file, then process, if processed succesfully then delete line from file. If not then event will be processed with next incoming event.
Hi @balmasi
This is not possible at the moment, or at least not without altering the code. Your solution might work, but it has its own potential problems. In your subclass, the replay marker gets stored when the next message is requested from the asynchronous generator function. If your application would crash at a point after successfully processing a message, but before requesting the next message, then the replay marker of an already processed message would not be stored. So at the next run of the application it would be retrieved and processed again.
Can you provide some feedback on what would be your strategy for dealing with message processing errors? If all you want to do is to retry the processing of a message after a failed message processing attempt, then simply storing the last message in a local variable or on a persistent location might be enough.
I'm saving every event to file, then i read event from file, then process, if processed succesfully then delete line from file. If not then event will be processed with next incoming event.
You still have the same problem because essentially what you have is your process() == writeToFile()
Hi @balmasi
This is not possible at the moment, or at least not without altering the code. Your solution might work, but it has its own potential problems. In your subclass, the replay marker gets stored when the next message is requested from the asynchronous generator function. If your application would crash at a point after successfully processing a message, but before requesting the next message, then the replay marker of an already processed message would not be stored. So at the next run of the application it would be retrieved and processed again.
@robertmrk
You're absolutely correct about the app crash case, however, this problem is fundamentally unsolvable.
as long as set_replay_marker()
and process()
are 2 atomic operations, the only way to make it resilient to failure is if the process
invokes set_replay_marker
AND process
is an idempotent operation. I think that's a given for consumer-driven subscriptions like this.
alternatively, you can allow the user to override the library so that the set_replay_marker
is built into the process
function so the user manages transactions to ensure both things are happening.
simply storing the last message in a local variable or on a persistent location might be enough. With the current setup
This works if you're expecting a soft failure, but not for unexpected hard failures, essentially forcing the user of the library to write the message to a db or something when it could be avoided.
Here are some variations of ways I thought this could be handled. Feel free to remix them cause I'm just brainstorming here
async for msg in client:
async with client.get_marker_context(msg):
await process(msg)
# where set_replay_marker is called if there was no exceptions in context
or
msg = await client.receive()
async with msg:
await process()
# where set_replay_marker is called if there was no exceptions in context
or even
msg = await client.receive()
await process(msg)
await client.mark_processed(msg) # or something to that effect
On a different note. I'm a little confused by your implementation of async iterators (based on my readings on the subject). isn't __aiter__
supposed to return an async iterator and __anext__
supposed to get the single next value? It seems you're doing the entire iteration in the __aiter__
method. Am I missing something?
Here's my attempt at making a simplified client that does what I want:
https://repl.it/@balmasi/aiosfstream-replay
you can also imagine converting that to a context manager which provides a function to wrap your process with. Would look something like
async for m in client:
async with Actor(my_replay_storage, m) as run_and_commit_marker:
await run_and_commit_marker(process)
@balmasi This suggestion sounds reasonable. I will soon implement the manual acknowledgement of messages as an option.
On a different note. I'm a little confused by your implementation of async iterators (based on my readings on the subject). isn't
__aiter__
supposed to return an async iterator and__anext__
supposed to get the single next value? It seems you're doing the entire iteration in the__aiter__
method. Am I missing something?
I think you should look up the difference between an asynchronous iterable and an asynchronous iterator. :wink:
An instance of the Client
or the SalesforceStreamingClient
class is not an iterator, it's an iterable, or more precisely it's an asynchronous iterable, all it needs to implement is __aiter__
.
When you call it, it'll return an object which will be an asynchronous iterator which implements both __aiter__
and __anext__
.
Hi @balmasi
I'm sorry but I didn't have any time to implement this enhancement until now. The documentation can be found here.
@robertmrk , thanks again for getting this.
i have couple of questions for you. About shelve.open, does it create the file on its own and store the events replayId? We are using this at the moment, we are struggling to get this python daemon process running continuously without getting that crashed, we did some error handling. What do you suggest for running this streaming client continuously ?
Hi @sureshbabumandava
Yes, shelve.open
will create the underlying database file if it doesn't exists. It's actually part of the standard library, you can find more information on it here.
It's not entirely clear from your description what exactly causes your problems. If you're experiencing problems with the library, if it behaves unexpectedly then please open a new issue with a minimal code example that can be used to reproduce the problem.
Hi @robertmrk ,
I've been using your package with great success so thank you.
One question on the consumption of messages though:
Let's say I have some
async def process(msg): ...
that does some processing of a received messageIt seems like
set_replay_marker
is being called before I can call my process function. How can I replay the message that I just received if theprocess(msg)
call fails?ideally, I would like to increase the replay marker AFTER I've processed successfully.
any advice?