streamdal / plumber

A swiss army knife CLI tool for interacting with Kafka, RabbitMQ and other messaging systems.
https://streamdal.com
MIT License
2.07k stars 71 forks source link

Add support for cloudEvents #338

Closed chris-aeviator closed 1 year ago

chris-aeviator commented 1 year ago

What would you like to see added? Cloudevents (https://cloudevents.io/) is a specification for describing event data in a common way. Cloud events are used in a variety of application platforms and are independent of the underlying transport layer, so plumber would be the ideal tool to send them from a terminal.

Use Case

When developing a service that expects cloudevents structured messages, plumber would act as the swiss army knife for sending them in an easy way.

Supported adapters are e.g.

more at https://github.com/cloudevents/spec#cloudevents-documents

Proposed Change

add a CLI flag --cloudEvent that turns --input {"data": {"userSignedUpEvent": ...} into a cloudEvent

Who Benefits from the Change(s)?

All users working with the serverlessworkflow.io specification and cloudEvent users in general.

Additional context example of a cloud event message

{
  "data":{ EVENT_DATA

 }
  "datacontenttype": "application/json; charset=utf-8",
  "id": "MESSAGE_ID

",
  "source": "//cloudaudit.googleapis.com/projects/PROJECT_ID

/logs/data_access",
  "specversion": "1.0",
  "type": "google.cloud.audit.log.v1.written",
  "time": "EVENT_GENERATION_TIME

",
  "dataschema": "https://googleapis.github.io/google-cloudevents/jsonschema/google/events/cloud/audit/v1/LogEntryData.json",
  "methodName": "jobservice.jobcompleted",
  "resourceName": "projects/my-project/jobs/bqjob_r3ac45813612fa2d6_0000017d591922c9_1",
  "serviceName": "bigquery.googleapis.com",
  "subject": "bigquery.googleapis.com/projects/my-project/jobs/bqjob_r3ac45813612fa2d6_0000017d591922c9_1",
}

sample python code to construct a cloud event

import datetime
import json
import uuid

event = {}
event['id'] = str(uuid.uuid4())
event['source'] = 'my-event-source'
event[type] = 'my-event-type'
event['specversion'] = '0.3'
event['time'] = datetime.datetime.utcnow().isoformat('T') + 'Z'
event['data'] = 'Hello World!'
event_json_str = json.dumps(event)

# Alternatively with Cloud Events SDK
from cloudevents.sdk.event import v03

event = (
    v03.Event().
    SetEventID('my-event-id').
    SetSource('my-event-source').
    SetEventType('my-event-type').
    SetEventTime(datetime.datetime.utcnow().isoformat('T') + 'Z').
    SetData('Hello World!')
blinktag commented 1 year ago

Thanks for the suggestion! We are working on getting this added to Plumber this week

blinktag commented 1 year ago

@chris-aeviator I'm trying to gauge how people might typically use this. Would the use-case just be a single flag, which unmarshals the JSON into a cloud event, and then uses the SDK's libs to ship the message correctly? Or would it also need flags for all the message options such as --id, --subject, --data-schema, etc?

Also is there any functionality you are looking for on reads? Currently plumber exposes the message bus' headers and metadata, which should by default include the ce_* attributes

blinktag commented 1 year ago

Hi @chris-aeviator, can you check my previous reply. I want to make sure I grokked the intended behavior before releasing this. Thank you!

chris-aeviator commented 1 year ago

Sorry for the late reply - most fields are standardized (so the flag logic would be appropriate) but there’s also the possibility to add custom fields - for a standard use case this is optional though

Am 10.04.2023 um 16:03 schrieb Mark G. @.***>:

 Hi @chris-aeviator, can you check my previous reply. I want to make sure I grokked the intended behavior before releasing this. Thank you!

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.

blinktag commented 1 year ago

Perfect thank you! I'll get this feature released this week

blinktag commented 1 year ago

@chris-aeviator Cloudevents support has landed in https://github.com/batchcorp/plumber/releases/tag/v2.3.0 It is currently supported for Kafka, NATS, NATS streaming, and NATS jetstream. Example usage is here: https://github.com/batchcorp/plumber/blob/master/docs/examples.md#publish-cloudevents

Let me know if you have any issues with the functionality