Python SDK for CDEvents
The SDK can be used to create CDEvents in CloudEvents form, for use with the CloudEvents Python SDK.
This SDK is work in work in progress, it will be maintained in sync with the specification and it now covers all events from the specification.
Clone the repo and install the Python SDK as a package:
cd src && pip install .
And import the module in your code
import cdevents
To create a CDEvent, for instance a pipelineRun queued one:
import cdevents
event = cdevents.new_pipelinerun_queued_event(
context_id="my-context-id",
context_source="my/first/cdevent/program",
context_timestamp=datetime.datetime.now(),
subject_id="myPipelineRun1",
custom_data={"hello_message": "hi!"},
subject_source="subjectSource",
custom_data_content_type="application/json",
pipeline_name="myPipeline",
url="https://example.com/myPipeline",
)
To send a CDEvent as CloudEvent:
import cdevents
# Create a CloudEvent from the CDEvent
cloudevent = cdevents.to_cloudevent(event)
# Creates the HTTP request representation of the CloudEvent in structured content mode
headers, body = to_structured(event)
# POST
requests.post("<some-url>", data=body, headers=headers)
See the CloudEvents docs as well.