awspring / spring-cloud-aws

The New Home for Spring Cloud AWS
http://awspring.io
Apache License 2.0
881 stars 299 forks source link

Implement FIFO Support #444

Closed tomazfernandes closed 2 years ago

tomazfernandes commented 2 years ago

Implement basic support for FIFO queues.

Goals

Stretch Goals

If those stretch goals cannot be addressed in this issue, open respective issues for them.

Non goals

Community feedback on possible features and known issues regarding FIFO is welcome ☺️

mgusiew-guide commented 2 years ago

Hi @tomazfernandes , I will reply to your comments in #436 early next week. We are still testing some corner cases and I want to make sure I fully understand the SQS FIFO behaviour before providing the feedback. Generally it looks like ordering and retries may be difficult to get right when poll batch size is bigger than one, more to come next week :)

tomazfernandes commented 2 years ago

Sounds good, thanks again @mgusiew-guide!

I'd like to hear more about your particular use-case, if you're willing to share - as far as I understand, all scenarios you propose will eventually break ordering (unless we want to retry a message indefinitely), since they consider having a DLQ.

By setting the message batch size to 1, but having a DLQ, you're not really assuring ordering, since after the message is redriven you'll skip it and process the next one. You're actually assuring that poison pills won't spoil the whole batch, which is of course fair if ordering is not paramount.

The reason I know for going for a FIFO queue when strict ordering is not required is exactly once semantics - perhaps we don't care that much about ordering, but we don't want to process the same message twice. For that use case, I can definitely see the benefits of being able to skip a message, or redrive it to a DLQ, without affecting the whole batch.

For strict ordering, AFAIK we can either retry indefinitely (FIFO queue w/o DLQ), or perhaps send the poison pill to a DLQ and stop consumption for that message group. (Stopping consumption for a message group would of course be tricky - we could easily stop the entire queue, or perhaps keep extending the visibility of the poison pill so that the message group is not serviced again, but that doesn't look like a clean solution).

Regarding the solutions you brought, it seems the key to solving them would be (configurably) extending visibility for all messages in a message group batch before each message from that group is processed, be it a first attempt or a retry. (The tricky part for retries would then be how to send the message to the DLQ after attempts are exhausted - we'd probably need to do that in the framework, since as far as AWS is concerned we've only increased such message's timeout).

Please let me know your thoughts on this if and when possible.

Thanks!

mgusiew-guide commented 2 years ago

Hi again @tomazfernandes ,

I did some additional research on this topic and I believe I am ready to discuss.

I use FIFO queues when I want to process in order, exactly once semantics is not that important for me cause the consumer may fail with retryable exception and in such case I want to retry the message before I process another one.

I see several use cases where FIFO semantics is useful, e.g. bank transactions for given account, stock quotes, etc.

The use case I am working on right now is infrastructure. I am sending infra commands grouped by infra component and the message contains the desired infra component state. When processing I want to create/update/remove infra component plus register the metadata in control plane and save audit. The metadata and audit are stored in database and I want to maximize chances that both infra component change and metadata/audit save are completed successfully (hence retries). I also want to order the messages by component in order they are sent cause out of order could change the component to older state.

Generally I would like to retry the message until the limit is reached (which results message to land in DLQ) and only then start processing next one in the group. This allows me to choose between two strategies in case when message processing fails with retyable exception or times out: 1) I can decide to process next message. For the infra scenario described above, I can usually process next message because I know what the desirable state of the infra component is. I will may miss some updates in between but no manual intervention is needed plus the failed messages go DLQ and I can store them as rejected so somebody may analyze later and process retrospectively if needed (in this case retro processing only updates the audit cause the component and metadata is up to date) 2) I can skip the next message by checking the command headers/body, e.g. version check or sth else. This is useful in case of CQRS messages that contain only delta and it is necessary to process each change (the message does not contain the whole state so it is necessary to aggregate deltas and apply the new ones on top of aggregate)

Hope this makes sense. I can achieve the above with batch size set to 1 cause and frequent polling cause this setup: 1) Allows to process messages from different groups in parallel 2) Ensures that at most one message from each group is processed at given time - SQS returns only the first message until retries limit is reached I can do that today with Spring SQS, the downside here is that frequent poll generates extra traffic and cost in AWS

