airbnb / streamalert

StreamAlert is a serverless, realtime data analysis framework which empowers you to ingest, analyze, and alert on data from any environment, using datasources and alerting logic you define.
https://streamalert.io
Apache License 2.0
2.86k stars 333 forks source link

GSuiteReports App processes same event in multiple scheduled invocations #800

Closed stoggi closed 6 years ago

stoggi commented 6 years ago

Background

The startTime that is passed to the activities API is set to the timestamp of the last event.

https://github.com/airbnb/streamalert/blob/a7eaa30516856e17303244fc99cb386be697e69f/stream_alert/apps/_apps/gsuite.py#L127-L132

But the response from the next call to the activity API includes that last event. The startTime must be inclusive.

https://developers.google.com/admin-sdk/reports/v1/reference/activities/list

startTime | string Sets the beginning of the range of time shown in the report. The date is in the RFC 3339 format, for example 2010-10-28T10:26:35.000Z. The report returns all activities from startTime until endTime. The startTime must be before the endTime (if specified) and the current time when the request is made, or the API returns an error.

Description

The startTime passed to the Google Reports API includes the last event already processed in the previous lambda execution.

Steps to Reproduce

Observed in the CloudWatch event logs for the GSuiteReports app:

/var/task/app_integrations/main.py:16: RuntimeWarning: Parent module 'app_integrations' not found while handling absolute import
from app_integrations.apps.app_base import StreamAlertApp
START RequestId: ... Version: 1
[INFO]  2018-08-05T01:39:20.841Z    ... Starting last timestamp set to: 2018-08-03T16:53:00.927Z
[INFO]  2018-08-05T01:39:21.119Z    ... App starting for service 'gsuite_admin'.
[INFO]  2018-08-05T01:39:21.119Z    ... App executing as a successive invocation: False
[INFO]  2018-08-05T01:39:22.873Z    ... Starting batch send of 1 logs to the rule processor
[INFO]  2018-08-05T01:39:23.191Z    ... Sent 1 logs to 'PREFIX_CLUSTER_streamalert_rule_processor' with Lambda request ID '...'
[INFO]  2018-08-05T01:39:23.193Z    ... Gather process for 'gsuite_admin' executed in 1.884724 seconds.
[INFO]  2018-08-05T01:39:23.197Z    ... Lambda remaining seconds: 114.62
[INFO]  2018-08-05T01:39:23.197Z    ... Lambda remaining seconds: 114.62
[ERROR] 2018-08-05T01:39:23.197Z    ... Ending last timestamp is the same as the beginning last timestamp. This could occur if there were no logs collected for this execution.
[INFO]  2018-08-05T01:39:23.197Z    ... App complete for service 'gsuite_admin'. Gathered 1 logs in 1 polls.
END RequestId: ...
REPORT RequestId: ...   Duration: 5505.05 ms    Billed Duration: 5600 ms Memory Size: 128 MB    Max Memory Used: 66 MB

Notice the last time stamp on the 3rd of August, even though the log is from the 5th of August. And there was 1 log sent to the rule_processing lambda. I observed this same log at every scheduled invocation where there were no new events.

Desired Change

We could increment _last_event_timestamp by one millisecond, or exclude the last seen report id. What would happen if multiple events occurred on the same millisecond?

stoggi commented 6 years ago

After thinking about it, incrementing _last_event_timestamp is not a good solution. For example, if there are 15 events on the same millisecond, but in the first batch we only received the first 5. Then in the second batch we could expect the next 10 events with the same timestamp.

ryandeivert commented 6 years ago

@stoggi so what's the desired outcome here do you think? from what I've seen, it's very rare to have events at the exact same ms, but I could be wrong

stoggi commented 6 years ago

@ryandeivert I'm not sure exactly. We could store the last n event ids, and de-duplicate them.

It's not really clear what their API does with the timestamps. According to the docs they only specify in seconds:

https://developers.google.com/admin-sdk/reports/v1/reference/activities/list

items[].id.time | datetime Time of occurrence of the activity. This is in UNIX epoch time in seconds.

It must be fractional seconds, but to what precision? RFC3339 also doesn't specify fractional precision https://www.ietf.org/rfc/rfc3339.txt for the format 2010-10-28T10:26:35.000Z

I had a go at incrementing the timestamp, and ended up with something like this:

if not self._next_page_token:
    # Increment the last event time by one millisecond to exlcude it from the next poll
    last_event_time = datetime.strptime(activities[0]['id']['time'], self.date_formatter())
    last_event_time += timedelta(milliseconds=1)
    self._last_timestamp = last_event_time.strftime(self.date_formatter())
    LOGGER.debug('Caching last timestamp: %s', self._last_timestamp)

I also had to modify date_formatter() to add %.f:

@classmethod
def date_formatter(cls):
    """Return a format string for a date, ie: 2010-10-28T10:26:35.000Z"""
    return '%Y-%m-%dT%H:%M:%S.%fZ'

Python actually uses microseconds for %f so you end up with 2010-10-28T10:26:35.000000Z https://docs.python.org/2/library/datetime.html#strftime-and-strptime-behavior

ryandeivert commented 6 years ago

@stoggi are you having success with the incrementing approach you've listed? I have been able to verify the issue but not test the fix you've outlined. I have this 'in progress' now but haven't really moved with it - would you like to contribute the fix? if so, I can reassign

stoggi commented 6 years ago

Yep sure. I'm happy to contribute a PR for this.

The fix does exclude the last event from the next API query, although I don't know if I am missing any events.

Perhaps the simplest way is to record the unique ids items[].id.uniqueQualifier for all events with the same timestamp as the last event. Then remove them in the next run.

ryandeivert commented 6 years ago

Awesome, thank you!! I like the idea of sorta de-duping, but the current implementation doesn't easily allow for storing arbitrary state info. I'm open to ideas to support this as well, if you're up for it! The current state object will always be:

{
    "current_state": "...",
    "last_timestamp": "..."
{

and is saved as json, performed here:

https://github.com/airbnb/streamalert/blob/0dfca60195eedaf93bd79bbc9a82fc6f51fba4fa/stream_alert/apps/config.py#L275-L278

Theoretically we could add a third top level key of custom_state or something similar to dump a custom state object to... thoughts?