turms-im / turms

🕊️ The world's most advanced open source instant messaging engine for 100K~10M concurrent users https://turms-im.github.io/docs
Apache License 2.0
1.72k stars 266 forks source link

How to trigger custom NotificationHandler when user is offline ? #1283

Open chinamcafee opened 1 year ago

chinamcafee commented 1 year ago

If I want to use third party push service like JPush or GeTui, I think I need to implement an custom NotificationHandler for Gateway, when user is online, the custom NotificationHandler code works well, but I found that when user is offline, my custom NotificationHandler couldn't be triggered.

It seems that when user is offline, OutboundMessageService#forwardClientMessageByRecipientId get activeNodeIds is empty, and
return PublisherPool.emptySet(), it doesn't forward message to GateWay Nodes, and my custom NotificationHandler couldn't be triggered, as follows:

image

But I am not sure if I have some misunderstandings about the code or it is a bug indeed.

Any help is appreciated

JamesChenX commented 1 year ago

Your understanding of the code is correct, but you need to implement im.turms.service.infra.plugin.extension.RequestHandlerResultHandler instead of im.turms.gateway.infra.plugin.extension.NotificationHandler.

Turms has a plugin named turms-plugin-push for push notifications, and you can reference it to implement your own plugin.


But I recommend you add support for JPush and GeTui in turms-plugin-push instead of developing a new one so that we can maintain them together and make them robust (e.g. save failed push notifications on Redis when the current node goes down, and other nodes can fetch these failed push notifications to help retry sending failed push notifications to users).

And if you also prefer this solution, you can reference im.turms.plugin.push.core.sender.fcm.FcmSender to implement your own JPushSender and GeTuiSender, and I will polish your code once you give a PR, so you don't need to worry too much on whether your code works fine with Turms.


Or you can also wait for me to implement them for you. But I am currently busy with developing turms-client-cpp (a C++-based client SDK), I can only start developing them from scratch next week or the week after next.

chinamcafee commented 1 year ago

Thank you for your explanation.

I will start working on understanding the code implementation in turms-plugin-push and try to reference FcmSender to create the JPushSender implementation.

Since I am also studying turms in my spare time, I am not certain if I will be able to provide my implementation quickly. Moreover, this feature is not urgent for me. Therefore, please feel free to decide whether to implement this functionality based on your time schedule.

If I manage to have time to implement it before you do, I would be very happy to submit a pull request to contribute the code, even though my implementation might not be professional enough in Reactive Programming.

Once again, thank you for your assistance.

chinamcafee commented 1 year ago

Hello, I have started learning the code of turms-plugin-push, but I have some confusion.

image

I read the logic in the im.turms.plugin.push.NotificationPusher#afterNotify method. My thinking is that when a user goes offline, the corresponding user ID cached record in redis should be deleted.

However, before the im.turms.plugin.push.NotificationPusher#sendNotification method is invoked, the im.turms.server.common.domain.session.service.UserStatusService#fetchDeviceDetails method is called.

But the first parameter is offlineRecipientIds, query redis by passing offline user IDs as parameters, shouldn't the expected result be empty? If the result is empty, can the subsequent logic still be successfully triggered?

image

JamesChenX commented 1 year ago

My thinking is that when a user goes offline, the corresponding user ID cached record in redis should be deleted.

turms-gateway store two entries for each user in Redis:

  1. The current user session information and this data will be removed when the user goes offline.
  2. The user device information that the client can send to turms-gateway by calling turmsClient.userService.login(deviceDetails={}, ...), and this data will not be removed when the user goes offline. Another thing to note is turms-gateway only store device details passed from clients in Redis if im.turms.server.common.infra.property.env.gateway.session.DeviceDetailsProperties#items make these fields to store. For example, if want to store the apns device details field sent by the client in Redis, you need to let Turms users add these properties, so that the plugin can fetch the device details data when im.turms.server.common.domain.session.service.UserStatusService#fetchDeviceDetails is called, and use the data to find push notification provider token to push notifications to offline users.
chinamcafee commented 1 year ago

Thank you very much for your response.