It would be nice to get the above semantics from SQS with batch size > 1. For example, if I have one group with 2 messages and the other with one message and batch size 3, I would like to get only 2 messages, first one from first group and first one from second group. Unfortunately SQS returns multiple messages from the same group (in this case it would return all 3 messages) and so do the others (e.g. ActiveMQ https://stackoverflow.com/questions/60047370/activemq-jmsxgroupid-does-not-work-as-expected).

I understand it would be difficult to implement the above on top of SQS. That would require keeping the dependent messages in memory (in case when first message needs to be rolled back, we want to extend visibility timeout for the second so that we "save" the retry) plus persisting the dependencies and communication with other consumer instances (in case of retry other consumer may get the first message). This would be tricky to implement. I may be speaking Chinese here but I can go into more details if needed ;)

WRT goals "Implement basic error / ack handling (when a message fails, subsequent messages are discarded)", I wonder how do you plan to discard the subsequent messages. It looks like some persistence may be needed because those messages: 1) may come long after the message processing failed 2) may be processed by different consumer 3) may be processed after consumer is restarted

As I mentioned in other comment, extending visibility timeout on dependent messages may be nice in case when the message is processed successfully (we don't know how many dependent messages are given by SQS so setting visibility timeout defensively for all messages may not be ideal). However when the message is processed with retryable error, things get more tricky. There is also a timeout issue which can result in same message processed by two consumers (in case if it is not possible to cancel the first one).

I do believe that the polling strategy that I described could be useful. If SQS provided it then Spring SQS could process FIFO in the same way that it processes other queues (at least for this strategy). Maybe it would make sense to ask AWS about such enhancement ? Let me know what you think.

Hope that clarifies a bit, don't hesitate to comment :)

tomazfernandes commented 2 years ago

Hi again @tomazfernandes ,

Hi and thanks again for your input @mgusiew-guide!

I see several use cases where FIFO semantics is useful, e.g. bank transactions for given account, stock quotes, etc.

I'm not sure I understand exactly what you mean by FIFO semantics - care to elaborate? For me that would be processing messages exactly once (no duplicates in the queue) and in order (no DLQ to allow skipping messages).

The use-case you brought is solid and good design, thanks!

I think the fundamental part we're not in sync for most of what you described as issues / solutions is message dependency. When we receive a batch from a FIFO queue, the only dependency there is is within each message group. If we receive messages from more than one group, we can treat each group independently - if we delete all messages from the first group, we can fetch new ones from that group even if we haven't processed the second group yet.

So basically, as far as I understand, there's no need to persist messages on our side or anything like that - we'll just group messages by messageGroupId and process messages from each group sequentially, while processing groups in parallel - that's the current version's design and it looks optimal to me.

With that in mind, we only need to worry about extending visibility within a given messageGroupId - even with retries, if a message fails, we extend the visibility for that message and for all other messages from the same group. Of course, if a message is not processed inside the visibility window for one message, it's a configuration issue that's up to the user to fix.

The same goes for my proposed goal, which is perhaps not that accurate - if a message fails, we can discard all messages of the same group including the failed one, and either set visibility timeout of them all to zero so that the messages are serviced again right away, or simply do nothing and wait for the visibility window to expire.

Please let me know if that makes sense or perhaps I'm missing something.

Thanks!

mgusiew-guide commented 2 years ago

Hi again @tomazfernandes ,

Thanks for the quick reply. Below my responses

I'm not sure I understand exactly what you mean by FIFO semantics - care to elaborate? For me that would be processing messages exactly once (no duplicates in the queue) and in order (no DLQ to allow skipping messages).

For me FIFO means first in, first out. To me the exactly once concept is orthogonal (I understand that it is currently offered for FIFO only but I can imagine standard queues with exactly-once semantics). In the real world hiccups happen so IMHO FIFO needs to take retries in account. So I would like the message to be retried before the next message from the same message group is processed. If the retries are exhausted and the message is not processed correctly (lands in DLQ) I see two ways to proceed: 1) Start sending all subsequent messages in this message group to DLQ - this is needed when the processing of next message depends on previous one - e.g. CQRS based on deltas 2) Allow to process subsequent messages in the group with retries - this is useful in my scenario when I can skip some messages (e.g. some infra resource modifications because the message has the desired state) but I don't want to process out of order (I want the infra resource to have the latest state)

FTR in FIFO streaming products like Kafka the message will be retried indefinitely until client commits the offset so in Kafka DLQ pattern must be implemented in client (this is called poison message and at some point you need to give up). In AWS SQS DLQ is configurable which solves poison message problem but once you start retrying with batch size greater then one, messages can be delivered out of order which I consider a problem.

I think the fundamental part we're not in sync for most of what you described as issues / solutions is message dependency. When we receive a batch from a FIFO queue, the only dependency there is is within each message group. If we receive messages from more than one group, we can treat each group independently - if we delete all messages from the first group, we can fetch new ones from that group even if we haven't processed the second group yet.

