odpi / egeria

Egeria core
https://egeria-project.org
Apache License 2.0
806 stars 261 forks source link

fix infinite loop while sending message #1876

Closed Shrinivas-Kane closed 4 years ago

Shrinivas-Kane commented 5 years ago

Scenario: If kakfa throws message is too big or kafka specific exception it goes in infinite loop

We need to fix infinite loop here: https://github.com/odpi/egeria/blob/master/open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/src/main/java/org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventProducer.java#L101

mandy-chessell commented 4 years ago

Thanks for spotting this. It could stop immediately if it gets an exception that is not an ExecutionException or WakeupException. Alternatively it could retry a few times with a sleep in the hope that it may clear).

Any preferences?

planetf1 commented 4 years ago

From https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html the exceptions from Producer.send seem to be:

AuthenticationException - seems fairly unlikely that will fix itself (though there could be a transient issue - if so it's more likely to last for a while due to bad config - so retrying quickly probably isn't that helpful)
AuthorizationException - as above
IllegalStateException - coding error?
InterruptException - could be worth a retry or two?
SerializationException -unlikely to help?
TimeoutException - probably worth a retry or two
KafkaException - see below

I would be inclined to only try a couple of times then abort. Maybe even only on timeout & interrupt? Ideally I'd consider retrying after a delay - which means more async handling would be needed for the retry - perhaps unnecessarily complex?

planetf1 commented 4 years ago

& further from ApiException, BufferExhaustedException, CommitFailedException, ConfigException, ConnectException, InterruptException, InvalidOffsetException, SerializationException, StreamsException, WakeupException

mandy-chessell commented 4 years ago

Billy is looking at this ...

wbittles commented 4 years ago

There may be some confusion around which exceptions we are actually concerned with I've copied and pasted the javadoc below and I assumed that it was the 3 exceptions thrown from Future.get

V get() throws InterruptedException, ExecutionException

Waits if necessary for the computation to complete, and then retrieves its result.

Returns: the computed result Throws: CancellationException - if the computation was cancelled ExecutionException - if the computation threw an exception InterruptedException - if the current thread was interrupted while waiting

The Kafka Exceptions should be the middle part of the Exception "cause" chain with the original Exception as the root cause. Theoretically it should be possible to iterate through each of the "causes" and use instanceof to introduce specific handling for specific exceptions.Assuming that the Kafka code preserves the originals ?

However I'm inclined to add "retries" to the producer properties and let them worry about shuffling exceptions across thread contexts and being a good citizen and all we have to do is handle the 3 Exceptions above ? I know this is bordering on a religious point of view but I am concerned with the catch ( Throwable t ) clause, while the information is kept in human readable form, it is actually swallowing what could be a JVM error and the error handler will not get called. I'd be inclined to at least catch Exception instead of Throwable ?

planetf1 commented 4 years ago

The link originally posted refers to line 101, for which I think my comments are still valid - at least by inspection? This is the code that publishes the event, and I think could indeed loop - regardless of how the results of that thread are eventually treated (given the use of futures.. if it ever gets there)

That being said, the point about futures is an important one too - for example if I look at DataPlatform, when it issues the Future.get() it's only capturing an I/O exception - nothing more specific than this - and no more indepth decoding which is needed as per above?

typically if we fail to handle such an exception it's likely we'll just misundertood what happened with that attempted send - not necessarily loop (though need to inspect usages of the get). It's the actual implementation of a task that's been submitted which could loop in the example I referred to|?

