thatdot / quine

Quine • a streaming graph • https://quine.io • Discord: https://discord.gg/GMhd8TE4MR
https://quine.io
Other
295 stars 39 forks source link

Add ability to use kafka ingest that is secured with Plain SASL #23

Closed charliemenke closed 1 year ago

charliemenke commented 1 year ago

Description

Added additional configurations that allows users to use Kafka that is secured with SASL (PLAIN) as an ingest stream. Currently only non secured kafka topics are able to be configured in the recipe.yaml file.

Here is an example of how you would use the new configuration options. saslAuthentication is optional and does not need to be set, so existing flow of unsecured kafka brokers should work as it did before.

ingestStreams:
  - type: KafkaIngest
    topics:
      - upserts
    bootstrapServers: <server>
    saslAuthentication:
      type: Plain
      jaasConfig: org.apache.kafka.common.security.plain.PlainLoginModule required username="<user>" password="<pass>";
      saslMechanism: PLAIN
    securityProtocol: SASL_SSL

Type of change

How Has This Been Tested?

I am new to this service so I have been testing against Kafka brokers secured with Plain SASL with a very simple recipe configuration and has been working so far, would love input if anyone has any.

Checklist:

emanb29 commented 1 year ago

Thanks for the contribution, this is a nice feature addition. sbt fixall should clean up the CI failures and automatically rewrite any formatting / imports to be consistent with the rest of the codebase.

charliemenke commented 1 year ago

Ahh thanks @emanb29, I was forgetting to sbt fixall. Thanks for the tip, just pushed reformatted changes.

If it does come up in the future that more Kafka SASL authentications are needed like GSSAPI, OAUTHBEARER, SCRAM, etc. it might be worth exploring a more generic way of adding the relevant kakfa properties through the .withProperties because as it stands a separate match statement would be need if, for example, we also added a KafkaSaslAuthentication.GSSAPI class which could become overly messy.

LeifW commented 1 year ago

Thanks for the contribution! I was kind of thinking whether or not to use SSL encryption for the transport, and what auth mechanism to use are somewhat orthogonal (unless you're going to use an SSL cert as your auth mechanism), and was thinking perhaps rather than having securityProtocol and saslAuthentication settings that more or less map directly to the underlying Kafka settings (e.g. requiring the former to be set to SASL_SSL (or SASL_PLAINTEXT, which we don't currently expose) if you're going to set the latter to PLAIN) might be more confusing than it needs to be. Kafka's setting on this are already confusing - if we're going to wrap this, we could do so in a way that perhaps makes the options easier to understand and separates orthogonal concerns. e.g. perhaps something like:

transport: PLAINTEXT | SSL
authentication: Option[SSL | SASL]