By dependency I meant the relationship between messages in the same message group id. So I think we have a common understanding here.

Let me illustrate my concerns on concrete example:

In the beginning the first consumer gets two messages and the second gets nothing, so far so good. The consumer may order the messages (as you suggested) and process the first one with following results: 1) The message is processed successfully. In this case the consumer may process second message, the only thing is that the visibility timeout on second message may need to be extended cause it was sitting idle for a while on consumer but the timer was ticking (we discussed that) 2) The message is processed with retyable error 3) The message processing times out

For now let's focus on 2). In this case we have following options: 2.1) Extend visibility timeout and retry the first message on consumer. The problem here is that if we don't expire the message, it won't return to queue and the retry count on queue level will not be decremented 2.2) Let the message expire so that it goes back to queue and is available for reprocessing. What about second message ? If we process it that will mean out of order. If we let it expire, it may happen that it will land in DLQ without any attempt to process it (this may or may not be what we want).
2.2.1) Let's say we want to start dropping the messages in this group (this is what I described as "Start sending all subsequent messages in this message group to DLQ"). Given that new messages may come at any time we need to:

2.2.2) Let's say we want to process next message once the retries for the first one are exhausted (this is what I described as "Allow to process subsequent messages in the group with retries"). If we extend visibility timeout on the second message on first consumer and wait for the first message to be polled again, this may not happen cause the second consumer may poll in the meantime and get the first message (in this case the first consumer will be keeping the second message and the second consumer will have the first message). So we would need to persist the dependency information somehow so that consumers can communicate which messages from which group they currently polled. This can be tricky

To sum up, IMHO out of order and retries don't play well together unless the batch size is greater than one.

Maybe FIFO with exactly once is easier to implement (the retries would have to be handled in client code) and maybe out of order is not that bad for many scenarios but I think it would be useful to get polling strategy that return messages that do not have any dependency between them. Right now we can achieve that with batch size = 1 and frequent polling but it would be nice to get an option to specify batch size > 1 and reduce the frequency of polling. Let me know if that makes sense (I understand that the best place to implement it is SQS).

tomazfernandes commented 2 years ago

Thanks for the clarification, looks like we're exactly on the same page. Also, thanks for bringing the CQRS use case, I hadn't thought yet of an objective use case that required strict message ordering and this fits perfectly.

I think simplest solution overall would be what we currently have in Spring Cloud AWS SQS - leave retrying / DLQ complexity to AWS and user configuration, and we just process the messages and discard the remaining in case of an error. This leaves the problem of messages going to the DLQ without processing. In this version we're also offering the possibility of receiving a List<Message> in the listener method, so users can retrieve the entire batch for a given message group and decide how to handle it in the listener. So already a fair step up from what we currently offer I guess since it leaves a door open for receiving batches > 1.

Then the enhancements might be:

  1. Extending the MessageVisibility for the batch each time a message is sent for processing - this would solve the happy path and at least allow messages to have the proper processing window. Although we could also mitigate this by instructing users to configure message visibility to 10x message processing time, and we would set message visibility to 0 on error so that messages could be serviced again immediately. Not sure what's the best option - eventually we should probably offer both.

  2. Handling retries internally. As far as I understand, we'd need to extend message visibility for the batch accordingly, and manually send poison pills to DLQ after retries are exhausted. This would not be exactly the same as letting AWS handle this, since the message in the DLQ would actually be a new message with the same information, but it shouldn't be much of a problem as long as we provide all necessary information from the original message and properly document this behavior.

  3. Handling strict ordering use cases, such as in CQRS. This is the tricky one. Persisting messages is not something I'd like to deal with at this point. I also thought we could perhaps stop consumption for that message group by perpetually extending the poison pill's visibility after sending it to the DLQ, which would cause that message group not to be serviced to any consumer. The tricky part would be then knowing when the poison pill has been handled to stop doing that, but we could provide a way for users to do that programatically. Also not something I'd like to deal with at this point.

I think instead of providing a complex OOTB feature, a simpler approach for that last use case might be offering hooks users can implement so they can handle it any way they prefer. For example, we already offer a MessageInterceptor interface where users could check the message group and filter the message out if that's the same group as the poison pill. We also offer an ErrorHandler interface that would allow users to persist any information on the failed message. We could also offer a proper MessageFilter interface to explicitly allow users to filter out messages based on a given criteria. After we've gotten some users' feedback on this, we might look into implementing such an OOTB feature.

