firebase / firebase-admin-java

Firebase Admin Java SDK
https://firebase.google.com/docs/admin/setup
Apache License 2.0
525 stars 253 forks source link

FCM Batch sending deprecated but no migration path #834

Open 19lmyers opened 1 year ago

19lmyers commented 1 year ago

I received an email from Firebase about the deprecation of legacy FCM APIs. Notably, the sendAll() method that employs batch sending has been deprecated, and I saw the Node Admin SDK has a new sendEach() method, but the Java SDK does not provide this method.

How should I approach migrating from the sendAll() method?

AndrWeisR commented 1 year ago

Ditto. I have firebase-admin-9.1.1.jar which only exposes these methods below. I've seen mention that these new methods are in firebase-admin 9.2.0, but that doesn't seem to be available anywhere yet.

image

xak2000 commented 1 year ago

I see sendEach* methods are already implemented in the master.

The question is: when 9.2.0 will be released?

adhirajsinghchauhan commented 1 year ago

The question is: when 9.2.0 will be released?

@xak2000 v9.2.0 was released just a day after your comment.


But I wonder if it'd be feasible to walk back the decision to discontinue the batch send API, or at least move it into the Blaze plan? For people who want a fast, fire-and-forget delivery system, sendEach does not fit the bill. Both theoretically and in real-world tests, sendAll far outperforms its replacement.

Quick tests with 2000 messages (each to a different topic, but with the same config otherwise) on a few of my own servers. No code changes apart from calling a different function, no retry or delay mechanisms in between, apart from what the library itself defaults to:

 sendAll: 5-10s
sendEach: 1-5m

The overhead of 2000 HTTP calls instead of just 4 has obvious effects. This only becomes worse as the number of messages, meant to be sent approximately "together", increases.

xak2000 commented 1 year ago

In the FAQ there is a dedicated topic about sending message to multiple tokens in one request.

Q. Does the HTTP v1 API support sending messages to multiple tokens in one request?? A. No. This feature, called "multicast" in legacy HTTP APIs, is not supported by the HTTP v1 API.

For use cases where end-to-end latency is critical, or where total fanout size is small (fewer than 1 million), Google recommends the HTTP V1 API. The HTTP V1 API over HTTP/2 can achieve up to ~1/3 of total message throughput rate compared to legacy multicast when sending on a single connection.

For use cases where throughput and egress bandwidth are prioritized or where total fanout size is large (greater than 1 million), Google recommends topic messaging. While topic messaging requires a one-time action to subscribe recipients to a topic, it offers up to a 10,000 QPS per project fanout rate without a maximum limit on topic size.

The question is: does firebase-admin-java use HTTP V1 API over HTTP/2? And, if yes, how to make SDK to use it? Probably, only Async variants of methods will use HTTP/2 multiplexing, right? Checked the code: both async and non-async variants call each request concurrently. The only difference is in handling the result. So, the only question is: does it use HTTP/2 multiplexing or each concurrent HTTP V1 API request creates a TCP connection?

emindeniz99 commented 10 months ago

In the FAQ there is a dedicated topic about sending message to multiple tokens in one request.

Q. Does the HTTP v1 API support sending messages to multiple tokens in one request?? A. No. This feature, called "multicast" in legacy HTTP APIs, is not supported by the HTTP v1 API. For use cases where end-to-end latency is critical, or where total fanout size is small (fewer than 1 million), Google recommends the HTTP V1 API. The HTTP V1 API over HTTP/2 can achieve up to ~1/3 of total message throughput rate compared to legacy multicast when sending on a single connection. For use cases where throughput and egress bandwidth are prioritized or where total fanout size is large (greater than 1 million), Google recommends topic messaging. While topic messaging requires a one-time action to subscribe recipients to a topic, it offers up to a 10,000 QPS per project fanout rate without a maximum limit on topic size.

The question is: does firebase-admin-java use HTTP V1 API over HTTP/2? ~And, if yes, how to make SDK to use it? Probably, only Async variants of methods will use HTTP/2 multiplexing, right?~ Checked the code: both async and non-async variants call each request concurrently. The only difference is in handling the result. So, the only question is: does it use HTTP/2 multiplexing or each concurrent HTTP V1 API request creates a TCP connection?

Hello, i think it creates to many connection instead of using http2. We faced with the issue at our system. https://github.com/firebase/firebase-admin-java/issues/849

LucasJC commented 9 months ago

Hey guys! I had the same experience using sendEach from previous comments. Are there any plans for improving this new alternative?

meor9zcu12 commented 9 months ago

Hi, I encountered the same issue. I plan to migrate fromsendMulticast to sendEachForMulticast, but the performance of sendEachForMulticast is very bad.

In my testing, to send 20,000 tokens sendMulticast (sendAll API) : 4 seconds sendEachForMulticast (sendEach API): 40 seconds

sendEach / sendEachForMulticast is useless for me until it is confirmed officially that supports http2 and have to restricted to http2 only. Otherwise high frequency of opening connection may make system crash.


In addition, I can't find any information about google-http-client:1.43.1 that does support http2

AntQwanlity commented 5 months ago

Found this : https://eladnava.com/send-multicast-notifications-using-node-js-http-2-and-the-fcm-http-v1-api/ Refers to this package : https://github.com/eladnava/fcm-v1-http2/tree/main Might be useful for some.

However, I believe the Admin SDK should handle this under the hood. Having to manually add an HTTP/2 layer above the SDK is terrible DX. Might as well use OneSignal.

emindeniz99 commented 5 months ago

The main issue is caused by google-http-client that does not support http2. default transport interface (NetHttpTransport) is JDK1.1 http connection. I have tried different http transport layers for fcm library such as netty-reactor, java11-httpclient, springwebflux https://github.com/googleapis/google-http-java-client/compare/main...emindeniz99:google-http-java-client:main https://github.com/emindeniz99/google-http-java-client

