BingAds / BingAds-Java-SDK

Other
43 stars 47 forks source link

ReportingServiceManager.downloadFileAsync blocks internally #166

Open torbengreulich opened 1 year ago

torbengreulich commented 1 year ago

We had issues with ReportingServiceManager.downloadFileAsync(). Some request did not return any answer and were blocked for days until we killed them. So we did some investigation:

This is what we call:

return reportingServiceManager.downloadFileAsync(parameters, null)
                    .get(TIMEOUT.toSeconds(), SECONDS);

Unfortunately the call of downloadFileAsync() was blocked and no future was returned, so the get with timeout could not work and we never get any response or TimeOutException.

We debugged into the code and we found, that the internal call to service.submitGenerateReportAsync(submitRequest, new AsyncHandler() did not answer and through this the whole call is blocked. We were not able to check what happens inside of submitGenerateReportAsync but in our opinion the call should always return the future or throw an exception.

Not relevant for this issue, but here is another call that may be blocking. the serviceClient.getService() may do some OAuthToken refreshing in refreshOAuthTokensIfNeeded() which do some external calls, which then may be blocking. This would be another case that would block the whole downloadCall without returning any future.

torbengreulich commented 1 year ago

We still have this issue and don't know why this is happening. As quick fix we did a hack to prevent our application from being blocked. We copied the ReportingServiceManager and wrapped the blocking code within downloadFileAsync as Future. With this we were able to cancel the execution after 10s.

public Future<ReportingDownloadOperation> submitDownloadAsync(ReportRequest reportRequest, AsyncCallback<ReportingDownloadOperation> callback) {
        Authentication auth = authorizationData.getAuthentication();

        if (auth == null) {
            throw new IllegalArgumentException("Missing authentication");
        }

        final ResultFuture<ReportingDownloadOperation> resultFuture = new ResultFuture<ReportingDownloadOperation>(callback);

        try (ExecutorService executorService = Executors.newSingleThreadExecutor()) {
            var submittedFuture = executorService.submit(() -> {
                final IReportingService service = serviceClient.getService();
                SubmitGenerateReportRequest submitRequest = new SubmitGenerateReportRequest();
                submitRequest.setReportRequest(reportRequest);

                service.submitGenerateReportAsync(submitRequest, new AsyncHandler<SubmitGenerateReportResponse>() {
                    @Override
                    public void handleResponse(Response<SubmitGenerateReportResponse> res) {
                        try {
                            SubmitGenerateReportResponse response;

                            response = res.get();

                            String trackingId = ServiceUtils.GetTrackingId(res);

                            ReportingDownloadOperation operation = new ReportingDownloadOperation(response.getReportRequestId(), authorizationData, apiEnvironment);
                            operation.setTrackingId(trackingId);

                            operation.setStatusPollIntervalInMilliseconds(statusPollIntervalInMilliseconds);

                            operation.setDownloadHttpTimeoutInMilliseconds(downloadHttpTimeoutInMilliseconds);

                            resultFuture.setResult(operation);
                        } catch (InterruptedException e) {
                            resultFuture.setException(new CouldNotSubmitReportingDownloadException(e));
                        } catch (ExecutionException e) {
                            resultFuture.setException(new CouldNotSubmitReportingDownloadException(e));
                        }
                    }
                });
            });
            for(int i = 1; i < 11 ; i++) {
                if(submittedFuture.isDone()) {
                    break;
                }
                Thread.sleep(1000);
            }
            if(!submittedFuture.isDone()) {
                log.warn("TimeoutException for internal call of submitGenerateReportAsync in submitDownloadAsync for {}", reportRequest.getReportName());
                submittedFuture.cancel(true);
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            log.warn("InterruptedException for internal call of submitGenerateReportAsync in submitDownloadAsync for {}.", reportRequest.getReportName(), e);
        } catch (Exception e) {
            log.warn("Exception for internal call of submitGenerateReportAsync in submitDownloadAsync for {}.", reportRequest.getReportName(), e);
        }

        return resultFuture;
    }
markusheiden commented 1 year ago

https://issues.apache.org/jira/browse/CXF-8895 maybe causes this. Please release a new SDK with the fixed CXF 4.0.3 dependency.

markusheiden commented 1 year ago

Generally, the "async" API here is a bit misleading, because the requests are always executed synchronously (that is in the caller thread). In our case, even the (exception) response seems to be handled synchronously. That way callbacks do not make sense at all but just increase the complexity. Will a separate thread (pool) be used at least for successful responses?

We expected the requests to be executed in a separate thread (pool) for real asynchronicity.

Currently, the blocking gets with timeout at the futures returned by the SDK "async" methods cannot work. That is because when the future is returned, some of the the main work (authentication + sending the request) that needs to be covered by the timeout already has been done.

IMO the async code in the SDK needs an overhaul.

markusheiden commented 1 year ago

Example stacktrace of a stuck call:

com.microsoft.bingads.v13.reporting.CouldNotSubmitReportingDownloadException: java.util.concurrent.ExecutionException: org.apache.cxf.binding.soap.SoapFault: Problem writing SAAJ model to stream: null
    at com.microsoft.bingads.v13.reporting.ReportingServiceManager$4.handleResponse(ReportingServiceManager.java:...)
    at org.apache.cxf.jaxws.JaxwsClientCallback.handleException(JaxwsClientCallback.java:90)
    at org.apache.cxf.interceptor.ClientOutFaultObserver.onMessage(ClientOutFaultObserver.java:61)
    at org.apache.cxf.endpoint.ClientImpl$2.onMessage(ClientImpl.java:519)
    at org.apache.cxf.phase.PhaseInterceptorChain.wrapExceptionAsFault(PhaseInterceptorChain.java:373)
    at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:331)
    at org.apache.cxf.endpoint.ClientImpl.doInvoke(ClientImpl.java:528)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:432)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:410)
    at org.apache.cxf.jaxws.JaxWsClientProxy.invokeAsync(JaxWsClientProxy.java:326)
    at org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:138)
    at jdk.proxy2/jdk.proxy2.$Proxy384.submitGenerateReportAsync(Unknown Source)
    at com.microsoft.bingads.v13.reporting.ReportingServiceManager.lambda$submitDownloadAsync$1(ReportingServiceManager.java:...)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.ExecutionException: org.apache.cxf.binding.soap.SoapFault: Problem writing SAAJ model to stream: null
    at org.apache.cxf.jaxws.JaxwsClientCallback$2.get(JaxwsClientCallback.java:103)
    at com.microsoft.bingads.v13.reporting.ReportingServiceManager$4.handleResponse(ReportingServiceManager.java:...)
    at org.apache.cxf.jaxws.JaxwsClientCallback.handleException(JaxwsClientCallback.java:90)
    at org.apache.cxf.interceptor.ClientOutFaultObserver.onMessage(ClientOutFaultObserver.java:61)
    at org.apache.cxf.endpoint.ClientImpl$2.onMessage(ClientImpl.java:519)
    at org.apache.cxf.phase.PhaseInterceptorChain.wrapExceptionAsFault(PhaseInterceptorChain.java:373)
    at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:331)
    at org.apache.cxf.endpoint.ClientImpl.doInvoke(ClientImpl.java:528)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:432)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:410)
    at org.apache.cxf.jaxws.JaxWsClientProxy.invokeAsync(JaxWsClientProxy.java:326)
    at org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:138)
    at jdk.proxy2/jdk.proxy2.$Proxy384.submitGenerateReportAsync(Unknown Source)
    at com.microsoft.bingads.v13.reporting.ReportingServiceManager.lambda$submitDownloadAsync$1(ReportingServiceManager.java:...)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.cxf.binding.soap.SoapFault: Problem writing SAAJ model to stream: null
    at org.apache.cxf.binding.soap.saaj.SAAJOutInterceptor$SAAJOutEndingInterceptor.handleMessage(SAAJOutInterceptor.java:223)
    at org.apache.cxf.binding.soap.saaj.SAAJOutInterceptor$SAAJOutEndingInterceptor.handleMessage(SAAJOutInterceptor.java:173)
    at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:307)
    at org.apache.cxf.endpoint.ClientImpl.doInvoke(ClientImpl.java:528)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:432)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:410)
    at org.apache.cxf.jaxws.JaxWsClientProxy.invokeAsync(JaxWsClientProxy.java:326)
    at org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:138)
    at jdk.proxy2/jdk.proxy2.$Proxy384.submitGenerateReportAsync(Unknown Source)
    at com.microsoft.bingads.v13.reporting.ReportingServiceManager.lambda$submitDownloadAsync$1(ReportingServiceManager.java:...)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.ctc.wstx.exc.WstxIOException: null
    at com.ctc.wstx.sw.BaseStreamWriter.flush(BaseStreamWriter.java:262)
    at org.apache.cxf.binding.soap.saaj.SAAJOutInterceptor$SAAJOutEndingInterceptor.handleMessage(SAAJOutInterceptor.java:214)
    at org.apache.cxf.binding.soap.saaj.SAAJOutInterceptor$SAAJOutEndingInterceptor.handleMessage(SAAJOutInterceptor.java:173)
    at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:307)
    at org.apache.cxf.endpoint.ClientImpl.doInvoke(ClientImpl.java:528)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:432)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:410)
    at org.apache.cxf.jaxws.JaxWsClientProxy.invokeAsync(JaxWsClientProxy.java:326)
    at org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:138)
    at jdk.proxy2/jdk.proxy2.$Proxy384.submitGenerateReportAsync(Unknown Source)
    at com.microsoft.bingads.v13.reporting.ReportingServiceManager.lambda$submitDownloadAsync$1(ReportingServiceManager.java:...)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.io.InterruptedIOException: null
    at java.base/java.io.PipedInputStream.awaitSpace(Unknown Source)
    at java.base/java.io.PipedInputStream.receive(Unknown Source)
    at java.base/java.io.PipedOutputStream.write(Unknown Source)
    at org.apache.cxf.io.AbstractWrappedOutputStream.write(AbstractWrappedOutputStream.java:51)
    at org.apache.cxf.io.AbstractThresholdOutputStream.write(AbstractThresholdOutputStream.java:69)
    at com.ctc.wstx.io.UTF8Writer.flush(UTF8Writer.java:100)
    at com.ctc.wstx.sw.BufferingXmlWriter.flush(BufferingXmlWriter.java:242)
    at com.ctc.wstx.sw.BaseStreamWriter.flush(BaseStreamWriter.java:260)
    at org.apache.cxf.binding.soap.saaj.SAAJOutInterceptor$SAAJOutEndingInterceptor.handleMessage(SAAJOutInterceptor.java:214)
    at org.apache.cxf.binding.soap.saaj.SAAJOutInterceptor$SAAJOutEndingInterceptor.handleMessage(SAAJOutInterceptor.java:173)
    at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:307)
    at org.apache.cxf.endpoint.ClientImpl.doInvoke(ClientImpl.java:528)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:432)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:410)
    at org.apache.cxf.jaxws.JaxWsClientProxy.invokeAsync(JaxWsClientProxy.java:326)
    at org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:138)
    at jdk.proxy2/jdk.proxy2.$Proxy384.submitGenerateReportAsync(Unknown Source)
    at com.microsoft.bingads.v13.reporting.ReportingServiceManager.lambda$submitDownloadAsync$1(ReportingServiceManager.java:...)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