Please let me know your thoughts on this, and if I'm missing something.

Thanks!

tomazfernandes commented 2 years ago

One minor consideration:

2.2.2) Let's say we want to process next message once the retries for the first one are exhausted (...)If we extend visibility timeout on the second message on first consumer and wait for the first message to be polled again, this may not happen cause the second consumer may poll in the meantime and get the first message (...)

I don't think that's what would happen - if we extend the second message's visibility, we'd still have an inflight message for that message group, and SQS wouldn't service the first message again until we either delete or let the second message expire.

Also, I ran some preliminary tests, and SQS has some weird behavior if we delete e.g. the third message in a batch and let the first two expire, so it's important to delete all messages either at the same time or in the right order. Of course, I might be missing something.

Thanks again.

tomazfernandes commented 2 years ago

@maciejwalkowiak, just a thought, do you think it might be a good idea to merge the current code as is to a separate branch in the project, and then I can open new PRs against it? Afterwards we'd still be able to open a PR from that branch to main for a review of the integration code as a whole.

This way I could open a separate PR with the FIFO solution without having to wait for the current code to be reviewed and merged to main.

Just to offer some context, I've been prioritizing features that I think might require adjustments to the feature's design / interfaces so I can solve that sooner rather than later - async, blocking, batch, etc. FIFO is perhaps the most complex one, so I really prefer dealing with it soon so I can anticipate any issues and make any necessary adjustments.

Of course, that's at your discretion, and please let me know if you prefer doing it in any other way. We can leave it as is too if that's better.

Thanks!

mgusiew-guide commented 2 years ago

Hi @tomazfernandes ,

Thanks for sharing your thoughts.

Some comments from my side:

I don't think that's what would happen - if we extend the second message's visibility, we'd still have an inflight message for that message group, and SQS wouldn't service the first message again until we either delete or let the second message expire.

We tested this scenario and this is exactly what is happening. IMHO the way that SQS works is that in situation where you process second message and the first message and third message are in the queue, SQS will allow anyone to poll first message but will keep the third message hidden (SQS blocks downstream messages but not the upstream messages).

Also, I ran some preliminary tests, and SQS has some weird behavior if we delete e.g. the third message in a batch and let the first two expire, so it's important to delete all messages either at the same time or in the right order. Of course, I might be missing something.

Not sure what you mean by weird behaviour but based on my understanding SQS will retry the first and second message.

Based on the above I no longer recommend to extend the visibility timeout on messages. While this makes sense for happy path, it can affect the ordering in case of retries.

So here is my recommendation: 1) For perfect ordering with retries, recommend batch size 1 2) For batch size > 1:

First strategy is simple, if the message processing fails/timeouts, let this message and the rest of the messages in batch expire so that they would be redelivered in the same order. In the worst case scenario, the client may lose few messages from the batch but this is a tradeoff in order to get better performance. In this case, I would document that the messages that come later will still be processed and the listener is responsible for deciding whether to ignore or not (the reason for this is to avoid persistence and IPC in Spring SQS).

The second strategy would be more sophisticated. In case of error Spring SQS would still let listener process subsequent messages but also give the listener the info if previous messages from the group that were included in batch succeeded or not. This would allow clients to decide if they want to process message or not (version checks, retry checks, etc.)

