spring-cloud / spring-cloud-function

Apache License 2.0
1.03k stars 606 forks source link

SCF multithreading issue #1136

Open GioTem opened 2 months ago

GioTem commented 2 months ago

Hi everyone, I've developed a server application that receives data in parallel from a client. I've noticed that when I send two packets simultaneously, they are processed in parallel up to my function composition. However, I deliberately added a sleep(To simulate the blocking operations present in the real case, such as file writing) at the end of my function composition to verify that the data were actually being processed in parallel. Unfortunately, the logs show that all packets are being processed every 3 seconds, as if they were being executed sequentially. Could someone help me understand what's happening?

My code `

@Bean
public TcpConnectionFactoryFactoryBean tcpSourceConnectionFactory() {
    TcpConnectionFactoryFactoryBean factoryBean = new TcpConnectionFactoryFactoryBean();
    factoryBean.setType("server");
    factoryBean.setPort(12345);
    factoryBean.setUsingNio(false);
    factoryBean.setDeserializer(new MyDeserializer());
    return factoryBean;
}

@Bean
public TcpInboundGateway gateway(@Qualifier("tcpSourceConnectionFactory") AbstractConnectionFactory connectionFactory) {
    TcpInboundGateway gateway = new TcpInboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setAutoStartup(false);
    return gateway;
}

@Bean
public Publisher<Message<String>> tcpSupplierFlow(TcpInboundGateway gateway) {
    return IntegrationFlow.from(gateway)
            .headerFilter(IpHeaders.LOCAL_ADDRESS)
            .toReactivePublisher();
}

@Bean
public Supplier<Flux<Message<String>>> tcpSupplier(
        Publisher<Message<String>> tcpSupplierFlow,
        TcpInboundGateway gateway) {
    return () -> Flux.from(tcpSupplierFlow)
            .doOnSubscribe(subscription -> gateway.start());
}

@Bean
public ApplicationListener<TcpConnectionCloseEvent> onClose() {
    return event -> logger.warn("Closing connection...");
}

@Bean
public Function<Message<String>, Message<String>> messageHandler() {
    return m -> {
        String message = m.getPayload();
        if (!message.isEmpty()) {
            try {
                logger.error("RECEIVED MESSAGE: %s | receivedTimestamp %s".formatted(m.getPayload(), new Date().getTime()));
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        return MessageBuilder.withPayload(m.getPayload()).build();
    };
}

`