The thread pool visible in this stack trace has been introduced by us to aid debugging. The remaining stack traces show that this is a pure synchronous call that handles even the (exception) response.

dennis-yemelyanov commented 8 months ago

Seems like there is at least one issue in CXF, which is causing the Future returned from a service method to never complete, if the client AsyncHandler throws an exception. For example, the following code hangs forever when using CXF:

ServiceClient<ICampaignManagementService> ServiceClient = getServiceClient();

AddBudgetsRequest request = getRequest();

Future<?> future = ServiceClient.getService().addBudgetsAsync(request, new AsyncHandler<AddBudgetsResponse>() {
  @Override
  public void handleResponse(Response<AddBudgetsResponse> res) {
    throw new RuntimeException("something happened");
  }  
});

System.out.println("Getting result from " + future.getClass());
try {
    future.get();
} catch (Exception ex) {
    System.out.println("Got exception from Future: " + ex);
}
System.out.println("Done");

Output:

Getting result from class org.apache.cxf.jaxws.JaxwsResponseCallback

The same code works fine when using JAX-WS RI (after excluding the CXF dependencies and including jaxws-rt):

Getting result from class com.sun.xml.ws.client.AsyncResponseImpl Got exception from Future: java.util.concurrent.ExecutionException: java.lang.RuntimeException: something happened Done