note: java 11 http client is not good for http2 requests, you should create multiple client for same target host in order to send multiple requests, it creates only one connection per host. I experienced the issue :D so I switched to netty-reactor All of my tries are fast solutions to send http2 requests, can require improvements.

public FirebaseApp firebaseApp() throws IOException {
  GoogleCredentials credential = GoogleCredentials.fromStream(new FileInputStream(credentialsPath));

  ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Nio-firebase-http-client-%d").build();

  ConnectionProvider connectionProvider = ConnectionProvider.builder("myConnectionPool")
    .allocationStrategy(Http2AllocationStrategy.builder()
      .maxConnections(8)
      .minConnections(4)
      .maxConcurrentStreams(100) // google api responds http2 connection with 100 as max concurrent stream header
      .build())
    .pendingAcquireMaxCount(100_000)
    .build();

  //        EventLoopGroup bossGroup = new NioEventLoopGroup(2, namedThreadFactory);
  var client = HttpClient.create(connectionProvider

    )
    .protocol(HttpProtocol.H2)
  //                               .runOn(bossGroup)
  ;

  FirebaseOptions options =
    FirebaseOptions.builder().setCredentials(credential)
    .setHttpTransport(new ReactorNettyHttpTransport(client))
    .build();
  return FirebaseApp.initializeApp(options);
}

This is better, it does not open many sockets, etc. HTTP2 is used. It solves our main problem. But the fcm library creates a thread for each request, for example sending 6 batch requests with each batch consisting of 500 device tokens. 6*500= 3k threads will be created. As I know, each thread requires 1 MB memory, this is another problem.

Overriding the httptransport (FirebaseOptions...setHttpTransport) does not solve batch request issues completely. I do not want to change Firebase library, it requires a lot of effort and also I am not sure which approach is best for creating reactive or async library (CompletableFuture, Mono ...), and I do not have enough time, we are waiting for an official solution :D

https://github.com/firebase/firebase-admin-java/issues/849

sntg-p commented 5 months ago

I was wondering if it makes sense to create a topic, make all your target devices subscribe to it and then send your message to that topic. Would that be an acceptable strategy?

sebphil commented 5 months ago

I was wondering if it makes sense to create a topic, make all your target devices subscribe to it and then send your message to that topic. Would that be an acceptable strategy?

It would work only if you want to send the same message to multiple devices. The batch feature can (or rather could) also be used to send different messages in a single http request, and there doesn't seem to be a workaround for that.

stager0909 commented 5 months ago

Hello,

I believe that the sendAll(Async) API should either be maintained or replaced with a better performing API.

The method of sending messages to devices subscribed to a topic, as you suggested, does not meet various business requirements. This method can only be used in cases where it is okay to receive the same message, such as sports relay messages. However, most apps send personalized messages, so this method cannot be used to send messages. Also, creating and configuring new topics every time the target changes can be difficult.

In addition, based on the results provided by the testers you mentioned, sendEach is several times slower than sendAll. I'm not sure why someone would want to get rid of sendAll(Async), but an API that provides the same or better performance should be provided.

Lastly, I hope that Firebase-admin will provide support for netty-based HTTP/2.

Thank you

honorhs commented 5 months ago

Is the firebase-admin-java project abandoned? We are forced to migrate to SDK v9.2.0 or higher by June 2024. Issues related to performance keep coming up, but there are no fixes or feedback on the posts about these issues. Other firebase-admin-* projects, like Python, are actively maintained. Why isn't the firebase-admin-java project tracking issues?

raychongtk commented 5 months ago

The sendEachForMulticast method invokes Single API concurrently. Does anyone know if FCM applies the rate limit on the API level?

From our testing, the alternative of the Batch API does not outperform compared to the Batch API as the latency is quite high.

LebonNic commented 4 months ago

As explained earlier by @stager0909, in practice, the topics system is virtually unusable in most apps. In reality, the jobs that handle the sending of notifications will most of the time customise the payload for each user (number of unseen notifications on iOS for example, badge if the user is connected to several accounts simultaneously on their device, etc).

It is absolutely essential to retain the sendAll method (which was originally introduced to avoid the performance problems raised by individual calls to send).

emindeniz99 commented 1 month ago

@jonathanedey Hello Jonathan, Do you have any idea about when the support for HTTP multiplexing(http2) for java-admin-sdk will be implemented? It is important for us to decide implementing our v1 rest API call implementation before June 2024.

bivrantoshakil commented 1 month ago

Hi @emindeniz99 thanks for sharing all the details about trying out http2 to use less socket connections. But can I ask how and where I should set my http proxy? I tried couple of ways that I know, but didn't work. Thanks in advance

emindeniz99 commented 1 month ago

Hi @emindeniz99 thanks for sharing all the details about trying out http2 to use less socket connections. But can I ask how and where I should set my http proxy? I tried couple of ways that I know, but didn't work. Thanks in advance

While setting up firebase app, you can pass your httptransport class to builder. example code in the comment https://github.com/firebase/firebase-admin-java/issues/834#issuecomment-1890969169

ReactorNettyHttpTransport class is a custom class that is available on the fork of google http client, the comment has this link.

@Configuration
public class FirebaseConfig {

    @Value("${notification.google.application.credentials}")
    private String credentialsPath;

    @Bean
    public GoogleCredentials credential() throws IOException {
        GoogleCredentials credential = GoogleCredentials.fromStream(new FileInputStream(credentialsPath))
                                                        .createScoped("https://www.googleapis.com/auth/firebase.messaging");
        return credential;
    }

    @Bean
    public FirebaseApp firebaseApp(GoogleCredentials credential) throws IOException {
        FirebaseOptions options = FirebaseOptions.builder().setCredentials(credential)
//                                                 .setHttpTransport(  ///TODO: add transport      )
                                                 .build();
        var app = FirebaseApp.initializeApp(options);
        return app;
    }

    @Bean
    public FirebaseMessaging firebaseMessaging(FirebaseApp firebaseApp) throws IOException {
        return FirebaseMessaging.getInstance(firebaseApp);
    }
}

