Closed ForbiddenEra closed 2 years ago
So, after a bit of digging it seems like the difference between the FluentClient's messages and stuff posted from either td-agent or calyptia-fluentd (tried both) is that the content of the message isn't being parsed as a buffer for some reason for messages from fluentd. Messages sent to self w/fluentclient do come up as Unit8array.
message: [
'tag_prefix.tag',
<Buffer 92 ce 62 86 0b 4f 81 a7 6d 65 73 73 61 67 65 a7 6d 65 73 73 61 67 65>,
{ size: 1 }
]
message: [
'fluent.info',
'\x92׀b\x86\x0BP\x1E\x1AZ\x15\x84£pid͢S¤ppid͢N¦worker\x00§messageٵstarting fluentd worker pid=25171 ppid=25166 worker=0',
{ size: 1, compressed: 'text', chunk: 'XfWdomaRnTurg7tBufiwgA==\n' }
]
Not sure why this would be happening..? I've tried enabling gzip, no difference.
From my investigation, it seems as though the decode method in the extensionCodec never gets called (in either case), the encode method does get called.
Tried different versions of fluent-logger, msgpack.. not sure whats going on.. the extensionCodec never gets called, decodeMultiAsync does see the 2nd one in binary but decodes diff?
Tried td-agent v3 as well.. I know I must be missing something but I can't seem to figure out what it is..
On the protocol specification for fluentd, I noticed something that could maybe affect this..
https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1
Under "PacketForward Mode":
Note: out_forward plugin sends events by the PackedForward mode. It encloses event records with msgpack str format instead of bin format for a backward compatibility reason.
It seems odd that this would be the issue as I'm basically using the config listed in the docs as shown above and even so, wouldn't the lib need to be able to parse both anyhow..? If this is the issue, then is it possible the listed config / server never worked with forwarding from actual fluentd..? Again, seems odd that there would be posted config for fluentd for forwarding to the node FluentServer if it wasn't ever working..?
Node version..? Dependency version..? fluentd version..?
Currently on Node v17.9.0; @fluent-org/logger v1.0.8, @msgpack/msgpack ^2.7.1 (as specified by @fluent-org/logger)
:'(
So looking even closer, it seems like some of the bytes are slightly diff..?
The only thing I can think now is the extensionCodec isn't working to decode the time and causing it to sort of garbage..?
But I need a D4-D8 or C7-C9 to get decodeExtension to fire? I'm getting DB?
I've referred the protocol docs for fluentd and msgpack and can't find anything regarding this/updates/etc..? :'(
I'm going to need to dig deeper on this, thanks for the detailed bug report! To be honest I tested this configuration against Fluent Bit, and assumed that Fluentd would work the same, but it doesn't :( It should though, I'll dig into why that is.
It looks like it's not parsing it correctly at the msgpack level, but I did figure out where the 0xDB is coming from - see https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin/out_forward.rb#L665
hm - I wonder if that's related to one of my last posts, where the docs say that out_foward uses string not binary for something? at least, that's sort of what he comment there implies:
# https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#packedforward-mode
# out_forward always uses str32 type for entries.
I wasn't sure if the note in the docs meant just the section of the record, the whole forward entry or what; this:
Note: out_forward plugin sends events by the PackedForward mode. It encloses event records with msgpack str format instead of bin format for a backward compatibility reason.
from https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#packedforward-mode
In the msgpack spec, 0xdb shows as a 32b/4B string, right? And it seems like indeed it's getting decoded AS that..? right after 0xdb we see decodeUtf8String 98 4 (98 is from 'headerByte' and 4 is the length)
But; the data is clearly NOT a utf8 string.......is this actually, technically, a bug in fluentd vs the msgpack spec?!
Because that almost seems like the case here..? I'm about to head out so haven't looked deeply but thanks for finding where that comes from; not a huge ruby person but can probably at least read it :)
Does seem a little interesting that Bit & fluentd would be different; I almost tempted with bit instead as I'm setting up a new architecture here but thought I might need some extra features of fluentd itself; I'm still new to fluentd itself hence me not being sure about whether I was just having a misconfiguration issue, but seems like this goes deeper.
Thanks for getting back & looking into it, hope your con was great!
entries = [0xdb, chunk_io.size].pack('CN')
So I am guessing this is packing 0xdb byte along with the size but not sure about the CN?
to me it reads as packing 0xDB then chunk_io.size perhaps terminated by 'CN'? I don't see CN [43, 4e] anywhere in the decoded message so I think I'm not reading it right..
It does decode the tag properly, and the options (which includes size, the number of entries, compressed which is 'text' when not compressed and gzip when compressed, also includes another entry when gzipped but since it's 'missing' on part of the encoding, I don't think gzip is getting deflated correctly because of that.
Not sure if the actual packed data also is using the extension bit? that's just for nanosecond resolution time AFAIK..? Might have to go over each byte and reference it in the protocol table to see it should be..
this is a full packet from my 'debug' thing and how it's decoded; hopefully my logging messages make sense; basically just the method name and the chunk/byte that method is processing, the second number on decodeUtf8String I believe is the .length property..
you can see it decodes the tag fine, as well as the property names for the following 'options' object..
decodeMultiAsync:: <Buffer 93>
HeadByte: 0x93 147
decodeMultiAsync:: <Buffer af 73 79 73 74 65 6d 2e 61 75 74 68 2e 65 72 72 db 00 00 00 68 92 d7 00 62 86 5b 0e 00 00 00 00 84 a4 68 6f 73 74 c4 08 64 65 76 2d 74 65 73 74 a5 69 ... 98 more bytes>
HeadByte: 0xaf 175
decodeUtf8String 15 0
decodeUtf8String:: system.auth.err
HeadByte: 0xdb 219
decodeUtf8String 104 4
decodeUtf8String:: ׀b[¤hostĈdev-test¥identĄsshd£pidă971§messageĮerror: connect_to 127.0.0.1 port 9229: failed.
HeadByte: 0x82 130
HeadByte: 0xa4 164
decodeUtf8String 4 0
decodeUtf8String:: size
HeadByte: 0x1 1
HeadByte: 0xaa 170
decodeUtf8String 10 0
decodeUtf8String:: compressed
HeadByte: 0xa4 164
decodeUtf8String 4 0
decodeUtf8String:: text
message: [
'system.auth.err',
'\x92׀b\x86[\x0E\x00\x00\x00\x00\x84¤hostĈdev-test¥identĄsshd£pidă971§messageĮerror: connect_to 127.0.0.1 port 9229: failed.',
{ size: 1, compressed: 'text' }
]
So, it seems you are correct about fluent-bit.
I setup a docker fluent-bit instance that simply has a forward input from fluentd and a foward output to my test app, and that seems to work.
I'm not sure it's 100% correct though; some messages still get delivered as a buffer it seems that parts should be UTF-8/strings maybe..?
In the first screenshot, you can see the line that says:
fluent.info EventTime { _epoch: 1653209822, _nano: 244054614 } { worker: 0, message: 'fluentd worker is now running worker=0' }
that's the line that actually responds to the libraries event and prints it; the fluent messages (which are coming from fluentd still) get decoded fine, but further on, messages from syslog are returned as buffers, ie:
system.daemon.err EventTime { _epoch: 1653209961, _nano: 0 } {
host: <Buffer 64 65 76 2d 74 65 73 74>,
ident: <Buffer 63 6f 6c 6c 65 63 74 64>,
pid: <Buffer 31 38 30 35>,
message: <Buffer 6d 61 74 63 68 5f 74 68 72 6f 74 74 6c 65 5f 6d 65 74 61 64 61 74 61 5f 6b 65 79 73 3a 20 6d 74 67 5f 75 70 64 61 74 65 5f 73 74 61 74 73 20 66 61 69 ... 4 more bytes>
}
This is probably un-related to the original issue and potentially maybe a quirk of the syslog pipeline; not a huge deal if I have to convert it myself but in case it is supposed to be handled, I thought I'd point it out.
Edit: Hmm.. Perhaps it's not completely related to the syslog.. I tried re-enabling my tail input as well to see how that came out..
It doesn't decode it properly at all, thus not even firing the event. I don't get any info back now from the tail packets. This is again, forwarded "through" fluent-bit from fluentd.
This is how it looks directly piped from fluentd without fluent-bit in the middle:
I was hoping I could just put fluent-bit in the middle as a 'proxy' at least temporarily but unfortunately I guess that's not going to work for me.
So the tail events that are getting fowarded but not fired are actually passing through the extensionCodec properly and getting decoded (when forwaded through flluent-bit)
I tried to make the logging more clear; when you see Entry Received::
, that's a fired event from the library.. :Message:
is when protoco.decodeClientStream returns a new message in the for await loop in server.js
I also tried just forcing the result to be a buffer in isPackedFowardMode, which does result in it trying to pass the packed message through the decoder again but I just end up with more GIGO
My tail log comes out even worse like this.
So it really just seems like the content of the actual messages which should be packed isn't parsing right or something.
Even pasting the bytes into an online msgpack converter comes up with the same result. It does pass through decodeExtension when it hits byte d7, but ..something isn't right!
If you paste
93 ab 66 6c 75 65 6e 74 2e 69 6e 66 6f db 00 00 00 86 92 d7 00 62 8a 42 58 39 24 35 67 81 a7 6d 65 73 73 61 67 65 d9 70 5b 69 6e 70 75 74 5f 74 61 69 6c 5d 20 66 6f 6c 6c 6f 77 69 6e 67 20 74 61 69 6c 20 6f 66 20 2f 68 6f 6d 65 2f 66 6f 72 62 69 64 64 65 6e 65 72 61 2f 61 74 62 53 65 72 76 69 63 65 2f 6c 6f 67 73 2f 50 61 79 54 72 69 65 41 54 42 53 65 72 76 69 63 65 2d 30 2e 30 2e 32 2d 61 6c 70 68 61 2d 61 74 62 2d 30 35 2d 32 32 2d 32 32 2e 6c 6f 67 82 a4 73 69 7a 65 01 aa 63 6f 6d 70 72 65 73 73 65 64 a4 74 65 78 74
into https://toolslick.com/conversion/data/messagepack-to-json
it shows the binary chars in the decoded message as well.. I suppose if it didn't have the right extension, this could be expected; however after d7 is 80 which decodeExtension seems to think is type -128; the only type we're adding here is 0, correct?
If you paste in the part of the message including only the actual message, the converter returns false (and indeed the first byte is false)
c2 92 d7 80 62 c2 8a 42 58 2f c2 8f 51 c4 84 c2 a3 70 69 64 cd 88 e9 a4 b0 70 69 64 cd 88 e3 a6 b7 6f 72 6b 65 72 00 c2 a7 6d 65 73 73 61 67 65 d9 b5 73 74 61 72 74 69 6e 67 20 66 6c 75 65 6e 74 64 20 77 6f 72 6b 65 72 20 70 69 64 3d 31 38 36 36 35 20 70 70 69 64 3d 31 38 36 35 39 20 77 6f 72 6b 65 72 3d 30
This one is really bugging me and I'm still not entirely sure if the issue is in this lib, msgpack lib, or that fluentd not following spec proper??
Dug into this today, turns out this is a UTF8 decoding issue, @msgpack/msgpack
completely wrangles the bytes in the string when it decodes them :/
Fix released in v1.0.9
Fix released in v1.0.9
Awesome!!! Thanks so much!
Glad you were able to figure it out; A UTF8 was on my radar but not as high up as thoughts of some sort of secondary decoding issue kind of thing (ie, having to decode a msgpack message within the main message)
Will try this when I can and let you know if I run into any issues.
Wait a minute; can we clarify something here?
You said:
When data is sent in the msgpack str format, the data ends up getting deserialized as UTF8 in @msgpack/msgpack.
Now, the msgpack spec says that the raw str format is UTF8.
At first I thought perhaps @msgpack/msgpack
was the culprit, but if the spec says UTF8 and @msgpack/msgpack
is decoding as UTF8, should that not be correct?
Where is it becoming NOT UTF8 (ascii?)? Is it fluentd encoding it this way, or does that somehow happen within this library?
Just - if this issue is actually rooted in another package, while I definitely appreciate your fix, it would probably be ideal long-term to notify the maintainers of the package with the core issue.
Yeah, I think the actual problem is in msgpack-ruby
, at least @msgpack/msgpack
seems to be doing the right thing according to the spec.
The thing is, bytes like 0x92 aren't even serializable in UTF-8, so I'm not sure how this would ever have worked. I also got wildly different results when I ran with TEXT_DECODING=force, which forces the msgpack decoder to use TextDecoder
instead of their custom one, so it's implementation dependent. I initially looked at ways to reverse the utf8 decoding process, but it looked like it was lossy, and also seemed harder than just messing with the parsing instance.
The data was never in UTF8, fluentd was encoding it in this way. All it does is encode it in msgpack bin
format, but stick a 0xd9/0xda/0xdb
in front instead of a 0xc4/0xc5/0xc6
.
I'm actually somewhat confused why fluentd sends its data in this way, it says it's for backwards compatibility reasons, but fluentd accepts bin
formatted data as well. I imagine msgpack-ruby
had a bug in it that caused this behavior, and once it got fixed they couldn't change both at once? idk
Another completely valid way to fix this would have been to save the buffer, parse it once, and then swap 0xd9/0xda/0xdb in the buffer at the right spot for a bin
formatter. Would be slower though :p
Well, your insight is much appreciated.
I did create issues on the fluent/fluentd
tracker, I think I also did on @msgpack/msgpack
although perhaps I should close that one and open one for the ruby version..
little bit of a weird one!
Hi,
Trying to use this as a server/sink to have fluent forward logs into a node app, but nothing seems to get emitted when forwarded in by an actual fluentd..
Using this test code:
I do immediately see the message forwarded into itself with this test app, however I see nothing emitted when my local fluentd connects and flushes it's buffer.
I've tried the config posted in the readme basically; I have my local syslog as a source tagged as system.
I've also tried <match **> for all messages.. I do not ever see a forward-failed file. The fluentd log complains when my node app isn't listening and then shows that it flushed when it does start listening.
I've setup a copy output to copy to the forward for the node sink and to a local file and can indeed see the log entries in the local file.
Perhaps I'm reading it wrong but retry succeeded with a chunk id sounds like it's definitely pushing a chunk but I'm not getting any event emitted while I do when I push it locally from node.
Any ideas what my issue might be? is there a possible version issue or something?
thanks