SeaQL / sea-streamer

🌊 A real-time stream processing toolkit for Rust
https://www.sea-ql.org/SeaStreamer/
Apache License 2.0
263 stars 9 forks source link

Add a MKSTREAM option when creating Redis consumer groups. #4

Closed hgzimmerman closed 1 year ago

hgzimmerman commented 1 year ago

Adds a boolean option to control if the MKSTREAM is present in XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] when creating Redis consumer groups. This happens when the consumer performs its first read.

PR Info

New Features

Adds the ability for Redis consumers to create consumer groups without the stream already existing. This is useful for us because we may start a producer and a consumer separately when doing local development, and would prefer to avoid making sure that the producer writes first before we can run the consumer the first time.

I have tested this locally with a single Redis node/shard and can confirm that it works in that scenario. I cannot speak on if this causes problems when connecting to multiple nodes, but I don't immediately see a reason why that would be the case.

Bug Fixes

Breaking Changes

Changes

I am unsure if you would prefer that an enum be used for the config option instead of a bool, or if the comments need further workshoping. I couldn't think of good names for the enum variants, so I opted for using a bool. Any suggestions welcome.


Thanks again for developing this project! 0.2.0 released right when we needed a flexible streaming library and incorporating it into our product was super easy.

tyt2y3 commented 1 year ago

Oh wow! Thank you for the first PR!

Thanks again for developing this project! 0.2.0 released right when we needed a flexible streaming library and incorporating it into our product was super easy.

Just to know a little bit more. Do you use the socket API or the vanilla API?

hgzimmerman commented 1 year ago

We intend to use Kafka in production against an Azure EventHub, but for local development and CI runs, we already have a Redis instance and don't want to go to the trouble of spinning up Kafka for every developer unless we need to debug some specific Kafka behavior. Stdio doesn't seem to work for our needs at first glance, because we already write logs to stdout, and don't want to change that to stderr at the moment, and I'm unsure if that interface could be wrangled to work in the context of unit tests. Its possible that files, or heck, just a normal flume channel wrapped in your interface (I guess providing the sender and receiver in the options) would work as well for unit tests, but Redis exists and doesn't seem to have any limitations aside from the one addressed here.

Having the main sea-streamer crate support both under an API that mostly acts the same is enough for us to evaluate that our application logic does the correct things in unit tests, while relying on test/staging environments to catch issues with the specific streaming behavior with EventHub.

tyt2y3 commented 1 year ago

because we already write logs to stdout, and don't want to change that to stderr at the moment

Fair point. I am developing the file backend atm.

Having the main sea-streamer crate support both under an API that mostly acts the same is enough for us to evaluate that our application logic does the correct things in unit tests, while relying on test/staging environments to catch issues with the specific streaming behavior with EventHub.

Thank you for sharing your use case! It's exactly a scenario I have in mind.

Never mind the Kafka errors, we will take care of those.

hgzimmerman commented 1 year ago

A file backend would be excellent for our needs.


The only other pain-point we had was setting up SASL authentication for Kafka, which might deserve a separate issue.

We had to write the following:

            options.set_kafka_connect_options(|options| {
                options.add_custom_option("bootstrap.servers", auth.bootstrap_host());
                options.add_custom_option("security.protocol", "SASL_SSL");
                options.add_custom_option("sasl.mechanism", "PLAIN");
                options.add_custom_option("sasl.username", "$ConnectionString");
                options.add_custom_option("sasl.password", connection_string);
            });

and specify

rdkafka = {version = "0.29.0", features = ["gssapi-vendored", "ssl-vendored"]}

in our Cargo.toml to get it to connect to our eventhub.

This would be nice to have as a predefined option function & feature-flag for Kafka. Although I do not have an understanding if there is an alternate/preferred auth mechanism for normal Kafka, and this is just some EventHub-specific thing, so I'm not sure how generally applicable something like this is, and am really only in a position to contribute the solution that works for EventHub.

tyt2y3 commented 1 year ago

I think it's a good idea to add a method set_sasl_auth with a SaslAuthOptions struct.

The reason I have not done so is that there are many auth modes for Kafka such that I am confused.

We can pass-through those feature flags too.

tyt2y3 commented 1 year ago

https://github.com/SeaQL/sea-streamer/releases/tag/0.2.1

tyt2y3 commented 1 year ago

@hgzimmerman I just released 0.3 with an initial implementation for the File backend I've been using in an internal project. Let me know what you think!