edgexfoundry / app-functions-sdk-go

Owner: Applications WG
Apache License 2.0
43 stars 83 forks source link

Custom dynamic mqtt topics in SetDefaultFunctionsPipeline, all pub and sub in the single mqtt connection #1558

Closed jiekechoo closed 8 months ago

jiekechoo commented 8 months ago

I want to set an AddFunctionsPipelineForTopics with custom topic(s), such as edgex/events/my-topic, but how do I send a message with the custom topic in application service for the trigger?

jiekechoo commented 8 months ago

Solved. Use the func PublishWithTopic. https://github.com/edgexfoundry/app-functions-sdk-go/blob/3bc857698cc13a1282509d5e63119d2caf80d765/pkg/interfaces/service.go#L202

jiekechoo commented 8 months ago

@cloudxxx8 , I use the func PublishWithTopic to publish message with custom topic device/message to the MessageBus, but it can't catch the message in the AddFunctionsPipelineForTopics, why? How could I do?

cloudxxx8 commented 8 months ago

what is the full scenario you want to achieve? are you tring to send the message and receive from the same App Service?

jiekechoo commented 8 months ago

@cloudxxx8 Thanks for your reply.

The full scenario in my app service:

There are two questions:

  1. How to use more mqtt topics for publish in single connection?
  2. How to use a signal in the MessageBus for trigger an event?

Here is my idea:

but nothing happened.

majidalikhn commented 8 months ago

We implemented a solution for dynamically setting MQTT topics based on event data using placeholders and context manipulation. Here's a simplified overview:

Define a Placeholder in the Configuration: In our configuration, we specify a placeholder for the MQTT topic, allowing dynamic replacement during runtime.

[AppServiceMQTT.MqttExportConfig]
# Additional configurations as necessary
Topic = "{mqttPublishTopic}"

Populate Placeholder Values Dynamically: During event processing, we determine the appropriate MQTT topic based on the event's content. We then update the context to reflect this dynamic topic selection.


// Simplified function to illustrate the concept
func ProcessEvent(event Event, ctx Context) error {
    // Logic to determine the topic based on the event
    dynamicTopicValue := determineDynamicTopic(event)

    // Update the context with the determined topic
    ctx.AddValue("mqttPublishTopic", dynamicTopicValue)

    return nil
}

Utilizing Config in Pipelines: We then use this configured context in our pipelines to ensure that the MQTT topics are dynamically adjusted based on the readings from each event:

functionsLibrary := functions.NewFunctionsLibrary()
mqttConfig := app.serviceConfig.AppServiceMQTT.MqttExportConfig
err := app.service.SetDefaultFunctionsPipeline(
    functionsLibrary.ProcessEvent(event(app.serviceConfig.AppServiceMQTT.Mappings),
    transforms.NewMQTTSecretSender(mqttConfig, false).MQTTSend,
)

I hope this helps, otherwise, feel free to reach me out, I will be more than happy to help.

jiekechoo commented 8 months ago

@majidalikhn Thanks for your help. I'll try to use the dynamic topics in the default pipeline rather than using multiple pipelines. Maybe it's a common issue.

majidalikhn commented 8 months ago

@jiekechoo try it, and if you need some help sure, let me know, and if you find some better ways, please share.

jiekechoo commented 8 months ago

@majidalikhn Vielen Dank. The first issue has been solved that the function PublishWithTopic send the Event to the MessageBus should be use the topic start with events/.

jiekechoo commented 8 months ago

@majidalikhn I followed your suggestion, it worked.

so, the issue solved.

Thanks.

majidalikhn commented 8 months ago

Wonderful. Happy to help.