apache / eventmesh

EventMesh is a new generation serverless event middleware for building distributed event-driven applications.
https://eventmesh.apache.org/
Apache License 2.0
1.62k stars 642 forks source link

Sending cloudevents format messages using grpc is slow (I don't know if all producers are slow) #5030

Open LeeMoonCh opened 4 months ago

LeeMoonCh commented 4 months ago

Search before asking

Question

Sending cloudevents format messages with grpc is slow (I'm not sure if the producers are all slow), and sending 101 pieces of data takes over 60 seconds.

code like this


            String gPort = ConfigUtil.getProperty("em.grpc.port");
            String host = ConfigUtil.getProperty("em.host");
            String pwd = ConfigUtil.getProperty("em.pwd");
            String user = ConfigUtil.getProperty("em.user");

            EventMeshGrpcClientConfig config = EventMeshGrpcClientConfig.builder()
                .serverAddr("172.16.15.136")
                .serverPort(51112)
                .consumerGroup(PRODUCER_GROUP)
                .password(pwd)
                .userName(user)
                .env(ENV)
                .idc(IDC)
                .sys(SYSID)
                .build();

            EventMeshGrpcProducer producer = new EventMeshGrpcProducer(config);

....

                CloudEvent event = CloudEventBuilder.v1()
                    .withId(UUID.randomUUID().toString())
                    .withSubject("tc_event_device") //topic
                    .withSource(URI.create("/"))
                    .withDataContentType(CLOUDEVENT_CONTENT_TYPE)
                    .withType(CLOUD_EVENTS_PROTOCOL_NAME)
                    .withData(eventJson.toJSONString().getBytes(StandardCharsets.UTF_8))
                    .withExtension("ttl", String.valueOf(4 * 1000))
                    .build();

                list.add(event);
              // send data
                if(list.size() > 100){
                    sendMsg2EventMesh(producer);
                }

......

private void sendMsg2EventMesh(EventMeshGrpcProducer producer){
            System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
            logger.info("发送事件个数:" + list.size());
            long start = System.currentTimeMillis();
            System.out.println("处理完数据了:" + start + " 用时:" + (start - begin));
            CloudEvent cloudEvent = list.get(list.size() - 1);
            String cs = JSON.parseObject(new String(cloudEvent.getData().toBytes())).getString("customCode");
            producer.publish(list);
            System.out.println("发送成功!" + cs + " 用时:" + (System.currentTimeMillis() - start) + " 发送条数:" + list.size());
            list.clear();
            begin = System.currentTimeMillis();
    }

The results as shown: image

The server is configured as follows:

eventmesh.properties

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
###############################EVNETMESH-runtime ENV#################################
eventMesh.server.idc=idc
eventMesh.server.env=env
eventMesh.server.provide.protocols=HTTP,TCP,GRPC
eventMesh.server.cluster=COMMON
eventMesh.server.name=EVENTMESH-runtime
eventMesh.sysid=1234
eventMesh.server.http.port=51113
eventMesh.server.grpc.port=51112
########################## eventMesh tcp configuration ############################
eventMesh.server.tcp.enabled=true
eventMesh.server.tcp.port=51111
eventMesh.server.tcp.readerIdleSeconds=120
eventMesh.server.tcp.writerIdleSeconds=120
eventMesh.server.tcp.allIdleSeconds=120
eventMesh.server.tcp.clientMaxNum=10000
# client isolation time if the message send failure
eventMesh.server.tcp.pushFailIsolateTimeInMills=30000
# rebalance internal
eventMesh.server.tcp.RebalanceIntervalInMills=30000
# session expire time about client
eventMesh.server.session.expiredInMills=60000
# flow control, include the global level and session level
eventMesh.server.tcp.msgReqnumPerSecond=15000
eventMesh.server.http.msgReqnumPerSecond=15000
eventMesh.server.session.upstreamBufferSize=20

# for single event publish, maximum size allowed per event
eventMesh.server.maxEventSize=1048576
# for batch event publish, maximum number of events allowed in one batch
eventMesh.server.maxEventBatchSize=1000

# thread number about global scheduler
eventMesh.server.global.scheduler=5
eventMesh.server.tcp.taskHandleExecutorPoolSize=8
#retry
eventMesh.server.retry.async.pushRetryTimes=3
eventMesh.server.retry.sync.pushRetryTimes=3
eventMesh.server.retry.async.pushRetryDelayInMills=500
eventMesh.server.retry.sync.pushRetryDelayInMills=500
eventMesh.server.retry.pushRetryQueueSize=10000
#admin
eventMesh.server.admin.http.port=10106
#registry
eventMesh.server.registry.registerIntervalInMills=10000
eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
#auto-ack
#eventMesh.server.defibus.client.comsumeTimeoutInMin=5

#sleep interval between closing client of different group in server graceful shutdown
eventMesh.server.gracefulShutdown.sleepIntervalInMills=1000
eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200

#ip address blacklist
eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8

#connector plugin
eventMesh.connector.plugin.type=rocketmq

#storage plugin
eventMesh.storage.plugin.type=rocketmq

#security plugin
#eventMesh.server.security.enabled=false
#eventMesh.security.plugin.type=security
#eventMesh.security.validation.type.token=false
#eventMesh.security.publickey=

#registry plugin
eventMesh.registry.plugin.enabled=false
eventMesh.registry.plugin.type=nacos
eventMesh.registry.plugin.server-addr=127.0.0.1:8848
eventMesh.registry.plugin.username=nacos
eventMesh.registry.plugin.password=nacos

# The TLS configuration of registry plugin consul
# keyStoreInstanceType's value can refer to com.ecwid.consul.transport.TLSConfig.KeyStoreInstanceType
#eventMesh.registry.consul.tls.keyStoreInstanceType=
#eventMesh.registry.consul.tls.certificatePath=
#eventMesh.registry.consul.tls.certificatePassword=
#eventMesh.registry.consul.tls.keyStorePath=
#eventMesh.registry.consul.tls.keyStorePassword=

# metrics plugin, if you have multiple plugin, you can use ',' to split
#eventMesh.metrics.plugin=prometheus

# trace plugin
eventMesh.server.trace.enabled=false
eventMesh.trace.plugin=zipkin

# webhook
# Start webhook admin service
eventMesh.webHook.admin.start=true
# Webhook event configuration storage mode. Currently, only file and Nacos are supported
eventMesh.webHook.operationMode=file
# The file storage path of the file storage mode. If #{eventmeshhome} is written, it is in the eventmesh root directory
eventMesh.webHook.fileMode.filePath= #{eventMeshHome}/webhook
# Nacos storage mode, and the configuration naming rule is eventmesh webHook. nacosMode. {nacos native configuration key} please see the specific configuration [nacos github api](https://github.com/alibaba/nacos/blob/develop/api/src/main/java/com/alibaba/nacos/api/SystemPropertyKeyConst.java)
## Address of Nacos
eventMesh.webHook.nacosMode.serverAddr=0.0.0.0:8848
# Webhook eventcloud sending mode. And eventmesh connector. plugin. The type configuration is the same
eventMesh.webHook.producer.storage=standalone

eventMesh.server.flushDiskType=ASYNC_FLUSH

server.env

APP_START_JVM_OPTION:::-server -Xms1g -Xmx16g -Xmn4g  -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:ConcGCThreads=8 -XX:ParallelGCThreads=8 -XX:MaxDirectMemorySize=16G -Dio.netty.eventLoopThreads=32 -Dio.netty.maxDirectMemory=16G -XX:SurvivorRatio=4 -Duser.language=zh

Where I do wrong? I don't think producers should so slowly!

github-actions[bot] commented 4 months ago

Welcome to the Apache EventMesh community!! We are glad that you are contributing by opening this issue. :D

Please make sure to include all the relevant context. We will be here shortly.

If you are interested in contributing to our project, please let us know! You can check out our contributing guide on contributing to EventMesh.

Want to get closer to the community?

WeChat Assistant WeChat Public Account Slack
Join Slack Chat
Mailing Lists: Name Description Subscribe Unsubscribe Archive
Users User support and questions mailing list Subscribe Unsubscribe Mail Archives
Development Development related discussions Subscribe Unsubscribe Mail Archives
Commits All commits to repositories Subscribe Unsubscribe Mail Archives
Issues Issues or PRs comments and reviews Subscribe Unsubscribe Mail Archives
LeeMoonCh commented 4 months ago
2024-07-12 22:31:33,260 WARN  [eventMesh-tcpNettyEpoll-Boss-1] ServerBootstrap(AbstractBootstrap.java:464) - Unknown channel option 'SO_TIMEOUT' for channel '[id: 0xb8d69f6f, L:/172.16.15.136:51111 - R:/10.2.1.116:51565]'
2024-07-12 22:31:33,263 INFO  [eventMesh-tcp-worker-6] EventMeshTcpConnectionHandler(EventMeshTcpConnectionHandler.java:48) - client|tcp|channelRegistered|remoteAddress=10.2.1.116:51565|msg=
2024-07-12 22:31:33,263 INFO  [eventMesh-tcp-worker-6] EventMeshTcpConnectionHandler(EventMeshTcpConnectionHandler.java:62) - client|tcp|channelActive|remoteAddress=10.2.1.116:51565|msg=
2024-07-12 22:31:33,692 INFO  [eventMesh-tcp-worker-6] message(EventMeshTcpMessageDispatcher.java:100) - pkg|c2eventMesh|cmd=HELLO_REQUEST|pkg=org.apache.eventmesh.common.protocol.tcp.Package@63c8d488
2024-07-12 22:31:33,693 ERROR [eventMesh-tcp-task-handle-6] message(HelloTask.java:95) - HelloTask failed|address=/10.2.1.116:51565,errMsg=java.lang.Exception: client purpose config is error
2024-07-12 22:31:33,695 INFO  [eventMesh-tcp-worker-6] message(Utils.java:128) - pkg|eventMesh2c|cmd=HELLO_RESPONSE|pkg=org.apache.eventmesh.common.protocol.tcp.Package@5e6d5da9|user=UserAgent{env='env', subsystem='5556', group='EventmeshTestGroup', path='C:/em', pid=32893, host='10.2.1.116', port=8362, version='2.0.11', idc='1234', purpose='null', unack='0'}|wait=0ms|cost=2ms

In addition, when I want to try to send a message using tpc, there is an error prompt

My code :

  UserAgent userAgent = UserAgent.builder()
                .env(ENV)
                .host("10.2.1.116")
                .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
                .username("admin")
                .group(UtilsConstants.GROUP)
                .path("C:/em")
                .port(8362)
                .subsystem("5556")
                .pid(32_893)
                .version(UtilsConstants.VERSION)
                .idc(SYSID)
                .build();
            EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
                .host("172.16.15.136")
                .port(51111)
                .userAgent(userAgent)
                .build();
            final EventMeshTCPClient<CloudEvent> producer =
                EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
            producer.init();

So, now I am confused. Neither official documents nor cases provide detailed descriptions of UserAgents. It's weird.

xwm1992 commented 4 months ago

client purpose config is error

this error means your UserAgents missing the purpose property, you can take a look at EventMeshTestUtils.generateClient1() method, this may help you.

github-actions[bot] commented 1 month ago

It has been 90 days since the last activity on this issue. Apache EventMesh values the voices of the community. Please don't hesitate to share your latest insights on this matter at any time, as the community is more than willing to engage in discussions regarding the development and optimization directions of this feature.

If you feel that your issue has been resolved, please feel free to close it. Should you have any additional information to share, you are welcome to reopen this issue.