When using a TCP input, packets' data are buffered before logstash
tries do decode them. Therefore, our decode() function will receive
chunks of "random" sizes, that might contain 2 PDUs, 3.4 PDUs, etc.
The current code parses only one PDU and discards the rest of the
payload. Therefore, we can easily miss a PDU, and the next call will
most likely parse the middle of a PDU, which will result in an error.
The file ipfix.dat used during CI is actually a good example : it
contains 3 IPFIX messages. But so far, the code is only considering
the first one, hence the 7 flows returned instead of the 13 that the
file contains.
This commit makes sure each call consumes all the PDUs available in
the payload, and the remaining data (beginning of another PDU) are
buffered to be reused in the next call.
When using a TCP input, packets' data are buffered before logstash tries do decode them. Therefore, our decode() function will receive chunks of "random" sizes, that might contain 2 PDUs, 3.4 PDUs, etc.
The current code parses only one PDU and discards the rest of the payload. Therefore, we can easily miss a PDU, and the next call will most likely parse the middle of a PDU, which will result in an error. The file ipfix.dat used during CI is actually a good example : it contains 3 IPFIX messages. But so far, the code is only considering the first one, hence the 7 flows returned instead of the 13 that the file contains.
This commit makes sure each call consumes all the PDUs available in the payload, and the remaining data (beginning of another PDU) are buffered to be reused in the next call.