DataCater / datacater

The developer-friendly ETL platform for transforming data in real-time. Based on Apache Kafka® and Kubernetes®.
https://datacater.io
Other
82 stars 4 forks source link

Improve call stack for calls to `/pipeline/{uuid}/edit`. #38

Closed HknLof closed 1 year ago

HknLof commented 1 year ago

It seems, that a combination of database calls and additionally working with a remote kafka broker leads to quite long initial retrieval stack. Below is an example call stack for such an issue.

Desirable performance / behaviour

Desirable performance for any given call should be to be able to return after ~1000ms for 90% of requests.

Actions / Solutions

Refs:

2022-11-12 13:22:06,884 WARN  [io.ver.cor.imp.BlockedThreadChecker] (vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 4163 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked
        at java.base@17.0.2/sun.nio.ch.EPoll.wait(Native Method)
        at java.base@17.0.2/sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:118)
        at java.base@17.0.2/sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:129)
        at java.base@17.0.2/sun.nio.ch.SelectorImpl.select(SelectorImpl.java:141)
        at org.apache.kafka.common.network.Selector.select(Selector.java:873)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:465)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchOffsetsByTimes(Fetcher.java:595)
        at org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:626)
        at org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:610)
        at org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2177)
        at org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2154)
        at io.datacater.core.stream.KafkaStreamsAdmin.setPartitionOffsets(KafkaStreamsAdmin.java:209)
        at io.datacater.core.stream.KafkaStreamsAdmin.inspect(KafkaStreamsAdmin.java:189)
        at io.datacater.core.stream.StreamsUtilities.lambda$getStreamMessages$0(StreamsUtilities.java:35)
        at io.datacater.core.stream.StreamsUtilities$$Lambda$1150/0x00000008014037c0.apply(Unknown Source)
        at io.smallrye.mutiny.unchecked.UncheckedFunction.lambda$toFunction$0(UncheckedFunction.java:45)
        at io.smallrye.mutiny.unchecked.UncheckedFunction$$Lambda$1151/0x0000000801403c18.apply(Unknown Source)
        at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
        at io.smallrye.mutiny.groups.UniOnNotNull.lambda$transform$4(UniOnNotNull.java:117)
        at io.smallrye.mutiny.groups.UniOnNotNull$$Lambda$1152/0x0000000801406000.apply(Unknown Source)
        at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor.onItem(UniOnItemTransform.java:36)
        at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onItem(UniOperatorProcessor.java:46)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage$CompletionStageUniSubscription.forwardResult(UniCreateFromCompletionStage.java:63)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage$CompletionStageUniSubscription$$Lambda$934/0x000000080137e0d0.accept(Unknown Source)
        at java.base@17.0.2/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base@17.0.2/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base@17.0.2/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base@17.0.2/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at io.vertx.core.Future.lambda$toCompletionStage$2(Future.java:360)
        at io.vertx.core.Future$$Lambda$506/0x00000008010a4d18.handle(Unknown Source)
        at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
        at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
        at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
        at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
        at io.vertx.sqlclient.impl.QueryResultBuilder.tryComplete(QueryResultBuilder.java:102)
        at io.vertx.sqlclient.impl.QueryResultBuilder.tryComplete(QueryResultBuilder.java:35)
        at io.vertx.core.Promise.complete(Promise.java:66)
        at io.vertx.core.Promise.handle(Promise.java:51)
        at io.vertx.core.Promise.handle(Promise.java:29)
        at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
        at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
        at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
        at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
        at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
        at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
        at io.vertx.sqlclient.impl.TransactionImpl.lambda$wrap$0(TransactionImpl.java:72)
        at io.vertx.sqlclient.impl.TransactionImpl$$Lambda$1086/0x00000008013f01f8.handle(Unknown Source)
        at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
        at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
        at io.vertx.core.impl.future.FutureBase$$Lambda$541/0x000000080111e718.run(Unknown Source)
        at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:86)
        at io.vertx.core.impl.DuplicatedContext.execute(DuplicatedContext.java:163)
        at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:51)
        at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
        at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
        at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
        at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
        at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
        at io.vertx.sqlclient.impl.command.CommandResponse.fire(CommandResponse.java:46)
        at io.vertx.sqlclient.impl.SocketConnectionBase.handleMessage(SocketConnectionBase.java:287)
        at io.vertx.pgclient.impl.PgSocketConnection.handleMessage(PgSocketConnection.java:97)
        at io.vertx.sqlclient.impl.SocketConnectionBase.lambda$init$0(SocketConnectionBase.java:100)
        at io.vertx.sqlclient.impl.SocketConnectionBase$$Lambda$535/0x00000008010db668.handle(Unknown Source)
        at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
        at io.vertx.core.impl.ContextBase.emit(ContextBase.java:239)
        at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:394)
        at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:155)
        at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at io.vertx.pgclient.impl.codec.PgEncoder.lambda$write$0(PgEncoder.java:98)
        at io.vertx.pgclient.impl.codec.PgEncoder$$Lambda$538/0x00000008010de4b0.handle(Unknown Source)
        at io.vertx.pgclient.impl.codec.PgCommandCodec.handleReadyForQuery(PgCommandCodec.java:139)
        at io.vertx.pgclient.impl.codec.PgDecoder.decodeReadyForQuery(PgDecoder.java:237)
        at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:96)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base@17.0.2/java.lang.Thread.run(Thread.java:833)
flippingbits commented 1 year ago

Which API endpoint are you referencing here? This route looks like it is coming from the frontend.

I agree, no communication with Kafka should be necessary to update a pipeline definition. Let's make it faster :)

HknLof commented 1 year ago

This occurs after opening the edit view for a given pipeline. I assume it is the call to to pipeline/preview.

https://github.com/DataCater/datacater/blob/main/ui/src/actions/pipelines.js#L183

Asuming, that the toggle of the showGrid triggers the initial / re-fetch.

https://github.com/DataCater/datacater/blob/main/ui/src/components/pipelines/PipelineDesigner.js#L143

flippingbits commented 1 year ago

For loading the pipeline designer, i.e., when accessing /pipelines/$uuid/edit in the frontend, the /api/streams/$uuid/inspect is called, thus Kafka is being access.

Since we call an external service, we might have a hard time staying under 1000ms. cc @ChrisRousey What do you think?

ChrisRousey commented 1 year ago

Ever since we improved the payload retrieval from streams it's almost always under 1000ms, i'm not too sure if this is only coming from accessing kafka.

The Inspect Enpoint is grabbing items from kafka, then sending them to the python-runner and waiting on transforms/filters in one thread. It is a heavy load for one api call but i think most of the time is coming from the python-runner.

@HknLof you said only some calls were slow? Maybe we have some slow running transforms/filters that caused this? And was the kafka broker local or remote?