In this way, you can reduce the connection count, but the thread count is high, as I mentioned in the comment.

bivrantoshakil commented 1 month ago

@emindeniz99 Thanks for the forked code, that helped me a lot. Trying netty with http2 fixed the huge amount of connection issue, at the same time I used a thread pool of virtual threads with JDK 21 to solve too many native threads problem. I am very satisfied with these 2 combination because it feels like sending the notification in batch - same as before. It performs great.

emindeniz99 commented 1 month ago

I have implemented http2 based sendmulticast with spring webflux webclient in our project.

This class should be on com.google.firebase.messaging package, you can create it own code but package name should match with it in order to access some SendResponse and similar classes (that can be copied but seems ugly in codebase). Our project is java 17 so we can not benefit from virtual threads. By using webclient, the requests do not block threads. 8 webclient thread handle 600k+ rpm on my macbook. We didn't use the code in production yet, it is open to your feedbacks, thanks

you can use sendMulticastV2

todo: retry policy should be added, like fcm library.

package com.google.firebase.messaging;

import com.google.api.client.http.HttpStatusCodes;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.firebase.ErrorCode;
import com.google.firebase.FirebaseApp;
import com.google.firebase.FirebaseException;
import com.google.firebase.internal.FirebaseProcessEnvironment;
import com.google.firebase.messaging.internal.MessagingServiceErrorResponse;
import com.google.firebase.messaging.internal.MessagingServiceResponse;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.Http2AllocationStrategy;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;

import java.io.IOException;
import java.util.List;
import java.util.Map;

@Slf4j
public class DGFirebaseMessaging {

    private final FirebaseApp firebaseApp;
    private final WebClient webClient;
    private final GoogleCredentials credential;

    public DGFirebaseMessaging(FirebaseApp firebaseApp, GoogleCredentials credential) {
        this.firebaseApp = firebaseApp;

        //        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Nio-firebase-http-client-%d").build();

        ConnectionProvider connectionProvider = ConnectionProvider.builder("myConnectionPool")
                                                                  .allocationStrategy(Http2AllocationStrategy.builder()
                                                                                                             .maxConnections(200)
                                                                                                             .minConnections(4)
                                                                                                             .maxConcurrentStreams(100) // google api
                                                                                                             // responds http2 connection with 100 as max concurrent stream header
                                                                                                             .build())
                                                                  .pendingAcquireMaxCount(100_000)
                                                                  .maxConnections(200)
                                                                  .build();

        //        EventLoopGroup bossGroup = new NioEventLoopGroup(2, namedThreadFactory);

        var client = HttpClient.create(connectionProvider

        ).protocol(HttpProtocol.H2)
            //                               .runOn(bossGroup)
            ;

        client.warmup().block();

        this.webClient = WebClient.builder()

                                  .baseUrl(FCM_URL.formatted(getProjectId(firebaseApp, credential)))

                                  .clientConnector(new ReactorClientHttpConnector(client))

                                  .build();

        this.credential = credential;
    }

    //    com.google.firebase.FirebaseApp.getProjectId
    static String getProjectId(FirebaseApp firebaseApp, GoogleCredentials credential) {

        // Try to get project ID from user-specified options.
        String projectId = firebaseApp.getOptions().getProjectId();

        // Try to get project ID from the credentials.
        if (Strings.isNullOrEmpty(projectId)) {
            if (credential instanceof ServiceAccountCredentials) {
                projectId = ((ServiceAccountCredentials) credential).getProjectId();
            }
        }

        // Try to get project ID from the environment.
        if (Strings.isNullOrEmpty(projectId)) {
            projectId = FirebaseProcessEnvironment.getenv("GOOGLE_CLOUD_PROJECT");
        }
        if (Strings.isNullOrEmpty(projectId)) {
            projectId = FirebaseProcessEnvironment.getenv("GCLOUD_PROJECT");
        }
        return projectId;
    }

    //    com.google.firebase.messaging.Message.wrapForTransport
    private ImmutableMap<String, Object> wrap(Message message, boolean dryRun) {
        ImmutableMap.Builder<String, Object> payload = ImmutableMap.<String, Object>builder().put("message", message);
        if (dryRun) {
            payload.put("validate_only", true);
        }
        return payload.build();
    }

    //    com.google.firebase.messaging.FirebaseMessagingClientImpl.fromApp
    @SneakyThrows(IOException.class)
    private String getReqBody(Message message, boolean dryRun) {
        return firebaseApp.getOptions().getJsonFactory().toString(wrap(message, dryRun));
    }

    @SneakyThrows(IOException.class)
    private <T> T parse(String value, Class<T> destinationClass) {
        return firebaseApp.getOptions().getJsonFactory().fromString(value, destinationClass);
    }

    //com.google.firebase.messaging.FirebaseMessagingClientImpl#FCM_URL
    private static final String FCM_URL = "https://fcm.googleapis.com/v1/projects/%s/messages:send";

    //    com.google.firebase.internal.AbstractHttpErrorHandler.HTTP_ERROR_CODES
    private static final Map<Integer, ErrorCode> HTTP_ERROR_CODES = ImmutableMap.<Integer, ErrorCode>builder()
                                                                                .put(HttpStatusCodes.STATUS_CODE_BAD_REQUEST,
                                                                                     ErrorCode.INVALID_ARGUMENT)
                                                                                .put(HttpStatusCodes.STATUS_CODE_UNAUTHORIZED,
                                                                                     ErrorCode.UNAUTHENTICATED)
                                                                                .put(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
                                                                                     ErrorCode.PERMISSION_DENIED)
                                                                                .put(HttpStatusCodes.STATUS_CODE_NOT_FOUND, ErrorCode.NOT_FOUND)
                                                                                .put(HttpStatusCodes.STATUS_CODE_CONFLICT, ErrorCode.CONFLICT)
                                                                                .put(429, ErrorCode.RESOURCE_EXHAUSTED)
                                                                                .put(HttpStatusCodes.STATUS_CODE_SERVER_ERROR, ErrorCode.INTERNAL)
                                                                                .put(HttpStatusCodes.STATUS_CODE_SERVICE_UNAVAILABLE,
                                                                                     ErrorCode.UNAVAILABLE)
                                                                                .build();

