Open AlexRuiz7 opened 1 month ago
Other plugins seem to interface with JobScheduler
through its Service Provider Interface:
It looks like the Plugin class (the main class inheriting from OpenSearch's Plugin
) needs to implement JobSchedulerExtension
.
A separate class implements ScheduledJobRunner
's runJob()
which pushes the task to its own thread:
A javadoc in this class reads as follows:
* The job runner class for scheduling async query.
*
* <p>The job runner should be a singleton class if it uses OpenSearch client or other objects
* passed from OpenSearch. Because when registering the job runner to JobScheduler plugin,
* OpenSearch has not invoked plugins' createComponents() method. That is saying the plugin is not
* completely initialized, and the OpenSearch {@link org.opensearch.client.Client}, {@link
* ClusterService} and other objects are not available to plugin and this job runner.
*
* <p>So we have to move this job runner initialization to {@link Plugin} createComponents() method,
* and using singleton job runner to ensure we register a usable job runner instance to JobScheduler
* plugin.
The SQL plugin uses a model class for scheduled jobs which implements ScheduledJobParameter
from JobScheduler
:
That research was already performed in #65
job-scheduler
to the command manager's gradle task. Job scheduler classes are not really being used over there.SampleExtensionRestHandler
:SampleJobParameters
with parameters from POST callSampleJobParameter
as a json
object.SampleExtensionPlugin
:getJobType()
: Returns a string with the job typegetJobIndex()
: Returns the name of the index that holds the scheduled jobs' parametersgetJobRunner()
: Returns the singleton instance of the plugin's Runner classgetJobParser()
: Returns a ScheduledJobParser
object that can parse the task's parametersSampleJobParameter
:toXContent()
, which is used to index the jobSampleJobRunner
:runJob()
which contains the job's logic.ScheduledJobParameter
, which gives it access to the task's detailsJobExecutionContext
which allows it to acquire a lock during the task execution time window.Runnable
object that gets submitted to an Opensearch thread.It seems like the only proper way to schedule tasks using the job scheduler is to store them as documents to an index.
This is evidenced by the fact that the only call of runJob
comes from the reschedule()
method from the JobScheduler
class. The job parameters to this runJob()
call can be traced back to the sweep()
method from the JobSweeper
class in turn.
Lastly, the sweep()
method seems to parse the job parameters from a provided index.
Search results pagination can be achieved by means of two distinct methods:
SearchSourceBuilder
's from()
and size()
which appear to be meant for user facing interfacesScroll
and other related classes.Solution 2
seems more robust (and is suggested for larger data batches).
I'm researching how official plugins handle iterating over the search result pages without blocking execution.
We have used the provided ThreadPool
for this in past tests alongside simple while
loops, but there seem to be more elegant solutions:
As of commit 3fc33ea, the JobSchedulerExtension has been implemented as explained below:
CommandManagerPlugin
implements the JobSchedulerExtension
interface
CommandManagerJobRunner
implements the ScheduledJobRunner
interface, which provides the runJob()
method.runJob()
method calls SearchJob
's searchJobRunnable()
which returns a Runnable
.
searchJobRunnable()
is structured around a do while
loop that issues a Point In Time search.
handlePage()
method will take each resulting SearchResponse
(a page) and iterate over the SearchHit
s to update the command.status
field from PENDING
to SENT
. This method also submits the command to the destination HTTP API.runPitQuery()
method is a wrapper around client.search()
, using the synchronous variant of the method without any Future handling (this is all run in a separate thread at runJob()
as described above anyway).pitSearchRequest()
composes a SearchRequest
object that consumes a SearchSourceBuilder
object.
SearchSourceBuilder
object currently uses a hardcoded term
query that looks for the value PENDING
within the command.status
field of the .commands
index documents.order_id
and timeout
in order for the Point In Time pagination to work.SearchSourceBuilder
object consumes a PointInTimeBuilder
object that is handled by a custom PointInTime
class.searchAfter
Object[]
is provided, this method returns a SearchRequest
that can be used to pull the first page. Otherwise, it will create one that requests a subsequent page according to the Point In Time functionality.
Description
As part of the command manager plugin development and in continuation of #65, we are going to implement the job-scheduler logic to prioritize the commands and send them to the Wazuh Server's Management API.
Plan
Functional requirements
[x] The job runner reads commands from the
.commands
index inPENDING
status.[x] #126
[x] The job runner sends the commands to the Management API (needs HTTP service implementation).*
[ ] The job runner logs each of its actions properly.
The job runner sends the commands to an external function for its processing. For the time being, we can just print these commands. Once the HTTP service implementation is completed, we can assemble both pieces.