I wonder if we need to do more inspection of all our concurrency useage to determine how concurrency exceptions are handled? (aside I don't see any issues reported by sonar on future related issues)

That being said I'm not overly familar with java futures (oops).

wbittles commented 4 years ago

It looks like the kafka producer alreays has "retries" set to 1 https://egeria.odpi.org/open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/

planetf1 commented 4 years ago

Thanks to @wbittles for pointing out the .get() in the code fragment above. I'm sure I was looking at different code, but it's there, hence completely agree with the comments around futures. If we can let the kafka producer do some appropriate retries that sounds like a good solution.

mandy-chessell commented 4 years ago

Just had a look at this fix and it is not what we need because it has removed the retry loop for the temporary problem where Kafka is down or incorrectly configured. The fix should only stop the loop if there is a permanent problem with the event

wbittles commented 4 years ago

@mandy-chessell all that retry logic is already present in the kafka client code, and can be configured by setting a property. By the time the Future throws an exception the kafka code has already decided if the error can be recovered.

mandy-chessell commented 4 years ago

How do we know that events are being buffered and which ones are being rejected? We used to be able to output audit log messages that indicated that events are being buffered because Kafka is down or incorrectly configured. Are we saying that Egeria should delegate that management and diagnostics to the event bus?

planetf1 commented 4 years ago

I ran a test with the new code (in master)

Reviewing the logs clearly during initialization events weren't sent:

2019-12-20 08:14:04.928  INFO 822 --- [nio-8080-exec-4] o.o.o.r.auditlog.OMRSAuditLog            : cocoMDS2 OMRS-AUDIT-0032 The local repos
itory outbound event manager is sending out the 470 type definition events that were generated and buffered during server initialization
Fri Dec 20 08:14:04 GMT 2019 cocoMDS2 Information OMRS-AUDIT-0032 The local repository outbound event manager is sending out the 470 type d
efinition events that were generated and buffered during server initialization
2019-12-20 08:14:04.955  WARN 822 --- [ad | producer-3] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-3] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

/rees / I then waited 5 mins or so, and restarted kafka/zookeeper.

On restart we see kafka reconnecting

2019-12-20 08:19:21.982  INFO 822 --- [ohort.OMRSTopic] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2f8a0f62-f1c5-45d2-b339-c064dcecebd8-7, groupId=2f8a0f62-f1c5-45d2-b339-c064dcecebd8] Revoke previously assigned partitions egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic-0
2019-12-20 08:19:21.982  INFO 822 --- [ohort.OMRSTopic] o.a.e.t.k.KafkaOpenMetadataEventConsumer : Lost partitions in rebalance. Committing current offsets:{}
2019-12-20 08:19:21.982  INFO 822 --- [ohort.OMRSTopic] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2f8a0f62-f1c5-45d2-b339-c064dcecebd8-7, groupId=2f8a0f62-f1c5-45d2-b339-c064dcecebd8] (Re-)joining group
2019-12-20 08:19:21.986  INFO 822 --- [ohort.OMRSTopic] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2f8a0f62-f1c5-45d2-b339-c064dcecebd8-7, groupId=2f8a0f62-f1c5-45d2-b339-c064dcecebd8] Finished assignment for group at generation 2: {consumer-2f8a0f62-f1c5-45d2-b339-c064dcecebd8-8-d5277cf9-3985-4d30-9ff7-b158fa5d1bc8=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@d387137f, consumer-2f8a0f62-f1c5-45d2-b339-c064dcecebd8-7-79d4d38f-3b24-4b4a-9922-2b9163283a3e=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@73ca6071}
2019-12-20 08:19:21.988  INFO 822 --- [ohort.OMRSTopic] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2f8a0f62-f1c5-45d2-b339-c064dcecebd8-7, groupId=2f8a0f62-f1c5-45d2-b339-c064dcecebd8] Successfully joined group with generation 2
2019-12-20 08:19:21.988  INFO 822 --- [ohort.OMRSTopic] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2f8a0f62-f1c5-45d2-b339-c064dcecebd8-7, groupId=2f8a0f62-f1c5-45d2-b339-c064dcecebd8] Adding newly assigned partitions: egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic-0
2019-12-20 08:19:21.989  INFO 822 --- [ohort.OMRSTopic] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2f8a0f62-f1c5-45d2-b339-c064dcecebd8-7, groupId=2f8a0f62-f1c5-45d2-b339-c064dcecebd8] Setting offset for partition egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic-0 to the committed offset FetchPosition{offset=113913, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 0 rack: null), epoch=-1}}
2019-12-20 08:19:22.552  INFO 822 --- [nsumer.outTopic] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2f8a0f62-f1c5-45d2-b339-c064dcecebd8-8, groupId=2f8a0f62-f1c5-45d2-b339-c064dcecebd8] Successfully joined group with generation 2
2019-12-20 08:19:22.552  INFO 822 --- [nsumer.outTopic] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2f8a0f62-f1c5-45d2-b339-c064dcecebd8-8, groupId=2f8a0f62-f1c5-45d2-b339-c064dcecebd8] Adding newly assigned partitions: egeria.omag.server.cocoMDS3.omas.assetconsumer.outTopic-0
2019-12-20 08:19:22.553  INFO 822 --- [nsumer.outTopic] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2f8a0f62-f1c5-45d2-b339-c064dcecebd8-8, groupId=2f8a0f62-f1c5-45d2-b339-c064dcecebd8] Found no committed offset for partition egeria.omag.server.cocoMDS3.omas.assetconsumer.outTopic-0
2019-12-20 08:19:22.557  INFO 822 --- [nsumer.outTopic] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-2f8a0f62-f1c5-45d2-b339-c064dcecebd8-8, groupId=2f8a0f62-f1c5-45d2-b339-c064dcecebd8] Resetting offset for partition egeria.omag.server.cocoMDS3.omas.assetconsumer.outTopic-0 to offset 2862.

