PlaytikaOSS / feign-reactive

Reactive Feign client inspired by https://github.com/OpenFeign project
Apache License 2.0
618 stars 126 forks source link

PATCH MultiPart request ERROR #684

Closed ebett-bayer closed 2 weeks ago

ebett-bayer commented 2 weeks ago

Hi, I need to send a PATCH multipart request like this.

  @PatchMapping(value = "/xxxx", consumes = "multipart/form-data")
  Mono<ResponseDto> saveInsightAttachment(
        @RequestHeader(BASE_URL) String baseUrl,
        @RequestPart("File") final MultipartFile file,
        @RequestPart("InsightId") final String insightId);

I am also using this interceptor:


@Bean
  public ReactiveHttpRequestInterceptor reactiveHttpRequestInterceptor() {
    return request -> {
      Map<String, List<String>> headers = request.headers();
      String baseUrl = headers.get(BASE_URL).get(0);
      if (baseUrl.endsWith("/")) {
        baseUrl = baseUrl.substring(0, baseUrl.length() - 1);
      }
      var instanceUri = URI.create(baseUrl + request.uri().getPath());
      headers.remove(BASE_URL);

      headers.put(ACCEPT, List.of(APPLICATION_JSON_VALUE));
      headers.put(HttpHeaders.AUTHORIZATION, List.of(getToken()));

      return Mono.just(new ReactiveHttpRequest(request, instanceUri));
    };

Is there any missing here? the request doesn't work.

Error while running request: ReactiveHttpRequest{, uri=https://xxxxx, target=HardCodedTarget(type=AdmaReactiveServiceClient, url=http://base-url)} \nType definition error: [simple type, class java.io.ByteArrayInputStream].",

Note that GetMapping requests works well.

StackTrace:

Error while running request: ReactiveHttpRequest{, uri=https://xxxx,
 target=HardCodedTarget(type=AdmaReactiveServiceClient, url=http://base-url)}
 \nType definition error: [simple type, class java.io.ByteArrayInputStream]\n\t
 at reactivefeign.webclient.client.WebReactiveHttpClient.lambda$executeRequest$3(WebReactiveHttpClient.java:124)
 \n\tat reactor.core.publisher.Mono.lambda$onErrorMap$29(Mono.java:3862)
 \n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
 \n\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
 \n\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onError(FluxDoFinally.java:119)
 \n\tat reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
 \n\tat reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onError(FluxPeekFuseable.java:903)
 \n\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2236)
 \n\tat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:544)
 \n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
 \n\tat reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
 \n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
 \n\tat reactor.core.publisher.Operators.error(Operators.java:198)
 \n\tat reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
 \n\tat reactor.core.publisher.Mono.subscribe(Mono.java:4576)
 \n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
 \n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
 \n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
 n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
 \n\tat reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
 \n\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:205)
 \n\tat reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
 \n\tat reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:229)
 \n\tat reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:279)
 \n\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
 \n\tat reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.maybeOnError(FluxConcatMapNoPrefetch.java:327)
 \n\tat reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:212)
 \n\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
 \n\tat reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:476)
 \n\tat reactor.core.publisher.SinkManyEmitterProcessor$EmitterInner.drainParent(SinkManyEmitterProcessor.java:620)
 \n\tat reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:874
 )\n\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
 \n\tat reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:337)
 \n\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
 \n\tat reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1743)
 \n\tat reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:196)
 \n\tat reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:205)
 \n\tat reactor.netty.http.client.HttpClientConnect$HttpObserver.onUncaughtException(HttpClientConnect.java:403)
 \n\tat reactor.netty.ReactorNetty$CompositeConnectionObserver.onUncaughtException(ReactorNetty.java:708)
 \n\tat reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onUncaughtException(DefaultPooledConnectionProvider.java:223)
 \n\tat reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onUncaughtException(DefaultPooledConnectionProvider.java:476)
 \n\tat reactor.netty.http.client.HttpClientOperations.onOutboundError(HttpClientOperations.java:672)
 \n\tat reactor.netty.channel.ChannelOperations.onError(ChannelOperations.java:262)
 \n\tat reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onError(FluxHide.java:142)
 \n\tat reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
 \n\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2236)
 \n\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:280)
 \n\tat reactor.netty.channel.MonoSendMany$SendManyInner.run(MonoSendMany.java:352)
 \n\tat reactor.netty.channel.MonoSendMany$SendManyInner.trySchedule(MonoSendMany.java:434)
 \n\tat reactor.netty.channel.MonoSendMany$SendManyInner.trySuccess(MonoSendMany.java:598)
 \n\tat reactor.netty.channel.MonoSendMany$SendManyInner$1.trySuccess(MonoSendMany.java:564)
 \n\tat reactor.netty.channel.MonoSendMany$SendManyInner$1.trySuccess(MonoSendMany.java:553)
 \n\tat io.netty.util.concurrent.PromiseCombiner.tryPromise(PromiseCombiner.java:170)
 \n\tat io.netty.util.concurrent.PromiseCombiner.access$600(PromiseCombiner.java:35)
 \n\tat io.netty.util.concurrent.PromiseCombiner$1.operationComplete0(PromiseCombiner.java:62)
 \n\tat io.netty.util.concurrent.PromiseCombiner$1.operationComplete(PromiseCombiner.java:44)
 \n\tat io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
 \n\tat io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
 \n\tat io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
 \n\tat io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
 \n\tat io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
 \n\tat io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
 \n\tat io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
 \n\tat io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:52)
 \n\tat io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:31)
 \n\tat io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
 \n\tat io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
 \n\tat io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
 \n\tat io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
 \n\tat io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
 \n\tat io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
 \n\tat io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
 \n\tat io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
 \n\tat io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:748)
 \n\tat io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:303)
 \n\tat io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:383)
 \n\tat io.netty.channel.kqueue.AbstractKQueueChannel.doWriteBytes(AbstractKQueueChannel.java:285)
 \n\tat io.netty.channel.kqueue.AbstractKQueueStreamChannel.writeBytes(AbstractKQueueStreamChannel.java:110)
 \n\tat io.netty.channel.kqueue.AbstractKQueueStreamChannel.doWriteSingle(AbstractKQueueStreamChannel.java:322)
 \n\tat io.netty.channel.kqueue.AbstractKQueueStreamChannel.doWrite(AbstractKQueueStreamChannel.java:280)
 \n\tat io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:929)
 \n\tat io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.flush0(AbstractKQueueChannel.java:515)
 \n\tat io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:893)
 \n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1369)
 \n\tat io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:935)
 \n\tat io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921)
 \n\tat io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907)
 \n\tat io.netty.handler.ssl.SslHandler.forceFlush(SslHandler.java:2243)
 \n\tat io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:824)
 \n\tat io.netty.handler.ssl.SslHandler.flush(SslHandler.java:801)