Not sure which strategy should be a default ("batch size = 1 or "fail rest of the messages"). I discussed with my colleague and for us "batch size = 1" seems more intuitive for end user. Anyways those 3 strategies may coexist and may be implemented incrementally (actually batch size = 1 already works and the other two do not conflict with each other).

I think this is more less consistent with the basic handling that you specified.

WRT extensions:

Extending the MessageVisibility for the batch each time a message is sent for processing - this would solve the happy path and at least allow messages to have the proper processing window. Although we could also mitigate this by instructing users to configure message visibility to 10x message processing time, and we would set message visibility to 0 on error so that messages could be serviced again immediately. Not sure what's the best option - eventually we should probably offer both.

Now when I understand current SQS retry policies, I no longer recommend to extend visibility timeouts. I agree that instructing users is better.

Handling retries internally. As far as I understand, we'd need to extend message visibility for the batch accordingly, and manually send poison pills to DLQ after retries are exhausted. This would not be exactly the same as letting AWS handle this, since the message in the DLQ would actually be a new message with the same information, but it shouldn't be much of a problem as long as we provide all necessary information from the original message and properly document this behavior.

I don't recommend going this direction cause it may be difficult to provide expected quality of service. What if the app crashes during the retry, what if the framework is not able to write to DLQ at given moment ? There are dragons here ;)

Handling strict ordering use cases, such as in CQRS. This is the tricky one. Persisting messages is not something I'd like to deal with at this point. I also thought we could perhaps stop consumption for that message group by perpetually extending the poison pill's visibility after sending it to the DLQ, which would cause that message group not to be serviced to any consumer. The tricky part would be then knowing when the poison pill has been handled to stop doing that, but we could provide a way for users to do that programatically. Also not something I'd like to deal with at this point.

I would document that this can be achieved with batch size = 1 but that may impact performance. In case when performance is important some tradeoff needs to be made ("fail the rest of the messages" strategy or "process subsequent messages"). The reason for this is the current SQS polling strategy, if at some point SQS implements enhanced polling strategy (give many messages but all from different message groups), Spring SQS will support it.

I think instead of providing a complex OOTB feature, a simpler approach for that last use case might be offering hooks users can implement so they can handle it any way they prefer. For example, we already offer a MessageInterceptor interface where users could check the message group and filter the message out if that's the same group as the poison pill. We also offer an ErrorHandler interface that would allow users to persist any information on the failed message. We could also offer a proper MessageFilter interface to explicitly allow users to filter out messages based on a given criteria. After we've gotten some users' feedback on this, we might look into implementing such an OOTB feature.

Not sure if this is needed at this point. I recommend to keep things simple cause once you release all those bells & whistles you will need to support it at least for some time...

Just my 2c.

tomazfernandes commented 2 years ago

Sorry for the delay @mgusiew-guide, and thanks for your response.

We tested this scenario and this is exactly what is happening. IMHO the way that SQS works is that in situation where you process second message and the first message and third message are in the queue, SQS will allow anyone to poll first message but will keep the third message hidden (SQS blocks downstream messages but not the upstream messages).

Not sure what you mean by weird behaviour but based on my understanding SQS will retry the first and second message.

You're right. But I do consider this behavior to be weird, since they claim When you receive a message with a message group ID, no more messages for the same message group ID are returned unless you delete the message or it becomes visible. In my interpretation of this rule the first message should not be redelivered as long as any messages from the group Id are in-flight.

Based on the above I no longer recommend to extend the visibility timeout on messages. While this makes sense for happy path, it can affect the ordering in case of retries.

I don't recommend going this direction cause it may be difficult to provide expected quality of service. What if the app crashes during the retry, what if the framework is not able to write to DLQ at given moment ? There are dragons here ;)

Not sure why that is. Independently of this behavior, given we have ordered messages, we must take care to process them in order and not process the next message until we have deleted the previous, either because it processed successfully or because we successfully sent it to the DLQ after retries.

If we're unable to do either, we discard all messages and let SQS service them again eventually - distributed systems should be prepared for network connections to fail and there's nothing really we can do about it. Adding retries would increase resiliency since it would enable passing through hiccups.

As long as we extend the visibility of all non-deleted messages together, and delete each message before processing the next one, I don't really see the problem. What am I missing?

Thanks again for your input!

mgusiew-guide commented 2 years ago

Thanks for reply @tomazfernandes

Below my comments:

You're right. But I do consider this behavior to be weird, since they claim When you receive a message with a message group ID, no more messages for the same message group ID are returned unless you delete the message or it becomes visible. In my interpretation of this rule the first message should not be redelivered as long as any messages from the group Id are in-flight.

I agree that the docs is is bit vague but one could argue that "or it becomes visible" is what is happening here (message one becomes visible because of visibility timeout).

Not sure why that is. Independently of this behavior, given we have ordered messages, we must take care to process them in order and not process the next message until we have deleted the previous, either because it processed successfully or because we successfully sent it to the DLQ after retries. If we're unable to do either, we discard all messages and let SQS service them again eventually - distributed systems should be prepared for network connections to fail and there's nothing really we can do about it. Adding retries would increase resiliency since it would enable passing through hiccups. As long as we extend the visibility of all non-deleted messages together, and delete each message before processing the next one, I don't really see the problem. What am I missing?

If you want to implement this strategy only, then indeed you can increase the timeout. Note however that this strategy has some limitations. Let's say you get two messages from same group. If the processing of the first one timeouts or finishes with retryable exception then the second one is not processed at all. If that happens for all retries then the second message will not get a single chance to be processed and will land with the first one in DLQ. In some scenarios when no message can be lost, this is the right solution. However in some cases client may want skip the failing message and still try the second one. In such case things get more tricky but I agree with you that it is still possible to extend the visiblity timeout. Let me know if you want to go into details. If so I can present what I mean using some examples in order to make sure we are on the same page.

tomazfernandes commented 2 years ago

I agree that the docs is is bit vague but one could argue that "or it becomes visible" is what is happening here (message one becomes visible because of visibility timeout).

I guess you're right again, thanks. Perhaps if the example in the docs illustrated a scenario with more than one message being served it might be clearer.

If so I can present what I mean using some examples in order to make sure we are on the same page.

I think we might still not be exactly in the same page. This time perhaps let me show you a more detailed example of what's on my mind, and you can let me know if you see any inconsistencies.

To make it simpler, I'll start with the basic example involving visibility extension. We want to:

We would:

  1. Receive a batch of messages, potentially from multiple group ids
  2. Aggregate them in batches from each group id - from now on batch will refer to messages from the same group
  3. Change visibility for all messages in the batch to 60 seconds
  4. Process the first message in the batch
  5. If processing succeeds: delete the message, remove from the batch and go back to 3
  6. If processing fails, discard messages so they will be served again by SQS, and we go back to 1

If some processing takes more than 60 seconds, SQS will serve it again perhaps to another consumer, and that's a configuration problem rather than something we should try to address.

Am I missing anything so far?

Thanks

mgusiew-guide commented 2 years ago

Hi @tomazfernandes ,

Thanks for going into details, from what I see we are on the same page with basic scenario. There is only one thing that I would like to clarify. Let's say we polled two messages in same group and the first one kept on failing which caused the second one to not be processed. Do I understand correctly that in case if some time later the poll receives third message in same group, the third message will be processed ?

tomazfernandes commented 2 years ago

Hi @mgusiew-guide,

Do I understand correctly that in case if some time later the poll receives third message in same group, the third message will be processed ?

Yes, you're correct. As per AWS docs, "Don't use a dead-letter queue with a FIFO queue if you don't want to break the exact order of messages or operations. For example, don't use a dead-letter queue with instructions in an Edit Decision List (EDL) for a video editing suite, where changing the order of edits changes the context of subsequent edits."

This solution would be close to what we currently have on Spring Cloud AWS, and perhaps you'd need to keep batch size set to 1 for simplicity. Two noteworthy upgrades though would be:

tomazfernandes commented 2 years ago

The second solution would be adding retries. We'd want to:

So we would:

  1. Receive a batch of messages, potentially from multiple group ids
  2. Aggregate them in batches from each group id - from now on batch will refer to messages from the same group
  3. Change visibility for all messages in the batch to 60 seconds
  4. Process the first message in the batch
  5. If processing succeeds: delete the message, remove from the batch and go back to 3
  6. If processing fails and attempts < maxAttempts, keep message in batch and go back to 3
  7. If processing fails and attempts are exhausted, send the message to a DLQ, delete the message, remove from the batch, and return to 3

How does this look?

Thanks.

mgusiew-guide commented 2 years ago

Hi @tomazfernandes ,

Thanks for explanation.

I think that for a simple strategy with batch size > 1, it should be acceptable to lose some messages given the SQS poll limitations. The batch processing that you mentioned could be used in case if someone wants to implement more sophisticated strategies. Do I understand correctly that for batch processing I would have to ACK from the listener (SqsMessageDeletionPolicy.NEVER) ?

For my use case I plan to continue batch size = 1, at least until the performance / cost are acceptable. This provides me with the retries and ordering guarantees that I need on SQS level.

WRT second solution (client retries) I recommend to be careful. As I understand you assume that the framework will be able to delete the message and place it on DLQ (two API calls). One or both of those call may sometimes fail, e.g. due to temporary network problem. So I recommend to be careful here as there will be corner cases to handle or known limitations to document.

With all that being said I still consider asking AWS for the possibility to expose new poll strategy that would guarantee that the single batch contains at most one message from each message group. If the SQS provided this, all that is needed on framework level is to place the message on ExecutorService (like is the case for standard queues). IMHO it is not easy to achieve that behaviour on client side with the current state of things. Feel free to disagree :)

