dynatrace-oss / logstash-output-dynatrace

A Logstash output plugin for sending logs to Dynatrace
4 stars 1 forks source link

Manticore::SocketException / Broken pipe #30

Closed cscotti closed 1 year ago

cscotti commented 1 year ago

We use your plugin for a big volume of logs. We note that it is not stable. After some hours, we get some errors like the following and the service get stuck despite that logstash status is green. The restart of logstash correct the problem.

[2023-09-11T09:16:58,461][ERROR][logstash.outputs.dynatrace][main][logstash_output] Could not fetch URL {:ingest_endpoint_url=>"https://##DYN_TENANT##.live.dynatrace.com/api/v2/logs/ingest", :message=>"Broken pipe", :class=>Manticore::SocketException, :will_retry=>true}

the pipeline is like this

    input {
      file {
        path => "/share/logs/**/*.log"
        sincedb_path => "/usr/share/logstash/data/file-sincedb.db"
        start_position => end
        mode => "tail"

        codec => multiline {
          pattern => "^\[[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3} GMT\] "
          negate => true
          what => "previous"
        }
      }
    }
    filter{
      mutate {
        replace => [ "host", "logstash-prod" ]
      }
      grok {
          match => [ "[@metadata][path]" ,"%{GREEDYDATA:log.source}"]
      }
    }
    output {
      dynatrace {
        id => "logstash_output"
        ingest_endpoint_url => "https://xxxxxx.live.dynatrace.com/api/v2/logs/ingest"
        api_key => "xxxxxx"
      }
    }

Expected: We would like that the service get not stuck and manage socket correctly

Versions: -Logstash is running on kubernetes. The logstash image is docker.elastic.co/logstash/logstash:8.5.1. -The plugin of logstash-output-dynatrace is 0.5.0. Logstash is running in a statefulset mode. -The env var LS_JAVA_OPTS is set with "-Xmx11g -Xms11g" and the statefulset ressources is defined like this

        resources:
          requests:
            cpu: "5"
            memory: 15Gi
          limits:
            cpu: "5"
            memory: 15Gi

Additionnal information: Note that when the service logstash is stuck with a lot of error "Could not fetch URL ", a curl command on the logstash running pod could be done on the same target "https://xxxxxx.live.dynatrace.com/api/v2/logs/ingest" without http error code. And, the log injected with curl could be seen on dynatrace dashboard.

cscotti commented 1 year ago

