allegro / hermes

Fast and reliable message broker built on top of Kafka.
http://hermes.allegro.tech
Other
810 stars 210 forks source link

Help required to pass external configuration of zookeeper / kafka zk to Hermes management #410

Closed hikrrish closed 8 years ago

hikrrish commented 8 years ago

I am trying to run hermes management standalone on an ec2 instance using the sh script. However, i am not able to set up the management instance to talk zookeeper running on different host. Configuration : export HERMES_MANAGEMENT_OPTS="-Dstorage.connectionString=zk-metadata-instance -Dkafka.clusters.connectionString=kafka-zk-instance"

when I run management after above configuration it first tries to connect to zk-metadata-instance and then tries to connect to localhost:2181. I think it is not able to pickup the kafka-zk-instace. Please correct me the if above is not correct.

gamefundas commented 8 years ago

The format should be as follows, in case of Hermes Management Spring config. No "-D" like the other modules which are Java options.

export HERMES_MANAGEMENT_OPTS="--=

I am saying this, only based on what the document says. Please try it out, it might work.

hikrrish commented 8 years ago

Thanks gamefundas .. please help with this one :

Now I am trying with below export HERMES_MANAGEMENT_OPTS="-Dstorage.connectionString=zk-hermes.i5tst.cltest.krishna.com:80 -Dspring.config.location=app.yml"

app.yml : kafka: defaultNamespace: clusters:

  clusterName:primary
  connectionString:zookeeper.iaca5tst.cltest.krishna.com:80

Getting below exception : Field error in object 'kafka' on field 'clusters': rejected value [--- clusterName:primary connectionString:zookeeper.iaca5tst.cltest.krishna.com:80]; codes [typeMismatch.kafka.clusters,typeMismatch.clusters,typeMismatch.java.util.List,typeMismatch]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [kafka.clusters,clusters]; arguments []; default message [clusters]]; default message [Failed to convert property value of type 'java.lang.String' to required type 'java.util.List' for property 'clusters'; nested exception is java.lang.IllegalStateException: Cannot convert value of type [java.lang.String] to required type [pl.allegro.tech.hermes.management.config.kafka.KafkaProperties] for property 'clusters[0]': no matching editors or conversion strategy found]; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'topicService' defined in URL [jar:file:/usr/local/hermes-management/lib/hermes-management-0.8.5-hotfix2.jar!/pl/allegro/tech/hermes/management/domain/topic/TopicService.class]: Unsatisfied dependency expressed through constructor argument with index 0 of type [pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService]: : Error creating bean with name 'kafkaConfiguration': Injection of autowired dependencies failed; nested exception is org.springframework.beans.factory.BeanCreationException: Could not autowire field: pl.allegro.tech.hermes.management.config.kafka.KafkaClustersProperties pl.allegro.tech.hermes.management.config.kafka.KafkaConfiguration.kafkaClustersProperties; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafka.CONFIGURATION_PROPERTIES': Could not bind properties to [unknown](target=kafka, ignoreInvalidFields=false, ignoreUnknownFields=true, ignoreNestedProperties=false); nested exception is org.springframework.validation.BindException: org.springframework.validation.BeanPropertyBindingResult: 1 errors

gamefundas commented 8 years ago

Did you use the following? I still see -D in your comment. Also note that you can load/override defaults using the poverty file directly in your spring boot "spring.config.location" instead of SPRING_CONFIG_LOCATION.

export HERMES_MANAGEMENT_OPTS="--=

hikrrish commented 8 years ago

If you see logs above it actually honors jvm args with -D so first issue is resolved

issue is really now in converting app.yml format to kafkaProperties format Cannot convert value of type [java.lang.String] to required type [pl.allegro.tech.hermes.management.config.kafka.KafkaProperties] for property 'clusters[0]': no matching editors or conversion strategy found];

adamdubiel commented 8 years ago

That actually a good question, how to write properties like this in Java properties format. I'm digging.

adamdubiel commented 8 years ago

I think you could use: https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-external-config.html -> spring.application.json as a workaround (and probably quite elegant way to solve the problem)

hikrrish commented 8 years ago

Thanks adamdubiel I tried with below format of yaml and it worked and passed this yaml file as jvm argument

kafka: clusters:

  clusterName: primary
  connectionString: myzkhost:80

Ques: How can manage logging ? specifically path for log file creation and change log levels

adamdubiel commented 8 years ago

We use logback for logging, so configure it using their format. You can point to logback.xml (or groovy or sth) using:

-Dlogback.configurationFile=/etc/hermes/logback.xml

Our configuration (with async appenders) for frontend looks like:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>/var/log/hermes/hermes-frontend.log</file>
        <encoder>
            <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</Pattern>
        </encoder>
        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
            <fileNamePattern>/var/log/hermes/hermes-frontend-%i.log</fileNamePattern>
            <minIndex>1</minIndex>
            <maxIndex>4</maxIndex>
        </rollingPolicy>

        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
            <maxFileSize>100MB</maxFileSize>
        </triggeringPolicy>
    </appender>

    <appender name="ASYNC_FILE" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="FILE" />
    </appender>

    <logger name="org.apache.zookeeper" level="ERROR" />

    <!--Selector spams ERROR level messages every 100ms on IOException-->
    <logger name="org.apache.kafka.common.network.Selector" level="OFF"/>

    <logger name="kafka" level="WARN"/>

    <root level="INFO">
        <appender-ref ref="ASYNC_FILE" />
    </root>
