snowplow / stream-collector

Collector for cloud-native web, mobile and event analytics, running on AWS and GCP
http://snowplowanalytics.com
Other
27 stars 32 forks source link

Scala Stream Collector: implement back-pressure #32

Open alexanderdean opened 8 years ago

alexanderdean commented 8 years ago

With the current architecture, if we have a malfunctioning Kinesis stream, then we will:

  1. Continue to accept events from a tracker cache and 200 them (so tracker deletes them from cache)
  2. Add all these events to an in-memory buffer in the hope that Kinesis will start functioning again

In other words - we don't implement back-pressure, and we take a significant risk that we will end up with at-most-once processing rather than at-least-once processing.

A better approach would be to set a failure threshold (e.g. Kinesis out of action for X minutes, or N events in the in-memory buffer), and if that is reached, then we start returning 503s (or similar) and the trackers keep the events on their side...

alexanderdean commented 8 years ago

/cc @fblundun @ninjabear

ninjabear commented 8 years ago

The other thing we discussed is a switch that starts the collector writing to a file or S3 bucket - meaning that we can rerun those events later if required.

The number of events that haven't made it into kinesis or the last kinesis write timestamp would be good information to put on any healthchecks the collector provides.

In my experience the JVM can be pretty unhelpful when trying to store large amounts of information in memory, I'd find it interesting to know what happens exactly when the collector tries to cache a lot of events (do we already have a JVM imposed limit for cache size?).

alexanderdean commented 8 years ago

Thanks @ninjabear - good points. Rather than complect the fallback switch into the Scala Stream Collector, I think it would be better to do this out-of-band using the healthcheck. In other words:

On each collector instance:

Another thing to consider is that whether we want to enable the fallback on a box-by-box basis, or globally for the whole collector cluster. It might be more coherent to do the latter.

Have also added: snowplow/snowplow#2466

fblundun commented 8 years ago

I agree that it would be better to do the fallback globally - I can't imagine why one collector instance would need to fallback but others wouldn't.

Also - if we have an external process restarting the collector, we will need to make sure that any records buffered by the collector are flushed before the restart.

jbeemster commented 8 years ago

We could add Consul to the Collectors to allow them to communicate with each other?

fblundun commented 8 years ago

I think communication rather than hard killing is a good idea. If Kinesis ceases to exist, the collectors need to do something with their in-memory data before they are killed and restarted. So we might as well just build in a way to switch between Kinesis and S3 (or another "safe" target) without the collector restarting.

ninjabear commented 8 years ago

I was just writing the same thing @fblundun, all of this can be done by treating the collector as a microservice with an api

alexanderdean commented 8 years ago

This is cool...

jbeemster commented 8 years ago

It might not be a bad idea still to implement something like RocksDB on the box just in case the instance does get restarted so we are not depending on the in-memory storage?

alexanderdean commented 8 years ago

Yes, I think that's orthogonal and we should be doing that anyway - we don't want to be in a situation where we can't bounce a box without losing data...