insight-platform / Replay

Video archive and on-demand re-streaming database
Other
3 stars 0 forks source link

Job that is configured to stop by realtime delta works ~5s longer than expected #10

Open aletharel opened 3 months ago

aletharel commented 3 months ago

There is a test that checks job stop condition by real time delta. Here is the config fragment with realtime delta set to 30 seconds:

    "stop_condition": {
        "real_time_delta_ms": {
            "configured_delta_ms": 30000
        }
    },

The test remembers start_timestamp, then receives all frames and remembers end_timestamp like this:

    # read stop condition > real time delta from job config
    realtime_delta = job_config.get("stop_condition", {}).get("real_time_delta_ms", {}).get("configured_delta_ms", 0) * 1000000
    if not realtime_delta:
        pytest.skip("Job stop condition is expected to be a realtime delta")
    end_timestamp = 0

    sink_runner = (SinkBuilder()
                    .with_socket('router+bind:tcp://0.0.0.0:6666')
                    .with_idle_timeout(30)
                    .build()
                    )

    start_timestamp = time.monotonic_ns()
    for result in sink_runner:
        end_timestamp = time.monotonic_ns()
    job_status = rest_api.get_job_status(replay_job)
    job_lifetime = end_timestamp - start_timestamp

There is no way that end_timestamp is affected by sink timeout since we take the last timestamp from within the loop. The difference (end_timestamp - start_timestamp) is about 5 seconds, so the test fails:

FAILED tests/test_job.py::TestJob::test_stop_condition_realtime_delta - AssertionError: assert (<JobStatus.STOPPED: 'Stopped'> == <JobStatus.STOPPED: 'Stopped'> and 30000000000 == 35098027418)

I am not sure if this is a bug or just an admissible error.

bwsw commented 3 months ago

@aletharel, I do not think the test is correct because you created the job earlier than the sink was created. It starts waiting and retrying, thus you can get a prolonged execution.

aletharel commented 2 months ago

@bwsw, I tried to update the test and start replay job after sink creation. Now the code is as follows:

    def test_stop_condition_realtime_delta(self, job_config, stream_id):
        # Read configuration > stop condition > realtime delta from job config
        realtime_delta = job_config.get("stop_condition", {}).get("real_time_delta_ms", {}).get(
            "configured_delta_ms", 0) * 1000000
        if not realtime_delta:
            pytest.skip("Job stop condition is expected to be a realtime delta")
        end_timestamp = 0

        # Create sink
        sink_runner = (SinkBuilder()
                       .with_socket('router+bind:tcp://0.0.0.0:6666')
                       .with_idle_timeout(30)
                       .build()
                       )

        # Start Replay job
        keyframe = rest_api.find_keyframe(stream_id)
        resulting_stream_id = "{0}_result".format(stream_id)
        job_id = rest_api.new_job(job_config, keyframe, stream_id, resulting_stream_id)

        start_timestamp = time.monotonic_ns()
        for result in sink_runner:
            end_timestamp = time.monotonic_ns()
        job_status = rest_api.get_job_status(job_id)
        job_lifetime = end_timestamp - start_timestamp

        rest_api.delete_job(job_id)

        assert (
                job_status == rest_api.JobStatus.STOPPED
                and realtime_delta <= job_lifetime < realtime_delta + 1000000000
        )

And the job lifetime is still 5 seconds greater than expected