Open zerlok opened 2 years ago
AsyncAPI spec:
asyncapi: 2.2.0 info: title: Dillery Scheduler Service version: 0.1.0 description: Background task scheduling service with AsyncAPI specification channels: dillery/task/posted: publish: message: name: PostedTask title: Posted Task payload: $ref: '#/components/schemas/postedTask' bindings: amqp: cc: - task.posted traits: - $ref: '#/components/operationTraits/amqp' components: schemas: taskMeta: name: TaskMeta title: Task Meta type: object required: - taskId - groupId - authorId properties: taskId: type: string description: A unique identifier of the task groupId: type: string description: A unique identifier of the group the task belongs to authorId: type: string description: A unique identifier to an author who posted the task taskInput: name: TaskInput title: Task Input type: object required: - inputBody properties: inputBody: type: string format: byte description: An input data for task execution postedTask: name: PostedTaskMeta title: Posted Task Meta allOf: - $ref: '#/components/schemas/taskMeta' - $ref: '#/components/schemas/taskInput' - name: PostedTaskMeta title: Posted Task Meta type: object properties: postedAt: type: string format: date-time description: A time when task was posted operationTraits: amqp: bindings: amqp: is: routingKey exchange: name: dillery type: topic autoDelete: true
Generator run:
poetry run asynchron-codegen generate python-aio-pika -o src/dillery
Generated src/dillery/consumer.py content:
src/dillery/consumer.py
# ... class DillerySchedulerServiceConsumerFacade(metaclass=abc.ABCMeta): """Background task scheduling service with AsyncAPI specification""" def __init__( self, controller: AioPikaBasedAmqpController, ) -> None: pass pass # ...
Expected content (something like):
# ... class DillerySchedulerServiceConsumerFacade(metaclass=abc.ABCMeta): """Background task scheduling service with AsyncAPI specification""" def __init__( self, controller: AioPikaBasedAmqpController, ) -> None: controller.bind_consumer( decoder=PydanticMessageSerializer( model=PostedTaskMeta, # type: ignore[misc] ), consumer=CallableMessageConsumer( consumer=self.consume_dillery_task_posted, ), bindings=AmqpConsumerBindings( exchange_name="dillery", binding_keys=( "task.posted", ), queue_name="posted-tasks", is_auto_delete_enabled=None, is_exclusive=None, is_durable=None, prefetch_count=None, ), ) @abc.abstractmethod async def consume_dillery_task_posted( self, message: PostedTaskMeta, ) -> None: raise NotImplementedError # ...
This may help https://github.com/asyncapi/bindings/issues/44
AsyncAPI spec:
Generator run:
Generated
src/dillery/consumer.py
content:Expected content (something like):