apache / rocketmq-streams

Apache rocketmq
https://rocketmq.apache.org/
Apache License 2.0
171 stars 82 forks source link

Provide RocketMQ Connect sink/source #163

Closed ni-ze closed 2 years ago

ni-ze commented 2 years ago

I would like to provide a RocketMQ Connect sink/source in addition to the existing sink/source in RocketMQ Streams. The data in others components, like mysql, are imported into RocketMQ using RocketMQ Connect. In this way, source and sink process are separate with streaming process, source/sink are more stable, and streaming process focus on its own business.

jonahcui commented 2 years ago

like MQTT, we need to remove the sink in data stream:

public DataStream toMqtt(String url, String clientId, String topic) {
        PahoSink pahoSink = new PahoSink(url, clientId, topic);
        ChainStage<?> output = this.mainPipelineBuilder.createStage(pahoSink);
        this.mainPipelineBuilder.setTopologyStages(currentChainStage, output);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, output);
    }

    public DataStream toMqtt(String url, String clientId, String topic, String username, String password) {
        PahoSink pahoSink = new PahoSink(url, clientId, topic, username, password);
        ChainStage<?> output = this.mainPipelineBuilder.createStage(pahoSink);
        this.mainPipelineBuilder.setTopologyStages(currentChainStage, output);
        return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, output);
    }

and change the example code to MQTT Sink:

.to(new PahoSink("tpc://localhost:1883", "test", "test"))

???

ni-ze commented 2 years ago

Old sink/source will not be removed, new sink/source will be support in RocketMQ Connect.