:toc: :toclevels: 2
= kafkactl
A command-line interface for interaction with Apache Kafka
image:https://github.com/deviceinsight/kafkactl/workflows/Lint%20%2F%20Test%20%2F%20IT/badge.svg?branch=main[Build Status,link=https://github.com/deviceinsight/kafkactl/actions] | image:https://img.shields.io/badge/command-docs-blue.svg[command docs,link=https://deviceinsight.github.io/kafkactl/]
== Features
== Installation
You can install the pre-compiled binary or compile from source.
=== Install the pre-compiled binary
homebrew:
brew tap deviceinsight/packages
brew install deviceinsight/packages/kafkactl
deb/rpm:
Download the .deb or .rpm from the https://github.com/deviceinsight/kafkactl/releases[releases page] and install with dpkg -i and rpm -i respectively.
yay (AUR)
There's a kafkactl https://aur.archlinux.org/packages/kafkactl/[AUR package] available for Arch. Install it with your AUR helper of choice (e.g. https://github.com/Jguer/yay[yay]):
snap:
manually:
Download the pre-compiled binaries from the https://github.com/deviceinsight/kafkactl/releases[releases page] and copy to the desired location.
=== Compiling from source
NOTE: make sure that kafkactl
is on PATH otherwise auto-completion won't work.
== Configuration
If no config file is found, a default config is generated in $HOME/.config/kafkactl/config.yml
.
This configuration is suitable to get started with a single node cluster on a local machine.
=== Create a config file
Create $HOME/.config/kafkactl/config.yml
with a definition of contexts that should be available
contexts: default: brokers:
remote-cluster003:9092
tls: enabled: true ca: my-ca cert: my-cert certKey: my-key
insecure: false
sasl: enabled: true username: admin password: admin
mechanism: oauth
tokenprovider:
plugin: azure
# optional: additional options passed to the plugin
options:
key: value
kubernetes: enabled: false binary: kubectl #optional kubeConfig: ~/.kube/config #optional kubeContext: my-cluster namespace: my-namespace
-scratch
or -ubuntu
depending on command)image: private.registry.com/deviceinsight/kafkactl
imagePullSecret: registry-secret
serviceAccount: my-service-account
keepPod: true
labels: key: value
annotations: key: value
nodeSelector: key: value
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "<key>"
operator: "<operator>"
values: [ "<value>" ]
tolerations:
clientID: my-client-id
kafkaVersion: 1.1.1
requestTimeout: 10s
avro: schemaRegistry: localhost:8081
jsonCodec: avro
requestTimeout: 10s
username: admin password: admin
tls: enabled: true ca: my-ca cert: my-cert certKey: my-key
insecure: false
protobuf: importPaths:
producer:
partitioner: "hash"
requiredAcks: "WaitForAll"
maxMessageBytes: 1000000
consumer:
isolationLevel: ReadUncommitted
[#_config_file_read_order] The config file location is resolved by
. checking for a provided commandline argument: --config-file=$PATH_TO_CONFIG
. evaluating the environment variable: export KAFKA_CTL_CONFIG=$PATH_TO_CONFIG
. checking for a project config file in the working directory (see <<_project_config_files>>)
. as default the config file is looked up from one of the following locations:
$HOME/.config/kafkactl/config.yml
$HOME/.kafkactl/config.yml
$APPDATA/kafkactl/config.yml
$SNAP_REAL_HOME/.kafkactl/config.yml
$SNAP_DATA/kafkactl/config.yml
/etc/kafkactl/config.yml
[#_project_config_files] ==== Project config files
In addition to the config file locations above, kafkactl allows to create a config file on project level. A project config file is meant to be placed at the root level of a git repo and declares the kafka configuration for this repository/project.
In order to identify the config file as belonging to kafkactl the following names can be used:
kafkactl.yml
.kafkactl.yml
During initialization kafkactl starts from the current working directory and recursively looks for a project level
config file. The recursive lookup ends at the boundary of a git repository (i.e. if a .git
folder is found).
This way, kafkactl can be used conveniently anywhere in the git repository.
Additionally, project config files have a special feature to use them read-only. Topically, if you configure more than
one context in a config file, and you switch the context with kafkactl config use-context xy
this will lead to a write
operation on the config file to save the current context.
In order to avoid this for project config files, one can just omit the current-context
parameter from the config file.
In this case kafkactl will delegate read and write operations for the current context to the next configuration file
according to <<_config_file_read_order, the config file read order>>.
=== Auto completion
==== bash
NOTE: if you installed via snap, bash completion should work automatically.
To load completions for each session, execute once: Linux:
MacOS:
==== zsh
If shell completion is not already enabled in your environment, you will need to enable it. You can execute the following once:
To load completions for each session, execute once:
You will need to start a new shell for this setup to take effect.
==== Fish
To load completions for each session, execute once:
== Documentation
The documentation for all available commands can be found here:
image::https://img.shields.io/badge/command-docs-blue.svg[command docs,link=https://deviceinsight.github.io/kafkactl/]
== Running in docker
Assuming your Kafka brokers are accessible under kafka1:9092
and kafka2:9092
, you can list topics by running:
If a more elaborate config is needed, you can mount it as a volume:
== Running in Kubernetes
:construction: This feature is still experimental.
If your kafka cluster is not directly accessible from your machine, but it is accessible from a kubernetes cluster
which in turn is accessible via kubectl
from your machine you can configure kubernetes support:
contexts: kafka-cluster: brokers:
Instead of directly talking to kafka brokers a kafkactl docker image is deployed as a pod into the kubernetes cluster, and the defined namespace. Standard-Input and Standard-Output are then wired between the pod and your shell running kafkactl.
There are two options:
. You can run kafkactl attach
with your kubernetes cluster configured. This will use kubectl run
to create a pod
in the configured kubeContext/namespace which runs an image of kafkactl and gives you a bash
into the container.
Standard-in is piped to the pod and standard-out, standard-err directly to your shell. You even get auto-completion.
. You can run any other kafkactl command with your kubernetes cluster configured. Instead of directly
querying the cluster a pod is deployed, and input/output are wired between pod and your shell.
The names of the brokers have to match the service names used to access kafka in your cluster. A command like this should give you this information:
:bulb: The first option takes a bit longer to start up since an Ubuntu based docker image is used in order to have a bash available. The second option uses a docker image build from scratch and should therefore be quicker. Which option is more suitable, will depend on your use-case.
:warning: currently kafkactl must NOT be installed via snap in order for the kubernetes feature to work. The snap runs in a sandbox and is therefore unable to access the kubectl
binary.
== Configuration via environment variables
Every key in the config.yml
can be overwritten via environment variables. The corresponding environment variable
for a key can be found by applying the following rules:
. replace .
by _
. replace -
by _
. write the key name in ALL CAPS
e.g. the key contexts.default.tls.certKey
has the corresponding environment variable CONTEXTS_DEFAULT_TLS_CERTKEY
.
NOTE: an array variable can be written using whitespace as delimiter. For example BROKERS
can be provided as
BROKERS="broker1:9092 broker2:9092 broker3:9092"
.
If environment variables for the default
context should be set, the prefix CONTEXTS_DEFAULT_
can be omitted.
So, instead of CONTEXTS_DEFAULT_TLS_CERTKEY
one can also set TLS_CERTKEY
.
See root_test.go for more examples.
== Plugins
kafkactl supports plugins to cope with specifics when using Kafka-compatible clusters available from cloud providers such as Azure or AWS.
At the moment, plugins can only be used to implement a tokenProvider
for oauth authentication.
In the future, plugins might implement additional commands to query data or configuration which is not part of the Kafka-API. One example would be Eventhub consumer groups/offsets for Azure.
See the plugin documentation for additional documentation and usage examples.
Available plugins:
== Examples
=== Consuming messages
Consuming messages from a topic can be done with:
In order to consume starting from the oldest offset use:
The following example prints message key
and timestamp
as well as partition
and offset
in yaml
format:
To print partition in default output format use:
Headers of kafka messages can be printed with the parameter --print-headers
e.g.:
If one is only interested in the last n
messages this can be achieved by --tail
e.g.:
The consumer can be stopped when the latest offset is reached using --exit
parameter e.g.:
The consumer can compute the offset it starts from using a timestamp:
The from-timestamp
parameter supports different timestamp formats. It can either be a number representing the epoch milliseconds
or a string with a timestamp in one of the https://github.com/deviceinsight/kafkactl/blob/main/util/util.go#L10[supported date formats].
NOTE: --from-timestamp
is not designed to schedule the beginning of consumer's consumption. The offset corresponding to the timestamp is computed at the beginning of the process. So if you set it to a date in the future, the consumer will start from the latest offset.
The consumer can be stopped when the offset corresponding to a particular timestamp is reached:
The to-timestamp
parameter supports the same formats as from-timestamp
.
NOTE: --to-timestamp
is not designed to schedule the end of consumer's consumption. The offset corresponding to the timestamp is computed at the begininng of the process. So if you set it to a date in the future, the consumer will stop at the current latest offset.
The following example prints keys in hex and values in base64:
The consumer can convert protobuf messages to JSON in keys (optional) and values:
To join a consumer group and consume messages as a member of the group:
If you want to limit the number of messages that will be read, specify --max-messages
:
=== Producing messages
Producing messages can be done in multiple ways. If we want to produce a message with key='my-key'
,
value='my-value'
to the topic my-topic
this can be achieved with one of the following commands:
If we have a file containing messages where each line contains key
and value
separated by #
, the file can be
used as input to produce messages to topic my-topic
:
The same can be accomplished without piping the file to stdin with the --file
parameter:
If the messages in the input file need to be split by a different delimiter than \n
a custom line separator can be provided:
NOTE: if the file was generated with kafkactl consume --print-keys --print-timestamps my-topic
the produce
command is able to detect the message timestamp in the input and will ignore it.
It is also possible to produce messages in json format:
the number of messages produced per second can be controlled with the --rate
parameter:
It is also possible to specify the partition to insert the message:
Additionally, a different partitioning scheme can be used. When a key
is provided the default partitioner
uses the hash
of the key
to assign a partition. So the same key
will end up in the same partition:
kafkactl produce my-topic --key=my-key --value=my-value kafkactl produce my-topic --key=my-key --value=my-value kafkactl produce my-topic --key=my-key --value=my-value
Message headers can also be written:
The following example writes the key from base64 and value from hex:
You can control how many replica acknowledgements are needed for a response:
Producing null values (tombstone record) is also possible:
Producing protobuf message converted from JSON:
A more complex protobuf message converted from a multi-line JSON string can be produced using a file input with custom separators.
For example, if you have the following protobuf definition (complex.proto
):
syntax = "proto3";
import "google/protobuf/timestamp.proto";
message ComplexMessage { CustomerInfo customer_info = 1; DeviceInfo device_info = 2; }
message CustomerInfo { string customer_id = 1; string name = 2; }
And you have the following file (complex-msg.txt
) that contains the key and value of the message:
The command to produce the protobuf message using sample protobuf definition and input file would be:
=== Avro support
In order to enable avro support you just have to add the schema registry to your configuration:
==== Producing to an avro topic
kafkactl
will lookup the topic in the schema registry in order to determine if key or value needs to be avro encoded.
If producing with the latest schemaVersion
is sufficient, no additional configuration is needed an kafkactl
handles
this automatically.
If however one needs to produce an older schemaVersion
this can be achieved by providing the parameters keySchemaVersion
, valueSchemaVersion
.
===== Example
kafkactl create topic avro_topic
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"schema": "{\"type\": \"record\", \"name\": \"LongList\", \"fields\" : [{\"name\": \"next\", \"type\": [\"null\", \"LongList\"], \"default\": null}]}"}' \ http://localhost:8081/subjects/avro_topic-value/versions
kafkactl produce avro_topic --value {\"next\":{\"LongList\":{}}}
==== Consuming from an avro topic
As for producing kafkactl
will also lookup the topic in the schema registry to determine if key or value needs to be
decoded with an avro schema.
The consume
command handles this automatically and no configuration is needed.
An additional parameter print-schema
can be provided to display the schema used for decoding.
=== Protobuf support
kafkactl
can consume and produce protobuf-encoded messages. In order to enable protobuf serialization/deserialization
you should add flag --value-proto-type
and optionally --key-proto-type
(if keys encoded in protobuf format)
with type name. Protobuf-encoded messages are mapped with https://developers.google.com/protocol-buffers/docs/proto3#json[pbjson].
kafkactl
will search messages in following order:
. Protoset files specified in --protoset-file
flag
. Protoset files specified in context.protobuf.protosetFiles
config value
. Proto files specified in --proto-file
flag
. Proto files specified in context.protobuf.protoFiles
config value
Proto files may require some dependencies in import
sections. To specify additional lookup paths use
--proto-import-path
flag or context.protobuf.importPaths
config value.
If provided message types was not found kafkactl
will return error.
Note that if you want to use raw proto files protoc
installation don't need to be installed.
Also note that protoset files must be compiled with included imports:
==== Example
Assume you have following proto schema in kafkamsg.proto
:
syntax = "proto3";
import "google/protobuf/timestamp.proto";
message TopicMessage { google.protobuf.Timestamp produced_at = 1; int64 num = 2; }
"well-known" google/protobuf
types are included so no additional proto files needed.
To produce message run
or with protoset
To consume messages run
or with protoset
=== Create topics
The create topic
allows you to create one or multiple topics.
=== Altering topics
Using the alter topic
command allows you to change the partition count, replication factor and topic-level
configurations of an existing topic.
The partition count can be increased with:
The replication factor can be altered with:
:information_source: when altering replication factor, kafkactl tries to keep the number of replicas assigned to each
broker balanced. If you need more control over the assigned replicas use alter partition
directly.
The topic configs can be edited by supplying key value pairs as follows:
:bulb: use the flag --validate-only
to perform a dry-run without actually modifying the topic
=== Altering partitions
The assigned replicas of a partition can directly be altered with:
=== Clone topic
New topic may be created from existing topic as follows:
Source topic must exist, target topic must not exist.
kafkactl
clones partitions count, replication factor and config entries.
=== Consumer groups
In order to get a list of consumer groups the get consumer-groups
command can be used:
kafkactl get consumer-groups
kafkactl get consumer-groups --topic my-topic
To get detailed information about the consumer group use describe consumer-group
. If the parameter --partitions
is provided details will be printed for each partition otherwise the partitions are aggregated to the clients.
kafkactl describe consumer-group my-group
kafkactl describe consumer-group my-group --only-with-lag
kafkactl describe consumer-group my-group --topic my-topic
=== Delete Records from a topics
Command to be used to delete records from partition, which have an offset smaller than the provided offset.
=== Create consumer groups
A consumer-group can be created as follows:
kafkactl create consumer-group my-group --topic my-topic --oldest
kafkactl create consumer-group my-group --topic my-topic --newest
kafkactl create consumer-group my-group --topic my-topic --partition 5 --offset 100
=== Clone consumer group
A consumer group may be created as clone of another consumer group as follows:
Source group must exist and have committed offsets. Target group must not exist or don't have committed offsets.
kafkactl
clones topic assignment and partition offsets.
=== Reset consumer group offsets
in order to ensure the reset does what it is expected, per default only
the results are printed without actually executing it. Use the additional parameter --execute
to perform the reset.
kafkactl reset offset my-group --topic my-topic --oldest
kafkactl reset offset my-group --topic my-topic --newest
kafkactl reset offset my-group --topic my-topic --partition 5 --offset 100
kafkactl reset offset my-group --all-topics --newest
kafkactl reset offset my-group --topic my-topic-a --topic my-topic-b --oldest
kafkactl reset offset my-group --topic my-topic-a --to-datetime 2014-04-26T17:24:37.123Z
=== Delete consumer group offsets
In order to delete a consumer group offset use delete offset
kafkactl delete offset my-group --topic my-topic
=== Delete consumer groups
In order to delete a consumer group or a list of consumer groups use delete consumer-group
=== ACL Management
Available ACL operations are documented https://docs.confluent.io/platform/current/kafka/authorization.html#operations[here].
==== Create a new ACL
kafkactl create acl --topic my-topic --operation read --principal User:consumer --allow
kafkactl create acl --topic my-topic --operation write --host 1.2.3.4 --principal User:consumer --deny
kafkactl create acl --topic my-topic --operation read --operation describe --principal User:consumer --allow
==== List ACLs
kafkactl get acl
kafkactl get access-control-list
kafkactl get acl --topics
==== Delete ACLs
kafkactl delete acl --topics --operation read --pattern any
kafkactl delete acl --topics --operation any --pattern any
kafkactl delete acl --cluster --operation any --pattern any
=== Getting Brokers
To get the list of brokers of a kafka cluster use get brokers
=== Describe Broker
To view configs for a single broker use describe broker
== Development
In order to see linter errors before commit, add the following pre-commit hook:
=== Pull requests
PULL_REQUEST_ID=123 LOCAL_BRANCH_NAME=feature/abc git fetch origin pull/${PULL_REQUEST_ID}/head:${LOCAL_BRANCH_NAME} git checkout ${LOCAL_BRANCH_NAME}