Closed calohmn closed 3 years ago
General notes and possible approaches:
The general idea is for northbound applications to publish command messages on a Kafka topic and to receive command response messages by subscribing to a Kafka topic. In order for applications to only be able to publish/receive command (response) messages for their specific tenant, the topic names should contain the tenant identifier, so that corresponding ACLs can be put in place in Kafka. This could mean using command-$tenantId
as topic for sending commands and command_response-$tenantId
for receiving command responses. For these topics, the device identifier would be used as partition key to ensure correct ordering of messages related to one specific device.
For the AMQP-based Command & Control API, the outcome of sending a command message was indicated back to the sending application by means of AMQP delivery states. As when using Kafka, there is no such protocol-provided acknowledgement mechanism, the question is whether or how to support acknowledgements then.
Either:
Don't support (technical-level) command acknowledgements: applications will no longer know whether commands were actually forwarded to or even acknowledged by the target device (depending on device protocol). Applications will then have to use application-level mechanisms to find out, whether a command has reached the device. Usually that would mean requiring a command response message from the device.
Or mimic/implement technical-level command acknowledgements by letting the protocol adapters publish corresponding messages on a topic like command_ack-$tenantId
, which the northbound application will have to subscribe to. For commands, for which a reply is expected, this would probably not offer much benefit. And for the case of one-way commands, where the application wants an acknowledgement, the alternative would be to just use a command that requests a command response from the device instead.
Let each protocol adapter (with a command subscription for some device of tenant X) consume each command (targeted at any device of tenant X) directly. Let each adapter then decide whether it should handle or ignore the command, by checking its local command subscriptions, mapping the device of the command to a gateway if needed.
Evaluation:
Commands are received by a new CommandRouting component which manages the mappings of device id and the id of the adapter instance with a command subscription for that device.
Each received command is mapped to the adapter instance and published on the cmd_internal-$adapterInstanceId
topic. The target protocol adapter will then consume/handle the message.
A one-to-one adaptation of the current (as of Hono 1.4) AMQP Command&Control handling:
Only one protocol adapter initially consumes a command-$tenant
topic message (because of shared consumer group). Adapter maps command to gateway and target adapter instance and either handles it itself or publishes it on the cmd_internal-$adapterInstanceId
topic (see approach 2).
cmd_internal-$adapterInstance
consumers (need to be in their own consumer group, not in the shared one across protocol adapters). Probably only relevant for large number of protocol adapters.EDIT: updated version below.
A new CommandForwarder (name chosen to separate it from the above CommandRouter concept) component receives all commands first and then forwards them via AMQP to the one corresponding protocol adapter instance registered via the Device Connection service.
Evaluation:
Thanks for the detailed overview of options :+1: IMHO option 4 looks most promising as it reduces complexity in the adapters and is mostly compliant with #2029. In fact, I do not see why we would still need a separate Device Connection Service in this case. Couldn't that be part of the Command Forwarder as proposed in #2029 (where the component is called Command Routing Service)?
@sophokles73 Yes, the design of option 4 above would just be the first, "least implementation effort" approach (also making use of the optimized HotRod-based DeviceConnection client).
Developing this further, it can really be just one component and we can tweak the Command Routing service design of #2029 in that direction (with emphasis on the fact that routing doesn't involve forwarding back to Qdrouter/Kafka here).
In fact, the bottom image of https://github.com/eclipse/hono/issues/2029#issuecomment-643321500 ("Variation") is already close:
For the AMQP messaging network approach we would just require multicast message routing for the command/$tenantId
address, so that commands don't need to be transferred from one CommandRouter instance to the other. Looking at the Qdrouter docs, I think using multicast shouldn't be a problem with regard to the disposition update to be sent back to the northbound application in case of AMQP C&C (behaviour was fixed with Qdrouter 1.9 one year ago).
@calohmn thanks for this detailed analysis. On the first read, I also agree that solution 4 is the best next evolution of the current implementation and will align with other improvements already planned (as @sophokles73 pointed out).
I can see also that we can in the future keep evolving this more towards solution 2. By probably replacing infinispan (caches) with KTable and also if at some point we replace adapter-registry communication from AMQP to HTTP it would make sense to use Kafka for commands in the adapters (as everything else would be HTTP or Kafka in that scenario).
But again, if we go for the solution 4 now it doesn't stop us from further considering these changes in the future.
I can see also that we can in the future keep evolving this more towards solution 2. By probably replacing infinispan (caches) with KTable [...]
The "Key-Value store" box drawn in the diagram for solution 2 is something that I left out in the figure for solution 4 (there it's abstracted away by the Device Connection service box). So yes, the different options for such a key-value store apply to solution 4 as well.
it would make sense to use Kafka for commands in the adapters [...]
You mean in this new CommandRouter component, potentially using Kafka Streams there, right? Yes, that's something I would also see as a potential future optimization.
I also think that solution 4 looks like the best alternative. Here's an updated version of it. I've also updated the AMQP messaging network based design in #2029.
A new CommandRouter component receives all commands first and then forwards them via AMQP to the one corresponding protocol adapter instance, that has registered itself for the device id of the command.
Evaluation:
Looks good to me.
About the usage of multiple command-$tenant
consumer groups (one group per CommandRouter instance) in solution 4:
I'm wondering whether this could lead to a message getting delivered twice if a device reconnects and re-subscribes.
Let's say a device is connected to adapter A which in turn is connected with CommandRouter X. A high rate of commands flows down that path to the device. Now the device disconnects and reconnects (and re-subscribes) this time to adapter B which is connected to CommandRouter Y. CommandRouter Y could have been put under pressure somehow and could be slower in processing the incoming commands (reading is done independently to Router X because of different consumer groups). This could mean that messages arrive at CommandRouter Y now which have already been received by CommandRouter X earlier. I could imagine, that this could lead to the same command messages getting delivered to the device twice.
As a solution I currently only see forcing all CommandRouter instances to use the same, single consumer group to read from the command-$tenant
topic (EDIT: other idea for a fix given below).
The problem of transferring a command from one CommandRouter instance to another, in case the target protocol adapter is connected to that other instance, could be solved by letting all CommandRouter instances be part of a cluster (Infinispan/Hazelcast). Then messages could be forwarded using the clustered vert.x event bus for example.
I originally thought to maybe rather avoid this, sticking to Kafka/AMQP messaging channels, but without lots of other inter-component connections, I don't see how we can get the command message across otherwise.
Of course this all could also mean that we rather choose solution 2 - although I would really like to skip the extra hop back via Kafka (and the extra topics involved).
@sophokles73 @dejanb WDYT?
@calohmn You are trying to solve "exactly once" delivery/processing problem here which is something that's hardly ever successfully implemented in a distributed system. For example, even if you implement all of the above what could happen is that protocol adapter sends a command to a device and fail immediately after without sending an ack. So we don't know if a device received/processed a command or not. So the command execution will fail and it will be probably retried by the client. This is just one example, but the point is that we need make sure device is an "idempotent consumer" of commands (meaning it can tolerate duplicate messages) and not try to solve the "exactly once" problem.
@dejanb I agree that we aren't aiming for "exactly once" qos here, rather "at least once". But still I think we should try to prevent possible scenarios where message duplication can occur. And if an approach (like solution 2 or 3) has fewer scenarios where such duplication can occur, that is an advantage of that approach to be taken into account.
For the scenario described above I could also imagine the following fix regarding solution 4: When a device unsubscribes, associate the current consumer partition and offset information with that device id and store it in the Key/Value store. When the device subscribes again (possibly on another CommandRouter instance), read that entry and potentially skip entries according to current partition offset. These Key/Value store entries would only be given a short lifespan then.
but the point is that we need make sure device is an "idempotent consumer" of commands (meaning it can tolerate duplicate messages)
@dejanb @calohmn IMHO the Command and Control API already provides the means to do that: the message-id needs to be unique on the sender side. I think it is fair to assume, that if a solution does not already use idempotent commands, it could still use the message-id to perform de-duplication on the device.
In the above described duplicate-messages scenario, the message-id would be the same then, therefore I think it should be paid attention to. In general, I think we cannot fully prevent passing on duplicate messages (with same message-id) - e.g. there could be a case of a CommandRouter instance crashing before it has committed the updated partition offsets.
I'm wondering whether this could lead to a message getting delivered twice if a device reconnects and re-subscribes.
Let's say a device is connected to adapter A which in turn is connected with CommandRouter X. A high rate of commands flows down that path to the device. Now the device disconnects and reconnects (and re-subscribes) this time to adapter B which is connected to CommandRouter Y. CommandRouter Y could have been put under pressure somehow and could be slower in processing the incoming commands (reading is done independently to Router X because of different consumer groups). This could mean that messages arrive at CommandRouter Y now which have already been received by CommandRouter X earlier. I could imagine, that this could lead to the same command messages getting delivered to the device twice.
There also seems to exist a problem if router Y had been processing (and discarding) commands faster than router X. In that case, however, I believe that the device will not get some of the commands, namely those commands that had not yet been processed/forwarded by adapter A while the device was connected to adapter A but which had already been discarded by router Y before the device (re-)connected to adapter B.
Your proposed solution involving saving the command message offset with the mapping could solve this issue as well. However, we do not seem to be able to guarantee that the offset is being registered properly in case an adapter fails, can we?
The other alternative of using a single consumer group should not have this problem, I guess.
IMHO going with the single consumer group and then relying on yet another component to deliver messages between components reliably will introduce additional operational complexity (and I'm not sure solve all issues). For example, it's nice to think that clustered event bus will forward messages between command routers, but what happens when hazelcast/infispan crash?
Maybe the solution would be that each "command router" open a connection to each adapter (or connect them via QDR), so that it can send a message directly to cmd_internal-$adapterId
address where appropriate adapter will listen? Then the difference between solutions 2 and 4 from the adapters' perspective is just whether it listens for command from AMQP of Kafka (and how connection is initiated).
But in any case if we need to make an extra hop, I think solution 2 offers is the best as it doesn't introduce a new reliability or coordination concerns.
@sophokles73:
There also seems to exist a problem if router Y had been processing (and discarding) commands faster than router X. In that case, however, I believe that the device will not get some of the commands [...]. Your proposed solution involving saving the command message offset with the mapping could solve this issue as well.
It could solve this, but I guess this case would be more complicated since command processing in router Y would then have to read from an earlier offset just for this device. So, an incoming subscription would mean that any not-yet-delivered commands from an earlier offset would have to be handled first for that device, and processing of any new commands coming in for that device meanwhile would have to be postponed. I guess this could get quite complex when trying to take care of all possible error scenarios.
However, we do not seem to be able to guarantee that the offset is being registered properly in case an adapter fails, can we?
Closing of the AMQP link to the adapter could trigger saving of the offsets for the devices that used that link. But again, this sounds all quite complex.
@dejanb:
Maybe the solution would be that each "command router" open a connection to each adapter (or connect them via QDR), so that it can send a message directly to cmd_internal-$adapterId address where appropriate adapter will listen?
I can only imagine such an approach with a Qdrouter sitting in between (letting the CommandRouter connect to all adapter instances would require some discovery mechanism for the router to know their IPs). And an extra Qdrouter then brings up the topic of added operational complexity (also with questions like how to scale the Qdrouter?).
As much as I like solution 4 with its advantage of not having to use the extra hop and extra "cmd_internal" Kafka topics and not having to implement AMQP and Kafka specific logic in the protocol adapters, I think in terms of reliability and reduced complexity concerning the above corner cases, solution 2 looks better.
As discussed in the community call yesterday:
If we are going for solution 2, having Kafka command consumers included in the protocol adapters, we could also (with probably not much extra effort) implement solution 1 (each adapter gets every command for a tenant directly) as a configurable alternative alongside solution 2. Hono deployments with only a few protocol adapter instances could be configured to use that solution, thereby not having to run the extra CommandRouter component. (In the long run, we may even decide to provide a corresponding solution for AMQP, using command/$tenant
multicast addresses.)
I would still see solution 4 as maybe a potential alternative to solution 2 (or the solution 2 and 1 combination).
In any case, we need the CommandRouter component containing consumers on the Kafka command-$tenant
topics (or a command-*
topic pattern).
Having developed that, we can make some tests to evaluate the above mentioned potential issues with solution 4, deciding then which solution to develop further.
Kafka Topics
While the above discussion is about the Hono internal routing of command messages, the choice of Kafka topics is relevant to the northbound applications, sending the command messages.
The plan is to use hono.command.${tenant_id}
as topic for sending commands and hono.command_response.${tenant_id}
for receiving command responses.
That means there is no possibility anymore to receive command responses on a specific channel with some given replyId (like when using command_response/${tenant_id}/${replyId}
with AMQP).
@thjaeckle would that work for Ditto?
Acknowledgements The other difference for northbound applications concerns (technical-level) command acknowledgements (see 2nd chapter in https://github.com/eclipse/hono/issues/2273#issuecomment-718418108). I think here we should just define that there are no such acknowledgements with Kafka-based Command&Control and that applications should request a command response from the device if needed.
@thjaeckle would that work for Ditto?
Yes, that should work - for the connection templates we always used a static "replyId" anyways so Ditto does not need specifying a dynamic replyId.
My 2 cents for the "technical Acknowledgements" topic: Basically with Kafka the "command sending QoS" would be changed to "at least once", correct? It could be quite problematic if e.g. a "toggle" command is send to a lamp which is delivered more than once then ..
My 2 cents for the "technical Acknowledgements" topic: Basically with Kafka the "command sending QoS" would be changed to "at least once", correct?
Yes. From my understanding, in pub/sub messaging (as Kafka provides it), a publisher should not need to care about consumers (if or when the message is consumed, or by how many consumers).
It could be quite problematic if e.g. a "toggle" command is send to a lamp which is delivered more than once then ..
My two cents on that: I think we should inform and encourage users to use idempotent commands instead ("switchOn" and "switchOff"). The current AMQP-based Command & Control API already explains that even a positive acknowledgment of a command message may indicate any of the following:
- An attempt has been made to deliver the command to the device. However, it is unclear if the device has received (or processed) the command.
- The device has acknowledged the reception of the command but has not processed the command yet.
- The device has received and processed the command.
For a reliable "toggle" command, the device must always send a command response when it has changed the state of the lamp.
@calohmn can this be closed?
yes, it's implemented according to solution 2 above.
For a Hono instance that uses Kafka as messaging infrastructure for sending telemetry/event messages (see the corresponding proposal), it makes sense to provide Kafka support also for Command & Control messages.
This would mean northbound applications don't have to use both a Kafka API (for telemetry/events) and an AMQP-based API (for commands) to interact with Hono. It also means removing the necessity to use an AMQP messaging network in addition to a Kafka instance, if Kafka is to be used for telemetry/event messages.