apache / rocketmq-streams

Apache rocketmq
https://rocketmq.apache.org/
Apache License 2.0
171 stars 82 forks source link

[ISSUE #272]Support subscribe topic from first #276

Closed Jargon9 closed 1 year ago

Jargon9 commented 1 year ago

Hi teams, I have completed support subscribe topic from first. Here is the example : image image

Could you please review the code?

ni-ze commented 1 year ago

@Jargon9 Before resolve the problem, I will close this pr.

Jargon9 commented 1 year ago

@ni-ze Dear Sir Ni, I did not see your code comment. What is the problem you mentioned? I would greatly appreciate it if you could point it out.

ni-ze commented 1 year ago

image Maybe you can find it in Files changed. @Jargon9

Jargon9 commented 1 year ago

@ni-ze It's a bit strange that I didn't see this comment in the file change or conversation. Thank you very much for pointing it out and I will fix it.

image

image

ni-ze commented 1 year ago

@Jargon9 If you can fix this,I will really appreciate.

Jargon9 commented 1 year ago

@ni-ze Sorry, there are important things to deal with during the May Day holiday, which has delayed the progress. I will fix it in the next two weeks.

Jargon9 commented 1 year ago

@ni-ze I have fix it. Could you please review the code?

Jargon9 commented 1 year ago

@ni-ze Dear Mr. Ni, if there are any comments, please let me know in the comments in case I receive no message as before. Thx!

ni-ze commented 1 year ago

@Jargon9 the unionConsumer in PlanetaryEngine is shared by two kind of topics, the source topic and shuffle topic. Source topic is up to user to decide consume from the last or the earliest. But the shuffle topic is consumed from the last always.

Jargon9 commented 1 year ago

@ni-ze Got it, I have filtered out shuffle topic ( end with -shuffleTopic) . Is it necessary to filter the state topic (end with -stateTopic)?

Jargon9 commented 1 year ago

image

Jargon9 commented 1 year ago

@ni-ze Dear Mr. Ni, If there are any further issues, could you please help point them out?

ni-ze commented 1 year ago

@Jargon9 It does need seek to the beginning of use topic. But we can reset the offset after the rebalance, like the recover the state of shuffle topic. The details can be find in class MessageQueueListenerWrapper, where the state is recovered after rebalance. Reset user topic offset can be invoked here, I think. Sorry for late to reply you.

Jargon9 commented 1 year ago

@ni-ze Got it, thanks. I will handle this in this week.

Jargon9 commented 1 year ago

@ni-ze Dear sir, I have noticed a problem where if I place resetOffset before originListener. messageQueueChanged, there may be an error (still in rebalance), while after placing it in originListener. messageQueueChanged, there may be an issue where consumption has already started. Do you have any good suggestions for this issue?

Jargon9 commented 1 year ago

image

ni-ze commented 1 year ago

@Jargon9 Use tool DefaultMQAdminExt to reset offset before start consumer maybe works.

ni-ze commented 1 year ago

Can I have you WeChat? It finish this as fast as possible, it really have a while from begin. My number:790174623.