Inbetween we see other messages including LEADER_NOT_AVAILABLE, and messages about rejoining the partitiion.

2019-12-20 08:19:13.782  WARN 823 --- [engine.outTopic] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-e2bc07b0-5496-41c2-be5f-4141a4baa53b-5, groupId=e2bc07b0-5496-41c2-be5f-4141a4baa53b] Error while fetching metadata with correlation id 143 : {egeria.omag.server.cocoMDS1.omas.governanceengine.outTopic=LEADER_NOT_AVAILABLE}
2019-12-20 08:19:14.230  INFO 823 --- [onview.outTopic] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-e2bc07b0-549
6-41c2-be5f-4141a4baa53b-6, groupId=e2bc07b0-5496-41c2-be5f-4141a4baa53b] (Re-)joining group

The 'offset' is interesting -- but importantly we don't see these events (if indeed sent) reaching egeria

If we now look at egeria audit log messages, we basically see nothing - we see the connector starting up, and reporting ready - but nothing else

Logging at debug messages again, we do get entries like

Fri Dec 20 08:19:09 GMT 2019 cocoMDS1 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  0 events successfully sent; 501 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic not present in metadata after 60000 ms

so the events were buffered, but it's not clear they came though. Looking directly at the kafka topics, there's no evidence of any messages there.

So whilst I get the point of trying to use what kafka offers, I think the scenario here is a fail. We are not resiliant.

MY PROPOSAL

Given we are approaching time to freeze for 1.3, and this looks like a regression (I should have tested it more thoroughly too ) I suggest

planetf1 commented 4 years ago

Having run with a draft PR

We do see more in the audit log, for example:

2019-12-20 09:05:24.437  INFO 12098 --- [ohort.OMRSTopic] o.o.o.r.auditlog.OMRSAuditLog            : cocoMDS3 OMRS-AUDIT-8006 Processing incoming event of type NewRelationshipEvent for instance ba9abf66-c765-48b5-981d-785fc1a5b56a from: OMRSEventOriginator{metadataCollectionId='980c7a8e-a79f-4ebc-901d-96435c6d6dfa', serverName='cocoMDS1', serverType='Open Metadata Server', organizationName='Coco Pharmaceuticals'}
Fri Dec 20 09:05:24 GMT 2019 cocoMDS3 Event OMRS-AUDIT-8006 Processing incoming event of type NewRelationshipEvent for instance ba9abf66-c765-48b5-981d-785fc1a5b56a from: OMRSEventOriginator{metadataCollectionId='980c7a8e-a79f-4ebc-901d-96435c6d6dfa', serverName='cocoMDS1', serverType='Open Metadata Server', organizationName='Coco Pharmaceuticals'}

These were missing from the first test, confirming they were lost

However in the notebook the assets that should have been known to the other cohort members still aren't. This may indicate that whilst the events were sent, they were not processed correctly

Here's the master log

Conclusion so far

Comments: @wbittles @mandy-chessell ?

planetf1 commented 4 years ago

In the example above we did not get as far as setting up the cohort as kafka was down - hence assets not visible across the (set up later) cohort.