</configuration>

This information should be included in our deployment docs - i will take care of it tomorrow.

hikrrish commented 8 years ago

I have doubt on consumer as when I create a subscription the logs doesn't move however if I restart it will get subscription messages

consumers : http://pastebin.com/xMSL8ZLw

frontend http://pastebin.com/tRx1qsyV

adamdubiel commented 8 years ago

Okay, so i see connectivity issues, you can see that Hermes was not able to get topic metadata from Kafka (which happens on topic init):

Response already sent. Error message Broker seems to be down, cause: Failed to update metadata after 500 ms.,

Did you test connectivity between Hermes and Kafka hosts in AWS? Telnet or anything...

hikrrish commented 8 years ago

Telnet seems to be working fine, tested on hermes instance with all kafka brokers. I have tested before my kafka cluster is in sync and I am able to use kafka producer / consumer over a topic created by hermes management

[ec2-user@ip-10-81-87-13 hermes]$ telnet 10.81.86.117 9092 Trying 10.81.86.117... Connected to 10.81.86.117. Escape character is '^]'. ^CConnection closed by foreign host. [ec2-user@ip-10-81-87-13 hermes]$ telnet 10.81.85.208 9092 Trying 10.81.85.208... Connected to 10.81.85.208. Escape character is '^]'. ^CConnection closed by foreign host. [ec2-user@ip-10-81-87-13 hermes]$ telnet 10.81.89.72 9092 Trying 10.81.89.72... Connected to 10.81.89.72. Escape character is '^]'.

hikrrish commented 8 years ago

please let me know If we need more detailed logging around this issue. will set up logback at debug level and send you logs in pastebin

adamdubiel commented 8 years ago

Okay, so some things you could consider:

hikrrish commented 8 years ago

Appreciate your all help I think something is still wrong. I can see 202 when I publish a message after your above suggestions. Also If I query message subscription status It shows “PENDING”

-Tested topic creation using list command -Tested kafka console producer/consumer on same topic

last thing to try out is running hermes on kafka server

Hermes management logs after group / topic creation ): 8090 (http) 2016-03-22 18:29:00.869 INFO 30394 --- [ main] p.a.t.h.management.HermesManagement : Started HermesManagement in 4.614 seconds (JVM running for 4.9) 2016-03-22 18:29:44.776 INFO 30394 --- [nio-8090-exec-1] p.a.t.h.i.z.ZookeeperGroupRepository : Creating group hermes for path /root/groups/hermes 2016-03-22 18:30:03.052 INFO 30394 --- [nio-8090-exec-2] p.a.t.h.i.z.ZookeeperTopicRepository : Creating topic for path /root/groups/hermes/topics/cash 2016-03-22 18:30:03.381 INFO 30394 --- [nio-8090-exec-2] kafka.admin.AdminUtils$ : Topic creation {"version":1,"partitions":{"8":[163],"4":[130],"9":[101],"5":[163],"6":[101],"1":[130],"0":[101],"2":[163],"7":[130],"3":[101]}} 2016-03-22 18:32:31.742 INFO 30394 --- [nio-8090-exec-7] .t.h.i.z.ZookeeperSubscriptionRepository : Creating subscription for path /root/groups/hermes/topics/cash/subscriptions/mySubscription I am able to send / receive using the topic created by Hermes Management (kafka producer and consumer application) : hermes.cash

Hermes Frontend logs

2016-03-22 18:28:43.706 INFO org.xnio - XNIO version 3.3.1.Final 2016-03-22 18:28:43.714 INFO org.xnio.nio - XNIO NIO Implementation Version 3.3.1.Final 2016-03-22 18:29:44.822 INFO p.a.t.h.c.cache.zookeeper.NodeCache - Got entry change event for path /root/groups/hermes 2016-03-22 18:30:03.187 INFO p.a.t.h.f.c.t.z.TopicsNodeCache - Got topic change event for path /root/groups/hermes/topics/cash type CHILD_ADDED 2016-03-22 18:30:03.190 INFO p.a.t.h.d.t.schema.SchemaRepository - Loading schema source for topic hermes.cash 2016-03-22 18:33:31.740 ERROR p.a.t.h.f.publishing.HttpResponder - Async timeout, cause: unknown, publishing on topic hermes.cash, remote host 10.81.92.136, message state SENDING_TO_KAFKA_PRODUCER_QUEUE

Consumer log :