where SASL is futher broken out into things like Plain, OAuth, Kerberos, etc (or those options could be flattened out into the top-level, so they're siblings of SSL)

We could also just expose a raw Map[String, String] to allow end-users to directly specify whatever Kafka settings they want, to be passed directly to the Kafka client.

charliemenke commented 1 year ago

Thanks for the input @LeifW . I agree with a lot of what you said about trying to handle the complexities of kafka properties. I think it comes down to a few things:

  1. What kind of control/responsibility does Quine want to have over what sort of configuration is passed for a kafka ingest steam? a. The PROS of being very explicit in what properties Quine supports is that there should be no question that Quine supports GSSAPI SASL authentication if we have explicit fields for it in a recipe. b. The CONS of being explicit that there would be no doubt that a user will have some obscure kafka property that they need to set for their use case to be satisfied, and we dont have a wrapper around it

Like you mentioned above:

We could also just expose a raw Map[String, String] to allow end-users to directly specify whatever Kafka settings they want, to be passed directly to the Kafka client.

If you go the route of less control/responsibility, you lose out on being able to directly express what sort of kafka properties Quine supports (Don't know if this is an issue as every kafka client property I can think of should just be supported out of the box (akka)), but nonetheless, you wouldn't get the nice kafka config template.

In my opinion, most products/services that I have used allow you to directly pass whatever kafka properties you want, even if those properties don't make sense, and I sort of like it that way? I feel like we don't need to overly handhold or dictate what properties a user of Quine should have access to when it comes to Kafka.

charliemenke commented 1 year ago

If its worth anything, my first thought when asked for help connecting Quine to a SASL secured kafka broker was that there must be some place to just pass kafka properties directly. The reason why my PR does not reflect that is because I didn't want to totally go against the existing pattern of how we configure a kafka consumer that auto commits, explicitly commits, or doesnt commit. But I think the conversation is worth having.

LeifW commented 1 year ago

Yeah, that's kind of my thinking - that people using Kafka in the first place would know how to use Kafka. Those settings are pretty standard for any use of Kafka, and are what are documented elsewhere. And it seems like there's a huge number of them that people might want to pass. As I see it, the pro for wrapping these settings is that we can make things a little more user-friendly for people unfamiliar with Kafka settings (e.g. "say transport = SSL and give us a username and password, and we'll configure the rest"), and we get some validation in that we can validate strings like "SASL_SSL" as a valid option, and if they pass "FOO" we can give a nicer error messaging explaining how that's not valid and what the valid options are, whereas the Kafka client might have some more inscrutable behavior were you to pass that directly... I consider your expectations a vote in favor of just exposing raw Kafka settings - though I suppose you're a biased sample - if you didn't know Kafka in the first place I'd guess you wouldn't be submitting this PR?

We could of course offer both - e.g. wrapped settings for ease of use to people unfamiliar w/ Kafka settings, and raw settings for those who are. But then we have to decide on a precedence or check for conflicts between those, and I could feel it could be confusing for users: "which do I set? both?". If we do just expose that Map[String, String], we could remove our securityProtocol: KafkaSecurityProtocol and autoOffsetReset: KafkaAutoOffsetReset = KafkaAutoOffsetReset.Latest settings, which are pretty much just a direct pass-through to Kafka setting. That would leave us still using Kafka settings for:

ENABLE_AUTO_COMMIT_CONFIG -> "true",
AUTO_COMMIT_INTERVAL_MS_CONFIG -> commitIntervalMs.toString

which is exposed as one of the options in offsetCommitting: Option[KafkaOffsetCommitting]. I'm thinking we either leave that, at least for now - as the other option for that setting (explicit offset committing) requires us to do a bunch of code, and that seems exclusive with auto-commit (why would you want both?). Or we could also remove auto-commit as an option from KafkaOffsetCommitting, and say to just set the Kafka properties if you want to use it. Actually, maybe we should do that anyways - it doesn't seem to work properly in Alpakka Kafka (which is maybe why they disabled it by default?) - I've seen it commit offsets beyond what is consumed in my testing.

LeifW commented 1 year ago

FWIW, the Alpakka Kafka client we're using already does give you a way to specify arbitrary properties to be passed directly to the underlying Kafka client, via the "kafka-clients" Typesafe Config key mentioned on https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#settings So you could specify these kinds of settings by passing those either via a typesafe config file or setting them as system properties, e.g. passing -Dakka.kafka.consumer.kafka-clients.security.protocol=SASL_SSL to the java invocation on startup.

That's not so convenient to use from a recipe, though, and restricts you to using the same settings for all Kafka ingest streams, so it seems we should just add a Map[String, String] of Kafka settings to our Kafka ingest definition.

charliemenke commented 1 year ago

I consider your expectations a vote in favor of just exposing raw Kafka settings - though I suppose you're a biased sample - if you didn't know Kafka in the first place I'd guess you wouldn't be submitting this PR?

I am currently leaning that way, and yes you probably could consider me a biased sample :) .

So seems like the best way forward is to move to a more open-ended Map[String, String] input that can be passed by a user through a recipe and also remove:

KafkaOffsetCommitting.ExplicitCommit will remain as a template option because of the LOE to implement for now.

I think we can also avoid most issues that would come from new users that do not have a great understanding of kafka by either defaulting some values in the exampleIngestStreamInfo (if i am understanding how that val gets used).

If that all sounds good I can take some time to make those changes. Ill also see how obtuse the error information is when supplying bogus kafka properties.

LeifW commented 1 year ago

Actually, on second thought - maybe we should leave our Kafka setting wrapping KafkaSecurityProtocol and KafkaAutoOffsetReset (feel free to add your SASL_SSL option to the former) in for now, just to avoid breaking people currently relying on those, and we could mark them as deprecated (maybe at some point in the future), with instructions to switch to using raw Kafka settings for those?

charliemenke commented 1 year ago

Ok, got some time to work on this. Implemented best I could from what we discussed. Below is an example of how we can pass arbitrary Kafka properties in the recipe.

ingestStreams:
  - type: KafkaIngest
    topics:
      - upsert
    bootstrapServers: <server>
    securityProtocol: SASL_SSL
    kafkaProperties:
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="<user" password="<pass>";
      sasl.mechanism: PLAIN

Note: I have it set up so that the existing templated properties like KafkaSecurityProtocol take precidence over what is set in kafkaProperties. So, for example, if I do something like this ->

ingestStreams:
  - type: KafkaIngest
    topics:
      - <topic>
    bootstrapServers: <server>
    securityProtocol: SASL_SSL
    kafkaProperties:
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="<user" password="<pass>";
      sasl.mechanism: PLAIN
      security.protocol: PLAINTEXT

under the hood, security.protocol will be set to SASL_SSL.

Additionally, to me it seems that if you do pass a incorrect value for kafkaProperties, the underlining Kafka client will error out with decent error info, albeit a long stacktrack

charliemenke commented 1 year ago

Merged changes to use default Map[String, String] at the IngestRoutes.scala level

LeifW commented 1 year ago

Merged into our upstream internal monorepo and pushed out back to here in 24638fbc026087e38ff3ce9a05b853d04dbede1e. Takk så mye!