In another test I allowed the cohort to stabilize & then stopped kafka... this time we can see the assets just fine, since the enterprise connector is aware of the other servers in the cohort in order to retrieve (though events will not be fired into OMASs etc) .. we also see behaviour like:

Fri Dec 20 14:27:33 GMT 2019 cocoMDS1 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  473 events successfully sent; 44 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic-0:120000 ms has passed since batch creation
Fri Dec 20 14:29:33 GMT 2019 cocoMDS1 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  473 events successfully sent; 43 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic-0:120000 ms has passed since batch creation
Fri Dec 20 14:31:33 GMT 2019 cocoMDS1 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  473 events successfully sent; 42 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic-0:120000 ms has passed since batch creation
Fri Dec 20 14:33:33 GMT 2019 cocoMDS1 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  473 events successfully sent; 41 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic-0:120001 ms has passed since batch creation
Fri Dec 20 14:35:33 GMT 2019 cocoMDS1 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  473 events successfully sent; 40 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic-0:120001 ms has passed since batch creation
Fri Dec 20 14:37:33 GMT 2019 cocoMDS1 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  473 events successfully sent; 39 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic-0:120001 ms has passed since batch creation
Fri Dec 20 14:39:33 GMT 2019 cocoMDS1 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  473 events successfully sent; 38 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic-0:120000 ms has passed since batch creation

We gradually seem to lose messages which by default are expiring after 120s due to the kafka client timeout (may be request.timeout.ms ?). Each time we try to resend the backlog we will block - and then drop

wbittles commented 4 years ago

It makes sense that the Egeria Audit log is missing entries , before the change the Egeria producer would have infinitely retrying , auditing every 20th attempt (10 times through the egeria retry logic and each send has a retry of 1) . The change will only try twice logging the second attempt.

If we want to compare similar behaviours we would need to set the retry count higher, I suspect if we set it high enough we will see the same behaviour where the client is able to tolerate the shutdown and restart because it's in an infinite loop until it's returned a successful send. But the egeria audit log will not contain any entries.

The kafka QOS is controlled by specific client code and properties, non of which was changed as part of this fix. It's not clear which QOS Egeria requires as there seems to be a disjoint between expecting guaranteed delivery and our current configuration which seems to be "Best Effort", either way I'm sure we can achieve what we need.

@mandy-chessell To be honest I don't see the value in filling egeria audit logs with kafka debug messages but I would have an expectation that kafka would generate it's own debug info as necessary. That said if you would like them back , I think I can arrange it .

planetf1 commented 4 years ago

I repeated the prior test, and actually see the identical behaviour - every 2 minutes we lose a buffered message. These tests may explain why initially I thought there was a before/after difference

At this point it's not clear reverting is going to make much difference (maybe for shorter glitches), but that we absolutely do need to get into a deeper review as per @wbittles comments above

Also have some fvt's that focus on driving the topic connector with specific error situations included (much quicker, reproducible) , and then a broader cohort test with similar

So at this point I'm inclined to leave the code as-is, and not revert

mandy-chessell commented 4 years ago

@wbittles the audit log messages are not Kafka debug messages - they are reporting the status of the server's ability to replicate. If kafka is down we need to buffer messages until it is available. I am wondering if the loop should be in OMRS rather than the connector. This is because we do not know what event bus is being used - may not be kafka if the connector implementation is swapped out.

It is important to log audit log messages at a slow rate - not on every failed message - this is what we had before and was useful to know kafka was the reason that metadata was not available.

From what I understood from the docs around the futures APIs is that we need to extract the real exception and work on that rather than the top level exception.

mandy-chessell commented 4 years ago

These are the error messages I am seeing in the current master - there are a number of issues - firstly the logging is happening at a rapid rate - also we are gradually loosing OMRSEvents just because Kafka is down.

