fabiogouw / spark-aws-messaging

A custom sink provider for Apache Spark that sends the content of a dataframe to an AWS SQS
MIT License
21 stars 5 forks source link

is at least once ? #10

Closed raphaelauv closed 1 year ago

raphaelauv commented 1 year ago

hi , thank you for the lib :+1:

I would like to know if the sink is at-least-once ( no matter what, the spark job FAIL if a message is not asserted by aws sqs ) ?

thank you

fabiogouw commented 1 year ago

Hello!

I haven't run deeper tests, but what I understand is if something goes wrong when writing the some messages to the SQS queue, the task will fail and another executer node will be chosen by Spark to retry the task.

In this scenario, the same message might be written many times, so it's important for your consumer app to be idempotent.

raphaelauv commented 1 year ago

I don't know if it's possible to detect it from client side :

when I produce 5 event where one event is bigger than the event limit size setup on the SQS queue then the job do not fail and there is 4 event in the sqs queue

example setup a limit of 2ko on a SQS queue and produce 1 event bigger than 2ko

fabiogouw commented 1 year ago

I've run some tests but I wasn't able to reproduce this behavior. This PR #11 has a new test that tries to write a dataframe with 5 rows with the 3rd row having 262300 bytes. The job fails with no messages being added to the queue. I ran them against a localstack container, not against a real SQS queue. I'm going to try that.

BTW, I found that another test started to fail after upgrading the localstack version, I'll look at this too.

raphaelauv commented 1 year ago

The problem is not against the max SQS message size but against a custom max size

set 2ko max size on the aws SQS queue then try produce msg that are 4ko

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html#API_SetQueueAttributes_RequestSyntax

MaximumMessageSize

fabiogouw commented 1 year ago

I got now. This seems to be a behavior of the AWS SDK that's used by this lib to send messages to the queue, when we call the sendMessageBatch method. I'm not sure if this is a bug or an intended comportment, but I agree that it can lead people to think their data was sent to the queue whilst it has been silently dropped.

What I can do is retrieve this configuration before writing to the queue, when the lib reads the data from the dataframe. Then, if one of the lines exceeds the limit, the lib can throw an exception. It can't prevent some of the data being written and the other part not, given the distributed way Spark works, but at least people will know something went wrong and they can fix and retry the job.

WDYT?

raphaelauv commented 1 year ago
retrieve this configuration before writing to the queue

their is no guaranty that a long running application would face a queue where the setting is change at runtime

and even if you fetch this conf every X there is a risk of lost message

I think the best would be to add a big disclaimer about this on the README

the sink is at least once if you ensure that your code do not produce event bigger than the MaximumMessageSize setting of the SQS queue

and maybe an exemple of how assert ( here in pyspark


def assert_size_column_value(df, max_size:int)->None:
    SQS_HEADER_SIZE = 100

    tmp_df = df.withColumn("size", length("value"))
    assert tmp_df.filter(tmp_df["size"] > max_size - SQS_HEADER_SIZE).count() == 0
fabiogouw commented 1 year ago

Yes, indeed documentation is a must, I'll do that!

However, I found a bug in the code: sendMessageBatch does return the messages that weren't able to be sent, so I can use this information to throw an exception stating something went wrong without the risk of losing a message (I don't need to fetch the configuration and validate the size of the message as I initially though).

Thanks for raising this issue, @raphaelauv, with this I was able to find this flaw!