wazuh / wazuh-indexer-plugins

GNU Affero General Public License v3.0
1 stars 3 forks source link

Implement Job Scheduler logic #103

Open f-galland opened 1 month ago

f-galland commented 1 month ago

Description

This PR adds job scheduler logic to the command-manager plugin.

Issues Resolved

Resolves #87

mcasas993 commented 2 weeks ago

When I tested this branch with ./gradlew run or ./gradlew run --debug after a couple of minutes this error appears in the terminal:

[2024-11-07T09:48:36,565][ERROR][o.o.a.s.TransportCreatePitAction] [integTest-0] PIT creation failed while updating PIT ID for indices [[.commands]]
[2024-11-07T09:48:36,565][ERROR][c.w.c.j.PointInTime      ] [integTest-0] all shards failed

This is the complete log file: integTest.log

f-galland commented 3 days ago

An issue was observed where the plugin entered an infinite loop whenever API calls were received whilst outgoing http requests where being performed. The following logs would be output in quick succession:

[2024-11-07T12:03:52,564][ERROR][c.w.c.j.PointInTime      ] [integTest-0] all shards failed
[2024-11-07T12:03:52,566][ERROR][o.o.a.s.TransportCreatePitAction] [integTest-0] PIT creation failed while updating PIT ID for indices [[.commands]]

I haven't been able to pin point the issue with exactitude, but I took the following measures to try and fix it:

With these changes, the problem seems to not manifest itself anymore. However this brings new issues as we now need to handle the lifecycle of the SearchJob object. I will research how this is usually done in opensearch and report back.

f-galland commented 13 hours ago

After indexing a new command every second for an extended period (more than ~30 min), an error log shows up:

[2024-11-25T06:48:13,445][ERROR][c.w.c.j.SearchJob        ] [integTest-0] RuntimeException retrieving page: java.util.concurrent.TimeoutException: Timeout waiting for task.

I'm not 100% sure if this is related, but when I stop indexing commands, the following error seems to pop up:

»  fatal error in thread [opensearch[integTest-0][search][T#23]], exiting
»  java.lang.AssertionError
»   at org.opensearch.index.search.stats.ShardSearchStats.lambda$onFetchPhase$5(ShardSearchStats.java:155)
»   at org.opensearch.index.search.stats.ShardSearchStats.computeStats(ShardSearchStats.java:160)
»   at org.opensearch.index.search.stats.ShardSearchStats.onFetchPhase(ShardSearchStats.java:152)
»   at org.opensearch.index.shard.SearchOperationListener$CompositeListener.onFetchPhase(SearchOperationListener.java:284)
»   at org.opensearch.search.SearchService$SearchOperationListenerExecutor.close(SearchService.java:1893)
»   at org.opensearch.search.SearchService.executeFetchPhase(SearchService.java:746)
»   at org.opensearch.search.SearchService.executeQueryPhase(SearchService.java:714)
»   at org.opensearch.search.SearchService$2.lambda$onResponse$0(SearchService.java:676)
»   at org.opensearch.action.ActionRunnable.lambda$supply$0(ActionRunnable.java:74)
»   at org.opensearch.action.ActionRunnable$2.doRun(ActionRunnable.java:89)
»   at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
»   at org.opensearch.threadpool.TaskAwareRunnable.doRun(TaskAwareRunnable.java:78)
»   at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
»   at org.opensearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:59)
»   at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1005)
»   at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
»   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
»   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
»   at java.base/java.lang.Thread.run(Thread.java:1583)
f-galland commented 13 hours ago

It looks like java.util.concurrent.TimeOutException is thrown by blocking operations:

Exception thrown when a blocking operation times out. Blocking operations for which a timeout is specified need a means to indicate that the timeout has occurred. For many such operations it is possible to return a value that indicates timeout; when that is not possible or desirable then TimeoutException should be declared and thrown.

f-galland commented 11 hours ago

The timeout for the client.index operation needed to be declared in milliseconds and we were using seconds instead.

After fixing that, I haven't observed the above exceptions again. Leaving the engine running for an extended period while indexing commands at a one per second rate works as expected:


$ curl -X GET http://localhost:9200/.commands/_count
{"count":2612,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}

$ curl -X GET http://localhost:9200/.commands/_count -H 'Content-type: application/json' -d '
{
  "query": {
    "term": {
      "command.status.keyword": {
        "value": "SENT"
      }
    }
  }
}
'
{"count":2612,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}

As can be seen, every received command was updated to status SENT.