ooni / data

OONI Data CLI and Pipeline v5
https://docs.ooni.org/data
8 stars 4 forks source link

Investigate corrupt file entries inside of buckets #93

Open hellais opened 2 months ago

hellais commented 2 months ago

When running the reprocessing of data some corrupt files were encountered. Due to the lack of adequate logging in the temporal cloud interface it's unclear which file exact was affected, but it's for sure inside of the 2024-06-23 bucket and is caused by an empty JSON file, see following stack trace:

{"message":"Input is a zero-length, empty document: line 1 column 1 (char 0)","stackTrace":"  File \"/home/art/repos/ooni/data/oonipipeline/.venv/lib/python3.11/site-packages/temporalio/worker/_activity.py\", line 453, in _run_activity\n    result = await impl.execute_activity(input)\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/home/art/repos/ooni/data/oonipipeline/.venv/lib/python3.11/site-packages/temporalio/contrib/opentelemetry.py\", line 280, in execute_activity\n    return await super().execute_activity(input)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/home/art/repos/ooni/data/oonipipeline/.venv/lib/python3.11/site-packages/temporalio/worker/_interceptor.py\", line 119, in execute_activity\n    return await self.next.execute_activity(input)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/home/art/repos/ooni/data/oonipipeline/.venv/lib/python3.11/site-packages/temporalio/worker/_activity.py\", line 711, in execute_activity\n    return await input.fn(*input.args)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/home/art/repos/ooni/data/oonipipeline/src/oonipipeline/temporal/activities/observations.py\", line 229, in make_observations\n    measurement_count = sum(await asyncio.gather(*awaitables))\n                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n","cause":{"message":"\n\"\"\"\nTraceback (most recent call last):\n  File \"/usr/lib/python3.11/concurrent/futures/process.py\", line 256, in _process_worker\n    r = call_item.fn(*call_item.args, **call_item.kwargs)\n        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/home/art/repos/ooni/data/oonipipeline/src/oonipipeline/temporal/activities/observations.py\", line 130, in make_observations_for_file_entry_batch\n    measurement_count, failure_count = make_observations_for_file_entry(\n                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/home/art/repos/ooni/data/oonipipeline/src/oonipipeline/temporal/activities/observations.py\", line 74, in make_observations_for_file_entry\n    for msmt_dict in stream_measurements(\n  File \"/home/art/repos/ooni/data/oonipipeline/src/oonidata/dataclient.py\", line 249, in stream_measurements\n    yield from stream_postcan(body)\n  File \"/home/art/repos/ooni/data/oonipipeline/src/oonidata/dataclient.py\", line 162, in stream_postcan\n    post = orjson.loads(in_file.read())\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\norjson.JSONDecodeError: Input is a zero-length, empty document: line 1 column 1 (char 0)\n\"\"\"","applicationFailureInfo":{"type":"_RemoteTraceback"}},"applicationFailureInfo":{"type":"JSONDecodeError"}}

For the moment in order to avoid blocking the reprocessing we bypass it by checking for any exception in the stream_measurement call