spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
989 stars 604 forks source link

Request-Reply in Spring Cloud Stream #815

Closed jonathanborenstein closed 3 months ago

jonathanborenstein commented 7 years ago

Does any capability of sending a request and receiving a reply exist in Spring Cloud Stream? Something that is similar to a Gateway in Spring Integration.

Or is Spring Cloud Stream just for one direction messaging?

hamzaislam commented 7 years ago

Second this, can anyone who has done this provide a sample?

mbogoevici commented 7 years ago

Currently, Spring Cloud Stream has no dedicated support for request/reply interaction. If necessary, this can be achieved by using a Spring Integration Gateway.

artembilan commented 6 years ago

There is already some community requirements and some ideas how to be on the matter: https://stackoverflow.com/questions/47800497/how-can-messaginggateway-be-configured-with-spring-cloud-stream-messagechannels.

I've cooked some simple PoC how it works without any modifications to the Framework:

@EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
@SpringBootApplication
public class CloudStreamGatewayApplication {

    interface GatewayChannels {

        String REQUEST = "request";

        @Output(REQUEST)
        MessageChannel request();

        String REPLY = "reply";

        @Input(REPLY)
        SubscribableChannel reply();
    }

    private static final String ENRICH = "enrich";

    @MessagingGateway
    public interface StreamGateway {

        @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
        String process(String payload);

    }

    @Bean
    public IntegrationFlow headerEnricherFlow() {
        return IntegrationFlows.from(ENRICH)
                .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
                .channel(GatewayChannels.REQUEST)
                .get();
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> process(Message<String> request) {
        return MessageBuilder.withPayload(request.getPayload().toUpperCase())
                .copyHeaders(request.getHeaders())
                .build();
    }

    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext =
                SpringApplication.run(CloudStreamGatewayApplication.class, args);

        StreamGateway gateway = applicationContext.getBean(StreamGateway.class);

        String result = gateway.process("foo");

        System.out.println(result);
    }

}

The application properties:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: requests
        output:
          destination: replies
        request:
          destination: requests
        reply:
          destination: replies

So, the idea is like to send a message into the bound destination from the @MessagingGateway and wait for the reply from another bound destination. Those two are simply INPUT and OUTPUT for the processor on the other side.

As for the out-of-the-box solution it seems just enough to include into some ChannelInterceptor for the outbound part the logic to convert TemporaryReplyChannel to the string representation over existing HeaderChannelRegistry. On the consumer side copy appropriate replyChannel and errorChannel from the request message to the reply message when we process @SendTo (@ServiceActivator does that already automatically).

Any other thoughts are welcome!

mpmiszczyk commented 6 years ago

@artembilan Does this example uses some kind of correlationId?

If not would would you suggest when dealing with multiple concurrent requests/responses?

garyrussell commented 6 years ago

The correlation is provided by this

.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)

Each request gets a dedicated replyChannel which is stored in a header; the enricher stores the channel in a HeaderChannelRegistry and changes the header to a String value.

When the reply is received, the gateway resolves the string back to the actual reply channel for this request.

For this reason, the server side must echo back the replyChannel header.

Hence .copyHeaders(request.getHeaders()).

This works out-of-the-box with the Rabbit binder.

When using the Kafka binder, you must add the replyChannel to the binder's headers property (or use the kafka11 binder or 2.0.M4 which uses kafka 1.0.)

kabennett commented 6 years ago

@artembilan @garyrussell I know this has been targeted as a possible future feature, but I have a question about Artem's solution in the meantime. In Artem's comments within the solution to my original post, he stated:

Only the problem here that your producer application must be as single consumer in the group for the AccountChannels.ACCOUNT_CREATED - we have to ensure that only one instance in the cloud is operating at a time. Just because only one instance has that TemporaryReplyChannel in its memory.

Any ideas on how this solution could possibly work in a clustered environment where more than one consumer in the group can exist? I am using NGINX to load balance across multiple servers, and ideally I'd like to have the consumer running on each server in a horizontal scaling configuration. True to Artem's comment, when I run the consumer on each server, my test cases fail. When I only run the consumer on one of the servers (and thusly only one is available within the cloud via Eureka), the test cases work. However, this isn't a resilient enough configuration considering server and/or microservice failures.

