F# friendly wrapper for Confluent.Kafka
, with minimal dependencies or additional abstractions (but see related repos).
FsKafka
wraps Confluent.Kafka
to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation. Depends on
Confluent.Kafka [1.9.2]
,
librdkafka.redist [1.9.2]
(pinned to ensure we use a tested pairing),
Serilog
(but no specific Serilog sinks, i.e. you configure to emit to NLog
etc) and
Newtonsoft.Json
(used internally to parse Broker-provided Statistics for logging purposes).
FsKafka is delivered as a Nuget package targeting netstandard2.0
and F# >= 4.5.
dotnet add package FsKafka
or for paket
, use:
paket add FsKafka
dotnet new
templates repo's proProjector
template (in -k
mode) for example producer logic using the BatchedProducer
and the proConsumer
template for examples of using the BatchedConsumer
from FsKafka
, alongside the extended modes in Propulsion
.Equinox.Cosmos
and/or Equinox.EventStore
.Contributions of all sizes are warmly welcomed. See our contribution guide
The best place to start, sample-wise is from the dotnet new
templates stored in a dedicated repo.
The templates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.
NB The tests are reliant on a TEST_KAFKA_BROKER
environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run writes to a guid-named topic)
export TEST_KAFKA_BROKER="<server>:9092"
dotnet build build.proj -v n
breaking off polling
... resuming polling
spam?The BatchedConsumer
implementation tries to give clear feedback as to when reading is not keeping up, for diagnostic purposes. As of #32, such messages are tagged with the type FsKafka.Core.InFlightMessageCounter
, and as such can be silenced by including the following in one's LoggerConfiguration()
:
.MinimumLevel.Override(FsKafka.Core.Constants.messageCounterSourceContext, Serilog.Events.LogEventLevel.Warning)
This code results from building out an end-to-end batteries-included set of libraries and templates as part of the Equinox project.
Equinox places some key constraints on all components and dependencies:-
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let batching = Batching.Linger (System.TimeSpan.FromMilliseconds 10.)
let producerConfig = KafkaProducerConfig.Create("MyClientId", "kafka:9092", Acks.All, batching)
let producer = KafkaProducer.Create(log, producerConfig, "MyTopic")
let key = Guid.NewGuid().ToString()
let deliveryResult = producer.ProduceAsync(key, "Hello World!") |> Async.RunSynchronously
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async {
for m in messages do
printfn "Received: %s" m.Message.Value
}
let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)
async {
use consumer = BatchedConsumer.Start(log, cfg, handler)
return! consumer.AwaitShutdown()
} |> Async.RunSynchronously
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async {
for m in messages do
printfn "Received: %s" m.Message.Value
}
let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)
async {
use consumer = BatchedConsumer.Start(log, cfg, handler)
use _ = KafkaMonitor(log).Start(consumer.Inner, cfg.Inner.GroupId)
return! consumer.AwaitShutdown()
} |> Async.RunSynchronously
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async {
for m in messages do
printfn "Received: %s" m.Message.Value
}
let config topic = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", [topic], "MyGroupId", AutoOffsetReset.Earliest)
let cfg1, cfg2 = config "MyTopicA", config "MyTopicB"
async {
use consumer1 = BatchedConsumer.Start(log, cfg1, handler)
use consumer2 = BatchedConsumer.Start(log, cfg2, handler)
use _ = KafkaMonitor(log).Start(consumer1.Inner, cfg1.Inner.GroupId)
use _ = KafkaMonitor(log).Start(consumer2.Inner, cfg2.Inner.GroupId)
return! Async.Parallel [consumer1.AwaitWithStopOnCancellation(); consumer2.AwaitWithStopOnCancellation()]
} |> Async.RunSynchronously