walmartlabs / bigben

BigBen - a generic, multi-tenant, time-based event scheduler and cron scheduling framework
Apache License 2.0
196 stars 53 forks source link

Expected performance #12

Closed ajzwu8 closed 5 years ago

ajzwu8 commented 5 years ago

Hi,

I was wondering what the expected performance of one bigben machine should be? Right now, we are running a simple performance test using kafka (one tenant with a topic of replication factor 1 and 1 partition). The test is as follows:

  1. Tenant is registered with topic
  2. 3 batches of equal amounts of jobs are scheduled using one producer:
    • first batch immediately
    • second batch after 30 second delay
    • third batch after 60 second delay
  3. One consumer listens on the output topic and acknowledges the message (simple log statement)

When the batch size is 100, this works just fine. However, we've seen hiccups where if the batch size is 200+, sometimes the events aren't properly sent. The immediately scheduled events will always send, but the delayed execution ones seem to only send either 40% or 0%, in the case of the 30 second delay batch. Are we configuring anything incorrectly?

FYI, we are using the default big ben shard sizing.

sandeepmalik commented 5 years ago

Hi,

Just to understand more correctly, when you say a batch of 100, does this mean 100 events having different ids and payloads?

BigBen uses a bucketing approach to bucketize events, and the default size of the buckets is 1 minute. What that means is that all the events that are scheduled for say between 10:20:00-10:21:00 should have been received before the start of 10:20:00.

At the start of 10:20:00, BigBen will scan how many events are present in that bucket and work off of that. If there're events coming later for this bucket then they might get missed for processing. I think that's what's happening in your tests.

If you ran the same test but with event times after a minute, you should not be missing any events.

It is possible to run BigBen with a smaller window size (say 10 seconds) but I have not tested that fully yet.

ajzwu8 commented 5 years ago

Yes, each job is its own event having its own ID + payload.

I think I understand your explanation. I'll try and adjust my tests accordingly to be scheduled rounded to minute timings. So to confirm, it sounds like if i were to schedule something that was less than a minute away and in the same minute (schedule an event at 10:20:15 for 10:20:45), then there may be a chance it would not get processed. Is that correct?

sandeepmalik commented 5 years ago

Correct, say at the start of 10:20 BigBen would find that there're 2300 events, given that shard size is 1000, it deduces there're 3 shards to process so it schedules them. Now let's say after 10 seconds 800 more events come. Out of these 700 will go to shard 3 (since it only has 300 events) and the remaining ones will go to 4th shard. The 4th shard is totally missed because the computation of shards happened at the beginning of the minute.

For the 3rd shard, it a bit more involved, say the 2300rd event was scheduled at 10:20:45. For every shard BigBen paginates with around 400 events in a single page. (The events are sorted by time). Once the 400 events are done, then next batch for the same shard is fetched with time > 10:20:45 (because events are sorted by time). So, if an event is scheduled at 10:20:10 for 10:20:15 then this event will be missed because at 10:20 from shard 3 300 events were fetched with last event time as 10:20:45. Any new events fetched from shard 3 will be after 10:20:45. So, it's a hit or miss scenario.

To be safe (event time - current time) > 60 seconds would be a safe delay.

Having said that, BigBen theoretically allows a configurable scan frequency but currently the lowest granularity supported is 1 minute. (schedule.scan.interval.minutes). We can look into supporting smaller granularities in future.

ajzwu8 commented 5 years ago

I've adjusted our tests so that now all events are set to schedule at the next minute (ex. an event created at 10:20:15, 10:20:30, and 10:20:59 are all scheduled for 10:21:00). So like you said, it seems like at 10:21:00, up to 400 of those events successfully trigger, and the rest that are scheduled at 10:21:00 are ignored.

Is there a way to have bigben fetch the next batch from the same shard starting at the same time? It seems from your explanation anything after 10:21:00 will be scheduled (so 10:21:01 onwards), but that creates the limitation that if >400 events are scheduled for the same time, then only 400 of them will trigger since the next batch is fetched from 10:21:01 onwards, not inclusive of the last successfully scheduled event.

