Closed vcostet closed 7 years ago
@vcostet Your messages are likely truncated because the default buffer size of 4096 bytes is too small. You can try increasing it to a larger value, maybe something like 8096 or 16384.
In general with IPv4, I believe the limit could be up to 65507 bytes but keep in mind you should only be sending messages to a protologbeat instance running locally on the same server. Connecting to even another host on the same private network would likely cause splitting of messages due to MTU.
As for the buffer being initialized outside of the for loop, that is purposely done so that the same buffer can be reused over and over, without initializing and allocating a new byte slice on each message which would consequently cause increased GC overhead. The ReadFrom
method simply reuses this buffer by reading in bytes form the socket connection.
The issue is not related to the buffer size (the messages are very short, < 20 characters and UDP stream is coming from a local service, and are not splitted).
processMessage
is called asynchronously, but will always get a reference to the same buffer, and the content of the buffer could be altered before (or during) the execution of processMessage
.
For example: if I receive two messages "abc" and "wxyz" almost at the same time, the execution order could likely be:
processMessage
is scheduled (but not yet executed)processMessage
is scheduled (but not yet executed)processMessage
is executed, but it will produce the message "wxy" (current value of the buffer, but truncated (because it uses a length of 3)) instead of "abc"processMessage
is executed for the second time, and will produce the message "wxyz"I can see two solutions:
processMessage
asynchronously (but it will increase the risk of loosing packets if processMessage
is too slow)startTCP
already creates a new buffer for each messages, processMessage
will allocate a new string for each messages in any case)@vcostet Thanks for the good feedback. Taking a second look at it yes I now see that I might have potentially overlooked this issue. The fact that I failed to take into account before is the byte slice is simply passed along essentially as a descriptor to the underlying data structure and not as a copy.
Although unlikely when logging a single message per second or less, I now see this could happen in environments with higher message volumes. Considering that in the processMessage
function i end up converting the byte slice to a string, another option could be to immediately convert the byte slice to a string and then pass it along as a function argument like this:
for {
length, _, err := l.ReadFrom(buffer)
if err != nil {
logp.Err("Error reading from buffer: %v", err.Error())
continue
}
if length == 0 {
continue
}
go ll.processMessage(strings.TrimSpace(string(buffer[:length])))
}
Considering strings are passed as copies of the original value, this would then eliminate the risk off the data being modified prior to the actual execution of the goroutine.
What do you think about that solution? Although yes the one you proposed would also work.
Haven't heard anything back from you on this last post @vcostet although I'll simply merge your PR for the sake of simplicity and considering it should do the job just fine. Thanks again.
In startUDP, the buffer has to be created inside the for loop, otherwise processMessage will always get the same buffer (and the content could change before the processMessage goroutine is executed).