delta-io / kafka-delta-ingest

A highly efficient daemon for streaming data from Kafka into Delta Lake
Apache License 2.0
343 stars 73 forks source link

Add flag to support gzip compressed messages #179

Closed geosach closed 2 weeks ago

geosach commented 2 weeks ago

Gzip decompression flag

Summary

This pull request introduces support for ingesting gzip-compressed messages in the kafka-delta-ingest project. This new functionality allows users to handle gzip-compressed Kafka messages, decompressing them before further processing and ingestion into Delta Lake.

Changes

  1. New Feature: Gzip Decompression
    • Added the --decompress_gzip flag to the command-line options.
    • Modified the MessageDeserializerFactory and related deserialization logic to support gzip decompression.
    • Updated main.rs to handle the new command-line argument and pass it to the deserialization process.
    • Updated lib.rs to integrate the new gzip decompression functionality.

Notes

---- test_start_from_latest stdout ----
thread 'test_start_from_latest' panicked at tests/helpers/mod.rs:361:13:
File was not created before timeout
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

failures:
    test_start_from_latest

test result: FAILED. 4 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 203.42s

However since this happens to older commits, it seems to not be caused by this new functionality.

Usage

Use the --decompress_gzip flag to enable gzip decompression. Ensure that the Kafka producer is configured to produce gzip-compressed messages.

RUST_LOG=debug cargo run ingest <kafka_topic> <delta_table_path> \
  --allowed_latency 60 \
  --app_id <app_id> \
  --decompress_gzip \
  --auto_offset_reset earliest

This command enables gzip decompression for Kafka messages ingested into the Delta Lake table.

Conclusion

This enhancement allows for more flexible and efficient processing of Kafka messages, particularly in environments where data compression is essential. Please review the changes and let me know if any adjustments or additional tests are required. Thank you for considering this contribution.