    //    com.google.firebase.messaging.FirebaseMessagingClientImpl.MessagingBatchCallback.createFirebaseException
    //    private FirebaseException createFirebaseException(MessagingServiceErrorResponse error) {
    //        String status = error.getStatus();
    //        ErrorCode errorCode = Strings.isNullOrEmpty(status) ? ErrorCode.UNKNOWN : Enum.valueOf(ErrorCode.class, status);
    //
    //        String msg = error.getErrorMessage();
    //        if (Strings.isNullOrEmpty(msg)) {
    //            msg = String.format("Unexpected HTTP response: %s", error.toString());
    //        }
    //
    //        return new FirebaseException(errorCode, msg, null);
    //    }

    private Mono<MessagingServiceResponse> sendno(Message message, boolean dryRun) {
        var body = getReqBody(message, dryRun);
        return webClient.post().headers(h -> {

                            h.setContentType(MediaType.APPLICATION_JSON);
                            try {
                               // TODO: refresh credentials periodically in scheduled method
                                credential.getRequestMetadata();
                                //                com.google.auth.http.HttpCredentialsAdapter.initialize
                                var accessToken = credential.getAccessToken();
                                h.setBearerAuth(accessToken.getTokenValue());
                            }
                            catch (IOException e) {
                                log.error("Error getting request metadata", e);
                                throw new RuntimeException(e);
                            }

                        }).bodyValue(body).retrieve().bodyToMono(String.class).map(i -> {
                            //                com.google.firebase.messaging.FirebaseMessagingClientImpl.MessagingBatchCallback.onSuccess
                            return parse(i, MessagingServiceResponse.class);

                        })
                        //                        todo: handle WebClientRequestException
                        .onErrorMap(WebClientResponseException.class, e -> {
                            //com.google.firebase.internal.AbstractHttpErrorHandler.handleHttpResponseException
                            var base = httpResponseErrorToBaseException(e);
                            var resBody = e.getResponseBodyAsString();
                            var errorBody = parse(resBody, MessagingServiceErrorResponse.class);
                            return FirebaseMessagingException.withMessagingErrorCode(base, errorBody.getMessagingErrorCode());
                        });
    }

    private Mono<SendResponse> sendnoAndWrapWithResponse(Message message, boolean dryRun) {

        return sendno(message, dryRun).map(i -> {
            return SendResponse.fromMessageId(i.getMessageId());
        }).onErrorResume(FirebaseMessagingException.class, i -> {
            return Mono.just(SendResponse.fromException(i));
        });
    }

    //    com.google.firebase.internal.AbstractHttpErrorHandler.httpResponseErrorToBaseException

    protected FirebaseException superHttpResponseErrorToBaseException(WebClientResponseException webClientResponseException) {
        ErrorCode code = HTTP_ERROR_CODES.get(webClientResponseException.getStatusCode().value());
        if (code == null) {
            code = ErrorCode.UNKNOWN;
        }

        String message = String.format("Unexpected HTTP response with status: %d\n%s",
                                       webClientResponseException.getStatusCode().value(),
                                       webClientResponseException.getResponseBodyAsString());
        return new FirebaseException(code, message, webClientResponseException);
    }

    //    com.google.firebase.internal.AbstractPlatformErrorHandler.httpResponseErrorToBaseException
    protected final FirebaseException httpResponseErrorToBaseException(WebClientResponseException webClientResponseException) {
        var base = superHttpResponseErrorToBaseException(webClientResponseException);
        var parsedError = parse(webClientResponseException.getResponseBodyAsString(), MessagingServiceErrorResponse.class);

        ErrorCode code = base.getErrorCode();
        String status = parsedError.getStatus();
        if (!Strings.isNullOrEmpty(status)) {
            code = Enum.valueOf(ErrorCode.class, parsedError.getStatus());
        }

        String message = parsedError.getErrorMessage();
        if (Strings.isNullOrEmpty(message)) {
            message = base.getMessage();
        }

        return new FirebaseException(code, message, webClientResponseException);
    }

    public Mono<BatchResponse> sendMulticastV2(MulticastMessage message, boolean dryRun) {
        return sendMulticastV2(message.getMessageList(), dryRun);
    }

    public @NotNull Mono<BatchResponse> sendMulticastV2(List<Message> messages, boolean dryRun) {
        var ls = messages.stream().map(i -> sendnoAndWrapWithResponse(i, dryRun)).toList();

        var finalistMono = Flux.mergeSequential(ls).collectList();

        return finalistMono.map(finalist -> {
            return new BatchResponse() {

                public List<SendResponse> getResponses() {
                    return finalist;
                }

                public int getSuccessCount() {
                    return finalist.stream().filter(i -> i.isSuccessful()).toList().size();
                }

                public int getFailureCount() {
                    return finalist.size() - getSuccessCount();
                }
            };
        });
    }

}
rodrii127 commented 1 month ago

Hi, i have a few questions about deprecation itself. In java is mandatory to change our version of admin sdk to 9.2.0? O we can still use another versions as 9.1? If we can still use the 9.1 version, what about the sendMulticast or sendAll operations? this methods will stop working since June 20th? Sorry for the confusion, but i can't find any information about this points

jonathanedey commented 1 month ago

Hey Folks, We are aware of the issues you have mentioned above and are working to address these.

Currently, support for HTTP/2 in Node.js is underway and our next focus is exploring similar options for Java.

We can't provide a timeline for the completion of these projects but we are working to have these resolved as soon as we can and appreciate your continued patience as we do so.

In the meantime, you can consider some of the workarounds mentioned above.