Thu Jan 02 12:39:52 GMT 2020 cocoMDS2 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  0 events successfully sent; 482 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:39:53 GMT 2020 cocoMDS2 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.devCohort.OMRSTopic.  0 events successfully sent; 482 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.devCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:39:53 GMT 2020 cocoMDS2 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.iotCohort.OMRSTopic.  0 events successfully sent; 482 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.iotCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:39:55 GMT 2020 cocoMDS3 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  0 events successfully sent; 482 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:39:56 GMT 2020 cocoMDS5 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  0 events successfully sent; 482 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:39:56 GMT 2020 cocoMDS6 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  0 events successfully sent; 482 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:39:56 GMT 2020 cocoMDS6 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.iotCohort.OMRSTopic.  0 events successfully sent; 482 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.iotCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:40:52 GMT 2020 cocoMDS2 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  0 events successfully sent; 481 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:40:53 GMT 2020 cocoMDS2 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.devCohort.OMRSTopic.  0 events successfully sent; 481 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.devCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:40:53 GMT 2020 cocoMDS2 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.iotCohort.OMRSTopic.  0 events successfully sent; 481 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.iotCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:40:55 GMT 2020 cocoMDS3 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  0 events successfully sent; 481 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:40:56 GMT 2020 cocoMDS5 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  0 events successfully sent; 481 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:40:56 GMT 2020 cocoMDS6 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  0 events successfully sent; 481 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:40:56 GMT 2020 cocoMDS6 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.iotCohort.OMRSTopic.  0 events successfully sent; 481 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.iotCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:41:52 GMT 2020 cocoMDS2 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  0 events successfully sent; 480 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:41:53 GMT 2020 cocoMDS2 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.devCohort.OMRSTopic.  0 events successfully sent; 480 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.devCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:41:53 GMT 2020 cocoMDS2 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.iotCohort.OMRSTopic.  0 events successfully sent; 480 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.iotCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:41:55 GMT 2020 cocoMDS3 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  0 events successfully sent; 480 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:41:56 GMT 2020 cocoMDS5 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  0 events successfully sent; 480 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:41:56 GMT 2020 cocoMDS6 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic.  0 events successfully sent; 480 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.cocoCohort.OMRSTopic not present in metadata after 60000 ms.
Thu Jan 02 12:41:56 GMT 2020 cocoMDS6 Error OCF-KAFKA-TOPIC-CONNECTOR-0012 Unable to send event on topic egeria.omag.openmetadata.repositoryservices.cohort.iotCohort.OMRSTopic.  0 events successfully sent; 480 events buffered. Latest error message is org.apache.kafka.common.errors.TimeoutException: Topic egeria.omag.openmetadata.repositoryservices.cohort.iotCohort.OMRSTopic not present in metadata after 60000 ms.

The first event is the cohort registration event, the subsequent events are the type def validaiton events. All are important to communicate before sending metadata instances.

planetf1 commented 4 years ago

Agree with the gradual losing of messages. However see my note from 13 days ago when I ran with the reverted (original) code - I still lost messages in a similar way. I'll build a reversion patch now and retry

planetf1 commented 4 years ago

Having tested a little more today, messages appear to get lost with both implementations - or at least the buffered message count decreases even when kafka is not active.

The conclusion so far remains that both implementations have problems. Both loose messages, neither will necessarily report exceptions properly in futures, and it's not clear either is significantly worse than the other. A loop is bad, but unknowns/less testing are bad too.

A detailed walkthrough and rigid tests are needed to firmly conclude and validate the implementation.

wbittles commented 4 years ago

If can I address the issues as raised, I'm still confused. 1) The logging rate is higher because I didn't change the default retries property, if retries is increased to 20, you'll get the same logging activity. 2)The only logging messages we will loose with the change are those temporary issues that were only logged if they occurred 20 times in the original implementation and again if retries is set to 20 you can recreate the same behaviour. 3) If the requirement is to buffer events when kafka is down , and not risk losing them we will need to persist them locally, currently anything buffered will be lost if the JVM dies. As we block when sending anyway perhaps it would be easier if kafka was configured to be transactional ? either way kafka allows the configuration of the required QOS.

@mandy-chessell I suspect that we have another infinite loop issue, in that it's not obvious how the egeria calling thread handles a non recoverable Exception , it looks like the thread just sleeps and infinitely retries with the following debug messages log.error("Bad exception from sending events " + error.getMessage()); I suspect that we need another handler in this catch block to decide if we need to restart the kafka client of simply retry. with this fix we know that the error was unrecoverable.

planetf1 commented 4 years ago

Agreed new staged approach for handling kafka - over to you @wbittles

planetf1 commented 4 years ago