Output Logs 2024-04-15T15:44:38.755+02:00 DEBUG 26152 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Accepted connection from 127.0.0.1:64720 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] o.s.i.i.tcp.connection.TcpNetConnection : New connection 127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : tcpSourceConnectionFactory: Added new connection: 127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.i.i.tcp.connection.TcpNetConnection : 127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea Reading... 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Accepted connection from 127.0.0.1:64719 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] o.s.i.i.tcp.connection.TcpNetConnection : New connection 127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : tcpSourceConnectionFactory: Added new connection: 127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a 2024-04-15T15:44:38.759+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.i.tcp.connection.TcpNetConnection : 127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a Reading... 2024-04-15T15:44:38.763+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.i.tcp.connection.TcpNetConnection : Message received GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755n, headers={ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=8d07d3e2-5008-a609-1987-f1057e262712, ip_hostname=127.0.0.1, timestamp=1713188678763}] 2024-04-15T15:44:38.763+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.i.i.tcp.connection.TcpNetConnection : Message received GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755n, headers={ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=0b49f68f-5f2b-eea4-45a1-5cdedecd39fa, ip_hostname=127.0.0.1, timestamp=1713188678763}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'tcpSupplierFlow.channel#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=5083e134-7990-7ef2-0379-364bde276e6b, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'tcpSupplierFlow.channel#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=ebbe5efc-0a4b-6b18-ca11-d86ba28acdc0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.t.MessageTransformingHandler : bean 'tcpSupplierFlow.header-filter#1' for component 'tcpSupplierFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow' received message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=ebbe5efc-0a4b-6b18-ca11-d86ba28acdc0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.i.t.MessageTransformingHandler : bean 'tcpSupplierFlow.header-filter#1' for component 'tcpSupplierFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow' received message: GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=5083e134-7990-7ef2-0379-364bde276e6b, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.channel.FluxMessageChannel : preSend on channel 'bean 'tcpSupplierFlow.channel#1'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_address=127.0.0.1, id=47ea1331-8e47-4618-d208-e84d0b70f2b0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.765+02:00 DEBUG 26152 --- [pool-5-thread-2] o.s.i.channel.FluxMessageChannel : preSend on channel 'bean 'tcpSupplierFlow.channel#1'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_address=127.0.0.1, id=3693d126-16c9-059d-e820-7af715e1337a, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:38.767+02:00 DEBUG 26152 --- [pool-5-thread-3] c.f.c.c.BeanFactoryAwareFunctionRegistry : Converted Message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_address=127.0.0.1, id=47ea1331-8e47-4618-d208-e84d0b70f2b0, ip_hostname=127.0.0.1, timestamp=1713188678765}] to: class org.springframework.messaging.support.GenericMessage 2024-04-15T15:44:38.767+02:00 DEBUG 26152 --- [pool-5-thread-3] c.f.c.c.BeanFactoryAwareFunctionRegistry : Invoking function: messageHandler<org.springframework.messaging.Message<java.lang.String>, org.springframework.messaging.Message<java.lang.String>>with input type: org.springframework.messaging.Message<java.lang.String> 2024-04-15T15:44:38.767+02:00 ERROR 26152 --- [pool-5-thread-3] com.example.demo.Conf$$SpringCGLIB$$0 : RECEIVED MESSAGE: Hello server from threadId 23 | timestamp 1713188678755n | receivedTimestamp 1713188678767 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] o.s.i.channel.FluxMessageChannel : preSend on channel 'bean 'tcpSupplier|messageHandler_integrationflow.channel#0'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755n, headers={id=7c3be25e-23cc-7054-c017-988dd42029a0, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] o.s.i.router.MethodInvokingRouter : bean 'tcpSupplier|messageHandler_integrationflow.router#0' for component 'tcpSupplier|messageHandler_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' received message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755n, headers={id=7c3be25e-23cc-7054-c017-988dd42029a0, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel : preSend on channel 'bean 'output'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755n, headers={id=7c3be25e-23cc-7054-c017-988dd42029a0, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] tractMessageChannelBinder$SendingHandler : org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@231e4dda received message: GenericMessage [payload=byte[57], headers={contentType=application/json, id=c38b7db9-1727-a2b3-d95c-9a5fcedafaec, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] o.s.i.a.outbound.AmqpOutboundEndpoint : org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@4331b295 received message: GenericMessage [payload=byte[57], headers={contentType=application/json, id=ae45cb45-95bd-67eb-3de3-7a858b37b37f, timestamp=1713188681769}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.i.channel.FluxMessageChannel : postSend (sent=true) on channel 'bean 'tcpSupplierFlow.channel#1'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_address=127.0.0.1, id=47ea1331-8e47-4618-d208-e84d0b70f2b0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [pool-5-thread-3] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'tcpSupplierFlow.channel#0'; defined in: 'class path resource [com/example/demo/Conf.class]'; from source: 'bean method tcpSupplierFlow'', message: GenericMessage [payload=Hello server from threadId 23 | timestamp 1713188678755n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10b01e60, ip_tcp_remotePort=64719, ip_connectionId=127.0.0.1:64719:12345:2a1da428-fbd9-4b5d-b83a-5b5a4b26bf1a, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=ebbe5efc-0a4b-6b18-ca11-d86ba28acdc0, ip_hostname=127.0.0.1, timestamp=1713188678765}] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[contentType] WILL be mapped, matched pattern=* 2024-04-15T15:44:41.769+02:00 INFO 26152 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [rabbitmq:5672] 2024-04-15T15:44:41.769+02:00 DEBUG 26152 --- [oundedElastic-1] c.r.client.impl.ConsumerWorkService : Creating executor service with 16 thread(s) for consumer work service 2024-04-15T15:44:41.785+02:00 DEBUG 26152 --- [pool-5-thread-2] c.f.c.c.BeanFactoryAwareFunctionRegistry : Converted Message: GenericMessage [payload=Hello server from threadId 24 | timestamp 1713188678755n, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@22a5034d, ip_tcp_remotePort=64720, ip_connectionId=127.0.0.1:64720:12345:93d694a1-a774-4cb7-911a-e18e40047bea, ip_address=127.0.0.1, id=3693d126-16c9-059d-e820-7af715e1337a, ip_hostname=127.0.0.1, timestamp=1713188678765}] to: class org.springframework.messaging.support.GenericMessage 2024-04-15T15:44:41.785+02:00 DEBUG 26152 --- [pool-5-thread-2] c.f.c.c.BeanFactoryAwareFunctionRegistry : Invoking function: messageHandler<org.springframework.messaging.Message<java.lang.String>, org.springframework.messaging.Message<java.lang.String>>with input type: org.springframework.messaging.Message<java.lang.String> 2024-04-15T15:44:41.785+02:00 ERROR 26152 --- [pool-5-thread-2] com.example.demo.Conf$$SpringCGLIB$$0 : RECEIVED MESSAGE: Hello server from threadId 24 | timestamp 1713188678755n | receivedTimestamp 1713188681785

In my test, I opened two socket connections on the client and sent a packet for each connection. As seen from the logs, the server processes the packet with threadId 24 with a delay of 3 seconds (from 2024-04-15T15:44:38.767+02:00 to 2024-04-15T15:44:41.785+02:00).

olegz commented 2 months ago

Composed function is a single function. It is not a chain of things to be executed. It's as if you wrote all your code in a single function instead of multiple functions and then compose. So look at it as such

GioTem commented 2 months ago

Thank you for your response. I understand that function composition is seen as a single function. However, I can't understand why requests are serialized. After all, the tcpSupplierFlow is outside the function composition and therefore should handle requests in parallel. Can you give me some explanation about it?

olegz commented 2 months ago

Perhaps I am missing something. It would be simpler if you create a sample app that reproduces the issue and push it to github somewhere so I can run it and better understand what you are trying to accomplish

olegz commented 3 weeks ago

Any followup?

GioTem commented 1 week ago

Sorry for the delayed response, but I wanted to ensure everything was working correctly. I chose another solution to prevent the issue at its root.