akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka/current/
Other
1.26k stars 646 forks source link

OutOfMemoryError with AWS SQS #1588

Closed avdv closed 5 years ago

avdv commented 5 years ago

Versions used

Alpakka version: 1.0-M3

Akka version: 2.5.20

Amazon SQS library: 2.5.8

Expected Behavior

When starting a SqsSource I would expect that the flow handles a slow consumer, ie. it only pulls new messages when needed.

val settings = SqsWatcherSettings(
      queue-url,
      SqsSourceSettings()
        .withMaxBatchSize(cfg.get[Int]("max-number-of-messages"))
        .withWaitTimeSeconds(cfg.get[Int]("wait-time-seconds"))
        .withMessageAttributes(cfg.get[Seq[String]]("message-attributes").map(MessageAttributeName.apply).toList)
    )

implicit val sqsClient: SqsAsyncClient = SqsAsyncClient
    .builder()
    .httpClient(new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory().build())
    .build()

val source = SqsSource(queueUrl, settings)
        .withAttributes(supervisionStrategy(resumingDecider))

val sink = SqsAckSink(queueUrl)

source
  .map(...)
  .throttle(100, 1.minute)
  .mapAsync(parallelism = 1)(...)
  .map {
      case (msg, Right(_)) ⇒
        MessageAction.Delete(msg)
      case (msg, _) ⇒
        MessageAction.Ignore(msg)
    }
  .(sink)(Keep.both)
  .run()

Actual Behavior

When starting up the flow, it seems to eagerly pull down all messages in the queue (probably multiple times, when the visibility timeout for messages in flight is reached).

image

Note, we were using https://github.com/s12v/akka-stream-sqs before, but tried to switch to alpakka-sqs.

Relevant logs

flo

Reproducible Test Case

n/a

ennru commented 5 years ago

Thank you for trying it out, that is definitely not how it's meant to be.

avdv commented 5 years ago

Hi,

I tried to pinpoint the problem with localstack and the throttling seems to be the culprit...

I have created a queue with 500 messages.

implicit val system: ActorSystem = ActorSystem()
implicit val mat: Materializer = ActorMaterializer()
implicit val awsSqsClient = SqsAsyncClient
   .builder()
   .endpointOverride(URI.create("http://localhost:4576"))
   .region(REGION)
   .build()

system.registerOnTermination(awsSqsClient.close())

val localstackUrl = "http://localhost:4576/queue/foobar"

SqsSource(
  localstackUrl,
  SqsSourceSettings()
    .withMaxBufferSize(10)
    .withCloseOnEmptyReceive(true)
)
  // !!!
  // .throttle(10, 1.minute)
  .map { msg ⇒
     println(s"received message: ${msg.messageId()}")
     Thread.sleep(1000)
     MessageAction.Ignore(msg)
  }
  .runWith(SqsAckSink(localstackUrl))
  .onComplete {
    case _ ⇒
      println("terminating...")
      system.terminate()
  }(scala.concurrent.ExecutionContext.Implicits.global)

This may be a contrived example, of course. But the stream is running without problems with exactly 30 messages in flight (30 seconds visibility timeout).

As soon as I uncomment the throttle operation, it seems to immediately demand most/all input from the source:

$ awslocal sqs get-queue-attributes  --queue-url "http://localhost:4576/queue/foobar" --attribute-names 'All'
{
    "Attributes": {
        "VisibilityTimeout": "30",
        "DelaySeconds": "0",
        "ReceiveMessageWaitTimeSeconds": "0",
        "ApproximateNumberOfMessages": "0",
        "ApproximateNumberOfMessagesNotVisible": "501",
        "ApproximateNumberOfMessagesDelayed": "0",
        "CreatedTimestamp": "1552998772",
        "LastModifiedTimestamp": "1552998772",
        "QueueArn": "arn:aws:sqs:elasticmq:000000000000:foobar"
    }
}
martyphee commented 5 years ago

I'm having a similar issue also with the number of inflight messages and the messages not being deleted. I tried to remove the throttling but it didn't make a difference.

SQS_MessagesInFlight

This consumer works fine on M1 but has issues with M3 and RC1.

Here is a gist of the consumer code. https://gist.github.com/martyphee/df632696796e3484205e486422c08450

nmeln commented 5 years ago

@ennru Having a similar problem as @martyphee after updating to RC1, everything was OK with M1.

Consumer is getting stuck after some time (after ~10 - 15 hours) and doesn't consume available messages. No errors in log.

Edit:

I see from metrics, that consumer stops receiving Empty Receives from SQS when this happens

adnanfaizan commented 5 years ago

Getting the same error and it seems Message class is being leaked -

top 5 object creation histogram using jhat -

