microsoft / kafka-connect-cosmosdb

Kafka Connect connectors for Azure Cosmos DB
MIT License
51 stars 55 forks source link
azure azure-cosmos-db confluent-hub cosmos cosmosdb kafka kafka-connect kafka-connector sink sink-connector source source-connector

Kafka Connect for Azure Cosmos DB (SQL API)

Open Source Love svg2 PRs Welcome Maintenance

Java CI with Maven Release

Introduction

File any issues / feature requests / questions etc. you may have in the Issues for this repo.

This project provides connectors for Kafka Connect to read from and write data to Azure Cosmos DB(SQL API).

The connectors in this repository are specifically for the Cosmos DB SQL API. If you are using Cosmos DB with other APIs then there is likely a specific connector for that API, but it's not this one.

Exactly-Once Support

Supported Data Formats

The sink & source connectors are configurable in order to support:

Format Name Description
JSON (Plain) JSON record structure without any attached schema.
JSON with Schema JSON record structure with explicit schema information to ensure the data matches the expected format.
AVRO A row-oriented remote procedure call and data serialization framework developed within Apache's Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format.

Since key and value settings, including the format and serialization, can be independently configured in Kafka, it is possible to work with different data formats for records' keys and values respectively.

To cater for this there is converter configuration for both key.converter and value.converter.

Converter Configuration Examples

JSON (Plain)

JSON with Schema

NOTE: The message written is made up of the schema + payload. Notice the size of the message, as well as the proportion of it that is made up of the payload vs. the schema. This is repeated in every message you write to Kafka. In scenarios like this, you may want to use a serialisation format like JSON Schema or Avro, where the schema is stored separately and the message holds just the payload.

AVRO

The example below shows the AvroConverter key and value properties that are added to the configuration:

  key.converter=io.confluent.connect.avro.AvroConverter
  key.converter.schema.registry.url=http://schema-registry:8081
  value.converter=io.confluent.connect.avro.AvroConverter
  value.converter.schema.registry.url=http://schema-registry:8081

Choosing a conversion format

Common Errors

Some of the common errors you can get if you misconfigure the converters in Kafka Connect. These will show up in the sinks you configure for Kafka Connect, as it’s this point at which you’ll be trying to deserialize the messages already stored in Kafka. Converter problems tend not to occur in sources because it’s in the source that the serialization is set.

Converter Configuration Errors

Configuration

Common Configuration Properties

The Sink and Source connectors share the following common configuration properties

Name Type Description Required/Optional
connect.cosmos.connection.endpoint uri Cosmos endpoint URI string Required
connect.cosmos.master.key string The Cosmos primary key that the sink connects with Required
connect.cosmos.databasename string The name of the Cosmos database the sink writes to Required
connect.cosmos.containers.topicmap string Mapping between Kafka Topics and Cosmos Containers, formatted using CSV as shown: topic#container,topic2#container2 Required

For Sink connector specific configuration, please refer to the Sink Connector Documentation

For Source connector specific configuration, please refer to the Source Connector Documentation

Project Setup

Please refer Developer Walkthrough and Project Setup for initial setup instructions.

Performance Testing

For more information on the performance tests run for the Sink and Source connectors, refer to the Performance testing document.

Refer to the Performance Environment Setup for exact steps on deploying the performance test environment for the Connectors.

Dead Letter Queue

We introduced the standard dead level queue from Kafka. For more info see: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues

Resources