A framework around kafka.js to transparently use Schema Registry and create an application that consumes, produces, and reacts to different kafka topics. Supports consumption in batches or in parallel. Statically define and verify the schemas / message types in TypeScript
Packages:
@ovotech/avro-ts
yarn add @ovotech/castle
import { createCastle, produce, consumeEachMessage } from '@ovotech/castle';
import { Event, EventSchema } from './avro';
// Define producers as pure functions
// With statically setting the typescript types and avro schemas
const mySender = produce<Event>({ topic: 'my-topic-1', schema: EventSchema });
// Define consumers as pure functions
// With statically setting which types it will accept
const eachEvent = consumeEachMessage<Event>(async ({ message }) => {
console.log(message.value);
});
const main = async () => {
const castle = createCastle({
schemaRegistry: { uri: 'http://localhost:8081' },
kafka: { brokers: ['localhost:29092'] },
consumers: [{ topic: 'my-topic-1', groupId: 'my-group-1', eachMessage: eachEvent }],
});
// Start all consumers and producers
await castle.start();
await mySender(castle.producer, [{ value: { field1: 'my-string' } }]);
};
main();
You can connect to multiple topics, each of which is will have its own independent consumer group. You can also pass SSL and SASL authentication, as well as authenticating the schema registry. More about castle package in packages/castle/README.md
yarn global add @ovotech/castle-cli
You can read about the various commands available with
castle --help
There are 4 main subcommand groups:
You can configure access to the kafka to your server named uat
, if you have the tls key, cert and certificate authority as text files. The schema registry is set as a url, any username and password can be set with a url provided auth like: http://user:pass@localhost:8081
. The config file is saved to $HOME/.castle-cli/
folder.
castle config set uat --kafka-broker localhost:3203 --key private.pem --ca ca.pem --cert cert.pem --schema-registry http://localhost:8081
After that is set you can use it in any command by stating --config uat
(or -C uat
):
castle schema search my-topic --config uat
castle schema show my-topic-full-name --config uat
castle topic search my-topic --config uat
castle topic consume my-topic-full-name --config uat
Using it without a specified config would connect to the default local kafka server.
You can run the tests with:
yarn test
Style is maintained with prettier and eslint
yarn lint
Deployment is preferment by lerna automatically on merge / push to main, but you'll need to bump the package version numbers yourself. Only updated packages with newer versions will be pushed to the npm registry.
Have a bug? File an issue with a simple example that reproduces this so we can take a look & confirm.
Want to make a change? Submit a PR, explain why it's useful, and make sure you've updated the docs (this file) and the tests (see test folder).
This project is licensed under Apache 2 - see the LICENSE file for details
It's Kafka's greatest work :)