tomazfernandes commented 2 years ago

Hi @mgusiew-guide, I'm sorry for the delay.

I've implemented the first scenario in these 3 commits: https://github.com/awspring/spring-cloud-aws/pull/374/commits/70bc930de6599043a056c8dd0981c7978f0a7404, https://github.com/awspring/spring-cloud-aws/pull/374/commits/5a4d7ac2dac41de5d6f664f023dfe73ed370e960, https://github.com/awspring/spring-cloud-aws/pull/374/commits/316a8c3721b20b83f260558161833502880c52eb

I don't see a problem with the second scenario - if we fail to send the message to the DLQ or to delete it afterwards, we'll stop processing the batch and the message will be served again - the same problem we face if we can't delete a successfully processed message today.

However, it does seem too complex for this first milestone which is already packed, so something for us to think of in a future release.

Please let me know if you have any further suggestions or concerns.

Thanks again for your input, it did help a lot in understanding these real-world FIFO use-cases!

tomazfernandes commented 2 years ago

Closed by https://github.com/awspring/spring-cloud-aws/commit/70bc930de6599043a056c8dd0981c7978f0a7404, https://github.com/awspring/spring-cloud-aws/commit/5a4d7ac2dac41de5d6f664f023dfe73ed370e960, https://github.com/awspring/spring-cloud-aws/commit/316a8c3721b20b83f260558161833502880c52eb

