Swift Kafka Client

The Swift Kafka Client library provides a convenient way to interact with Apache Kafka by leveraging Swift's new concurrency features. This package wraps the native librdkafka library.

Adding Kafka as a Dependency

To use the Kafka library in a SwiftPM project, add the following line to the dependencies in your Package.swift file:

.package(url: "https://github.com/swift-server/swift-kafka-client", branch: "main")

Include "Kafka" as a dependency for your executable target:

.target(name: "<target>", dependencies: [
    .product(name: "Kafka", package: "swift-kafka-client"),

Finally, add import Kafka to your source code.


Kafka should be used within a Swift Service Lifecycle ServiceGroup for proper startup and shutdown handling. Both the KafkaProducer and the KafkaConsumer implement the Service protocol.

Producer API

The send(_:) method of KafkaProducer returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the events AsyncSequence. Each acknowledgement indicates that producing a message was successful or returns an error.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [brokerAddress])

let (producer, events) = try KafkaProducer.makeProducerWithEvents(
    configuration: configuration,
    logger: logger

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [producer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        try await serviceGroup.run()

    // Task sending message and receiving events
    group.addTask {
        let messageID = try producer.send(
                topic: "topic-name",
                value: "Hello, World!"

        for await event in events {
            switch event {
            case .deliveryReports(let deliveryReports):
                // Check what messages the delivery reports belong to
                break // Ignore any other events

Consumer API

After initializing the KafkaConsumer with a topic-partition pair to read from, messages can be consumed using the messages AsyncSequence.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .partition(
        KafkaPartition(rawValue: 0),
        topic: "topic-name"
    bootstrapBrokerAddresses: [brokerAddress]

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        try await serviceGroup.run()

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message

Consumer Groups

Kafka also allows users to subscribe to an array of topics as part of a consumer group.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
    bootstrapBrokerAddresses: [brokerAddress]

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        try await serviceGroup.run()

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message

Manual commits

By default, the KafkaConsumer automatically commits message offsets after receiving the corresponding message. However, we allow users to disable this setting and commit message offsets manually.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
var configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
    bootstrapBrokerAddresses: [brokerAddress]
configuration.isAutoCommitEnabled = false

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        try await serviceGroup.run()

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message
            // ...
            try await consumer.commitSync(message)

Security Mechanisms

Both the KafkaProducer and the KafkaConsumer can be configured to use different security mechanisms.


var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .plaintext


var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .tls()


let kerberosConfiguration = KafkaConfiguration.SASLMechanism.KerberosConfiguration(
    keytab: "KEYTAB_FILE"

var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslPlaintext(
    mechanism: .gssapi(kerberosConfiguration: kerberosConfiguration)


let saslMechanism = KafkaConfiguration.SASLMechanism.scramSHA256(
    username: "USERNAME",
    password: "PASSWORD"

var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslTLS(
    saslMechanism: saslMechanism


The Package depends on the librdkafka library, which is included as a git submodule. It has source files that are excluded in Package.swift.

Development Setup

We provide a Docker environment for this package. This will automatically start a local Kafka server and run the package tests.

docker-compose -f docker/docker-compose.yaml run test