awslabs / kinesis-aggregation

AWS libraries/modules for working with Kinesis aggregated record data
Apache License 2.0
376 stars 154 forks source link

Non thread-safe code using threads #127

Open rrbarbosa opened 3 years ago

rrbarbosa commented 3 years ago

In the python implementation, the RecordAggregator class is annotated as "not thread-safe", however the class makes use of threading to execute the callback by default.

Won't this snippet end up clearing the record before executing the callback?

        # If we hit this point, aggregated record is full
        # Call all the callbacks (potentially on a separate thread)
        out_record = self.current_record
        for (callback, execute_on_new_thread) in self.callbacks:
            if execute_on_new_thread:
                threading.Thread(target=callback, args=(out_record,)).start()
            else:
                callback(out_record)

        # Current record is full so clear it out, make a new empty one and add the user record
        self.clear_record()

A simple workaround is stop using threading for callbacks.

IanMeyers commented 3 years ago

I think actually that line 215 should instead use copy.deepcopy() rather than assignment. But in principle I agree that this is an issue. Will look to patch.

IanMeyers commented 3 years ago

This is fixed in 85e8678 and version 1.1.4 on pypi