jonathanedey commented 1 month ago

Hi, i have a few questions about deprecation itself. In java is mandatory to change our version of admin sdk to 9.2.0? O we can still use another versions as 9.1? If we can still use the 9.1 version, what about the sendMulticast or sendAll operations? this methods will stop working since June 20th? Sorry for the confusion, but i can't find any information about this points

Hi @rodrii127, The sendMulticast and sendAll operations both use the backend batch send API. When that endpoint is turned down, after June 21st, these operations will no longer be supported. For more information refer to: https://firebase.google.com/support/faq#fcm-23-deprecation

bivrantoshakil commented 1 month ago

@emindeniz99 thanks for sharing another alternative way. Have you tried overriding ThreadManager to have your own pool of threads instead of firing up tons of them by sdk itself?

And instead of rewriting DGFirebaseMessaging class, isn't it better to use your own HttpTransport implementation and pass that to firebase app builder?

meor9zcu12 commented 1 month ago

@jonathanedey When exploring similar options for Java, do FCM team consider to use Apache HttpComponents Client5 which support HTTP/2 instead of com.google.http-client?

emindeniz99 commented 1 month ago

@emindeniz99 thanks for sharing another alternative way. Have you tried overriding ThreadManager to have your own pool of threads instead of firing up tons of them by sdk itself?

And instead of rewriting DGFirebaseMessaging class, isn't it better to use your own HttpTransport implementation and pass that to firebase app builder?

@bivrantoshakil hello, thank you, I did not know the thread factory option. I think it should have same behavior as virtual thread to optimize the system, it may not be easy. I did not investigate well.

bivrantoshakil commented 1 month ago

@emindeniz99 thanks for sharing another alternative way. Have you tried overriding ThreadManager to have your own pool of threads instead of firing up tons of them by sdk itself? And instead of rewriting DGFirebaseMessaging class, isn't it better to use your own HttpTransport implementation and pass that to firebase app builder?

@bivrantoshakil hello, thank you, I did not know the thread factory option. I think it should have same behavior as virtual thread to optimize the system, it may not be easy. I did not investigate well.

Creating your implementation for ThreadManager is very simple and straight forward. You can pass in your own instance of ExecutorService. I tried with JDK 17 without virtual threads and got very good results

emindeniz99 commented 1 month ago

@emindeniz99 thanks for sharing another alternative way. Have you tried overriding ThreadManager to have your own pool of threads instead of firing up tons of them by sdk itself? And instead of rewriting DGFirebaseMessaging class, isn't it better to use your own HttpTransport implementation and pass that to firebase app builder?

@bivrantoshakil hello, thank you, I did not know the thread factory option. I think it should have same behavior as virtual thread to optimize the system, it may not be easy. I did not investigate well.

Creating your implementation for ThreadManager is very simple and straight forward. You can pass in your own instance of ExecutorService. I tried with JDK 17 without virtual threads and got very good results

I could not understand the benefits of the new threads. To limit parallel thread count? Does it slow down the sending notifications? If not, can you share the implementation logic? Thank you

bivrantoshakil commented 1 month ago

@emindeniz99 thanks for sharing another alternative way. Have you tried overriding ThreadManager to have your own pool of threads instead of firing up tons of them by sdk itself? And instead of rewriting DGFirebaseMessaging class, isn't it better to use your own HttpTransport implementation and pass that to firebase app builder?

@bivrantoshakil hello, thank you, I did not know the thread factory option. I think it should have same behavior as virtual thread to optimize the system, it may not be easy. I did not investigate well.

Creating your implementation for ThreadManager is very simple and straight forward. You can pass in your own instance of ExecutorService. I tried with JDK 17 without virtual threads and got very good results

I could not understand the benefits of the new threads. To limit parallel thread count? Does it slow down the sending notifications? If not, can you share the implementation logic? Thank you

If you provide your own implementation of ThreadManager class, then you can control the number of threads as per your need. You can also use any of the built in executors in the JDK itself. This is one example in kotlin

`class FCMThreadManager(maxConcurrentThreadCount: Int) : ThreadManager() { private val apps = mutableSetOf() private var executorService: ExecutorService = Executors.newFixedThreadPool(maxConcurrentThreadCount, threadFactory)

override fun getExecutor(app: FirebaseApp): ExecutorService {
    apps.add(app.name)
    return executorService
}

override fun releaseExecutor(app: FirebaseApp, executor: ExecutorService) {
    if (apps.remove(app.name) && apps.isEmpty()) {
        executorService.shutdownNow()
    }
}

override fun getThreadFactory(): ThreadFactory {
    return ThreadFactoryBuilder().setNameFormat("fcm-%d").build()
}

}`

But as you're using http client with HTTP 2.0, the thread limitations will not reduce your overall throughput.

sarismet commented 2 weeks ago

Hi,

I have tried virtual threads by overwriting ThreadManager and built FirebaseApp with it. I can reach 2 million request per minute, however, I got errors saying that "Error writing request body to server", "Remote host terminated the handshake", "Unexpected end of file from server". When I decrease the load by Semaphore, the errors decreases as well. Do you have any idea what causes these errors? Does FCM apply throttling to my IP or should I change my java docker image?

@bivrantoshakil could you please share us how you initialize firebase app? You mentioned you used virtual threads.

Docker image: eclipse-temurin:21.0.3_9-jre-alpine

bivrantoshakil commented 2 weeks ago

Hi,

I have tried virtual threads by overwriting ThreadManager and built FirebaseApp with it. I can reach 2 million request per minute, however, I got errors saying that "Error writing request body to server", "Remote host terminated the handshake", "Unexpected end of file from server". When I decrease the load by Semaphore, the errors decreases as well. Do you have any idea what causes these errors? Does FCM apply throttling to my IP or should I change my java docker image?

@bivrantoshakil could you please share us how you initialize firebase app? You mentioned you used virtual threads.

Docker image: eclipse-temurin:21.0.3_9-jre-alpine

