delta-io / kafka-delta-ingest

A highly efficient daemon for streaming data from Kafka into Delta Lake
Apache License 2.0
337 stars 71 forks source link
delta deltalake kafka rust

= kafka-delta-ingest

The kafka-delta-ingest project aims to build a highly efficient daemon for streaming data through link:https://kafka.apache.org[Apache Kafka] into link:https://delta.io[Delta Lake].

This project is currently in production in a number of organizations and is still actively evolving in tandem with the link:https://github.com/delta-io/delta-rs[delta-rs] bindings.

To contribute please look at the link:https://github.com/delta-io/kafka-delta-ingest/blob/main/doc/HACKING.adoc[hacking document].

== Features

See the link:https://github.com/delta-io/kafka-delta-ingest/blob/main/doc/DESIGN.md[design doc] for more details.

=== Example

The repository includes an example for trying out the application locally with some fake web request data.

The included docker-compose.yml contains link:https://github.com/wurstmeister/kafka-docker/issues[kafka] and link:https://github.com/localstack/localstack[localstack] services you can run kafka-delta-ingest against locally.

==== Starting Worker Processes

  1. Launch test services - docker-compose up setup
  2. Compile: cargo build
  3. Run kafka-delta-ingest against the web_requests example topic and table (customize arguments as desired):
export AWS_ENDPOINT_URL=http://0.0.0.0:4566
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test

RUST_LOG=debug cargo run ingest web_requests ./tests/data/web_requests \
  --allowed_latency 60 \
  --app_id web_requests \
  --transform 'date: substr(meta.producer.timestamp, `0`, `10`)' \
  --transform 'meta.kafka.offset: kafka.offset' \
  --transform 'meta.kafka.partition: kafka.partition' \
  --transform 'meta.kafka.topic: kafka.topic' \
  --auto_offset_reset earliest

Notes:

==== Kafka SSL

In case you have Kafka topics secured by SSL client certificates, you can specify these secrets as environment variables.

For the cert chain include the PEM content as an environment variable named KAFKA_DELTA_INGEST_CERT. For the cert private key include the PEM content as an environment variable named KAFKA_DELTA_INGEST_KEY.

These will be set as the ssl.certificate.pem and ssl.key.pem Kafka settings respectively.

Make sure to provide the additional option:

-K security.protocol=SSL

when invoking the cli command as well.

=== Using Azure Event Hubs

Azure Event Hubs (with pricing tier "Standard" or higher) has a Kafka Surface that can be used with kafka-delta-ingest.

Azure Event Hubs doesn't have a local emulator, so an actual Azure Event Hubs resource is required. As a result, there's no need for the docker-compose application described above.

More info:

==== Starting Worker Processes

  1. link:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create[Create] an Azure Event Hubs Namespace and within it, an Event Hub (which corresponds to a Kafka topic).

  2. Set these environment variables, they are required by the delta-rs library:

    • AZURE_STORAGE_ACCOUNT_NAME (just the storage account name, not the FQDN)
    • AZURE_STORAGE_ACCOUNT_KEY (just the key, not the connection string)
  3. Create the _delta_log directory in the web_requests directory in Azure Storage and upload the link:https://github.com/delta-io/kafka-delta-ingest/blob/main/tests/data/web_requests/_delta_log/00000000000000000000.json[first Delta transaction containing the schema] to this directory.

  4. In the docker command below, replace the following placeholders with your values:

    • AZURE_STORAGE_ACCOUNT_NAME (just the storage account name, not the FQDN)
    • AZURE_STORAGE_ACCOUNT_KEY (just the key, not the connection string)
    • EVENTHUBS_NAMESPACE_NAME (just the namespace name, not the FQDN)
    • EVENTHUBS_KEY_NAME
    • EVENTHUBS_KEY
  5. Build the docker image

docker build -t kdi:0.1 . -f Dockerfile.Debian

Notes:

  1. Execute this docker command to run kafka-delta-ingest
