NSLS-II / nslsii

NSLS-II related devices
BSD 3-Clause "New" or "Revised" License
10 stars 21 forks source link

Add function to subscribe a Kafka Publisher to the RunEngine #86

Closed jklynch closed 4 years ago

jklynch commented 4 years ago

This PR adds functions for working with Kafka.

codecov-commenter commented 4 years ago

Codecov Report

Merging #86 into master will decrease coverage by 0.28%. The diff coverage is 30.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #86      +/-   ##
==========================================
- Coverage   53.39%   53.11%   -0.29%     
==========================================
  Files          13       13              
  Lines         809      819      +10     
==========================================
+ Hits          432      435       +3     
- Misses        377      384       +7     
Impacted Files Coverage Δ
nslsii/__init__.py 25.00% <30.00%> (+0.28%) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 47cb469...50a7adb. Read the comment docs.

gwbischof commented 4 years ago

I think that _kafka_publisher should be on a thread so that it doesn't block, and in a try-except so that is doesn't interrupt an experiment if it fails.

gwbischof commented 4 years ago

Here is an example: https://github.com/NSLS-II/nslsii/blob/176a45802a9413660f9e1f424d7ae8bed227497c/nslsii/__init__.py#L454

gwbischof commented 4 years ago

What about making a decorator that runs a function on a thread with try-except? So we don't have to duplicate the code

danielballan commented 4 years ago

Extending @gwbischof's suggestion --- utility decorators to do this (for a simple function callback and class-based DocumentRouter callback, respectively) were recently added to bluesky by @tacaswell but I think not yet documented. They are: https://github.com/bluesky/bluesky/blob/4fab894bddbd4a563f28852ea3171b87140ae7b9/bluesky/callbacks/core.py#L21-L117

But these decorators just catch and log exceptions; they aren't robust against callbacks that might hang for a long time or get very far behind. A variation could be added here (and upstream-ed later) that gives the extra protection of a queue + background thread, as implemented for olog.

As an aside, the need for this fault-tolerant subscription is probably a temporary state of affairs because in the future we can just rely on Kafka for fault tolerance. Specifically:

jklynch commented 4 years ago

The kafka client should not need to be on a thread (as far as I know, I guess). Based on what it does I think it must be threaded itself.

jklynch commented 4 years ago

In the case of a kafka failure interrupting an experiment I think that may be exactly the behavior we want. Otherwise documents would not be saved.

gwbischof commented 4 years ago

I don't think that we would want to interrupt an experiment as this time, because we may find bugs in our code when we start using it, also our kafka cluster is a test setup and not production ready. If there are problems with it, or we need to make changes to the kafka configuration noone could run experiments during that time. The documents would still be saved because they are in parallel going to the beamline mongo database.

gwbischof commented 4 years ago

like @jklynch said, produce is async https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.Producer.produce

jklynch commented 4 years ago

I see what you mean about continuing an experiment if kafka fails, @gwbischof. There is a "message delivery report" function that will let us know if delivery is failing. The client will not block.

danielballan commented 4 years ago

Gotcha. Then I agree that no special wrapping or handling is necessary, as long the producer will not raise under any circumstances (even inability to connect).

Rather, once Kafka is promoted to be our “consumer of record” at NSLS-II, we will need to go out of our way to ensure that failed deliver does kill the scan. We could do this by wrapping the produce call in a blocking function that blocks until the callback is called — slow, but requires no changes to bluesky — or we could extend bluesky to deal with non-blocking subscriptions that can tell us after the fact that we need to raise. That feature was first pitched in https://github.com/bluesky/bluesky/issues/1137. I would guess that we will want that when the time comes. I just mentioned the “or we could block...” idea for completeness.

jklynch commented 4 years ago

test_Eurotherm in nslsii/tests/temperature_controllers_test.py is the only failing test

jklynch commented 4 years ago

I am doing something kind of questionable in using broker_name to get the beamline name. It would be nice to have a better way to get the beamline name.