@sarismet the implementation you did sounds right. But I think the RPM limit at Firebase is 600k per minute, not 2M.

sarismet commented 2 weeks ago

Hi, I have tried virtual threads by overwriting ThreadManager and built FirebaseApp with it. I can reach 2 million request per minute, however, I got errors saying that "Error writing request body to server", "Remote host terminated the handshake", "Unexpected end of file from server". When I decrease the load by Semaphore, the errors decreases as well. Do you have any idea what causes these errors? Does FCM apply throttling to my IP or should I change my java docker image? @bivrantoshakil could you please share us how you initialize firebase app? You mentioned you used virtual threads. Docker image: eclipse-temurin:21.0.3_9-jre-alpine

@sarismet the implementation you did sounds right. But I think the RPM limit at Firebase is 600k per minute, not 2M.

@bivrantoshakil Yes, you are right, but, I expect the error code and response to be related to "rate-limit", not UNKNOWN. I will try to read the trace logs of com.google.firebase

KrishnakanthYachareni commented 1 week ago

I have implemented http2 based sendmulticast with spring webflux webclient in our project.

This class should be on com.google.firebase.messaging package, you can create it own code but package name should match with it in order to access some SendResponse and similar classes (that can be copied but seems ugly in codebase). Our project is java 17 so we can not benefit from virtual threads. By using webclient, the requests do not block threads. 8 webclient thread handle 600k+ rpm on my macbook. We didn't use the code in production yet, it is open to your feedbacks, thanks

you can use sendMulticastV2

todo: retry policy should be added, like fcm library.

package com.google.firebase.messaging;

import com.google.api.client.http.HttpStatusCodes;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.firebase.ErrorCode;
import com.google.firebase.FirebaseApp;
import com.google.firebase.FirebaseException;
import com.google.firebase.internal.FirebaseProcessEnvironment;
import com.google.firebase.messaging.internal.MessagingServiceErrorResponse;
import com.google.firebase.messaging.internal.MessagingServiceResponse;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.Http2AllocationStrategy;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;

import java.io.IOException;
import java.util.List;
import java.util.Map;

@Slf4j
public class DGFirebaseMessaging {

    private final FirebaseApp firebaseApp;
    private final WebClient webClient;
    private final GoogleCredentials credential;

    public DGFirebaseMessaging(FirebaseApp firebaseApp, GoogleCredentials credential) {
        this.firebaseApp = firebaseApp;

        //        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Nio-firebase-http-client-%d").build();

        ConnectionProvider connectionProvider = ConnectionProvider.builder("myConnectionPool")
                                                                  .allocationStrategy(Http2AllocationStrategy.builder()
                                                                                                             .maxConnections(200)
                                                                                                             .minConnections(4)
                                                                                                             .maxConcurrentStreams(100) // google api
                                                                                                             // responds http2 connection with 100 as max concurrent stream header
                                                                                                             .build())
                                                                  .pendingAcquireMaxCount(100_000)
                                                                  .maxConnections(200)
                                                                  .build();

        //        EventLoopGroup bossGroup = new NioEventLoopGroup(2, namedThreadFactory);

        var client = HttpClient.create(connectionProvider

        ).protocol(HttpProtocol.H2)
            //                               .runOn(bossGroup)
            ;

        client.warmup().block();

        this.webClient = WebClient.builder()

                                  .baseUrl(FCM_URL.formatted(getProjectId(firebaseApp, credential)))

                                  .clientConnector(new ReactorClientHttpConnector(client))

                                  .build();

        this.credential = credential;
    }

    //    com.google.firebase.FirebaseApp.getProjectId
    static String getProjectId(FirebaseApp firebaseApp, GoogleCredentials credential) {

        // Try to get project ID from user-specified options.
        String projectId = firebaseApp.getOptions().getProjectId();

        // Try to get project ID from the credentials.
        if (Strings.isNullOrEmpty(projectId)) {
            if (credential instanceof ServiceAccountCredentials) {
                projectId = ((ServiceAccountCredentials) credential).getProjectId();
            }
        }

        // Try to get project ID from the environment.
        if (Strings.isNullOrEmpty(projectId)) {
            projectId = FirebaseProcessEnvironment.getenv("GOOGLE_CLOUD_PROJECT");
        }
        if (Strings.isNullOrEmpty(projectId)) {
            projectId = FirebaseProcessEnvironment.getenv("GCLOUD_PROJECT");
        }
        return projectId;
    }

    //    com.google.firebase.messaging.Message.wrapForTransport
    private ImmutableMap<String, Object> wrap(Message message, boolean dryRun) {
        ImmutableMap.Builder<String, Object> payload = ImmutableMap.<String, Object>builder().put("message", message);
        if (dryRun) {
            payload.put("validate_only", true);
        }
        return payload.build();
    }

    //    com.google.firebase.messaging.FirebaseMessagingClientImpl.fromApp
    @SneakyThrows(IOException.class)
    private String getReqBody(Message message, boolean dryRun) {
        return firebaseApp.getOptions().getJsonFactory().toString(wrap(message, dryRun));
    }

    @SneakyThrows(IOException.class)
    private <T> T parse(String value, Class<T> destinationClass) {
        return firebaseApp.getOptions().getJsonFactory().fromString(value, destinationClass);
    }

    //com.google.firebase.messaging.FirebaseMessagingClientImpl#FCM_URL
    private static final String FCM_URL = "https://fcm.googleapis.com/v1/projects/%s/messages:send";