docker run -it --network=host ^
  -e RUST_LOG="debug" ^
  -e SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt ^
  -e AZURE_STORAGE_ACCOUNT_NAME={AZURE_STORAGE_ACCOUNT_NAME} ^
  -e "AZURE_STORAGE_ACCOUNT_KEY={AZURE_STORAGE_ACCOUNT_KEY}" ^
  kdi:0.1 ^
  ingest web_requests adls2://{AZURE_STORAGE_ACCOUNT_NAME}/{FILESYSTEM_NAME}/web_requests ^
  --allowed_latency 5 ^
  --kafka thovoll-kdi-eh.servicebus.windows.net:9093 ^
  --Kafka security.protocol=SASL_SSL ^
  --Kafka sasl.mechanism=PLAIN ^
  --Kafka sasl.username=$ConnectionString ^
  --Kafka sasl.password=Endpoint=sb://{EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net/;SharedAccessKeyName={EVENTHUBS_KEY_NAME};SharedAccessKey={EVENTHUBS_KEY} ^
  --Kafka socket.keepalive.enable=true ^
  --Kafka metadata.max.age.ms=180000 ^
  --Kafka heartbeat.interval.ms=3000 ^
  --Kafka session.timeout.ms=30000 ^
  --Kafka debug=broker,security,protocol ^
  --app_id web_requests ^
  --transform "date: substr(meta.producer.timestamp, `0`, `10`)" ^
  --transform "meta.kafka.offset: kafka.offset" ^
  --transform "meta.kafka.partition: kafka.partition" ^
  --transform "meta.kafka.topic: kafka.topic" ^
  --auto_offset_reset earliest

Notes:

==== Sending data to Event Hubs

On Windows, link:https://github.com/paolosalvatori/ServiceBusExplorer[Service Bus Explorer] can be used to send data to Event Hubs.

The following payload should be sent for the web_requests Delta table:

{
  "status": 200,
  "session_id": "7c28bcf9-be26-4d0b-931a-3374ab4bb458",
  "method": "GET",
  "meta": {
    "producer": {
      "timestamp": "2021-03-24T15:06:17.321710+00:00"
    }
  },
  "uuid": "831c6afa-375c-4988-b248-096f9ed101f8",
  "url": "http://www.example.com"
}

==== Verifying data from Event Hub using kcat

kcat can be run on Windows via docker using this command, which will print the last message (-o -1).

Make sure to first replace the following placeholders:

docker run -it --network=host edenhill/kcat:1.7.1 -C -o -1 -b {EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net:9093 -t web_requests -X security.protocol=SASL_SSL -X sasl.mechanism=PLAIN -X sasl.username=$ConnectionString -X sasl.password=Endpoint=sb://{EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net/;SharedAccessKeyName={EVENTHUBS_KEY_NAME};SharedAccessKey={EVENTHUBS_KEY} -X socket.keepalive.enable=true -X metadata.max.age.ms=180000 -X heartbeat.interval.ms=3000 -X session.timeout.ms=30000

Notes:

== Kafka SSL

In case you have Kafka topics secured by SSL client certificates, you can specify these secrets as environment variables.

For the cert chain include the PEM content as an environment variable named KAFKA_DELTA_INGEST_CERT. For the cert private key include the PEM content as an environment variable named KAFKA_DELTA_INGEST_KEY.

These will be set as the ssl.certificate.pem and ssl.key.pem Kafka settings respectively.

Make sure to provide the additional option:

-K security.protocol=SSL

when invoking the cli command as well.

== Writing to S3

When writing to S3, you may experience an error like source: StorageError { source: S3Generic("dynamodb locking is not enabled") }.

A locking mechanism is need to prevent unsafe concurrent writes to a delta lake directory, and DynamoDB is an option for this. To use DynamoDB, set the AWS_S3_LOCKING_PROVIDER variable to dynamodb and create a table named delta_rs_lock_table in Dynamo. An example DynamoDB table creation snippet using the aws CLI follows, and should be customized for your environment's needs (e.g. read/write capacity modes):

aws dynamodb create-table --table-name delta_rs_lock_table \
    --attribute-definitions \
        AttributeName=key,AttributeType=S \
    --key-schema \
        AttributeName=key,KeyType=HASH \
    --provisioned-throughput \
        ReadCapacityUnits=10,WriteCapacityUnits=10

== Schema Support This application has support for both avro and json format via command line arguments. If no format argument is provided, the default behavior is to use json. The table below indicates what will happen with respect to the provided arguments.

=== Argument Value Result
default json behavior
--json default json behavior
--json will connect schema registry to deserialize json
--avro "" expects all messages in avro format
--avro will use the provided avro schema for deserialization
--avro will connect schema registry to deserialize avro
===

For more information, see link:https://github.com/delta-io/delta-rs/tree/dbc2994c5fddfd39fc31a8f9202df74788f59a01/dynamodb_lock[DynamoDB lock]. == Verifying data in Azure Storage

Use the Azure Portal to browse the file system:

== Get Involved

Join link:https://delta-users.slack.com/archives/C01Q2RXCVSQ[#kafka-delta-ingest in the Delta Lake Slack workspace]