The reason for this behavior is that there is no exception handling when calling the client handler in JaxwsClientCallback: https://github.com/apache/cxf/blob/afc92891fba122f216a3b617599db5b87a704a50/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxwsClientCallback.java#L44

There is a try-catch at a higher level in ClientImpl: https://github.com/apache/cxf/blob/afc92891fba122f216a3b617599db5b87a704a50/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java#L947

However inside that catch it simply calls callback.handleException(), which brings us back to JaxwsClientCallback, so it invokes the client handler again, causing it to throw a new exception. But this time there is nothing to catch the new exception (no try-catch around the original catch).

So the exception gets lost somewhere up the stack, and the returned Future never gets completed.

JAX-WS RI does handle the exception and update the Future object, so the issue doesn't happen: https://github.com/eclipse-ee4j/metro-jax-ws/blob/4e66cdc71a91ac8ccb9e6890f380b1e523d5b1e9/jaxws-ri/runtime/rt/src/main/java/com/sun/xml/ws/client/AsyncResponseImpl.java#L104

I'm planning to open a CXF issue so hopefully this can be fixed.

I also noticed that it's possible for the SDK to run into this issue since not all exception types are handled, for example, here: https://github.com/BingAds/BingAds-Java-SDK/blob/7a9739d6df9dc109e769ed67fba05d2056499bf0/src/main/java/com/microsoft/bingads/v13/reporting/ReportingServiceManager.java#L244

We should probably fix it as well to catch a Throwable instead of specific exception types