    //    com.google.firebase.internal.AbstractHttpErrorHandler.HTTP_ERROR_CODES
    private static final Map<Integer, ErrorCode> HTTP_ERROR_CODES = ImmutableMap.<Integer, ErrorCode>builder()
                                                                                .put(HttpStatusCodes.STATUS_CODE_BAD_REQUEST,
                                                                                     ErrorCode.INVALID_ARGUMENT)
                                                                                .put(HttpStatusCodes.STATUS_CODE_UNAUTHORIZED,
                                                                                     ErrorCode.UNAUTHENTICATED)
                                                                                .put(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
                                                                                     ErrorCode.PERMISSION_DENIED)
                                                                                .put(HttpStatusCodes.STATUS_CODE_NOT_FOUND, ErrorCode.NOT_FOUND)
                                                                                .put(HttpStatusCodes.STATUS_CODE_CONFLICT, ErrorCode.CONFLICT)
                                                                                .put(429, ErrorCode.RESOURCE_EXHAUSTED)
                                                                                .put(HttpStatusCodes.STATUS_CODE_SERVER_ERROR, ErrorCode.INTERNAL)
                                                                                .put(HttpStatusCodes.STATUS_CODE_SERVICE_UNAVAILABLE,
                                                                                     ErrorCode.UNAVAILABLE)
                                                                                .build();

    //    com.google.firebase.messaging.FirebaseMessagingClientImpl.MessagingBatchCallback.createFirebaseException
    //    private FirebaseException createFirebaseException(MessagingServiceErrorResponse error) {
    //        String status = error.getStatus();
    //        ErrorCode errorCode = Strings.isNullOrEmpty(status) ? ErrorCode.UNKNOWN : Enum.valueOf(ErrorCode.class, status);
    //
    //        String msg = error.getErrorMessage();
    //        if (Strings.isNullOrEmpty(msg)) {
    //            msg = String.format("Unexpected HTTP response: %s", error.toString());
    //        }
    //
    //        return new FirebaseException(errorCode, msg, null);
    //    }

    private Mono<MessagingServiceResponse> sendno(Message message, boolean dryRun) {
        var body = getReqBody(message, dryRun);
        return webClient.post().headers(h -> {

                            h.setContentType(MediaType.APPLICATION_JSON);
                            try {
                                credential.getRequestMetadata();
                                //                com.google.auth.http.HttpCredentialsAdapter.initialize
                                var accessToken = credential.getAccessToken();
                                h.setBearerAuth(accessToken.getTokenValue());
                            }
                            catch (IOException e) {
                                log.error("Error getting request metadata", e);
                                throw new RuntimeException(e);
                            }

                        }).bodyValue(body).retrieve().bodyToMono(String.class).map(i -> {
                            //                com.google.firebase.messaging.FirebaseMessagingClientImpl.MessagingBatchCallback.onSuccess
                            return parse(i, MessagingServiceResponse.class);

                        })
                        //                        todo: handle WebClientRequestException
                        .onErrorMap(WebClientResponseException.class, e -> {
                            //com.google.firebase.internal.AbstractHttpErrorHandler.handleHttpResponseException
                            var base = httpResponseErrorToBaseException(e);
                            var resBody = e.getResponseBodyAsString();
                            var errorBody = parse(resBody, MessagingServiceErrorResponse.class);
                            return FirebaseMessagingException.withMessagingErrorCode(base, errorBody.getMessagingErrorCode());
                        });
    }

    private Mono<SendResponse> sendnoAndWrapWithResponse(Message message, boolean dryRun) {

        return sendno(message, dryRun).map(i -> {
            return SendResponse.fromMessageId(i.getMessageId());
        }).onErrorResume(FirebaseMessagingException.class, i -> {
            return Mono.just(SendResponse.fromException(i));
        });
    }

    //    com.google.firebase.internal.AbstractHttpErrorHandler.httpResponseErrorToBaseException

    protected FirebaseException superHttpResponseErrorToBaseException(WebClientResponseException webClientResponseException) {
        ErrorCode code = HTTP_ERROR_CODES.get(webClientResponseException.getStatusCode().value());
        if (code == null) {
            code = ErrorCode.UNKNOWN;
        }

        String message = String.format("Unexpected HTTP response with status: %d\n%s",
                                       webClientResponseException.getStatusCode().value(),
                                       webClientResponseException.getResponseBodyAsString());
        return new FirebaseException(code, message, webClientResponseException);
    }

    //    com.google.firebase.internal.AbstractPlatformErrorHandler.httpResponseErrorToBaseException
    protected final FirebaseException httpResponseErrorToBaseException(WebClientResponseException webClientResponseException) {
        var base = superHttpResponseErrorToBaseException(webClientResponseException);
        var parsedError = parse(webClientResponseException.getResponseBodyAsString(), MessagingServiceErrorResponse.class);

        ErrorCode code = base.getErrorCode();
        String status = parsedError.getStatus();
        if (!Strings.isNullOrEmpty(status)) {
            code = Enum.valueOf(ErrorCode.class, parsedError.getStatus());
        }

        String message = parsedError.getErrorMessage();
        if (Strings.isNullOrEmpty(message)) {
            message = base.getMessage();
        }

        return new FirebaseException(code, message, webClientResponseException);
    }

    public Mono<BatchResponse> sendMulticastV2(MulticastMessage message, boolean dryRun) {
        return sendMulticastV2(message.getMessageList(), dryRun);
    }

    public @NotNull Mono<BatchResponse> sendMulticastV2(List<Message> messages, boolean dryRun) {
        var ls = messages.stream().map(i -> sendnoAndWrapWithResponse(i, dryRun)).toList();

        var finalistMono = Flux.mergeSequential(ls).collectList();

        return finalistMono.map(finalist -> {
            return new BatchResponse() {

                public List<SendResponse> getResponses() {
                    return finalist;
                }

                public int getSuccessCount() {
                    return finalist.stream().filter(i -> i.isSuccessful()).toList().size();
                }

                public int getFailureCount() {
                    return finalist.size() - getSuccessCount();
                }
            };
        });
    }

}

@emindeniz99 I've impemented similar way but our 1% total traffic has beenn due to the following issues. Did you come across these issues and How to fix them?

  1. org.springframework.web.reactive.function.client.WebClientRequestException: nested exception is io.netty.handler.timeout.ReadTimeoutException
    at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
    Error has been observed at the following site(s):
    *__checkpoint ⇢ Request to POST https://fcm.googleapis.com/v1/projects/dummy/messages:send [DefaultWebClient]
    Original Stack Trace:
  2. org.springframework.web.reactive.function.client.WebClientRequestException: Connection prematurely closed BEFORE response; nested exception is reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response
    at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