It seems that I didn't have a deep enough understanding of the login code logic. Indeed, I did not pass the deviceDetails parameter at client side, thus I haven't observed a second cache entry for user in Redis. That's my mistake. I'll add the parameter in the business.js and try to study how the server handles deviceDetails.

I'll continue to understand this portion of the code logic, in order to implement the plugin correctly.

Wujerry commented 10 months ago

If I want to use third party push service like JPush or GeTui, I think I need to implement an custom NotificationHandler for Gateway, when user is online, the custom NotificationHandler code works well, but I found that when user is offline, my custom NotificationHandler couldn't be triggered.

It seems that when user is offline, OutboundMessageService#forwardClientMessageByRecipientId get activeNodeIds is empty, and return PublisherPool.emptySet(), it doesn't forward message to GateWay Nodes, and my custom NotificationHandler couldn't be triggered, as follows: image

But I am not sure if I have some misunderstandings about the code or it is a bug indeed.

Any help is appreciated

Same question here:

 Set<String> activeNodeIds = sessionsStatus.getActiveNodeIds();
                    if (activeNodeIds.isEmpty()) {
                        notificationData.release();                     
                       // maybe this line should return the offline recipientId, 
                       // otherwise  offline recipientIds is always empty, i`m not sure
                       return PublisherPool.<Long>emptySet(); 

                    }
JamesChenX commented 10 months ago

Hi @Wujerry, Thanks for pointing the bug out. The bug has been fixed and you can pull the latest code or docker images to have a try.

chinamcafee commented 10 months ago

Hi @Wujerry, Thanks for pointing the bug out. The bug has been fixed and you can pull the latest code or docker images to have a try.

@JamesChenX It seems that there is indeed a bug here. Will this bug affect the logic of turms-plugin-push? I'm still unable to retrieve offline users correctly while implementing the JPushSender logic in turms-plugin-push, but I thought that is because I still misunderstand logic of Turms' code and I should re-study turms plugin logic first. So due to a lack of free time, I have not successfully implemented JPushSender yet.

But I will learn this bugfix later and continue to try implementing JPushSender in my spare time. Thanks for your bugfix, it will help me a lot.

Wujerry commented 10 months ago

In class ServiceRequestDispatcher, the code in Mono.defer never executed

return mono.map(offlineRecipientIds -> pluginManager.invokeExtensionPointsSequentially(
                RequestHandlerResultHandler.class,
                RESULT_AFTER_NOTIFY_METHOD,
                (cur, pre) -> pre.switchIfEmpty(Mono.defer(() -> cur
                        .afterNotify(result, requesterId, requesterDevice, offlineRecipientIds))))) // this line never executed
                .then();

and same in the plugin method NotificationPusher.afterNotify(), the code in Mono.deferContextual never executed.

I'm not familiar with the reactive style in java, I just set a breakpoint in IDE and it never be hit.

JamesChenX commented 9 months ago

Hi @Wujerry, it's an expected behavior. The code in Mono.derfer(() -> ...) will only execute when:

  1. there is more than one registered extension point implementing RequestHandlerResultHandler.
  2. The return value of the first extension point isMonoEmpty (e.g. Mono.empty()), which means let the next handler handle the request handler result.

And I will add some necessary method comments for them to make their behavior cleaner later.

JamesChenX commented 9 months ago

Hi @chinamcafee, the bug will suppress some calls to methods of turms-plugin-push, so you should upgrade if you need to implement JPushSender.

And feel free to open a new issue to ask if you encounter any problems.

chinamcafee commented 9 months ago

Hi @chinamcafee, the bug will suppress some calls to methods of turms-plugin-push, so you should upgrade if you need to implement JPushSender.

And feel free to open a new issue to ask if you encounter any problems.

Ok, thank you for your help, I will continue to work on implementing JPushSender as soon as possible

Wujerry commented 9 months ago
pluginManager.invokeExtensionPointsSequentially

Hi @JamesChenX , thank for the explanation. I merged the newest code and found some problems. image

  1. I created a group with only 2 members, one is online and other is offline, then I send a message, but offlineRecipientIds are two.
  2. I have already loaded the push plugin, but Mono.defer never executed
