Propan - just an another one HTTP a declarative Python Messaging Framework. It's inspired by FastAPI and Kombu, simplify Message Brokers around code writing and provides a helpful development toolkit, which existed only in HTTP-frameworks world until now.
It's designed to create reactive microservices around Messaging Architecture.
It is a modern, high-level framework on top of popular specific Python brokers libraries, based on pydantic and FastAPI, pytest concepts.
This project is superceeded by FastStream.
FastStream is a new package based on the ideas and experiences gained from FastKafka and Propan. By joining our forces, we picked up the best from both packages and created a unified way to write services capable of processing streamed data regardless of the underlying protocol.
I’ll continue to maintain Propan package, but new development will be in FastStream. If you are starting a new service, FastStream is the recommended way to do it.
For now FastStream supports Kafka and RabbitMQ. Other brokers support will be added in a few months.
You can find a detail migration guide in the documentation
Documentation: https://lancetnik.github.io/Propan/
async | sync | |
---|---|---|
RabbitMQ | :heavy_check_mark: stable :heavy_check_mark: | :hammer_and_wrench: WIP :hammer_and_wrench: |
Redis | :heavy_check_mark: stable :heavy_check_mark: | :mag: planning :mag: |
Nats | :heavy_check_mark: stable :heavy_check_mark: | :mag: planning :mag: |
Kafka | :warning: beta :warning: | :mag: planning :mag: |
SQS | :warning: beta :warning: | :mag: planning :mag: |
NatsJS | :warning: beta :warning: | :mag: planning :mag: |
ZeroMQ | :hammer_and_wrench: WIP :hammer_and_wrench: | :mag: planning :mag: |
MQTT | :mag: planning :mag: | :mag: planning :mag: |
Redis Streams | :mag: planning :mag: | :mag: planning :mag: |
Pulsar | :mag: planning :mag: | :mag: planning :mag: |
ActiveMQ | :mag: planning :mag: | :mag: planning :mag: |
AzureSB | :mag: planning :mag: | :mag: planning :mag: |
If you are interested in this project, please give me feedback by:
giving the repository a star
tweet about Propan and let me and others know why you use it
joining Discord server
Your support helps me to stay in touch with you and encourages to continue developing and improving the library. Thank you for your support!
Really, share information about this project with others. The bigger community we have - the better project will be!
With declarative tools you can define what you need to get. With traditional imperative tools you must write what you need to do.
Take a look at classic imperative tools, such as aio-pika, pika, redis-py, nats-py, etc.
This is the Quickstart with the aio-pika:
import asyncio
import aio_pika
async def main():
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/"
)
queue_name = "test_queue"
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue(queue_name)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(message.body)
asyncio.run(main())
aio-pika is a great tool with a really easy learning curve. But it's still imperative. You need to connect, declare channel, queues, exchanges by yourself. Also, you need to manage connection, message, queue context to avoid any troubles.
It is not a bad way, but it can be much easier.
from propan import PropanApp, RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)
@broker.handle("test_queue")
async def base_handler(body):
print(body)
This is the Propan declarative way to write the same code. That is so much easier, isn't it?
Install using pip
:
pip install "propan[async-rabbit]"
# or
pip install "propan[async-nats]"
# or
pip install "propan[async-redis]"
# or
pip install "propan[async-kafka]"
# or
pip install "propan[async-sqs]"
Create an application with the following code at serve.py
:
from propan import PropanApp
from propan import RabbitBroker
# from propan import RedisBroker
# from propan import NatsBroker
# from propan import SQSBroker
# from propan import KafkaBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222")
# broker = RedisBroker("redis://localhost:6379")
# broker = SQSBroker("http://localhost:9324", ...)
# broker = KafkaBroker("localhost:9092")
app = PropanApp(broker)
@broker.handle("test")
async def base_handler(body):
print(body)
And just run it:
propan run serve:app --workers 3
Propan uses pydantic
to cast incoming function arguments to types according to their annotation.
from pydantic import BaseModel
from propan import PropanApp, RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)
class SimpleMessage(BaseModel):
key: int
@broker.handle("test2")
async def second_handler(body: SimpleMessage):
assert isinstance(body.key, int)
Propan a has dependencies management policy close to pytest fixtures
and FastAPI Depends
at the same time.
You can specify in functions arguments which dependencies
you would to use. Framework passes them from the global Context object.
Also, you can specify your own dependencies, call dependencies functions and more.
from propan import PropanApp, RabbitBroker, Context, Depends
rabbit_broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(rabbit_broker)
async def dependency(user_id: int) -> bool:
return True
@rabbit_broker.handle("test")
async def base_handler(user_id: int,
dep: bool = Depends(dependency),
broker: RabbitBroker = Context()):
assert dep is True
assert broker is rabbit_broker
Also, Propan allows you to use RPC requests over your broker with a simple way:
from propan import PropanApp, RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(rabbit_broker)
@broker.handle("ping")
async def base_handler():
return "pong"
@app.after_startup
async def self_ping():
assert (
await broker.publish("", "ping", callback=True)
) == "pong"
Propan automatically generates documentation for your project according to the AsyncAPI specification. You can work with both generated artifacts and place a Web view of your documentation on resources available to related teams.
The availability of such documentation significantly simplifies the integration of services: you can immediately see what channels and message format the application works with. And most importantly, it doesn't cost you anything - Propan has already done everything for you!
Propan has its own CLI tool that provided the following features:
For example: pass your current .env project setting to context
propan run serve:app --env=.env.dev
from propan import PropanApp, RabbitBroker
from propan.annotations import ContextRepo
from pydantic_settings import BaseSettings
broker = RabbitBroker()
app = PropanApp(broker)
class Settings(BaseSettings):
url: str = "amqp://guest:guest@localhost:5672/"
@app.on_startup
async def setup(env: str, context: ContextRepo):
settings = Settings(_env_file=env)
await broker.connect(settings.url)
context.set_global("settings", settings)
Also, Propan CLI is able to generate a production-ready application template:
propan create async rabbit [projectname]
Notice: project template require pydantic[dotenv]
installation.
Run the created project:
# Run rabbimq first
docker compose --file [projectname]/docker-compose.yaml up -d
# Run project
propan run [projectname].app.serve:app --env=.env --reload
Now you can enjoy a new development experience!
You can use Propan MQBrokers
without PropanApp
.
Just start and stop them according to your application lifespan.
from propan import NatsBroker
from sanic import Sanic
app = Sanic("MyHelloWorldApp")
broker = NatsBroker("nats://localhost:4222")
@broker.handle("test")
async def base_handler(body):
print(body)
@app.after_server_start
async def start_broker(app, loop):
await broker.start()
@app.after_server_stop
async def stop_broker(app, loop):
await broker.close()
Also, Propan can be used as part of FastAPI.
Just import a PropanRouter you need and declare the message handler
using the @event
decorator. This decorator is similar to the decorator @handle
for the corresponding brokers.
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import RabbitRouter
router = RabbitRouter("amqp://guest:guest@localhost:5672")
app = FastAPI(lifespan=router.lifespan_context)
class Incoming(BaseModel):
username: str
def call():
return True
@router.event("test")
async def hello(m: Incoming, d = Depends(call)):
return { "response": f"Hello, {m.username}!" }
app.include_router(router)
To see more framework usages go to examples/
Thanks for all of these amazing peoples made the project better!