dblqt commented 6 years ago

Hi,

I am using version 2.0.0.Release of binder kafka and streams like so:

plugins {
    id 'java'
}

group 'com.projectdrgn'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile "org.springframework.boot:spring-boot-starter-webflux:2.0.1.RELEASE"
    compile "org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:2.0.0.RELEASE"
    compile "org.springframework.cloud:spring-cloud-stream-binder-kafka:2.0.0.RELEASE"

    testCompile group: 'junit', name: 'junit', version: '4.12'
}

I copied Artem's sample + application properties directly into my intellij. And it works with just one exception. Instead of FOO I receive a byte array encoded into a string saying 70,79,79. Anything I am missing?

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;

@EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
@SpringBootApplication
public class CloudStreamGatewayApplication {
    interface GatewayChannels {

        String REQUEST = "request";

        @Output(REQUEST)
        MessageChannel request();

        String REPLY = "reply";

        @Input(REPLY)
        SubscribableChannel reply();
    }

    private static final String ENRICH = "enrich";

    @MessagingGateway
    public interface StreamGateway {

        @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
        String process(String payload);

    }

    @Bean
    public IntegrationFlow headerEnricherFlow() {
        return IntegrationFlows.from(ENRICH)
                .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
                .channel(GatewayChannels.REQUEST)
                .get();
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> process(Message<String> request) {
        return MessageBuilder.withPayload(request.getPayload().toUpperCase())
                .copyHeaders(request.getHeaders())
                .build();
    }

    public static void main(String[] args) throws InterruptedException {
        ConfigurableApplicationContext applicationContext =
                SpringApplication.run(CloudStreamGatewayApplication.class, args);

        StreamGateway gateway = applicationContext.getBean(StreamGateway.class);

        Thread.sleep(5000);

        while (true) {
            String result = gateway.process("foo");

            System.out.println(result);
            Thread.sleep(1000);
        }
    }

}

applications.yml

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: requests
        output:
          destination: replies
        request:
          destination: requests
        reply:
          destination: replies

Thanks, Stefan

artembilan commented 6 years ago

@stebart,

Please, follow this fix: https://github.com/titoc/rest-gateway/pull/1

garyrussell commented 6 years ago

@kabennett Sorry for the delay.

Any ideas on how this solution could possibly work in a clustered environment where more than one consumer in the group can exist?

It's not a problem on the consumer side; one of the instances will handle the request and send the reply. The issue is getting the reply back to the right client-side instance.

The simplest solution for that is to have a dedicated reply topic for each instance and use dynamic reply routing via a BinderAwareChannelResolver as is used in the router app.

vladmelnyk commented 5 years ago

I copied Artem's sample + application properties directly into my intellij. And it works with just one exception. Instead of FOO I receive a byte array encoded into a string saying 70,79,79. Anything I am missing?

Having the same problem myself

artembilan commented 5 years ago

As I said in the mentioned PR: You need to have this in the gateway configuration:

@ServiceActivator(inputChannel = GatewayChannels.TO_UPPERCASE_REPLY)
public MyMessage getReply(MyMessage myMessage) {
    return myMessage;
}

The gateway interface must be declared without replyChannel:

@MessagingGateway
public interface StreamGateway {

    @Gateway(requestChannel = ENRICH)
    String process(String payload);
}

This way Spring Cloud Stream will perform its content negotiation logic and convert an incoming bytes to the expected type with built-in message converters. Since the service activator is configured without an outputChennel, the replyChannel header is going to be selected and the gateway will receive a desired reply.

alwyn commented 5 years ago

@garyrussell

It's not a problem on the consumer side; one of the instances will handle the request and send the reply. The issue is getting the reply back to the right client-side instance.

The simplest solution for that is to have a dedicated reply topic for each instance and use dynamic reply routing via a BinderAwareChannelResolver as is used in the router app.

Anything in the Greenwich release that assists in implementing this?

olegz commented 5 years ago

Greenwich? That is spring-cloud. I think you meant Fishtown. Outside of what's available in SI, no there is nothing in Fishtown

hanishi commented 5 years ago

Hi, I was trying to accomplish something similar, although it is HTTP specific and not a generic request/reply mechanism out of the box.