I've closed #2119, however once we have done some rework on this issue, it may be worth revisiting that scenario (removing the sleep ) as part of testing.

planetf1 commented 4 years ago

I had a think about error conditions. I think we should walk through the design considering the following -- and also look at what testing we can do for each of these:

There's also some permanent errors:

( incompatible versions

More advanced tests that I think are out of immediate scope, but to look at when we do more testing around full HA & high scalability scenarios - where kafka is of course just one part

cc: @mandy-chessell @grahamwallis (moved from PR)

wbittles commented 4 years ago

@planetf1 @mandy-chessell @grahamwallis It's important to remember that we can only work with what kafka presents us, mainly an execution exception with a cause list. If we work with the retryable distinction in our error handling then we only need 3 test cases, a retryable , a non retryable and a runtime uncaught type exception (A thread killer). The underlying cause of the error shouldn't matter as we just retry until the issue is resolved.

As for the message too big which we could check for before even attempting to send the event , I'm just struggling to understand where the event would actually come from in a production environment ?

If we take the above approach we can mock up a kafka client with error inject code to throw the required exceptions in a timely controlled way. All we need is the ability to generate an exception that matched the kafka exceptions cause list. Outside of that , it's difficult to see how you would go about driving the necessary exceptions from kafka itself.

grahamwallis commented 4 years ago

@wbittles @planetf1 I agree with Billy that emulating the error conditions in a mocked client would be a good way to test this. Coupled with walking through at a design level should be sufficient.

planetf1 commented 4 years ago

My comment above was more about the external scenario, and ensuring we are ok in our design, and then in tests, that we can sensibly address those circumstances. Mostly they reflect the kind of typical issues I've seen in the past, or what I've read specifically about kafka.

For each we should indeed map through to whether we handle as retryable/non retryable - or needing a 'reset' (which I think is what you mean by thread killer)

The event too big will depend on how the kafka infra is setup so although we hope to not get it, we do need to consider what we do if we see it. definately an audit log message. It may be a 1-off, or could be every message, though I think the responsibility of figuring out how urgent is probably down to the org/s log analysis tools/rules?

Whilst some of the scenarios I mention may be harder to inject into kafka - I think many are easier - usually to do with network (connections, timeouts, slow, dns). Some , a little harderm can be injected through external tools - like in macos there's a network tester that can introduce packet loss and latency

failed nodes/isolation are just down to having a kafka cluster and zapping the relevant node during the test.

Buffers/offsets can probably be managed by intentionally setting them small and running under load (cts?) or setting expiry time low & pausing (offsets).

All of these do require work I realise, and having a test infrastructure to automate is far from a trivial exercise. And we won't do them all straight away, but I think this level of checks are critical to ensuring we have a reliable infra - at least with kafka. Once we have another topic connector the same discussion may follow!

wbittles commented 4 years ago

@planetf1 It's not obvious what we gain from going to this level of test effort. If we are only going to be handling 3 groups of exceptions only two of which originate from kafka and the other from group the JVM. As long as we handle those three types of exception the actual underlying reason isn't considered in the Exception handling. So for instance if a producer can't connect to the broker it doesn't matter to egeria if it's a network issue , a system error of a kafka fault. We carry out the same backout retry. And by backout I mean log and throw away.

As for the RecordTooLargeException, it's not obvious why it needs special attention , it's an API exception so the event producer must have deliberately configured the system to allow events of that size. I agree that it would be dangerous to allow a rogue event producer joining a chort , it's just not clear why the TopicConnector is the right place to deal with the issue, could be worth another issue ?

As for the reliable infrastructure point , it's currently the responsibility of the hosting environment to provide a stable kafka services , networking , storage , databases etc etc , are you thinking that egeria should take responsibility for ensuring that all these systems behave as expected when an error system occurs anywhere in the platform stack ?

Even if we were trust kafka to handle the list of possible exceptions is listed here https://kafka.apache.org/10/javadoc/org/apache/kafka/common/errors/package-summary.html are you suggesting a tescase be developed for each of these possible errors ?

planetf1 commented 4 years ago

The loop has been fixed in PR #2638

The remaining questions around design will be moved to #1206 to ensure the current behaviour is reviewed and documented