robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Using faust with Azure EventHub for Kafka #483

Open therhaag opened 4 years ago

therhaag commented 4 years ago

Dear faust developers, thank you for creating this great library. I was wondering if there is some example on how to use faust with a Kafka-enabled instance of Azure EventHub (https://docs.microsoft.com/en-gb/azure/event-hubs/event-hubs-for-kafka-ecosystem-overview). It would be important to know whether this is supported as my team aims to migrate from a Kafka cluster to managed services on Azure.

jbooth-mastery commented 4 years ago

I'm not a faust developer but I am using Faust and Azure. We discarded eventhubs as a solution for us because of the time-limited retention. Since you can only hold events for X days, that breaks both using the confluent avro schema registry (which we're using) and Faust's tables (which store the changelog in a topic, so that topic needs to be key compacted, not time-limited).

therhaag commented 4 years ago

Hi Jonathan - thank you for pointing out the retention time issue. Would that affect only certain features or is does that render using faust with EventHub impossible altogether? We would still like to give it a try but are facing connectivity/authentication issues and I would like to know whether it's worth to investigate these or whether that's a waste of time due to fundamental incompatibility.

jbooth-mastery commented 4 years ago

I don't believe Faust is relying on Tables, so you should be able to use the EventHub + Kafka with Faust in general. I don't know if Tables would throw exceptions or if it'd quietly work, up until the point you need to rebuild the table and discover data is missing because it expired.

I can't provide more pointers on how. I didn't hook Faust up with EventHubs.

ryou90 commented 4 years ago

Hi, are there any new insights or experiences on this topic? I'd also like to use the Azure Event Hub for Faust.

Thx :)

databasedav commented 4 years ago

This works for me.

EVENTHUB_NAMESPACE = ...
SASL_USERNAME = "$ConnectionString"
SASL_PASSWORD = f"Endpoint=sb://{EVENTHUB_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={...};SharedAccessKey={...}"

app = faust.App(
    'app',
    broker=f"kafka://{EVENTHUB_NAMESPACE}.servicebus.windows.net:9093",
    broker_credentials=faust.SASLCredentials(
        username=SASL_USERNAME,
        password=SASL_PASSWORD,
        ssl_context=aiokafka.helpers.create_ssl_context()
    ),
)

Edit: actually it doesn't work unless you change this line to res = await (await authenticator.step(auth_bytes)). I can fix this with a pr but not sure if I should make it to the main aiokafka repo or the robinhood one?

Edit: looks like installing aiokafka from my fork of robinhood aiokafka isn't working? How do I make this change apply to Faust?

Edit: so Faust complains if you use a fork of robinhood aiokafka and other problems arose when I tried disabling it so for now I'm just copying an edited copy of aiokafka/conn.py with these lines

res = await authenticator.step(auth_bytes)
if res is None:
    break
payload, expect_response = res

changed to

res = await authenticator.step(auth_bytes)
if asyncio.isfuture(res):
    res = await res
if res is None:
    break
payload, expect_response = res

in my Dockerfile after installing the packages.

taybin commented 3 years ago

FWIW, this worked for me:

EVENTHUB_NAMESPACE = ...
SASL_USERNAME = "$ConnectionString"
SASL_PASSWORD = f"Endpoint=sb://{EVENTHUB_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={...};SharedAccessKey={...}"

app = faust.App(
    'app',
    broker=f"kafka://{EVENTHUB_NAMESPACE}.servicebus.windows.net:9093",
    broker_credentials=faust.SASLCredentials(
        username=SASL_USERNAME,
        password=SASL_PASSWORD,
        ssl_context=ssl.create_default_context()
    ),
)

without having to make any changes to which version of aiokafka was installed.

VatsalP commented 3 years ago

FWIW, this worked for me:

EVENTHUB_NAMESPACE = ...
SASL_USERNAME = "$ConnectionString"
SASL_PASSWORD = f"Endpoint=sb://{EVENTHUB_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={...};SharedAccessKey={...}"

app = faust.App(
    'app',
    broker=f"kafka://{EVENTHUB_NAMESPACE}.servicebus.windows.net:9093",
    broker_credentials=faust.SASLCredentials(
        username=SASL_USERNAME,
        password=SASL_PASSWORD,
        ssl_context=ssl.create_ssl_context()
    ),
)

without having to make any changes to which version of aiokafka was installed.

It should be ssl.create_default_context(), I didn't found any create_ssl_context in ssl lib

taybin commented 3 years ago

You are correct. That was some copy paste coding. I'll fix it above.

TiagoDufau commented 3 years ago

This works for me.

EVENTHUB_NAMESPACE = ...
SASL_USERNAME = "$ConnectionString"
SASL_PASSWORD = f"Endpoint=sb://{EVENTHUB_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={...};SharedAccessKey={...}"

app = faust.App(
    'app',
    broker=f"kafka://{EVENTHUB_NAMESPACE}.servicebus.windows.net:9093",
    broker_credentials=faust.SASLCredentials(
        username=SASL_USERNAME,
        password=SASL_PASSWORD,
        ssl_context=aiokafka.helpers.create_ssl_context()
    ),
)

Edit: actually it doesn't work unless you change this line to res = await (await authenticator.step(auth_bytes)). I can fix this with a pr but not sure if I should make it to the main aiokafka repo or the robinhood one?

Edit: looks like installing aiokafka from my fork of robinhood aiokafka isn't working? How do I make this change apply to Faust?

Edit: so Faust complains if you use a fork of robinhood aiokafka and other problems arose when I tried disabling it so for now I'm just copying an edited copy of aiokafka/conn.py with these lines

res = await authenticator.step(auth_bytes)
if res is None:
    break
payload, expect_response = res

changed to

res = await authenticator.step(auth_bytes)
if asyncio.isfuture(res):
    res = await res
if res is None:
    break
payload, expect_response = res

in my Dockerfile after installing the packages.

I can confirm this solution worked for me too.