DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
4.01k stars 1.69k forks source link

standalone模式同步1千多万数据到starrocks内存溢出 #1751

Open cs3163077 opened 1 year ago

cs3163077 commented 1 year ago

Search before asking

Description

====================Flink日志====================

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.dtstack.chunjun.throwable.ChunJunRuntimeException: write starRocks failed.
    at com.dtstack.chunjun.connector.starrocks.sink.StarRocksOutputFormat.writeRecordInternal(StarRocksOutputFormat.java:109)
    at com.dtstack.chunjun.connector.starrocks.sink.StarRocksOutputFormat.writeRecord(StarRocksOutputFormat.java:82)
    at com.dtstack.chunjun.connector.starrocks.sink.StarRocksOutputFormat.writeRecord(StarRocksOutputFormat.java:39)
    at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.invoke(DtOutputFormatSinkFunction.java:117)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:135)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
Caused by: java.lang.Exception: Writing records to StarRocks failed.
    at com.dtstack.chunjun.connector.starrocks.streamload.StreamLoadManager.write(StreamLoadManager.java:94)
    at com.dtstack.chunjun.connector.starrocks.sink.NormalWriteProcessor.write(NormalWriteProcessor.java:58)
    at com.dtstack.chunjun.connector.starrocks.sink.StarRocksOutputFormat.writeMultipleRecordsInternal(StarRocksOutputFormat.java:70)
    at com.dtstack.chunjun.connector.starrocks.sink.StarRocksOutputFormat.writeRecordInternal(StarRocksOutputFormat.java:97)
    ... 15 more
Caused by: java.lang.RuntimeException: Writing records to StarRocks failed.
    at com.dtstack.chunjun.connector.starrocks.streamload.StreamLoadManager.checkFlushException(StreamLoadManager.java:190)
    at com.dtstack.chunjun.connector.starrocks.streamload.StreamLoadManager.write(StreamLoadManager.java:66)
    ... 18 more
Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3332)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
    at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:649)
    at java.lang.StringBuilder.append(StringBuilder.java:202)
    at org.apache.http.impl.conn.Wire.wire(Wire.java:80)
    at org.apache.http.impl.conn.Wire.output(Wire.java:111)
    at org.apache.http.impl.conn.LoggingOutputStream.write(LoggingOutputStream.java:73)
    at org.apache.http.impl.io.SessionOutputBufferImpl.streamWrite(SessionOutputBufferImpl.java:124)
    at org.apache.http.impl.io.SessionOutputBufferImpl.write(SessionOutputBufferImpl.java:160)
    at org.apache.http.impl.io.ContentLengthOutputStream.write(ContentLengthOutputStream.java:113)
    at org.apache.http.entity.ByteArrayEntity.writeTo(ByteArrayEntity.java:114)
    at org.apache.http.impl.DefaultBHttpClientConnection.sendRequestEntity(DefaultBHttpClientConnection.java:156)
    at org.apache.http.impl.conn.CPoolProxy.sendRequestEntity(CPoolProxy.java:160)
    at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:238)
    at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
    at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
    at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:111)
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
    at com.dtstack.chunjun.connector.starrocks.streamload.StarRocksStreamLoadVisitor.doHttpPut(StarRocksStreamLoadVisitor.java:307)
    at com.dtstack.chunjun.connector.starrocks.streamload.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:88)
    at com.dtstack.chunjun.connector.starrocks.streamload.StreamLoadManager.asyncFlush(StreamLoadManager.java:241)
    at com.dtstack.chunjun.connector.starrocks.streamload.StreamLoadManager.lambda$startAsyncFlushing$2(StreamLoadManager.java:201)
    at com.dtstack.chunjun.connector.starrocks.streamload.StreamLoadManager$$Lambda$636/785271024.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:748)

Code of Conduct

panshi106 commented 9 months ago

感觉是StarRocksOutputFormat类中writeMultipleRecordsInternal方法中if (rows.size() != batchSize) { streamLoadManager.flush(null, false); } 这个if判断感觉有问题,会让数据积压到最后一次才全部flush,所以会导致oom,去掉if判断就好了