I think that logstash-output-dynatrace plugin do not check if the payload is more than dynatrace ingest log api limitation (The maximum payload size of a single request is 5 MB -> https://www.dynatrace.com/support/help/dynatrace-api/environment-api/log-monitoring-v2/post-ingest-logs). When there are too much call with a big payload, the socket will be closed one by one and cannot be reopened (dynatrace ingest log api protection ?).

[2023-09-12T07:52:59,544][DEBUG][org.apache.http.wire     ][main][logstash_output] http-outgoing-8 >> "[write] I/O error: Broken pipe"
[2023-09-12T07:52:59,544][DEBUG][org.apache.http.impl.conn.DefaultManagedHttpClientConnection][main][logstash_output] http-outgoing-8: Close connection
[2023-09-12T07:52:59,546][DEBUG][org.apache.http.impl.conn.DefaultManagedHttpClientConnection][main][logstash_output] http-outgoing-8: Shutdown connection
[2023-09-12T07:52:59,546][DEBUG][org.apache.http.impl.execchain.MainClientExec][main][logstash_output] Connection discarded
[2023-09-12T07:52:59,546][DEBUG][org.apache.http.impl.conn.PoolingHttpClientConnectionManager][main][logstash_output] Connection released: [id: 8][route: {s}->https://##DYN_TENANT##.live.dynatrace.com:443][total available: 0; route allocated: 2 of 25; total allocated: 2 of 50]
[2023-09-12T07:52:59,546][INFO ][org.apache.http.impl.execchain.RetryExec][main][logstash_output] I/O exception (java.net.SocketException) caught when processing request to {s}->https://##DYN_TENANT##.live.dynatrace.com:443: Broken pipe
[2023-09-12T07:52:59,547][DEBUG][org.apache.http.impl.execchain.RetryExec][main][logstash_output] Broken pipe
java.net.SocketException: Broken pipe
    at sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:425) ~[?:?]
    at sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:445) ~[?:?]
    at sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:831) ~[?:?]
    at java.net.Socket$SocketOutputStream.write(Socket.java:1035) ~[?:?]
    at sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:345) ~[?:?]
    at sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1308) ~[?:?]
    at org.apache.http.impl.conn.LoggingOutputStream.write(LoggingOutputStream.java:74) ~[httpclient-4.5.13.jar:4.5.13]
    at org.apache.http.impl.io.SessionOutputBufferImpl.streamWrite(SessionOutputBufferImpl.java:124) ~[httpcore-4.4.14.jar:4.4.14]
    at org.apache.http.impl.io.SessionOutputBufferImpl.write(SessionOutputBufferImpl.java:160) ~[httpcore-4.4.14.jar:4.4.14]
    at org.apache.http.impl.io.ContentLengthOutputStream.write(ContentLengthOutputStream.java:113) ~[httpcore-4.4.14.jar:4.4.14]
    at org.apache.http.impl.io.ContentLengthOutputStream.write(ContentLengthOutputStream.java:120) ~[httpcore-4.4.14.jar:4.4.14]
    at org.apache.http.entity.StringEntity.writeTo(StringEntity.java:167) ~[httpcore-4.4.14.jar:4.4.14]
    at org.apache.http.impl.DefaultBHttpClientConnection.sendRequestEntity(DefaultBHttpClientConnection.java:156) ~[httpcore-4.4.14.jar:4.4.14]
    at org.apache.http.impl.conn.CPoolProxy.sendRequestEntity(CPoolProxy.java:152) ~[httpclient-4.5.13.jar:4.5.13]
    at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:238) ~[httpcore-4.4.14.jar:4.4.14]
    at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) ~[httpcore-4.4.14.jar:4.4.14]
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) ~[httpclient-4.5.13.jar:4.5.13]
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[httpclient-4.5.13.jar:4.5.13]
    at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[httpclient-4.5.13.jar:4.5.13]
    at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[httpclient-4.5.13.jar:4.5.13]
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[httpclient-4.5.13.jar:4.5.13]
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:72) ~[httpclient-4.5.13.jar:4.5.13]
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:221) ~[httpclient-4.5.13.jar:4.5.13]
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:165) ~[httpclient-4.5.13.jar:4.5.13]
    at jdk.internal.reflect.GeneratedMethodAccessor27.invoke(Unknown Source) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
    at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:427) ~[jruby.jar:?]
    at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:294) ~[jruby.jar:?]
    at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:24) ~[jruby.jar:?]
    at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:86) ~[jruby.jar:?]
    at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:85) ~[jruby.jar:?]
    at org.jruby.ir.instructions.CallBase.interpret(CallBase.java:549) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:361) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:80) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:164) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:151) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:210) ~[jruby.jar:?]
    at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:142) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:345) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:80) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:164) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:151) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:210) ~[jruby.jar:?]
    at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:142) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:345) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:80) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:164) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:151) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:210) ~[jruby.jar:?]
    at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:142) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:345) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:86) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:201) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:188) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:218) ~[jruby.jar:?]
    at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:173) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:316) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:92) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:238) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:225) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:226) ~[jruby.jar:?]
    at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:204) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:325) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:86) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:201) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:188) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:218) ~[jruby.jar:?]
    at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:173) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:316) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72) ~[jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:86) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:201) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:188) ~[jruby.jar:?]
    at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:218) ~[jruby.jar:?]
    at org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.invokeOutput(OutputStrategyExt.java:153) ~[logstash-core.jar:?]
    at org.logstash.config.ir.compiler.OutputStrategyExt$SimpleAbstractOutputStrategyExt.doOutput(OutputStrategyExt.java:279) ~[logstash-core.jar:?]
    at org.logstash.config.ir.compiler.OutputStrategyExt$SharedOutputStrategyExt.output(OutputStrategyExt.java:312) ~[logstash-core.jar:?]
    at org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.multiReceive(OutputStrategyExt.java:143) ~[logstash-core.jar:?]
    at org.logstash.config.ir.compiler.OutputDelegatorExt.doOutput(OutputDelegatorExt.java:111) ~[logstash-core.jar:?]
    at org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.lambda$multiReceive$0(AbstractOutputDelegatorExt.java:121) ~[logstash-core.jar:?]
    at co.elastic.logstash.api.TimerMetric.lambda$time$0(TimerMetric.java:35) ~[logstash-core.jar:?]
    at org.logstash.instrument.metrics.timer.ConcurrentLiveTimerMetric.time(ConcurrentLiveTimerMetric.java:47) ~[logstash-core.jar:?]
    at co.elastic.logstash.api.TimerMetric.time(TimerMetric.java:34) ~[logstash-core.jar:?]
    at org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.multiReceive(AbstractOutputDelegatorExt.java:121) ~[logstash-core.jar:?]
    at org.logstash.generated.CompiledDataset3.compute(Unknown Source) ~[?:?]
    at org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(CompiledPipeline.java:351) ~[logstash-core.jar:?]
    at org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(CompiledPipeline.java:341) ~[logstash-core.jar:?]
    at org.logstash.execution.ObservedExecution.lambda$compute$0(ObservedExecution.java:17) ~[logstash-core.jar:?]
    at org.logstash.execution.WorkerObserver.lambda$observeExecutionComputation$0(WorkerObserver.java:39) ~[logstash-core.jar:?]
    at org.logstash.instrument.metrics.timer.ConcurrentLiveTimerMetric.time(ConcurrentLiveTimerMetric.java:47) ~[logstash-core.jar:?]
    at org.logstash.execution.WorkerObserver.lambda$executeWithTimers$1(WorkerObserver.java:50) ~[logstash-core.jar:?]
    at org.logstash.instrument.metrics.timer.ConcurrentLiveTimerMetric.time(ConcurrentLiveTimerMetric.java:47) [logstash-core.jar:?]
    at org.logstash.execution.WorkerObserver.executeWithTimers(WorkerObserver.java:50) [logstash-core.jar:?]
    at org.logstash.execution.WorkerObserver.observeExecutionComputation(WorkerObserver.java:38) [logstash-core.jar:?]
    at org.logstash.execution.ObservedExecution.compute(ObservedExecution.java:17) [logstash-core.jar:?]
    at org.logstash.execution.WorkerLoop.abortableCompute(WorkerLoop.java:113) [logstash-core.jar:?]
    at org.logstash.execution.WorkerLoop.run(WorkerLoop.java:86) [logstash-core.jar:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
    at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:442) [jruby.jar:?]
    at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:306) [jruby.jar:?]
    at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32) [jruby.jar:?]
    at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:142) [jruby.jar:?]
    at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:345) [jruby.jar:?]
    at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72) [jruby.jar:?]
    at org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:116) [jruby.jar:?]
    at org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136) [jruby.jar:?]
    at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66) [jruby.jar:?]
    at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58) [jruby.jar:?]
    at org.jruby.runtime.Block.call(Block.java:143) [jruby.jar:?]
    at org.jruby.RubyProc.call(RubyProc.java:309) [jruby.jar:?]
    at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:107) [jruby.jar:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]
cscotti commented 1 year ago

In order to reduce the payload size, an idea will be to not send json_array (because it could be over dynatrace limit). It implies to add a new boolean parameter in the output ( json_array => false)

    output {
      dynatrace {
        id => "logstash_output"
        ingest_endpoint_url => "https://xxxxxx.live.dynatrace.com/api/v2/logs/ingest"
        api_key => "xxxxxx"
        json_array => false
      }
    }
cscotti commented 1 year ago

An idear is to see how the plugins logstash-output-google_pubsub manage payload with the parameters :

dyladan commented 1 year ago

@cscotti I created a PR to limit the batch sizes and split into multiple requests when necessary #31

Individual logs larger than 5MB obviously MUST be dropped but it should work in all other cases.

cscotti commented 1 year ago

Thanks @dyladan ;-)

dyladan commented 1 year ago

@cscotti please try https://github.com/dynatrace-oss/logstash-output-dynatrace/releases/tag/v0.5.1 and let me know if it works for you