confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
123 stars 1.04k forks source link

KUDF/KUDAF thread-safety #828

Closed rodesai closed 6 years ago

rodesai commented 6 years ago

KUDAF and KUDF objects are instantiated during analysis and shared by all streams threads processing a query. We should go through them and make sure they are all thread-safe. We already know that TimestampToString and StringToTimestamp are not. Both create a DateFormat member that can be clobbered by multiple threads trying to parse dates concurrently.

Alternatively, we can look at making these objects thread-local so the implementations don't need to worry about thread-safety.

big-andy-coates commented 6 years ago

@rodesai making this objects thread-local would probably be less work, and more future proof, than checking and fixing each impl.

If this just requires the FunctionRegistry to use thread-local, then I don't mind picking this up... just let me know which branch it's needed on.

rodesai commented 6 years ago

I agree that using thread-locals would be the extensible approach, though making these functions stateless may be more "correct" from a streams pov.

The fix is more involved than updating the FunctionRegistry though. The FunctionRegistry just has descriptions of all the available KUD(A)Fs, including the class to use for the actual Kudf/Kudaf objects, which are created for each query and shared amongst all the StreamThreads. We should also go through all our KafkaStreams API calls that build the query topology and make sure we aren't passing in stateful or !thread-safe objects.

big-andy-coates commented 6 years ago

Yer, I poked around a bit more and saw that FunctionRegistry was the wrong place. Looks like CodeGenRunner is at least one place that would need to be changed.

The main issue with using TLS would be the clean up and potential memory leaks.

bluemonk3y commented 6 years ago

I agree with you both. Stateless is always ideal but when you have expensive Java calendar idiosyncrasies then thread local works best. Anything that is expensive to create should be thread local and avoid locking. This also follows the stream thread model. :)

On 1 Mar 2018 7:34 p.m., "Andy Coates" notifications@github.com wrote:

Yer, I poked around a bit more and saw that FunctionRegistry was the wrong place. Looks like CodeGenRunner is at least one place that would need to be changed.

The main issue with using TLS would be the clean up and potential memory leaks.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/confluentinc/ksql/issues/828#issuecomment-369704720, or mute the thread https://github.com/notifications/unsubscribe-auth/AKC0puWnEwbxAgiNnSx_H2C69hQpPtw-ks5taE06gaJpZM4SXolR .

dguy commented 6 years ago

Lets not do thread-local if we can! Can't we just make a change so that we get a Supplier rather than the instance?

big-andy-coates commented 6 years ago

I'll take a look and report back.

apurvam commented 6 years ago

@big-andy-coates , you are picking this up?

big-andy-coates commented 6 years ago

@apurvam yep. Well, I'm looking into the KUDF and KUDAFs at leasts. @rodesai mentioned there might be more to it than that, but hasn't elaborated as yet.

big-andy-coates commented 6 years ago

Looking into this and chatting with @dguy, it looked like the best approach was to swap out the map() and filter() calls with a matching transformValues() calls, so that UDFs could be instantiated per-task, which would fix threading issues and fit nicely with streaming.

Unfortunately, while KStream has transformValues(), KTable does not, nor does it have anything else suitable. A ktable.toStream().transfromValues().groupByKey(..).reduce(..) might functionally work, but would impact performance adversely.

So I propose just fixing up any none thread safe UDFs for GA and creating another issue to track a more long term solution, which will either involve switching to the processor API or adding KTable.transformValues().

big-andy-coates commented 6 years ago

Create #873 to look at a better, long-term solution that avoids the need for all UDFs to be thread-safe.