Branch | Status |
---|---|
master | |
dev |
This repository contains Kafka binding extensions for the Azure WebJobs SDK. The communication with Kafka is based on library Confluent.Kafka.
Please find samples here
DISCLAIMER: This library is supported in the Premium Plan along with support for scaling as Go-Live - supported in Production with a SLA. It is also fully supported when using Azure Functions on Kubernetes where scaling will be handed by KEDA - scaling based on Kafka queue length. It is currently not supported on the Consumption plan (there will be no scale from zero) - this is something the Azure Functions team is still working on.
This library provides Quick Start for each language. General information of the samples, refer to:
Language | Description | Link | DevContainer |
---|---|---|---|
C# | C# precompiled sample with Visual Studio | Readme | No |
Java | Java 8 sample | Readme | Yes |
JavaScript | Node 12 sample | Readme | Yes |
PowerShell | PowerShell 6 Sample | Readme | No |
Python | Python 3.8 sample | Readme | Yes |
TypeScript | TypeScript sample (Node 12) | Readme | Yes |
The following direction is for C#. However, other languages work with C# extension. You can refer to the configuration parameters.
There are two binding types in this repo: trigger and output. To get started using the extension in a WebJob project add reference to Microsoft.Azure.WebJobs.Extensions.Kafka project and call AddKafka()
on the startup:
static async Task Main(string[] args)
{
var builder = new HostBuilder()
.UseEnvironment("Development")
.ConfigureWebJobs(b =>
{
b.AddKafka();
})
.ConfigureAppConfiguration(b =>
{
})
.ConfigureLogging((context, b) =>
{
b.SetMinimumLevel(LogLevel.Debug);
b.AddConsole();
})
.ConfigureServices(services =>
{
services.AddSingleton<Functions>();
})
.UseConsoleLifetime();
var host = builder.Build();
using (host)
{
await host.RunAsync();
}
}
public class Functions
{
const string Broker = "localhost:9092";
const string StringTopicWithOnePartition = "stringTopicOnePartition";
const string StringTopicWithTenPartitions = "stringTopicTenPartitions";
/// <summary>
/// Trigger for the topic
/// </summary>
public void MultiItemTriggerTenPartitions(
[KafkaTrigger(Broker, StringTopicWithTenPartitions, ConsumerGroup = "myConsumerGroup")] KafkaEventData<string> events,
ILogger log)
{
foreach (var kafkaEvent in events)
{
log.LogInformation(kafkaEvent.Value);
}
}
}
Trigger bindings are designed to consume messages from a Kafka topics.
public static void StringTopic(
[KafkaTrigger("BrokerList", "myTopic", ConsumerGroup = "myGroupId")] KafkaEventData<string>[] kafkaEvents,
ILogger logger)
{
foreach (var kafkaEvent in kafkaEvents)
logger.LogInformation(kafkaEvent.Value);
}
Kafka messages can be serialized in multiple formats. Currently the following formats are supported: string, Avro and Protobuf.
The Kafka trigger supports two methods for consuming Avro format:
Using Avro specific
ISpecificRecord
.KafkaTrigger
is added should have a value type of the class defined in previous step: KafkaEventData<MySpecificRecord>
public class UserRecord : ISpecificRecord
{
public const string SchemaText = @" {
""type"": ""record"",
""name"": ""UserRecord"",
""namespace"": ""KafkaFunctionSample"",
""fields"": [
{
""name"": ""registertime"",
""type"": ""long""
},
{
""name"": ""userid"",
""type"": ""string""
},
{
""name"": ""regionid"",
""type"": ""string""
},
{
""name"": ""gender"",
""type"": ""string""
}
]
}";
public static Schema _SCHEMA = Schema.Parse(SchemaText);
[JsonIgnore]
public virtual Schema Schema => _SCHEMA;
public long RegisterTime { get; set; }
public string UserID { get; set; }
public string RegionID { get; set; }
public string Gender { get; set; }
public virtual object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.RegisterTime;
case 1: return this.UserID;
case 2: return this.RegionID;
case 3: return this.Gender;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public virtual void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.RegisterTime = (long)fieldValue; break;
case 1: this.UserID = (string)fieldValue; break;
case 2: this.RegionID = (string)fieldValue; break;
case 3: this.Gender = (string)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
public static void User(
[KafkaTrigger("BrokerList", "users", ConsumerGroup = "myGroupId")] KafkaEventData<UserRecord>[] kafkaEvents,
ILogger logger)
{
foreach (var kafkaEvent in kafkaEvents)
{
var user = kafkaEvent.Value;
logger.LogInformation($"{JsonConvert.SerializeObject(kafkaEvent.Value)}");
}
}
Using Avro Generic
KafkaTrigger
attribute set the value of AvroSchema
to the string representation of it.KafkaEventData<GenericRecord>
.The sample function contains 1 consumer using avro generic. Check the class AvroGenericTriggers
public static class AvroGenericTriggers
{
const string PageViewsSchema = @"{
""type"": ""record"",
""name"": ""pageviews"",
""namespace"": ""ksql"",
""fields"": [
{
""name"": ""viewtime"",
""type"": ""long""
},
{
""name"": ""userid"",
""type"": ""string""
},
{
""name"": ""pageid"",
""type"": ""string""
}
]
}";
[FunctionName(nameof(PageViews))]
public static void PageViews(
[KafkaTrigger("BrokerList", "pageviews", AvroSchema = PageViewsSchema, ConsumerGroup = "myGroupId")] KafkaEventData<GenericRecord> kafkaEvent,
ILogger logger)
{
if (kafkaEvent.Value != null)
{
// Get the field values manually from genericRecord (kafkaEvent.Value)
}
}
Protobuf is supported in the trigger based on the Google.Protobuf
nuget package. To consume a topic that is using protobuf as serialization set the TValue generic argument to be of a type that implements Google.Protobuf.IMessage
. The sample producer has a producer for topic protoUser
(must be created). The sample function has a trigger handler for this topic in class ProtobufTriggers
.
public static class ProtobufTriggers
{
[FunctionName(nameof(ProtobufUser))]
public static void ProtobufUser(
[KafkaTrigger("BrokerList", "protoUser", ConsumerGroup = "myGroupId")] KafkaEventData<ProtoUser>[] kafkaEvents,
ILogger logger)
{
foreach (var kafkaEvent in kafkaEvents)
{
var user = kafkaEvent.Value;
logger.LogInformation($"{JsonConvert.SerializeObject(user)}");
}
}
}
Output binding are designed to produce messages to a Kafka topic. It supports different keys and values types. Avro and Protobuf serialisation are built-in.
[FunctionName("ProduceStringTopic")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
[Kafka("stringTopicTenPartitions", BrokerList = "LocalBroker")] IAsyncCollector<KafkaEventData<string>> events,
ILogger log)
{
var kafkaEvent = new KafkaEventData<string>()
{
Value = await new StreamReader(req.Body).ReadToEndAsync(),
};
await events.AddAsync(kafkaEvent);
return new OkResult();
}
To set a key value use KafkaEventData<string, string>
to define a key of type string (supported key types: int, long, string, byte[]).
To produce messages using Protobuf serialisation use a KafkaEventData<MyProtobufClass>
as message type. MyProtobufClass
must implements the IMessage interface.
For Avro provide a type that implements ISpecificRecord.
If nothing is defined the value will be of type byte[]
and no key will be set.
Customization of the Kafka extensions is available in the host file. As mentioned before, the interface to Kafka is built based on Confluent.Kafka library, therefore some of the configuration is just a bridge to the producer/consumer.
{
"version": "2.0",
"extensions": {
"kafka": {
"maxBatchSize": 100
}
}
}
Confluent.Kafka is based on librdkafka C library. Some of the configuration required by the library is exposed by the extension in this repository. The complete configuration for librdkafka can be found here.
Setting | Description | Default Value |
---|---|---|
MaxBatchSize | Maximum batch size when calling a Kafka trigger function | 64 |
SubscriberIntervalInSeconds | Defines the minimum frequency in which messages will be executed by function. Only if the message volume is less than MaxBatchSize / SubscriberIntervalInSeconds | 1 |
ExecutorChannelCapacity | Defines the channel capacity in which messages will be sent to functions. Once the capacity is reached the Kafka subscriber will pause until the function catches up | 1 |
ChannelFullRetryIntervalInMs | Defines the interval in milliseconds in which the subscriber should retry adding items to channel once it reaches the capacity | 50 |
The settings exposed here are targeted to more advanced users that want to customize how librdkafka works. Please check the librdkafka documentation for more information.
Setting | librdkafka property | Trigger or Output |
---|---|---|
ReconnectBackoffMs | reconnect.backoff.max.ms | Trigger |
ReconnectBackoffMaxMs | reconnect.backoff.max.ms | Trigger |
StatisticsIntervalMs | statistics.interval.ms | Trigger |
SessionTimeoutMs | session.timeout.ms | Trigger |
MaxPollIntervalMs | max.poll.interval.ms | Trigger |
QueuedMinMessages | queued.min.messages | Trigger |
QueuedMaxMessagesKbytes | queued.max.messages.kbytes | Trigger |
MaxPartitionFetchBytes | max.partition.fetch.bytes | Trigger |
FetchMaxBytes | fetch.max.bytes | Trigger |
AutoCommitIntervalMs | auto.commit.interval.ms | Trigger |
AutoOffsetReset | auto.offset.reset | Trigger |
LibkafkaDebug | debug | Both |
MetadataMaxAgeMs | metadata.max.age.ms | Both |
SocketKeepaliveEnable | socket.keepalive.enable | Both |
LingerMs | linger.ms | Output |
NOTE: MetadataMaxAgeMs
default is 180000
SocketKeepaliveEnable
default is true
otherwise, the default value is the same as the Configuration properties. The reason of the default settings, refer to this issue.
NOTE: AutoOffsetReset
default is Earliest. Allowed Values are Earliest
and Latest
.
If you are missing an configuration setting please create an issue and describe why you need it.
Both, trigger and output, can connect to a secure Kafka broker. The following attribute properties are available to establish a secure connection:
Setting | librdkafka property | Description |
---|---|---|
AuthenticationMode | sasl.mechanism | SASL mechanism to use for authentication |
Username | sasl.username | SASL username for use with the PLAIN and SASL-SCRAM |
Password | sasl.password | SASL password for use with the PLAIN and SASL-SCRAM |
Protocol | security.protocol | Security protocol used to communicate with brokers |
SslKeyLocation | ssl.key.location | Path to client's private key (PEM) used for authentication |
SslKeyPassword | ssl.key.password | Password for client's certificate |
SslCertificateLocation | ssl.certificate.location | Path to client's certificate |
SslCaLocation | ssl.ca.location | Path to CA certificate file for verifying the broker's certificate |
OAuthBearerMethod | sasl.oauthbearer | OAuth bearer method. Only 'default' or 'oidc'. AuthenticationMode must be set to OAuthBearer |
OAuthBearerClientId | sasl.oauthbearer.client.id | OIDC ClientId |
OAuthBearerClientSecret | sasl.oauthbearer.client.secret | OIDC ClientSecret |
OAuthBearerScope | sasl.oauthbearer.scope | OIDC Scope |
OAuthBearerTokenEndpointUrl | sasl.oauthbearer.token.endpoint.url | Token endpoint URL |
OAuthBearerExtensions | sasl.oauthbearer.extensions | Comma separated key/value pair required by Confluent Kafka |
Username and password should reference a Azure function configuration variable and not be hardcoded.
For the non-C# languages, you can specify cardinality for choosing if the KafkaTrigger is executed in batch. |
Setting | Description | Option |
---|---|---|---|
cardinality | Set to many in order to enable batching. If omitted or set to one, a single message is passed to the function. For Java functions, if you set "MANY", you need to set a dataType . |
"ONE", "MANY" | |
dataType | For java functions, the type of the deserialize a kafka event. It requires when you use cardinality = "MANY" | "string", "binary" |
Currently when running a function in a Linux Premium plan environment there will be an error indicating that we could not load the librdkafka library. To address the problem, at least for now, please add the setting below. It will include the extension location as one of the paths where libraries are searched. We are working on avoiding this setting in future releases.
Setting | Value | Description |
---|---|---|
LD_LIBRARY_PATH | /home/site/wwwroot/bin/runtimes/linux-x64/native | Librakafka library path |
For samples take a look at the samples folder.
Connecting to a managed Kafka cluster as the one provided by Confluent in Azure requires a few additional steps:
public static class ConfluentCloudTrigger
{
[FunctionName(nameof(ConfluentCloudStringTrigger))]
public static void ConfluentCloudStringTrigger(
[KafkaTrigger("BootstrapServer", "my-topic",
ConsumerGroup = "azfunc",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
Username = "ConfluentCloudUsername",
Password = "ConfluentCloudPassword")]
KafkaEventData<string> kafkaEvent,
ILogger logger)
{
logger.LogInformation(kafkaEvent.Value.ToString());
}
}
This repo includes unit and end to end tests. End to end tests require a Kafka instance. A quick way to provide one is to use the Kafka quick start example mentioned previously or use a simpler single node docker-compose solution (also based on Confluent Docker images):
Getting simple single node Kafka running:
docker-compose -f ./test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/kafka-singlenode-compose.yaml up -d
To shutdown the single node Kafka:
docker-compose -f ./test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/kafka-singlenode-compose.yaml down
By default end to end tests will try to connect to Kafka on localhost:9092
. If your Kafka broker is located in a different location create a local.appsettings.tests.json
file in folder ./test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/
overwritting the value of LocalBroker setting like the example below:
{
"LocalBroker": "location-of-your-kafka-broker:9092"
}
Handling errors in Azure Functions is important to avoid lost data, missed events, and to monitor the health of your application. It's also important to understand the retry behaviors of event-based triggers.
Kafka Extensions supports the Function Level retries, it is evaluated when a trigger function raises an uncaught exception. As a best practice, you should catch all exceptions in your code and rethrow any errors that you want to result in a retry.
There are two retry strategies supported by policy that you can configure :-
A specified amount of time is allowed to elapse between each retry.
The first retry waits for the minimum delay. On subsequent retries, time is added exponentially to the initial duration for each retry, until the maximum delay is reached. Exponential back-off adds some small randomization to delays to stagger retries in high-throughput scenarios.
For more info please check official doc