OpenCTI-Platform / client-python

OpenCTI Python Client
https://www.opencti.io
Apache License 2.0
116 stars 130 forks source link

[client] Connector queuing auto backpressure - Scheduler (#6325) #698

Closed Megafredo closed 3 months ago

Megafredo commented 4 months ago

Proposed changes

Related issues

Checklist

Further comments

If this is a relatively large or complex change, kick off the discussion by explaining why you chose the solution you did and what alternatives you considered, etc...

helene-nguyen commented 4 months ago

Tested locally and everything seems good to me, here are different cases I've tested:

Steps to Reproduce and my understanding:

  1. Stop the Worker:

    • Begin by stopping the worker process.
  2. Run the Connector:

    • Start the connector process.
  3. Scheduler Checks duration_period Configuration:

    • The scheduler will review the duration_period setting.
  4. Main Process Execution:

    • The main process of the connector will start.
    • We must wait for this process to complete before entering the scheduler configuration and initialization.
  5. Scheduler Activation:

    • The scheduler activates once the specified period time is reached.
  6. Queue Size Threshold Check:

    • The scheduler checks the queue size threshold, defined as queue_threshold.
    • If no environment variable is set, the default threshold is 500MB.
  7. Queue Size Condition:

    • If the connector's queue size exceeds the queue threshold, the message callback process (connector's main process) will pause.
    • The process will not resume until the queue is ingested and reduced sufficiently, allowing it to restart during the next scheduler check.

During the test, the connector was modified to run the scheduler as following:

    def process_message(self):
        """Main process of the connector."""
        [...]

    def run(self):
        self.helper.schedule_iso(
            message_callback=self.process_message,
            duration_period=self.config.duration_period,
        )