Wujerry commented 9 months ago

And find a bug in push plugin: The default function of beforeNotify in push plugin return an empty mono, if u have not implement this method, the AfterNotify will never be reached.

    private Mono<Void> notifyRelatedUsersOfAction(
            @NotNull RequestHandlerResult result,
            @NotNull Long requesterId,
            @NotNull DeviceType requesterDevice) {
        Mono<RequestHandlerResult> notify =
                pluginManager.invokeExtensionPointsSequentially(RequestHandlerResultHandler.class,
                        RESULT_BEFORE_NOTIFY_METHOD,
                        result,
                        (resultNotifier, pre) -> pre.flatMap(handlerResult -> resultNotifier
                                .beforeNotify(handlerResult, requesterId, requesterDevice)));

  // the default function of ``beforeNotify`` in push plugin return an empty mono, if u have not implement this method, the 
  //`` AfterNotify`` will never be reached.
        return notify
                .flatMap(handlerResult -> notifyRelatedUsersOfAction0(handlerResult,
                        requesterId,
                        requesterDevice))
                .then();
    }
    default Mono<RequestHandlerResult> beforeNotify(
            @NotNull RequestHandlerResult result,
            @NotNull Long requesterId,
            @NotNull DeviceType requesterDevice) {
        //return Mono.empty();
        return Mono.just(result);   // I changed to this but I don`t know if it is the best way
    }
JamesChenX commented 9 months ago

@Wujerry

  1. I have already loaded the push plugin, but Mono.defer never executed

If you load a plugin via HTTP, you need to start it to make it work. And if there is a running extension, code in Mono.defer() will run.

I will write a doc for the plugin lifecycle this week. https://github.com/turms-im/turms/issues/1340

The default function of beforeNotify in push plugin return an empty mono, if u have not implement this method, the AfterNotify will never be reached.

It is designed in this way.

Wujerry commented 9 months ago

If you load a plugin via HTTP, you need to startit to make it work. And if there is a running extension, code in Mono.defer() will run.

That's weird. extension is running, invoker.invoke excuted and pre is empty mono. I do some learning about Mono.defer and known why turms write this way, but it just did not worked as expected.

I put the turms-push-plugin jar into plugin dir as document said:

  1. implement beforeNotify method, and set a breakpoint in IDE, it works.
  2. offlineRecipientIds size > 0, but breakpoint in afterNotify can not hit.
  3. then I remove the Mono.defer() wrapper, afterNotify can be hit.

Then I changed code to this:

        return mono.map(offlineRecipientIds -> pluginManager.invokeExtensionPointsSequentially(
                RequestHandlerResultHandler.class,
                RESULT_AFTER_NOTIFY_METHOD,
                (cur, pre) -> Mono.defer(() -> {
                    System.out.println("----------------!");
                    return Mono.empty();
                })))
                .then();

Also can not hit the breakpoint

chinamcafee commented 9 months ago
default Mono<RequestHandlerResult> beforeNotify(
        @NotNull RequestHandlerResult result,
        @NotNull Long requesterId,
        @NotNull DeviceType requesterDevice) {
    //return Mono.empty();
    return Mono.just(result);   // I changed to this but I don`t know if it is the best way
}

@Wujerry It seems that our confusion is the same. I also made such a modification, but I think it destroys Turms' original design, so I thought that my understanding of Turms' source code was not deep enough. But it seems that you have encountered similar situation, and you modified return Mono.empty() to return Mono.empty() too.

Wujerry commented 9 months ago

any news?

JamesChenX commented 9 months ago

@Wujerry I ran some quick tests but they don't reproduce the problems you have mentioned, and will have some full tests on the weekend.

btw: Personally, if someone asks multiple questions in one issue, I prefer to answer them in one comment after testing but it also means that people may need to wait for me more time because it's hard for me to take so much time to check multiple issues, especially after work. In other words, I tend to schedule to check them only on weekends when I have much available time.

Wujerry commented 9 months ago

@JamesChenX thanks a lot, I don`t know how to say very thank you in English but thank you.