  1. I used AsyncContext from Servlet 3.1

  2. AsyncContext returned from every request is stored in a ConcurrentMap with a key that is calculated from System.identityHashCode(request) where request is HttpServletRequest which will become a correlationId that needs to be stored in the header part of a Message<?> and it is the responsibility of the Message<?> receiving component that it puts the same id back in the Messaeg<?> that it sends back to the origin.

  3. As it uses the AsyncContext, the thread which handled the request does not block. Thread is reattached when AsyncContextServletMessagingGateway retrieves a returned message from `replyChannel and finds a AsyncContext with the correlationId and calls dispatch() on it.

The original idea came from jetty's Continuations which has been deprecated in replace of native AsyncContext use. Therefore, it is nothing new and has been used in all kinds of projects including CometD

Here is the entire code so far I am using for my PoC. I hope this will give some hint to someone who is thinking of doing request/reply pattern(of HTTP) using Spring Cloud Stream.

https://gist.github.com/hanishi/e2f2e83a168e192f681f3a85379019d6

AsyncContext can also change HttpRequestHandlingMessagingGateway none blocking, which is a good thing. @artembilan You could probably make the AsyncContextServletMessagingGateway approach more portable so the mechanism can be used in both Spring Integration and Spring Cloud Stream.

cjrequena commented 5 years ago

@kabennett Sorry for the delay.

Any ideas on how this solution could possibly work in a clustered environment where more than one consumer in the group can exist?

It's not a problem on the consumer side; one of the instances will handle the request and send the reply. The issue is getting the reply back to the right client-side instance.

The simplest solution for that is to have a dedicated reply topic for each instance and use dynamic reply routing via a BinderAwareChannelResolver as is used in the router app.

Hi do you know if there is any solution for this issue now? Thanks.

burakhelvaci commented 5 years ago

@kabennett Sorry for the delay.

Any ideas on how this solution could possibly work in a clustered environment where more than one consumer in the group can exist?

It's not a problem on the consumer side; one of the instances will handle the request and send the reply. The issue is getting the reply back to the right client-side instance. The simplest solution for that is to have a dedicated reply topic for each instance and use dynamic reply routing via a BinderAwareChannelResolver as is used in the router app.

Hi do you know if there is any solution for this issue now? Thanks.

An alternative to Garry's solution. I'm using filtering mechanism. For example

@MessagingGateway
public interface StreamGateway {

  String ENRICH = "enrich";
  String FILTER = "filter";

  @Gateway(requestChannel = ENRICH, replyChannel = FILTER)
  Mono<byte[]> sendAndReceive(Message<String> message);
  //or byte[] sendSynchronousEvent(Message<String> message);
}
@Configuration
public class BeanContainer {

  public static final UUID instanceUUID = UUID.randomUUID();

  @Bean
  public IntegrationFlow headerEnricherFlow() {
    return IntegrationFlows.from(EventListenerGateway.ENRICH)
        .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
        .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header("instanceId",instanceUUID))
        .channel(Source.OUTPUT)
        .get();
  }

  @Bean
  public IntegrationFlow replyFiltererFlow() {
    return IntegrationFlows.from(Sink.INPUT)
        .filter(Message.class, message -> instanceUUID.equals(message.getHeaders().get("instanceId")))
        .channel(EventListenerGateway.FILTER)
        .get();
  }
}

It's getting warning. Maybe it's not best practice but work for multiple producer instance.

Explanation: When consumer reply-back to producer and there is a multiple producer, each producer will collect to reply-message. This is a problem and I am using this code block to solve this problem. If you set dedicated instance id to message header, you can filter reply-message by instance id.

cjrequena commented 5 years ago

@kabennett Sorry for the delay.

Any ideas on how this solution could possibly work in a clustered environment where more than one consumer in the group can exist?

It's not a problem on the consumer side; one of the instances will handle the request and send the reply. The issue is getting the reply back to the right client-side instance. The simplest solution for that is to have a dedicated reply topic for each instance and use dynamic reply routing via a BinderAwareChannelResolver as is used in the router app.

Hi do you know if there is any solution for this issue now? Thanks.

An alternative to Garry's solution. I'm using filtering mechanism. For example

@MessagingGateway
public interface StreamGateway {

  String ENRICH = "enrich";
  String FILTER = "filter";

  @Gateway(requestChannel = ENRICH, replyChannel = FILTER)
  Mono<byte[]> sendAndReceive(Message<String> message);
  //or byte[] sendSynchronousEvent(Message<String> message);
}
@Configuration
public class BeanContainer {

  public static final UUID instanceUUID = UUID.randomUUID();

  @Bean
  public IntegrationFlow headerEnricherFlow() {
    return IntegrationFlows.from(EventListenerGateway.ENRICH)
        .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
        .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header("instanceId",instanceUUID))
        .channel(Source.OUTPUT)
        .get();
  }

  @Bean
  public IntegrationFlow replyFiltererFlow() {
    return IntegrationFlows.from(Sink.INPUT)
        .filter(Message.class, message -> instanceUUID.equals(message.getHeaders().get("instanceId")))
        .channel(EventListenerGateway.FILTER)
        .get();
  }
}

It's getting warning. Maybe it's not best practice but work for multiple producer instance.

Explanation: When consumer reply-back to producer and there is a multiple producer, each producer will collect to reply-message. This is a problem and I am using this code block to solve this problem. If you set dedicated instance id to message header, you can filter reply-message by instance id.

Hi, @burakhelvaci I have done a sample project using your approach and seems to work well. see https://github.com/cjrequena/spring-integration-gateway-kafka-sample

artembilan commented 5 years ago
IntegrationFlows.from(streamName)
...
.channel(streamName)

You make a trap for yourself. So, essentially you loop messages from the channel to itself. It might work some time in your target solution, and sometime doesn't. Just because such a channel declaration brings for us a DirectChannel instance with a UnicastingDispatcher. If there are several subscribers to such a channel, the messages are going to be handed to them in round-robin manner. I'm not sure what is your logic, but gateway's input must be a different one from the output for target binding, so you won't loop accidentally.

Also you need to keep in mind that filter by instance id works, but this is not an efficient solution since all your instances are going to consume the same message from Kafka topic. However it is going to work only when all your instances have different groups to achieve a true publish-subscriber for reply delivery.

I don't have a perfect answer how it should be, so that might be a reason why we still don't have a reqest-reply as an out-of-the-box solution.

cjrequena commented 5 years ago
IntegrationFlows.from(streamName)
...
.channel(streamName)

You make a trap for yourself. So, essentially you loop messages from the channel to itself. It might work some time in your target solution, and sometime doesn't. Just because such a channel declaration brings for us a DirectChannel instance with a UnicastingDispatcher. If there are several subscribers to such a channel, the messages are going to be handed to them in round-robin manner. I'm not sure what is your logic, but gateway's input must be a different one from the output for target binding, so you won't loop accidentally.

Also you need to keep in mind that filter by instance id works, but this is not an efficient solution since all your instances are going to consume the same message from Kafka topic. However it is going to work only when all your instances have different groups to achieve a true publish-subscriber for reply delivery.

I don't have a perfect answer how it should be, so that might be a reason why we still don't have a reqest-reply as an out-of-the-box solution.

Thanks for your comments, I see your point about that is not the best solution in terms of efficiency.

sabareeshkkanan commented 5 years ago

This will be an exciting feature, do we know what is planned

5aab commented 3 years ago

Thanks @cjrequena and @artembilan I implemented something similar for RabbitMq with mix of s-c-stream and integration. It has got two producers and a consumer. It is working fine. Here is the sample: https://github.com/5aab/request-reply-pattern

Hassen-BENNOUR commented 2 years ago

here is a fully working example to do request-reply with Spring Cloud Stream & Spring Integration. A REST service sends the received data to an EDA service (upperCase of the received String) by doind request-reply through the "example.request-reply" exchange .


import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.function.Function;

@RequestMapping(path = "/api", produces = MediaType.TEXT_PLAIN_VALUE)
@RestController
@Slf4j
public class ClientController {

    @Autowired
    private Function<String, String> convertSendAndReceive;

    @Operation(summary = "do request-reply", description = "send message to ", tags = {"SCS"})
    @ApiResponses(value = {
            @ApiResponse(responseCode = "200", description = "successful operation")
    })
    @PostMapping(value = "/sendToRabbit", consumes = MediaType.TEXT_PLAIN_VALUE)
    ResponseEntity<String> sendToRabbit(@RequestBody String message) {
        try {
            String response = send.apply(message);
            log.info("response : " + response);
            return new ResponseEntity(response, HttpStatus.OK);
        } catch (Exception exception) {
            log.error(exception.getMessage());
            return new ResponseEntity(exception.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
        }
    }
}

//

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;

import java.util.function.Consumer;

@Configuration
public class ResponseConfiguration {

    @Bean
    public Consumer<Message<String>> respond(MessageHandler amqpOutboundEndpoint) {
        return amqpOutboundEndpoint::handleMessage;
    }

    @Bean
    public MessageHandler amqpOutboundEndpoint(AmqpTemplate amqpTemplate) {
        AmqpOutboundEndpoint amqpOutboundEndpoint = new AmqpOutboundEndpoint(amqpTemplate);
        amqpOutboundEndpoint.setRoutingKeyExpressionString("headers['" + AmqpHeaders.REPLY_TO + "']");
        return amqpOutboundEndpoint;
    }
}

//

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.function.Function;

@Configuration
public class ServerConfiguration {

    @Bean
    public Function<String, String> uppercase() {
        return String::toUpperCase;
    }
}

//

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.function.Function;

@Configuration
public class ClientConfiguration {

    private static final String EXCHANGE_NAME = "EXCHANGE_NAME";

    private String DEFAULT_EXCHANGE_NAME = "example.request-reply";

    @Bean
    @DependsOn("amqpOutbound")
    public <T, R> Function<T, R> convertSendAndReceive(Function<Message<T>, R> sendToRabbitFunction) {
        return message -> {
            Message<T> msg = MessageBuilder.withPayload(message)
                    .setHeader(EXCHANGE_NAME, DEFAULT_EXCHANGE_NAME)
                    .build();
            return sendToRabbitFunction.apply(msg);
        };
    }

    @Bean
    public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
        return IntegrationFlows.from(Function.class, gateway -> gateway.beanName("sendToRabbitFunction"))
                .handle(Amqp.outboundGateway(amqpTemplate)
                        .exchangeNameExpression("headers['" + EXCHANGE_NAME + "']"))
                .get();
    }
}

//yml config

spring:
  cloud:
    stream:
      function:
        definition: uppercase|respond
      rabbit:
        bindings:
          uppercaserespond-in-0:
            consumer:
              queueNameGroupOnly: true
              exchangeType: topic
              autoBindDlq: true
      bindings:
        uppercaserespond-in-0:
          group: uppercase
          destination: request-reply

pom.xml


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <artifactId>spring-cloud-stream-request-reply</artifactId>
    <groupId>demo</groupId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <spring-cloud.version>2020.0.1</spring-cloud.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!-- OpenAPI-->
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-ui</artifactId>
            <version>1.4.8</version>
        </dependency>
        <!--  workaround openapi-core-->
        <dependency>
            <groupId>io.github.classgraph</groupId>
            <artifactId>classgraph</artifactId>
            <version>4.8.44</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>${project.artifactId}</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

`

pdalfarr commented 3 months ago

Hi. Any update about this feature? Is @Hassen-BENNOUR 's example heading towards the good direction to implement this?

Thanks

artembilan commented 3 months ago

Thank you for the interest! We doubt it would make sense to have this in the framework as first class citizen. Please, see that pattern implementation here: https://github.com/artembilan/microservices-patterns-spring-integration/tree/main/mrpc

pdalfarr commented 3 months ago

OK, thanks. So this issue could be closed, right?

olegz commented 3 months ago

Yes, we can resolve it as won't fix. Let me elaborate a bit. . . Streaming is generally a one-way concept. . at least spring-cloud-stream was designed with that in mind. And all the hooks and features in it are tailored to provide fast and reliable connectivity to variety of messaging system to stream data from it and to it. For traditional request/reply we already have frameworks such as spring-integration and spring-messaging, so trying to introduce it into spring-cloud-stream would not make much sense and would certainly go against core principals behind the framework design.