vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17.42k stars 1.51k forks source link

Avro files miss headers #16994

Open razumau opened 1 year ago

razumau commented 1 year ago

A note for the community

Problem

Avro files generated by sinks miss header.

According to Avro’s specification, Avro files should begin with a schema. However, Avro files produced by File or S3 sinks start with data and don’t have headers.

It seems to be happening because the only Avro-writing method used in Vector is to_avro_datum (https://github.com/vectordotdev/vector/blob/6542778af87ec8324ff1e75b9e68cb3251d1931c/lib/codecs/src/encoding/format/avro.rs#L73), and the comment about it in apache-avro says that it doesn’t generate headers. This usage makes sense, because we don’t need headers there, however, headers are seemingly not being written someplace else.

Am I missing something? Should there be more in my config? I’ve attached a minimal config that creates a small avro file. Avro files are in this gist.

Configuration

[sources.logs]
type = "file"
include = [ "./*.log" ]
data_dir = "."

[transforms.interaction_id]
type = "remap"
inputs = ["logs"]
source = """
  del(.message)
  .id = "id_1"
"""

[sinks.console]
type = "console"
encoding.codec = "json"
inputs = [ "interaction_id" ]

[sinks.avro]
type = "file"
inputs = [ "interaction_id" ]
path = "./%+.avro"
encoding.codec = "avro"
encoding.avro.schema   = """
   {
      "type": "record",
      "name": "interaction",
      "fields": [
        {
          "name": "id",
          "type": "string"
        }
      ]
    }
"""
idle_timeout_secs = 5

Version

0.28.1

Debug Output

2023-03-29T10:54:48.030503Z  INFO vector::app: Internal log rate limit configured. internal_log_rate_secs=10
2023-03-29T10:54:48.032027Z  INFO vector::app: Log level is enabled. level="vector=trace,codec=trace,vrl=trace,file_source=trace,tower_limit=trace,rdkafka=trace,buffers=trace,lapin=trace,kube=trace"
2023-03-29T10:54:48.033921Z  INFO vector::app: Loading configs. paths=["vector.toml"]
2023-03-29T10:54:48.048454Z DEBUG vector::config::loading: No secret placeholder found, skipping secret resolution.
2023-03-29T10:54:48.066133Z DEBUG vector::topology::builder: Building new source. component=logs
2023-03-29T10:54:48.069107Z DEBUG vector::topology::builder: Building new transform. component=interaction_id
2023-03-29T10:54:48.070912Z DEBUG vector::topology::builder: Building new sink. component=avro
2023-03-29T10:54:48.073570Z DEBUG vector::topology::builder: Building new sink. component=console
2023-03-29T10:54:48.074486Z  INFO vector::topology::running: Running healthchecks.
2023-03-29T10:54:48.074793Z DEBUG vector::topology::running: Connecting changed/added component(s).
2023-03-29T10:54:48.074812Z DEBUG vector::topology::running: Configuring outputs for source. component=logs
2023-03-29T10:54:48.075053Z  INFO vector::topology::builder: Healthcheck: Passed.
2023-03-29T10:54:48.075070Z  INFO vector::topology::builder: Healthcheck: Passed.
2023-03-29T10:54:48.075168Z DEBUG vector::topology::running: Configuring output for component. component=logs output_id=None
2023-03-29T10:54:48.075187Z DEBUG vector::topology::running: Configuring outputs for transform. component=interaction_id
2023-03-29T10:54:48.075198Z DEBUG vector::topology::running: Configuring output for component. component=interaction_id output_id=None
2023-03-29T10:54:48.075210Z DEBUG vector::topology::running: Connecting inputs for transform. component=interaction_id
2023-03-29T10:54:48.075226Z DEBUG vector::topology::running: Adding component input to fanout. component=interaction_id fanout_id=logs
2023-03-29T10:54:48.075542Z DEBUG vector::topology::running: Connecting inputs for sink. component=console
2023-03-29T10:54:48.075560Z DEBUG vector::topology::running: Adding component input to fanout. component=console fanout_id=interaction_id
2023-03-29T10:54:48.075573Z DEBUG vector::topology::running: Connecting inputs for sink. component=avro
2023-03-29T10:54:48.075586Z DEBUG vector::topology::running: Adding component input to fanout. component=avro fanout_id=interaction_id
2023-03-29T10:54:48.075606Z DEBUG vector::topology::running: Spawning new source. key=logs
2023-03-29T10:54:48.075636Z DEBUG vector::topology::running: Spawning new transform. key=interaction_id
2023-03-29T10:54:48.075665Z TRACE vector::topology::running: Spawning new sink. key=console
2023-03-29T10:54:48.075679Z TRACE vector::topology::running: Spawning new sink. key=avro
2023-03-29T10:54:48.076006Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::topology::builder: Sink starting.
2023-03-29T10:54:48.076008Z DEBUG source{component_kind="source" component_id=logs component_type=file component_name=logs}: vector::topology::builder: Source pump supervisor starting.
2023-03-29T10:54:48.076035Z DEBUG transform{component_kind="transform" component_id=interaction_id component_type=remap component_name=interaction_id}: vector::topology::builder: Synchronous transform starting.
2023-03-29T10:54:48.076034Z DEBUG source{component_kind="source" component_id=logs component_type=file component_name=logs}: vector::topology::builder: Source starting.
2023-03-29T10:54:48.076007Z DEBUG sink{component_kind="sink" component_id=avro component_type=file component_name=avro}: vector::topology::builder: Sink starting.
2023-03-29T10:54:48.076167Z  INFO vector: Vector has started. debug="false" version="0.26.0" arch="x86_64" revision="c6b5bc2 2022-12-05"
2023-03-29T10:54:48.076208Z  INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.
2023-03-29T10:54:48.076398Z DEBUG source{component_kind="source" component_id=logs component_type=file component_name=logs}: vector::topology::builder: Source pump starting.
2023-03-29T10:54:48.076906Z  INFO source{component_kind="source" component_id=logs component_type=file component_name=logs}: vector::sources::file: Starting file server. include=["./*.log"] exclude=[]
2023-03-29T10:54:48.077561Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.05467326165853459
2023-03-29T10:54:48.077568Z DEBUG sink{component_kind="sink" component_id=avro component_type=file component_name=avro}: vector::utilization: utilization=0.013746270045153053
2023-03-29T10:54:48.077568Z TRACE vector: Beep.
2023-03-29T10:54:48.078319Z  INFO source{component_kind="source" component_id=logs component_type=file component_name=logs}:file_server: file_source::checkpointer: Loaded checkpoint data.
2023-03-29T10:54:48.080497Z  INFO source{component_kind="source" component_id=logs component_type=file component_name=logs}:file_server: vector::internal_events::file::source: Resuming to watch file. file=test.log file_position=90
2023-03-29T10:54:48.081239Z TRACE source{component_kind="source" component_id=logs component_type=file component_name=logs}:file_server: file_source::file_server: Continue watching file. path="test.log"
2023-03-29T10:54:49.077739Z TRACE vector: Beep.
2023-03-29T10:54:49.110461Z DEBUG vector::internal_events::file::source: Files checkpointed. count=2 duration_ms=26
2023-03-29T10:54:49.117644Z DEBUG source{component_kind="source" component_id=logs component_type=file component_name=logs}:file_server: file_source::file_server: event_throughput=0.000/sec bytes_throughput=0.000/sec ratios={"discovery": 0.0004887969, "other": 0.0001565995, "reading": 9.403917e-5, "sending": 0.00042390157, "sleeping": 0.9988414}
2023-03-29T10:54:49.118790Z TRACE source{component_kind="source" component_id=logs component_type=file component_name=logs}:file_server: file_source::file_server: Continue watching file. path="test.log"
2023-03-29T10:54:50.077711Z TRACE vector: Beep.
2023-03-29T10:54:50.112952Z DEBUG vector::internal_events::file::source: Files checkpointed. count=2 duration_ms=0
2023-03-29T10:54:50.143356Z DEBUG source{component_kind="source" component_id=logs component_type=file component_name=logs}:file_server: file_source::file_server: event_throughput=0.000/sec bytes_throughput=0.000/sec ratios={"discovery": 0.00074738025, "other": 0.00016802402, "reading": 4.9383467e-5, "sending": 0.00021879365, "sleeping": 0.99881643}
2023-03-29T10:54:50.143898Z TRACE source{component_kind="source" component_id=logs component_type=file component_name=logs}:file_server: file_source::file_server: Continue watching file. path="test.log"
2023-03-29T10:54:51.079108Z TRACE vector: Beep.
2023-03-29T10:54:51.115406Z DEBUG vector::internal_events::file::source: Files checkpointed. count=2 duration_ms=0
2023-03-29T10:54:52.077955Z TRACE vector: Beep.
2023-03-29T10:54:52.117571Z DEBUG vector::internal_events::file::source: Files checkpointed. count=2 duration_ms=0
2023-03-29T10:54:52.192274Z DEBUG source{component_kind="source" component_id=logs component_type=file component_name=logs}:file_server: file_source::file_server: event_throughput=0.000/sec bytes_throughput=0.000/sec ratios={"discovery": 0.000500229, "other": 9.997211e-5, "reading": 2.5505993e-5, "sending": 0.000111648216, "sleeping": 0.9992627}
2023-03-29T10:54:52.192762Z TRACE source{component_kind="source" component_id=logs component_type=file component_name=logs}:file_server: file_source::file_server: Continue watching file. path="test.log"
2023-03-29T10:54:53.077392Z TRACE vector: Beep.
2023-03-29T10:54:53.077455Z DEBUG sink{component_kind="sink" component_id=avro component_type=file component_name=avro}: vector::utilization: utilization=0.0013746405052619257
2023-03-29T10:54:53.077459Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.0054673385865286455
2023-03-29T10:54:53.118897Z DEBUG vector::internal_events::file::source: Files checkpointed. count=2 duration_ms=0
2023-03-29T10:54:54.077563Z TRACE vector: Beep.
2023-03-29T10:54:54.120289Z DEBUG vector::internal_events::file::source: Files checkpointed. count=2 duration_ms=0
2023-03-29T10:54:54.242407Z DEBUG source{component_kind="source" component_id=logs component_type=file component_name=logs}:file_server: file_source::file_server: event_throughput=0.000/sec bytes_throughput=0.000/sec ratios={"discovery": 0.00040951258, "other": 7.652749e-5, "reading": 1.7471024e-5, "sending": 7.586726e-5, "sleeping": 0.99942064}
2023-03-29T10:54:54.242984Z TRACE source{component_kind="source" component_id=logs component_type=file component_name=logs}:file_server: file_source::file_server: Continue watching file. path="test.log"
2023-03-29T10:54:55.078887Z TRACE vector: Beep.
2023-03-29T10:54:55.122676Z DEBUG vector::internal_events::file::source: Files checkpointed. count=2 duration_ms=0
2023-03-29T10:54:56.078487Z TRACE vector: Beep.
2023-03-29T10:54:56.124779Z DEBUG vector::internal_events::file::source: Files checkpointed. count=2 duration_ms=0
2023-03-29T10:54:56.293111Z DEBUG source{component_kind="source" component_id=logs component_type=file component_name=logs}:file_server: file_source::file_server: event_throughput=0.000/sec bytes_throughput=0.000/sec ratios={"discovery": 0.00037476403, "other": 6.58176e-5, "reading": 1.3513954e-5, "sending": 5.807708e-5, "sleeping": 0.9994879}
2023-03-29T10:54:56.293634Z TRACE source{component_kind="source" component_id=logs component_type=file component_name=logs}:file_server: file_source::file_server: Continue watching file. path="test.log"

Example Data

No response

Additional Context

No response

References

No response

jszwedko commented 1 year ago

Thanks for the report @razumau !

Yes, you are correct that the avro encoding doesn't write the schema as a header. I think the common usage to this point has been with known schemas that users can then apply to the file separately when consuming. It is an obvious shortcoming though.

Fixing this would require a bit of an update to the encoding system within Vector which only deals with encoding individual events and not, for example, providing additional data to write at the beginning of each "batch". This same issue would come up with a csv codec or other codecs that require "headers".

manevant commented 1 month ago

@jszwedko Hello! Is there any news on this issue? Are there any plans to add a header bytes and schema to the file header so that the resulting file is truly Avro compatible?

I'm afraid that at the moment in real use this sync is almost useless. Because almost every tool or system I tried to process the resulting file expects a schema or at least a header.

I'm not an expert on the Avro standard - but is the presence of a header without the data schema itself a valid Avro file?

Perhaps, if the previous assumption is correct, it is possible to add only a header to the final resulting avro file of the sync - so that the result can be read by standard libraries (in my case attempts to process the resulting file included python, apache spark and apache impala)

jszwedko commented 1 month ago

I'm not an avro expert, but my understanding is that the schema can be provided when decoding data files so it is usable albeit less convenient. Ideally the schema would be included in the file itself too.

NitriKx commented 1 month ago

+đź‘Ť as this issue prevents from using Google BigQuery with the generated files. Unfortunately BigQuery expects the header to be present and refuses to import it / use as a source data in an External table