wazuh / wazuh-indexer

Wazuh indexer, the Wazuh search engine
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
6 stars 16 forks source link

Amazon Security Lake integration - DTD - Make integration exit cleanly on pipe close #168

Closed f-galland closed 3 months ago

f-galland commented 4 months ago

Description

At the present moment, when logstash tries to kill the integration, it is unable to do so. This happens because we purposefully step into an infinite loop after opening the standard input file descriptor.

Steps to reproduce the behavior:

Log output

[2024-02-28T12:34:23,330][INFO ][logstash.pipelineaction.reload] Reloading pipeline {"pipeline.id"=>:main}
[2024-02-28T12:34:23,675][INFO ][logstash.outputs.pipe    ][main] close: closing pipes
[2024-02-28T12:34:23,677][INFO ][logstash.outputs.pipe    ][main] Closing pipe "/usr/share/logstash/bin/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --bucketname securitylake --s3endpoint s3.ninja:9000 --s3profile default"
[2024-02-28T12:34:28,630][WARN ][org.logstash.execution.ShutdownWatcherExt] {"inflight_count"=>0, "stalling_threads_info"=>{"other"=>[{"thread_id"=>36, "name"=>"[main]-pipeline-manager", "current_call"=>"[...]/vendor/bundle/jruby/3.1.0/gems/logstash-output-pipe-3.0.6/lib/logstash/outputs/pipe.rb:115:in `close'"}]}}
[2024-02-28T12:34:28,638][ERROR][org.logstash.execution.ShutdownWatcherExt] The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.
[2024-02-28T12:34:33,653][WARN ][org.logstash.execution.ShutdownWatcherExt] {"inflight_count"=>0, "stalling_threads_info"=>{"other"=>[{"thread_id"=>36, "name"=>"[main]-pipeline-manager", "current_call"=>"[...]/vendor/bundle/jruby/3.1.0/gems/logstash-output-pipe-3.0.6/lib/logstash/outputs/pipe.rb:115:in `close'"}]}}
f-galland commented 4 months ago

In order to determine whether this behavior is exhibited without using our script, I tested a simple tee command as the pipeline's destination as follows:

output {
    pipe
    {
        id => "securityLake"
        command => "tee /tmp/fede.txt"
    }
}

Now for the purpose of testing whether logstash can properly close the pipe when needed, I simply issued a simple change to the configuration while running logstash with the --config.reload.automatic flag. This should restart the pipe plugin to acquire the new configuration.

I seem to get similar results as reported above even for this simpler tee pipeline:

[2024-02-28T19:21:05,754][INFO ][logstash.pipelineaction.reload] Reloading pipeline {"pipeline.id"=>:main}
[2024-02-28T19:21:10,889][WARN ][org.logstash.execution.ShutdownWatcherExt] {"inflight_count"=>100, "stalling_threads_info"=>{"other"=>[{"thread_id"=>113, "name"=>"[main]>worker0", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>114, "name"=>"[main]>worker1", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>115, "name"=>"[main]>worker2", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>116, "name"=>"[main]>worker3", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>117, "name"=>"[main]>worker4", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>118, "name"=>"[main]>worker5", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>119, "name"=>"[main]>worker6", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>120, "name"=>"[main]>worker7", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>121, "name"=>"[main]>worker8", "current_call"=>"[...]/vendor/bundle/jruby/3.1.0/gems/logstash-output-pipe-3.0.6/lib/logstash/outputs/pipe.rb:122:in `puts'"}, {"thread_id"=>122, "name"=>"[main]>worker9", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>123, "name"=>"[main]>worker10", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>124, "name"=>"[main]>worker11", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>125, "name"=>"[main]>worker12", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>126, "name"=>"[main]>worker13", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>127, "name"=>"[main]>worker14", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>128, "name"=>"[main]>worker15", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in multi_receive'"}, {"thread_id"=>96, "name"=>"[main]-pipeline-manager", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:479:in `block in join'"}]}}
[2024-02-28T19:21:10,891][ERROR][org.logstash.execution.ShutdownWatcherExt] The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.

I believe this is caused by logstash's output-pipe plugin itself, and I'm researching ways to work this out.

f-galland commented 4 months ago

I also tried checking whether the python process received a termination signal with:

import signal
import time
def handler(signum, frame):
  print('Received signal: {}'.format(signum))
if __name__ == "__main__":
  print('My PID is:', os.getpid())
  signal.signal(signal.SIGHUP, handler)
  signal.signal(signal.SIGINT, handler)
  signal.signal(signal.SIGQUIT, handler)
  signal.signal(signal.SIGILL, handler)
  signal.signal(signal.SIGTRAP, handler)
  signal.signal(signal.SIGABRT, handler)
  signal.signal(signal.SIGBUS, handler)
  signal.signal(signal.SIGFPE, handler)
  signal.signal(signal.SIGUSR1, handler)
  signal.signal(signal.SIGSEGV, handler)
  signal.signal(signal.SIGUSR2, handler)
  signal.signal(signal.SIGPIPE, handler)
  signal.signal(signal.SIGALRM, handler)
  signal.signal(signal.SIGTERM, handler)
  while True:
    time.sleep(1)

And I never got to see a termination signal when logstash tried to close the pipe.

AlexRuiz7 commented 3 months ago

We'll reopen if necessary.