2016-03-22 18:28:48.973 INFO c.n.c.sources.URLConfigurationSource - URLs to be used as dynamic configuration source: [file:/opt/hermes/conf/hermes-frontend.properties] 2016-03-22 18:28:49.001 INFO c.n.config.DynamicPropertyFactory - DynamicPropertyFactory is initialized with configuration sources: com.netflix.config.ConcurrentCompositeConfiguration@1b1473ab 2016-03-22 18:28:49.124 INFO o.a.c.f.imps.CuratorFrameworkImpl - Starting 2016-03-22 18:28:49.160 INFO o.a.c.f.state.ConnectionStateManager - State change: CONNECTED 2016-03-22 18:28:49.413 INFO org.eclipse.jetty.util.log - Logging initialized @891ms 2016-03-22 18:28:49.784 INFO o.a.c.f.imps.CuratorFrameworkImpl - Starting 2016-03-22 18:28:49.792 INFO o.a.c.f.state.ConnectionStateManager - State change: CONNECTED 2016-03-22 18:28:49.838 INFO p.a.t.h.c.s.w.m.LegacyMirroringSupervisorController - Consumer boot complete. Workload config: [consumer.workload.algorithm=legacy.mirror] 2016-03-22 18:29:44.842 INFO p.a.t.h.c.cache.zookeeper.NodeCache - Got entry change event for path /root/groups/hermes 2016-03-22 18:30:03.083 INFO p.a.t.h.c.cache.zookeeper.NodeCache - Got entry change event for path /root/groups/hermes/topics/cash 2016-03-22 18:32:31.832 INFO p.a.t.h.c.s.c.z.SubscriptionsNodeCache - Got subscription change event for path /root/groups/hermes/topics/cash/subscriptions/mySubscription type CHILD_ADDED 2016-03-22 18:32:31.837 INFO p.a.t.h.c.s.ConsumersSupervisor - Creating consumer for hermes_cash_mySubscription 2016-03-22 18:32:32.074 INFO org.I0Itec.zkclient.ZkEventThread - Starting ZkClient event thread. 2016-03-22 18:32:32.078 INFO org.I0Itec.zkclient.ZkClient - zookeeper state changed (SyncConnected)

hikrrish commented 8 years ago

We are getting TIME OUT for the first time when a new topic is created afterwards its always 202.

Do you recommend to use latest code ? I am using 0.85 version of hermes

adamdubiel commented 8 years ago

I think this is a configuration issue - the code you are running is fine. Can you create a minimal config file containing only:

Frontend

kafka.broker.list=broker1:9092,broker2:9092
kafka.zookeeper.connect.string=zk1:2181,zk2:2181

zookeeper.connect.string=storageZk:2181
zookeeper.root=/hermes

Consumers

kafka.broker.list=broker1:9092,broker2:9092
kafka.zookeeper.connect.string=zk1:2181,zk2:2181

zookeeper.connect.string=storageZk:2181
zookeeper.root=/hermes

kafka.consumer.offsets.storage=kafka
hikrrish commented 8 years ago

Getting timeout on frontend { "message": "Async timeout, cause: unknown", "code": "TIMEOUT" }

and kakfa logs show (10.81.85.136 is hermes frontend ) [2016-03-24 02:51:19,468] WARN Partition [happygroup.happyTopic,3] on broker 163: No checkpointed highwatermark is found for partition happygroup.happyTopic,3 [2016-03-24 03:03:29,212] INFO Closing socket connection to /10.81.85.136 due to invalid request: -720899 is not a valid request size. (kafka.network.Processor)

(negative request bytes)

hikrrish commented 8 years ago

Hi Adam, It was an issue with kafka connectivity and yesterday we were able to get push notifications. we ran fe/consumer/management on a single node with 3 node cluster of kafka and zookeeper.

Now I am trying to create a 3 node cluster of FE/Consumer and a single node with management and console running. the issue I am facing right now is message is not received at http endpoint. I am getting 201 in each step and also getting messages in kafka consumer client for the messages posted via hermes frontend. Could you suggest where should I look for this issue ?

hikrrish commented 8 years ago

Not sure if an issue with port/security group on AWS .. tried again putting frontend/consumer and management on the same node and its working.

hikrrish commented 8 years ago

we have consumer service running on 8000 port. Does management connects to consumer on 8000? My understanding is hermes components are independent and connects to zk for any metadata info

adamdubiel commented 8 years ago

Consumer needs the port only to expose healthchecks, so it actually doesn't matter. You are right, they are not connected in any way, except for shared ZK and Kafka. Consumers need to be able to connect to Kafka and ZK - i'm not familiar with security group settings on AWS, but that might be worth checking.

gamefundas commented 8 years ago

Adam, thanks for all the help. We managed to resolve all configuration issues, some related to AWS security groups to get the stack running. Kafka's "advertised.host.name" was one important config that was missing. This solved the Kafka access issue

No change required on the Hermes end, we should be able to close this issue.

hikrrish commented 8 years ago

Issue is resolved, we can close this thanks

adamdubiel commented 8 years ago

Glad to hear it :) Closing the issue now.

adamdubiel commented 8 years ago

Btw i added logging docs: http://hermes-pubsub.readthedocs.org/en/latest/deployment/logging/