emindeniz99 commented 1 week ago

@KrishnakanthYachareni Hi, I transferred the migration process to another team member, he completed the load test etc. But I did not hear the readtimeout exceptions in our tests and production. It can be due to reactor.netty.http.client.HttpClient configs. Can you share your configs? Also, you can implement retry logic for timeouts. For the PrematureCloseException, if it is 'warn' log, you can ignore. If not, did you limit the max connection count in the http client config. I have encounter with this error when I set the limit very high, the client can not open new connections and can not get any response etc.

While doing the performance testing with above WebClient config, did you see the reactor metrics reactor_netty_eventloop_pending_tasks?

No, we did not see the webclient metrics.

What is the maximum duration that a connection can remain idle without being closed by Firebase servers? What is the maximum lifespan of a live connection, assuming continuous traffic flow? What is the maximum lifespan of a live connection, assuming continuous traffic flow?

It is default value, I don't know

Are there any restrictions on the number of accepted connections that a given instance can have on Firebase servers?

I did not see any limit on the Firebase docs. But on our side, our one server has a limit to opening connections to the server. So we should limit the connection count. One connection can handle 100 parallel request, so you can set the connection count in the Http2AllocationStrategy to your target value.(to send 3k parallel request, 3000/1000= 30 connection is required). :D We allocated double connection in case a connection is dropped. The others can handle the new requests. Via setting min-max count to the same number, we keep the connections warmed up.

ConnectionProvider connectionProvider = ConnectionProvider.builder("DGFirebaseMessagingPool")
  .allocationStrategy(Http2AllocationStrategy.builder()
    .maxConnections(120)
    .minConnections(120)
    .maxConcurrentStreams(100) // google api
    // responds http2 connection with 100 as max concurrent stream header
    .build())
  .pendingAcquireMaxCount(100_000)
  .maxConnections(120)
  .build();

var client = HttpClient.create(connectionProvider).protocol(HttpProtocol.H2);

client.warmup().block();

this.webClient = WebClient.builder()
  .baseUrl(FCM_URL.formatted(getProjectId(firebaseApp, credential)))
  .clientConnector(new ReactorClientHttpConnector(client))
  .build();
KrishnakanthYachareni commented 1 week ago

@emindeniz99 Thanks for the response. We have DLQ has setup for timedout requests. But we've been observing 1% of 180 million requests are getting failed due to above mentioned errors. Following is the WebClient configuration and replicated into 10 kuberenetes pods in the production. Do you have any suggestions on the configs?

 public WebClient webClient() {
        var connectionProvider = ConnectionProvider.builder("fcmConnection")
                .maxConnections(500) // Max no of connections that the connection pool can maintain at any given time.
                .build();
        HttpClient httpClient = getHttpClient(connectionProvider);

        httpClient.warmup().block(); // Eager Initialization of Event loop group, host name resolver and native transport libs.

        return WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .baseUrl(firebaseCCMConfig.getFcmHttpEndpoint())
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .build();
    }

    private HttpClient getHttpClient(ConnectionProvider connectionProvider) {
        return HttpClient.create(connectionProvider)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 45000)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .responseTimeout(ofSeconds(5))
                .keepAlive(true)
                .metrics(true, Function.identity())
                .protocol(HttpProtocol.H2);
    }

Note: The reactor metric reactor_netty_eventloop_pending_tasks is indeciating how many requests are in pending state and just waiting for worker threads to pickup. So I bumped the reactor threads from 4 - 8 threads.

emindeniz99 commented 1 week ago

@emindeniz99 Thanks for the response. We have DLQ has setup for timedout requests. But we've been observing 1% of 180 million requests are getting failed due to above mentioned errors. Following is the WebClient configuration and replicated into 10 kuberenetes pods in the production. Do you have any suggestions on the configs?

 public WebClient webClient() {
        var connectionProvider = ConnectionProvider.builder("fcmConnection")
                .maxConnections(500) // Max no of connections that the connection pool can maintain at any given time.
                .build();
        HttpClient httpClient = getHttpClient(connectionProvider);

        httpClient.warmup().block(); // Eager Initialization of Event loop group, host name resolver and native transport libs.

        return WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .baseUrl(firebaseCCMConfig.getFcmHttpEndpoint())
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .build();
    }

    private HttpClient getHttpClient(ConnectionProvider connectionProvider) {
        return HttpClient.create(connectionProvider)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 45000)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .responseTimeout(ofSeconds(5))
                .keepAlive(true)
                .metrics(true, Function.identity())
                .protocol(HttpProtocol.H2);
    }

Note: The reactor metric reactor_netty_eventloop_pending_tasks is indeciating how many requests are in pending state and just waiting for worker threads to pickup. So I bumped the reactor threads from 4 - 8 threads.

Can you increase the readtimeout from 5 sec to higher? Probably, google responds your requests slowly. When using the batch api, we saw very high response times. But after the v1 api, I did not look the response times.

Edit: I tried to see average response time of post request on mobile, it seems 800 ms on high throughput. Tomorrow i will try to percentile 95 and 99, but we may not be tracking the response time for each request.

Edit2: Did you investigate/consider the allocationstrategy option that I have shared? Setting the values may help. But I dont hope :) I do not understand/search the difference between maxconn on connection provider and min and max conn on allocation strategy option. I am not sure which one is correct way.

emindeniz99 commented 6 days ago

The shutdown date for the old API has been updated. It has been postponed by one month. Additionally, they have provided a form link to request extensions. Old date: 6/21/2024 (june) New Date: 7/22/2024 (july)

image image

New Announcement:

image

Source: https://firebase.google.com/support/faq#deprecated-api-shutdown