mgusiew-guide commented 2 years ago

Hi @tomazfernandes ,

Thanks for sharing the updates. I will have a look at it, when I find some time.

WRT second scenario, consider following flow: 1) Client receives a from SQS and tries to process without success. The retries in memory are exceeded but the retries on SQS are still fine 2) Framework successfully sends the message to DLQ 3) Framework tries to delete the message but fails (SQS not available) 4) The message goes back to queue and is retried 5) The message is processed successfully by framework and deleted from the queue In this case the framework successfully processed the message but it landed in DLQ and maybe reported as error (if there is an error handler in DLQ).

tomazfernandes commented 2 years ago

Hi @mgusiew-guide, thanks for your reply. Sure, that can happen. But let's consider the scenarios you brought.

In case it's a CQRS delta-based architecture, the DLQ pattern shouldn't really be used unless there's a way to guarantee all messages for that MessageGroupId are forwarded there. So that's not what this second scenario is for.

OTOH, for your particular use-case, if I understand correctly, messages that end up in the DLQ are not reprocessed, since a message with the next state may already have been processed. So it doesn't seem like a message getting to the DLQ even though it was properly processed should be a deal breaker. I don't think this should happen very often and in a distributed system I believe we should embrace and prepare for failure, rather than trying to avoid it at all costs. Since this would be an opt-in feature, I think stating it clearly in the documentation might be enough.

For your usage, would you rather keep using batch size set to 1 than using a feature like on these terms?

As a note, I've contributed a Non-blocking Delayed Retries feature to Spring Kafka that does the exact same thing more than one year ago - even though the feature has been widely used for more than one year now I haven't seen any complaints about this scenario yet. Perhaps since SQS has DLQ built-in users would expect a more rigorous approach?

Thanks.

mgusiew-guide commented 2 years ago

Thanks for quick feedback @tomazfernandes .

In case it's a CQRS delta-based architecture, the DLQ pattern shouldn't really be used unless there's a way to guarantee all messages for that MessageGroupId are forwarded there. So that's not what this second scenario is for.

M.G. From my experience it is often the case that in CQRS the application tries to process the next messages (if possible) and then there is a case-by case decision whether to process rejected ones, often is a special "out of order" mode.

OTOH, for your particular use-case, if I understand correctly, messages that end up in the DLQ are not reprocessed, since a message with the next state may already have been processed. So it doesn't seem like a message getting to the DLQ even though it was properly processed should be a deal breaker.

M.G. In my case I will reprocess and check if the next message was processed or not. If it was not processed, I can safely try to reprocess (I know that it is not retried anymore) . If the next message was processed, I can decide whether to skip or process in "out of order" mode.

I don't think this should happen very often and in a distributed system I believe we should embrace and prepare for failure, rather than trying to avoid it at all costs. Since this would be an opt-in feature, I think stating it clearly in the documentation might be enough.

M.G. I do agree that there is a tradeoff here and I agree that for some users this tradeoff may be acceptable. That being said, I recommend to document the tradeoff cause in case if outage happens you may end up with dissatisfied users.

For your usage, would you rather keep using batch size set to 1 than using a feature like on these terms?

M.G. We are going to stick with batch size = 1 for now. This guarantees that if the message lands in DLQ, it is not going to be retried anymore, so when I consume from DLQ I know that this message is not concurrently being processed. In my use case I don't have huge traffic so correctness is more important than performance.

As a note, I've contributed a Non-blocking Delayed Retries feature to Spring Kafka that does the exact same thing more than one year ago - even though the feature has been widely used for more than one year now I haven't seen any complaints about this scenario yet. Perhaps since SQS has DLQ built-in users would expect a more rigorous approach?