Update on running tests: So I scheduled a batch of 400 events for the next minute on the dot (say, 12:00:00), then scheduled another batch of 400 events for 30 seconds after. The only 400 events that successfully trigger are the ones that occur on the minute, it seems.

sandeepmalik commented 5 years ago

Interesting. No that should not happen. All events should be fetched even if they are more than 400 and have exactly the same time. The pagination is actually on a tuple with the condition as (event time, event id) > (time, last seen id). So even if time is same the IDs will differ (the IDs are also sorted and these are not client provided event IDs but internally generated ones). That seems like a miss in the query. Thanks for pointing it out. I'll try to fix it today and release the fix.

whomobile commented 5 years ago

@sandeepmalik 400 looks from below. https://github.com/walmartlabs/bigben/blob/1c15bfb6a132bbc0d865c561846b98b485a783cc/lib/src/main/kotlin/com/walmartlabs/bigben/tasks/tasks.kt#L143

if (events.size >= fetchSize)

if event get processed, events.size would be decreased.

I think unit test cassandra/src/test/kotlin/com/walmartlabs/bigben/cassandra/tests/IntegrationTests.kt

fun test event loader() {

sandeepmalik commented 5 years ago

@whomobile, that logic seems correct. The events is an immutable list of events loaded from db, the size doesn't decrease. The logic "if (events.size >= fetchSize)" basically is saying that if in the first fetch we got 400 events then there may be more and let's try to fetch them otherwise let's consider the shard done and terminate the pagination.

The pagination query https://github.com/walmartlabs/bigben/blob/e9cca8bdf93f4775ac3f8ba2f6cc5a788e0b15a1/cassandra/src/main/kotlin/com/walmartlabs/bigben/providers/domain/cassandra/CassandraModule.kt#L65 seems to be correct as well.

There's something else going on here. I'll try to replicate what @ajzwu8 is saying and see what's going on.

whomobile commented 5 years ago

@sandeepmalik if that's immutable, that is right. I don't think IntegrationTests.kt can cover test for ShardTask.

But here is some side thought.

IntegrationTests.kt / test event loader use following like way to paginate it.
while (l.second.isNotEmpty()) { l.second.forEach { assertEquals(events["${it.eventTime}-${it.id}"], it) events.remove("${it.eventTime}-${it.id}") } l = module<EventLoader>().load(bucket, it, fetchSize, l.second.last().eventTime!!, l.second.last().id!!, l.first).get() }

but in ShardTask module, if (events.size >= fetchSize) loader.load(p.first, p.second, fetchSize, events.last().eventTime!!, events.last().id!!, rp.first) else immediateFuture(rp.first to events)

from above logic, how the result of loader.load(p.first, p.second, fetchSize, events.last().eventTime!!, events.last().id!!, rp.first) will get processed? else events.filter { it.status != PROCESSED }.map { e -> schedule(e). ...

may be I missed recursive logic somewhere?

sandeepmalik commented 5 years ago

you're right. The second loader() is missing a listener for processing. Thanks for spotting it. I'll work on fixing it.

sandeepmalik commented 5 years ago

This is fixed. Thanks guys.

sandeepmalik commented 5 years ago

Let me know if you still face any issues, otherwise I'll close the issue later.

whomobile commented 5 years ago

@sandeepmalik

One question about pagination query.

Current pagination query loaderQuery = mappingManager.session.prepare("SELECT * FROM ${session.loggedKeyspace}.events WHERE bucket_id = ? AND shard = ? AND (event_time, id) > (?,?) LIMIT ?;") and each one mapped to return mappingManager.session.executeAsync(loaderQuery.bind(bucketId, shard, eventTime, eventId, fetchSize)).transform { null to mappingManager.mapper(EventC::class.java).map(it!!).toList() } and that called like loader.load(p.first, p.second, fetchSize, events.last().eventTime!!, events.last().id!!, rp.first)

for example, event_time can be set to events.last().eventTime!!. but I am not sure whether there are any guarantee that events.last().eventTime > events.last(-1).eventTime or events.last().id > events.last(-1).id

old days in sql db, pagination can be done by "cursors" , "offset" and/or "order by". In bigint primary key based table case, could use that primary key.

I just searched how it works on cassandra case, https://stackoverflow.com/questions/26757287/results-pagination-in-cassandra-cql and it looks setFetchSize or ( pagingState, token function) is possible options.

It looks setFetchSize looks best way to cassandra.

sandeepmalik commented 5 years ago

@whomobile, the event time and event id are clustering keys in the event table and hence sorted by nature (both are in ASC sort). That guarantees " events.last().eventTime > events.last(-1).eventTime or events.last().id > events.last(-1).id" is always true.

As far as fetch size is concerned, the query "loaderQuery = mappingManager.session.prepare("SELECT * FROM ${session.loggedKeyspace}.events WHERE bucket_id = ? AND shard = ? AND (event_time, id) > (?,?) LIMIT ?;") "

already uses a limit in the clause where we put value 400, so there's no need to set a fetch size again.

whomobile commented 5 years ago

That should work. thank you for explanation. yes, no need to set fetch size again.

Anyway it looks setFetchSize could cause other issues... https://support.datastax.com/hc/en-us/articles/115003076346-FAQ-Why-does-Java-driver-setFetchSize-return-all-rows-

schiacci commented 5 years ago

Hello, I'm not sure if this is related, but I just pulled and deployed/built (docker_deploy.sh). When I try to run BigBen (docker_run.sh), I receive the following:

docker logs docker_bigben_run_8a26836a65a5
Using bigben config file: uri:///dist/bigben-config.yaml
Error: Main method not found in class com.walmartlabs.bigben.app.RunKt, please define the main method as:
   public static void main(String[] args)
or a JavaFX application class must extend javafx.application.Application
sandeepmalik commented 5 years ago

Let me rebuild and push the docker image again.

sandeepmalik commented 5 years ago

@schiacci, please try now, sorry about that. (do a fresh pull of the image)

ajzwu8 commented 5 years ago

Hey @sandeepmalik, thanks for the quick fix. We tested executing future events (scheduling for at least the next minute onwards) and it seems to be working now. I'm noticing that jobs that are scheduled "immediately" (eventTime as close to current time in payload) are lost, possibly due to the logic aforementioned.

I can bypass this for now using Kafka (instead of sending to bigben to send to application event receiver, just send immediately to app receiver), but I just wanted to let you know accordingly to confirm this as a design decision or a bug as well.

sandeepmalik commented 5 years ago

Hi @ajzwu8 when you say 'lost' you mean they are not triggered or not stored in the db? By design, the events that are scheduled for time < current time + lapse.offset are considered late arrivals / lapsed and are triggered immediately without saving in the db. The default offset (in bigban.yaml) is 0, so effectively any event time < current time (time at which server processes the event) are considered lapsed.

https://github.com/walmartlabs/bigben/blob/master/lib/src/main/kotlin/com/walmartlabs/bigben/api/EventReceiver.kt#L189

The rationale was that this usually would happen in two cases. Either the event is actually scheduled very close to current time. In that case you're right, I just felt that the app / client may just route the event directly to the destination (optionally waiting a few seconds for scheduling time if required to avoid client -> kafka -> bigben -> kafka overhead.

The second case is for some reason if bigben was down or lagging for some reason for an extended period then it may not be able to pull events fast enough from kafka and when it does those may already have been lapsed. In this case bigben leans towards processing those events as fast as possible to drain the lag asap by triggering them. Storing those events would have caused a bit more latency and hence less rate at which drag would go down.

However, if auditing of lapsed events is required then it should be easy to create a custom event processor which stores in db and then publishes to kafka.

I might consider adding it later as well.

Did you measure the performance? Would you be able to share how many events you are able to process and the H/W profile? Thanks.

whomobile commented 5 years ago

@sandeepmalik I think default value is too small.

lapse.offset.minutes: 0 as 1 minutes is minimum interval allowed, it would be safe to set it 1

sandeepmalik commented 5 years ago

I am closing this issue. Thanks everyone for your help!