Closed laroque closed 5 years ago
To implement the above, we need to include two new pieces of information in the message body: 1) an indication of when a message is the last in a sequence, vs a non-final element of the sequence 2) a monotonic indicator of position in the sequence, so that the replies can be combined and the client can be sure that none were missed
Further, the standard needs to be very clear about how the message objects should be combined. Should subsequent messages mirror the structure of the first message fully, or should they have a different structure which specifies how to add the additional data to the prior message(s)?
Further-further, our implementations will need a clever way to take an existing data object, decide if it is "large," and then figure out how to split it. If we have the above then this doesn't need to be the same everywhere, but it might as well be at least a convention if not a rule.
options:
unique keys are merged and duplicated keys are recursed on their type. Arrays are concatenated, simple types are expected to match.
a1 = { 'k1': 1, 'k2':[1,2,3] }
a2 = {'k1':1, 'k2':[4,5,6], 'k3':'foo'}
A = special_merge(a1, a2) = {'k1':1, 'k2':[1,2,3,4,5,6], 'k3':'foo'}
subsequent messages have instruction sets for how to add additional values (add a new value at a position, concat to an existing array, replace an existing object)
a1 = { 'k1': 1, 'k2':[1,2,3] }
a2 = {'concat':[{indices:['k2'],values:[4,5,6]}], 'add':[{indices:['k3'],values:'foo']}
A = a1.special_merge(a2)
split on string objects
a1 = "{ 'k1': 1, 'k2':[1,2,3,"
a2 = "4,5,6], 'k3':'foo'}"
A = json.loads(a1+a2)
I don't like the third option very much. The first two are hard to weigh.
I've been giving this some thought, and one particular aspect of it seems rather challenging: splitting the payload. Given that we would need to be completely generic as far as the general shape of the payload, I'm not sure this is really possible.
In my opinion it would be more straightforward to serialize first and then break the serialized (string-like) object into smaller pieces. The sequence of events for sending a message would be: serialize --> split --> send --> receive --> join --> deserialize.
However, this sequence is a little awkward in our current setup, where we have dripline headers included in the AMQP message body. We can't simply split the serialized object because the dripline headers would only be included in the first of the split-up objects.
So I would therefore propose a more significant change to relationship between the dripline message structure and the AMQP message structure: make the dripline headers all AMQP message attributes, and the AMQP message body is then comprised of only the dripline payload. Some of the dripline headers can be matched with standard AMQP properties (e.g. the timestamp), but some will have to be custom properties (e.g. lockout key).
In this case we can then do the serialize --> split --> send --> receive --> join --> deserialize sequence. In an implementation, the function for sending a dripline message, which is responsible for serialization, would serialize the payload, divide the string-like object up if necessary, and send messages with each part of the payload. Presumably we'd add header fields to indicate the position in the sequence of AMQP messages and either the final AMQP message or the total number of AMQP messages.
Regarding implementations for my above suggestion:
In Python, you set the headers
keyword in pika.BasicProperties
. Here's an example, and the pika documentation.
In C++ you use the header table interface functions in AmqpClient::BasicMessage
. A AmqpClient::Table
is just a std::map<TableKey, TableValue>
, where a AmqpClient::TableKey
is just a std::string
. `AmqpClient::TableValue is a union-type class that can have a variety of different types (definition).
I've implemented the above change in how dripline headers are handled in dripline-cpp as a test case. You can see the changes in this PR: https://github.com/driplineorg/dripline-cpp/pull/1.
I've tested this new system by validating the round trip from dripline to amqp and back to dripline.
I've made proposed modifications to the standards that correspond to the message integration suggested above. You can see the changes in this PR: https://github.com/driplineorg/dripline/pull/9.
I've also made proposed modifications to the standards for handling the large payload splitting suggested above. You can see those changes in this PR: https://github.com/driplineorg/dripline/pull/10.
In implementing the C++ version of this feature, I've found a necessary change to one aspect from the previous implementation. Previously receiving a message, in particular a response, was implemented as a static function in core
. That action did not require any statefulness, even when expecting multiple response messages. On this new scheme, however, we need an object to maintain it's state as it waits for multiple chunks from each reply. My current plan for handling this is to put the basic receive-message functionality in a receiver
class, including functions for waiting for a reply.
Ben and I have decided to go forward with the solution being implemented in C++.
Branch for documentation of this proposal: feature/split_message
The implementation is now in dripline-cpp#12.
Finished in dripline-cpp
Migrated from project8/dripline issue 168
Recommendation
There has been some discussion lately about how to best deal with large payloads. In particular, we've been trying to trying to transfer ~hundreds of kB of data from an SQL select for storage elsewhere, though in the past we've also transferred long arrays of data from, say, the lockin or others. I think that the dripline standard itself should address this, a few options come to mind:
Set some cap on message payload size and define a means by which larger payloads can be split, sent as a set of multiple replies, and combined at the end. This is non-trivial effort but scales and should be a "forever" solution.
An alternative (not preferred)
Dictate that dripline communication is for instructions and replies, but not larger data transfers. Qualitatively state that "large" transfers are not supported and/or discouraged, and then continue to move forward roughly as we are now. This is less precise but seems to be what is happening naturally. some other idea?