apache / rocketmq-externals

Mirror of Apache RocketMQ (Incubating)
4.55k stars 3.07k forks source link

[rocketmq-spark] ask a master for help, repeated consumption occurs when the program restarts #901

Open kai23333 opened 1 year ago

kai23333 commented 1 year ago

Reference code : https://github.com/apache/rocketmq-externals/blob/master/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMqUtilsTest.java

code

My code :

image

Consumer Strategy Have chosen lastest, This setting doesn't seem to work, Program restart consumes historical data,

How to solve this problem?

The complete code is as follows:

        try {

            Map<String, String> optionParams = new HashMap<>();
            optionParams.put(RocketMQConfig.NAME_SERVER_ADDR, nameSrvAddr);
            SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver").setMaster("local[*]");
            JavaStreamingContext sc = new JavaStreamingContext(sparkConf, new Duration(duration));

            List<String> topics = new ArrayList<>();
            if (StringUtils.hasText(topic)) {
                for (String s : topic.split(";")) {
                    topics.add(s);
                }
            }

            LocationStrategy locationStrategy = LocationStrategy.PreferConsistent();

            JavaInputDStream<MessageExt> stream = RocketMqUtils.createJavaMQPullStream(sc, groupId,
                    topics, ConsumerStrategy.lastest(), false, false, false, locationStrategy, optionParams);

            stream.foreachRDD(new VoidFunction<JavaRDD<MessageExt>>() {

                private static final long serialVersionUID = 1L;

                @Override
                public void call(JavaRDD<MessageExt> messageExtJavaRDD) throws Exception {

                    JavaRDD<GPSRDD> GPSRDDJavaRDD = messageExtJavaRDD.map(new Function<MessageExt, GPSRDD>() {

                        private static final long serialVersionUID = 1L;

                        @Override
                        public GPSRDD call(MessageExt messageExt) throws Exception {

                            GPSRDD gps = new GPSRDD();
                            String xxx = new String(messageExt.getBody());
                            System.out.println(xxx);
                            return gps;
                        }
                    });

                }
            });

            sc.start();

        } catch (Exception e) {
            e.printStackTrace();
        }