..
\n**Caused by: org.springframework.core.codec.CodecException: Type definition error: [simple type, class java.io.ByteArrayInputStream]
 \n\tat org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:256)
 \n\tSuppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
 \nError has been observed at the following site(s):
 \n\t*__checkpoint ⇢ Request to PATCH https://xxxxx [DefaultWebClient]\nOriginal Stack Trace:
 \n\t\tat org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:256)
 \n\t\tat org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$0(AbstractJackson2Encoder.java:158)
 \n\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)**
 \n\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
 \n\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)\
ebett-bayer commented 2 weeks ago

Deps

    implementation 'org.springframework:spring-webmvc:6.1.13'
    implementation 'org.springframework.cloud:spring-cloud-starter-openfeign:4.1.3'
    implementation 'com.playtika.reactivefeign:feign-reactor-core:4.2.1'
    implementation 'com.playtika.reactivefeign:feign-reactor-spring-configuration:4.2.1'
    implementation 'com.playtika.reactivefeign:feign-reactor-webclient:4.2.1'
    implementation 'io.github.openfeign:feign-hc5:12.4'
    implementation 'io.github.openfeign:feign-micrometer:12.4'
ebett-bayer commented 2 weeks ago

I found this No serializer found for class java.io.ByteArrayInputStream and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)

After disable it, I get an exeption

MultipartFile resource [2018-01-01_2018-01-02.csv] cannot be resolved to absolute file path

Is the MultipartFile class right for feign-reactive?

NOTE: The file bytes comes form an string variable

ebett-bayer commented 2 weeks ago

I found the fix, I had to change MultipartFile class in method signature with org.springframework.core.io.Resource

Regards.