Class Instance Count Total Size
class [C 1376284 2068640582
class software.amazon.awssdk.services.sqs.model.Message 332718 18632208
class java.lang.String 1375675 16508100
class [Lakka.dispatch.forkjoin.ForkJoinTask; 227 14880304
class scala.collection.immutable.$colon$colon 334396 5350336

In my case, I am also doing something similar to what avdv@ is doing with MesssageAction.ignore and MessageAction.delete both. I also tried with MessageAction.changeVisibility along with MessageAction.delete.

Seems like Message object is never dereferenced. My Stream looks like below -


SqsSource(queueUrl,
      //parallelism = maxBufferSize / maxBatchSize 20 10
      SqsSourceSettings().withWaitTime(10 seconds)
        .withMaxBatchSize(10).withMaxBufferSize(20)
    ).map {
      msg => {
        val out = Source.single(msg)
          .via(messageToLambdaRequest)
          .via(lambdaRequestToLambdaResp)
          .via(lambdaRespToAggregationKeySet)
          .via(workFlow)
          .log("error while consuming events internally.")
          .withAttributes(ActorAttributes.supervisionStrategy(decider))
          .runWith(Sink.seq)

        val reducedResponse = out.map(response => {
          response.foldLeft[Response](OK)((a, b) =>
            if (a == OK && b == OK) OK else NotOK)
        })

        val messageAction = reducedResponse
          .map(res =>
            if (res == OK) {
              //log.info("finally response is OK hence message action delete is prepared. {}.", msg.messageId())
              delete(msg)
            } else
              ignore(msg)
          )
        messageAction
      }
    }
      .mapAsync(1)(identity)
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      // For the SQS Sinks and Sources the sum of all parallelism (Source) and maxInFlight (Sink)
      // must be less than or equal to the thread pool size.
      .log("error log")
      .runWith(SqsAckSink(queueUrl, SqsAckSettings(1)))

  }

I tried this with 1.0-M3 and 1.0-RC1 both. is there a work around of this?

RustedBones commented 5 years ago

Since the migration to AWS sdk #1450, the SqsSource is eagerly polling and filling its internal buffer.

def inHandler: InHandler = new InHandler {
  override def onPush(): Unit = {
    val messages = grab(in)

    if (messages.nonEmpty) buffer.offer(messages)
    if (settings.closeOnEmptyReceive && buffer.isEmpty) completeStage()
    if (!hasBeenPulled(in)) pull(in)
    if (!buffer.isEmpty && isAvailable(out)) push(out, buffer.poll())
  }
}

No maximum size is set on the buffer, so the flow will always pull after buffering. The rate-decoupled-graph-stages guideline must be followed.

Moreover, I don't get why this custom flow is required since it is directly followed by

.mapConcat(identity)
.buffer(settings.maxBufferSize, OverflowStrategy.backpressure) 
gabfssilva commented 5 years ago

To be honest, I don't know why I added that stage to the Source in the first place. I remember trying to use takeWhile but I don't know why it didn't work out, so, I dropped it. I think I thought that by adding this buffer stage:

.buffer(settings.maxBufferSize, OverflowStrategy.backpressure) 

(...) it'd not try to pull again and the backpressure would work as expected, but, I think the pull(in) not controlling the max buffer size messed it all up. Sorry. =P

gabfssilva commented 5 years ago

And now I see that there's kind of a problem with the approach I took by using pre-defined stages since mapAsync is executed eagerly and more messages are getting fetched than the buffer predefined. (even with your fix, @RustedBones, tho it fixes the non stopping polling)

Also, since the mapAsync is stateless, it cannot know how many messages it can poll from SQS based on the size of the current buffer, it always fetches based on maxBatchSize setting. So, even if the buffer is set to 20 and the stream has 19 elements, it'll try to fetch more 10 messages, which is not what should happen.

I think the problem occurs because of mapAsync + mapConcat(identity) + buffer(settings.maxBufferSize, OverflowStrategy.backpressure) all together. Even using mapAsync(1) it sends more requests than expected. What I think is: while the buffer is not full, mapAsync keeps being called. This way more messages are being grabbed than the maxBufferSize setting.

Should I create an issue to talk specifically about it?

RustedBones commented 5 years ago

I am not sure mapAsync is eagerly executed. mapAsync will reach the parallelism only if downstream asks for parallelism elements. It will then back-pressues after this point.

For instance, if paralellism of map async is set to 1000 and downstream asks for 1 element, only 1 async call will be executed. Othewise the mapAsync stage would not respect the reactive principle.

Edit: I've just checked the MapAsync implementation. There is no eager pull from upstream on the stage preStart.

In our case, we decoupled the back-pressure using a buffer. It means that at initialization, the buffer stage will ask upstream for maxBufferSize elements. mapAsync, will limit the number of parallel calls to settings.maxBufferSize / settings.maxBatchSize which makes sense: No need to fetch more elements than the buffer can store.

Is some cases though, we may execute a ReceiveMessageRequest for 10 elements even if we only have 1 slot left in the buffer. This is not really a problem, the 9 remaining elements will be back-pressured in the mapConcat stage. So yes, the maxBufferSize is not super accurate because some other elements can be internally buffered in mapConcat but this does not seem to be an issue to me.

User must be attentive on the maxBufferSize though. If their sqs-stream is slow, and the queue is configured with a short VisibilityTimeout, users will process messages with receipt handle already invalidated because of the time the messages stayed in the buffer.