superstreamlabs / memphis

Memphis.dev is a highly scalable and effortless data streaming platform
https://docs.memphis.dev
Other
3.19k stars 215 forks source link

Feature request: Connector to Prefect for triggering flows #1070

Open yanivbh1 opened 1 year ago

yanivbh1 commented 1 year ago

Asked by

Peppie (Discord)

rizziemma commented 1 year ago

Linking documentation to launch Prefect flows:

g41797 commented 12 months ago

Are we talking about following flow?

rizziemma commented 11 months ago

I've implemented a very basic connector : a Memphis consumer + handler that transforms the message into a Prefect flow run. I use the Prefect REST API to easily generalize into a HTTP connector,

from __future__ import annotations
import asyncio
from memphis import Memphis, MemphisError, MemphisConnectError, MemphisHeaderError
import os
import httpx
import ast

#PREFECT
api_url = os.getenv('PREFECT_API_URL')
headers = {
  "Authorization": f"Bearer {os.getenv('PREFECT_API_KEY')}"
}

async def handle_message(msgs, error, context):
    try:
        for msg in msgs:
            print("message: ", msg.get_data())
            data = ast.literal_eval(msg.get_data().decode("UTF-8"))

            async with httpx.AsyncClient() as client:
                payload = { "parameters": data.get("parameters", {})}
                deployment_id = data["deployment_id"]
                url = f"{api_url}/deployments/{deployment_id}/create_flow_run"
                print(f"sending payload {payload} to url {url}")
                response = await client.post(
                    url,
                    headers=headers,
                    json=payload
                )
                response.raise_for_status()

            await msg.ack()
            if error:
                print(error)
    except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
        print(e)
        return
    except (Exception) as e:
        print(e)
        return

async def main():
    try:
        memphis = Memphis()
        await memphis.connect(host=os.getenv("MEMPHIS_HOST"), username=os.getenv("MEMPHIS_USERNAME"), password=os.getenv("MEMPHIS_PASSWORD"), account_id=os.getenv("MEMPHIS_ACCOUNT"))

        consumer = await memphis.consumer(station_name="prefect", consumer_name=os.getenv("MEMPHIS_CONSUMER"))
        consumer.set_context({"target": "prefect"})
        consumer.consume(handle_message)
        # Keep your main thread alive so the consumer will keep receiving data
        await asyncio.Event().wait()

    except (MemphisError, MemphisConnectError) as e:
        print(e)

    finally:
        await memphis.close()

if __name__ == "__main__":
    asyncio.run(main())

Currently adapting it as a Kubernetes deployment, should be sufficient for our use case right now. Let me know if we can make anything useful from this!

yanivbh1 commented 11 months ago

Hey @rizziemma , It's a great boilerplate and should be shared with the community! A) @Avitaltrifsik will send you a cool swag B) Once Memphis functions will be released, will be great to integrate it within.

Avitaltrifsik commented 11 months ago

Hey @rizziemma I would be happy for you to share it on Discord with the whole community, I am sure it can benefit more users! and also would love to contact you there to have your details for the swag pack :) Here is an invite!