M.G. Well in this case users can choose between retries implementation (framework vs DLQ) and expect appropriate quality of service and tradeoffs

FTR I haven't mentioned the other possible problems with retires in memory, e.g. excessive number of retries. Imagine that you are calling an expensive service and you pay a lot of money for each call. If you retry too many times (which may happen) then your costs will increase. Same applies if you call the system with quotas, you may be exceed the limits.

Last but not least, my intention is not criticise the options but to try to mention the cons based on my experience with messaging/streaming systems. I see pros & cons in all options and I don't think we have a golden hammer here. In case if you are interested in this topic, I recommend this blog post, in particular "Error handling strategies" and "Best Practices for a Dead Letter Queue in Apache Kafka". I know that Kafka is bit different than SQS but it is FIFO by design so IMHO many best practices apply to FIFO queues as well. The author lists many strategies that may be applicable in different type of use cases.

Hope that helps

mgusiew-guide commented 2 years ago

@tomazfernandes just wanted to recall the last answer:

As a note, I've contributed a Non-blocking Delayed Retries feature to Spring Kafka that does the exact same thing more than one year ago - even though the feature has been widely used for more than one year now I haven't seen any complaints about this scenario yet. Perhaps since SQS has DLQ built-in users would expect a more rigorous approach?

M.G. As you mentioned in the docs, when using this feature the FIFO guarantee is gone. It may be acceptable in scenarios in which Kafka is used as regular message queue. If the end user wants to have FIFO with retries and DLQ, she can use DeadLetterPublishingRecoverer right ? I understand that this does not provide exponential retries but the FIFO semantics and the retries would be guaranteed, please correct me if I am wrong. Sorry for not digging into that earlier but I thought we were focused on FIFO. Maybe DeadLetterPublishingRecoverer also has some issues (coordination between committing offset and sending message to dead-letter topic) but that would be topic for another conversation plus Kafka does not offer built-in DLQ so may be reasonable to not keep expectations to high...

So to summarise, client side DLQ could lead to false negatives and excessive retries, possibly something else (again I haven't gone deep into this) . The reason why I did not recommend it is because the end users may ask to eliminate those limitations and you may end up with implementing message broker in Spring SQS which is not an easy task. That being said if correctness is not the highest priority, this may be a viable option.

Please let me know if this makes sense.

tomazfernandes commented 2 years ago

Hi @mgusiew-guide,

M.G. As you mentioned in the docs, when using this feature the FIFO guarantee is gone. It may be acceptable in scenarios in which Kafka is used as regular message queue. If the end user wants to have FIFO with retries and DLQ, she can use DeadLetterPublishingRecoverer right ? I understand that this does not provide exponential retries but the FIFO semantics and the retries would be guaranteed, please correct me if I am wrong. Sorry for not digging into that earlier but I thought we were focused on FIFO. Maybe DeadLetterPublishingRecoverer also has some issues (coordination between committing offset and sending message to dead-letter topic) but that would be topic for another conversation plus Kafka does not offer built-in DLQ so may be reasonable to not keep expectations to high...

You're right, when using this feature FIFO guarantees are lost, so not the same scenario we're discussing. Without that feature, DeadLetterPublishingRecoverer can be used in conjunction with DefaultErrorHandler, which provides blocking retries by seeking back to the failed record after a backoff period, which can be exponential. That would resemble more our scenario.

Since commits are handled after the record is published to the DLT, it can happen that the message is sent to the DLT and reprocessed afterwards. Never seen any complaints about this though, but as you mention it could be due to Kafka not offering this OOTB and thus expectations can be lower.

The reason why I did not recommend it is because the end users may ask to eliminate those limitations and you may end up with implementing message broker in Spring SQS which is not an easy task.

Yeah, I think you brought valid points and implementing this would probably not be worth it. It was good discussing it though, so thanks for bringing it up. Furthermore, the only real benefit I see of retrying in memory vs setting batch size to 1 would be reducing the number of polls, but given we'd still need to keep extending visibility, we'd end up making a request per message anyway. Also performance-wise the SQS calls are really fast, so not much to gain there considering the effort to implement this.

Maybe in a scenario with lots of different MessageGroupIds it might make more sense, but we can make many simultaneous polls for 1 message and get messages from different group ids concurrently.

One thing that came to mind is that we might change the visibility for failed messages so we can use it as a BackOff - let's say a message has a visibility of 30s, if it fails fast, we could set its visibility to e.g. 4 seconds so it'll be served again quicker. Something for another issue though.