lf-edge / ekuiper

Lightweight data stream processing engine for IoT edge
https://ekuiper.org
Apache License 2.0
1.48k stars 413 forks source link

File Sink Output - JSON Lines type - Not completing the last log line at the end of the file #2343

Closed canob closed 1 year ago

canob commented 1 year ago

I configured different sources on eKuiper:

The really strange thing here, is that after some time (sometimes, minutes), eKuiper completes the last JSON line of the output file with cutted part.

Environment:

What happened and what you expected to happen: Two things happened:

How to reproduce it (as minimally and precisely as possible):

Anything else we need to know?: Nothing in particular.

canob commented 1 year ago

To continue testing this, I added a REST sink Output, sending the result of a rule to FluentBit HTTP Input, and eKuiper is reporting this error: unexpected end of JSON input: http error. | method=POST path="http://192.168.1.50:8888/" status=201 request_body="{"Mem.free":2312340,"Mem.total":32819404}" response_body="" Is not clear to me if this problem is related to the already mentioned one, which is the last line in the logs is cutted when I send the output to a file, but maybe is related. imagen

ngjaying commented 1 year ago

@canob Appreciate the detail description. Are you trying to use files to transfer data between rules? That's not the purpose of file sink. It is not designed to write each json output to a single file ( which means "completing the last line" ). Instead file sink is used to save batch data.

To transfer data between rules, try to use memory sink/source pair. Check https://ekuiper.org/docs/en/latest/guide/rules/rule_pipeline.html.

The REST error means the JSON output format has some problems. Please try to debug https://ekuiper.org/docs/en/latest/getting_started/debug_rules.html

canob commented 1 year ago

Hi @ngjaying . Thanks for your answer. I'm not trying to use file transfers between rules, I'm using the same rule to consume a source (a file in a rule, a HTTP Push in other rule, both JSON content) and send that data to an output (a different file output in both rules), so nothing is shared between rules. Thinking in the idea that the output file in both rules is not being completed in real time (the idea of "completing last line"), I thought that the REST errors were because eKuiper is trying to process the file and is not finding the last line complete, so is not a valid JSON. I'm going to try the debug rule option to get more information about the error and add more information to this post, in the case that is useful for you to debug the problem.

ngjaying commented 1 year ago

Thanks for clarifying. You said "and I cannot process that file with other piece of software in that case."

I don't think file sink is designed for "realtime" data transfer. Would you like to use MQTT or REST sink to publish "realtime" data to external system? For file sink, it has a rolling strategy(https://ekuiper.org/docs/en/latest/guide/sinks/builtin/file.html#rolling-strategy), and will guarantee to complete the writing for each roll.

canob commented 1 year ago

Based on your idea of using other type of sink to publish "realtime" data to external system (not file sink) is that I configured a REST sink Output to send data to FluentBit HTTP Input (my second post in this issue), but in that case happened the problem with JSON unexpected end of JSON input that I need to try to debug with the provided procedure that you commented me in your other answer. I even tried this for example: FluentBit HTTP Output -> eKuiper HTTP Push Source -> A filter with a rule -> HTTP Pull Sink -> FluentBit HTTP Input, and is not working with the same message error of JSON, and there is no file involved in that path of data. Please don't misunderstand my questions regarding the functionality of eKuiper: it's been a long time since I reviewed a project as promising and with so many interesting features as this one, I'm really amazed about the capabilities of eKuiper, and want to understand better how to use it.

ngjaying commented 1 year ago

Each sink can have multiple actions, during rule composing, you can add an additional log action to watch the output. I guess you'll just need to set "sendSingle" to true in the sink property.

canob commented 1 year ago

I already changed to "sendSingle" on sink output when I saw the first time the unexpected end of JSON input: http error error. today I'm going to enable log action and debug on rules to try to find the problem. Thanks for all your help.

canob commented 1 year ago

Hi again @ngjaying,

So, I did many tests to try to understand what happening, but is no really clear for me why I'm getting unexpected end of JSON input: http error error.

The final conclusion is that If I create this path:

Fluentbit Istance 1 HTTP output (memory statistics) to eKuiper
-> eKuiper HTTP Push Source
    -> eKuiper rule to select only two fields of the received JSON, Mem.free and Mem.total
         -> eKuiper HTTP Pull Sink
              -> Fluentbit Instance 2 HTTP input from eKuiper
                  -> Fluentbit Instance 2 File output to ekuiper file

I'm receive the error, but I'm not loosing any event: imagen imagen

As you can see, I send 6 events from Fluentbit to eKuiper, and I ended with 6 events on ekuiper file (you are going to see an additional field, "tag", because I'm adding that field with a filter in Fluentbit before send the events to ekuiper file): imagen

The debug log of eKuiper is not showing any particular error:

time="2023-10-23 13:07:50" level=info msg="Serving kuiper (version - 1.11.4) on port 20498, and restful api on http://0.0.0.0:9081." file="server/server.go:208"
Serving kuiper (version - 1.11.4) on port 20498, and restful api on http://0.0.0.0:9081.
time="2023-10-23 13:08:00" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:08:10" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:08:20" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:08:30" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:08:40" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:08:50" level=debug msg="receive http request: /api/data" file="httpserver/data_server.go:83" httppush_connection=0
time="2023-10-23 13:08:50" level=debug msg="httppush received message map[Mem.free:%!s(float64=711716) Mem.total:%!s(float64=3.2819404e+07) Mem.used:%!s(float64=3.2107688e+07) Swap.free:%!s(float64=7.761148e+06) Swap.total:%!s(float64=7.995388e+06) Swap.used:%!s(float64=234240) date:2023-10-23T13:08:49.461940Z tag:memory]" file="httpserver/data_server.go:92" httppush_connection=0
time="2023-10-23 13:08:50" level=debug msg="memory source broadcast from topic $$httppush//api/data to fluentbit-output_HTTPInput_0 done" file="pubsub/manager.go:137" httppush_connection=0
time="2023-10-23 13:08:50" level=debug msg="filter plan receive &{HTTPInput map[Mem.free:711716 Mem.total:3.2819404e+07 Mem.used:3.2107688e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:49.461940Z tag:memory] 1698066530479 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/filter_operator.go:37" rule=fluentbit-output
time="2023-10-23 13:08:50" level=debug msg="project plan receive &{HTTPInput map[Mem.free:711716 Mem.total:3.2819404e+07 Mem.used:3.2107688e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:49.461940Z tag:memory] 1698066530479 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/project_operator.go:54" rule=fluentbit-output
time="2023-10-23 13:08:50" level=debug msg="sending data: [map[Mem.free:711716 Mem.total:3.2819404e+07]]" file="node/sink_node.go:242" rule=fluentbit-output
time="2023-10-23 13:08:50" level=debug msg="rest sink receive map[Mem.free:%!s(float64=711716) Mem.total:%!s(float64=3.2819404e+07)]" file="http/rest_sink.go:87" rule=fluentbit-output
time="2023-10-23 13:08:50" level=debug msg="do request: &http.Request{Method:\"POST\", URL:(*url.URL)(0xc0000d5200), Proto:\"HTTP/1.1\", ProtoMajor:1, ProtoMinor:1, Header:http.Header{\"Content-Type\":[]string{\"application/json\"}}, Body:io.nopCloserWriterTo{Reader:(*bytes.Buffer)(0xc0003e6db0)}, GetBody:(func() (io.ReadCloser, error))(0xfcaea0), ContentLength:40, TransferEncoding:[]string(nil), Close:false, Host:\"192.168.1.50:8888\", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:\"\", RequestURI:\"\", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:(*context.emptyCtx)(0xc000046080)}" file="httpx/http.go:99" rule=fluentbit-output
time="2023-10-23 13:08:50" level=debug msg="rest sink got response &{201 Created  201 HTTP/1.1 1 1 map[Content-Length:[0] Ok:[ok] Server:[Fluent Bit v2.1.10]] 0xc0001f0dc0 0 [] false false map[] 0xc0006e5600 <nil>}" file="http/rest_sink.go:67" rule=fluentbit-output
time="2023-10-23 13:08:50" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:08:51" level=debug msg="receive http request: /api/data" file="httpserver/data_server.go:83" httppush_connection=0
time="2023-10-23 13:08:51" level=debug msg="httppush received message map[Mem.free:%!s(float64=711100) Mem.total:%!s(float64=3.2819404e+07) Mem.used:%!s(float64=3.2108304e+07) Swap.free:%!s(float64=7.761148e+06) Swap.total:%!s(float64=7.995388e+06) Swap.used:%!s(float64=234240) date:2023-10-23T13:08:50.461888Z tag:memory]" file="httpserver/data_server.go:92" httppush_connection=0
time="2023-10-23 13:08:51" level=debug msg="memory source broadcast from topic $$httppush//api/data to fluentbit-output_HTTPInput_0 done" file="pubsub/manager.go:137" httppush_connection=0
time="2023-10-23 13:08:51" level=debug msg="filter plan receive &{HTTPInput map[Mem.free:711100 Mem.total:3.2819404e+07 Mem.used:3.2108304e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:50.461888Z tag:memory] 1698066531477 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/filter_operator.go:37" rule=fluentbit-output
time="2023-10-23 13:08:51" level=debug msg="project plan receive &{HTTPInput map[Mem.free:711100 Mem.total:3.2819404e+07 Mem.used:3.2108304e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:50.461888Z tag:memory] 1698066531477 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/project_operator.go:54" rule=fluentbit-output
time="2023-10-23 13:08:51" level=debug msg="sending data: [map[Mem.free:711100 Mem.total:3.2819404e+07]]" file="node/sink_node.go:242" rule=fluentbit-output
time="2023-10-23 13:08:51" level=debug msg="rest sink receive map[Mem.free:%!s(float64=711100) Mem.total:%!s(float64=3.2819404e+07)]" file="http/rest_sink.go:87" rule=fluentbit-output
time="2023-10-23 13:08:51" level=debug msg="do request: &http.Request{Method:\"POST\", URL:(*url.URL)(0xc000699320), Proto:\"HTTP/1.1\", ProtoMajor:1, ProtoMinor:1, Header:http.Header{\"Content-Type\":[]string{\"application/json\"}}, Body:io.nopCloserWriterTo{Reader:(*bytes.Buffer)(0xc0007da960)}, GetBody:(func() (io.ReadCloser, error))(0xfcaea0), ContentLength:40, TransferEncoding:[]string(nil), Close:false, Host:\"192.168.1.50:8888\", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:\"\", RequestURI:\"\", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:(*context.emptyCtx)(0xc000046080)}" file="httpx/http.go:99" rule=fluentbit-output
time="2023-10-23 13:08:51" level=debug msg="rest sink got response &{201 Created  201 HTTP/1.1 1 1 map[Content-Length:[0] Ok:[ok] Server:[Fluent Bit v2.1.10]] 0xc0001f4a40 0 [] false false map[] 0xc000689c00 <nil>}" file="http/rest_sink.go:67" rule=fluentbit-output
time="2023-10-23 13:08:52" level=debug msg="receive http request: /api/data" file="httpserver/data_server.go:83" httppush_connection=0
time="2023-10-23 13:08:52" level=debug msg="httppush received message map[Mem.free:%!s(float64=710848) Mem.total:%!s(float64=3.2819404e+07) Mem.used:%!s(float64=3.2108556e+07) Swap.free:%!s(float64=7.761148e+06) Swap.total:%!s(float64=7.995388e+06) Swap.used:%!s(float64=234240) date:2023-10-23T13:08:51.461825Z tag:memory]" file="httpserver/data_server.go:92" httppush_connection=0
time="2023-10-23 13:08:52" level=debug msg="memory source broadcast from topic $$httppush//api/data to fluentbit-output_HTTPInput_0 done" file="pubsub/manager.go:137" httppush_connection=0
time="2023-10-23 13:08:52" level=debug msg="filter plan receive &{HTTPInput map[Mem.free:710848 Mem.total:3.2819404e+07 Mem.used:3.2108556e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:51.461825Z tag:memory] 1698066532466 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/filter_operator.go:37" rule=fluentbit-output
time="2023-10-23 13:08:52" level=debug msg="project plan receive &{HTTPInput map[Mem.free:710848 Mem.total:3.2819404e+07 Mem.used:3.2108556e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:51.461825Z tag:memory] 1698066532466 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/project_operator.go:54" rule=fluentbit-output
time="2023-10-23 13:08:52" level=debug msg="sending data: [map[Mem.free:710848 Mem.total:3.2819404e+07]]" file="node/sink_node.go:242" rule=fluentbit-output
time="2023-10-23 13:08:52" level=debug msg="rest sink receive map[Mem.free:%!s(float64=710848) Mem.total:%!s(float64=3.2819404e+07)]" file="http/rest_sink.go:87" rule=fluentbit-output
time="2023-10-23 13:08:52" level=debug msg="do request: &http.Request{Method:\"POST\", URL:(*url.URL)(0xc0006993b0), Proto:\"HTTP/1.1\", ProtoMajor:1, ProtoMinor:1, Header:http.Header{\"Content-Type\":[]string{\"application/json\"}}, Body:io.nopCloserWriterTo{Reader:(*bytes.Buffer)(0xc0007db050)}, GetBody:(func() (io.ReadCloser, error))(0xfcaea0), ContentLength:40, TransferEncoding:[]string(nil), Close:false, Host:\"192.168.1.50:8888\", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:\"\", RequestURI:\"\", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:(*context.emptyCtx)(0xc000046080)}" file="httpx/http.go:99" rule=fluentbit-output
time="2023-10-23 13:08:52" level=debug msg="rest sink got response &{201 Created  201 HTTP/1.1 1 1 map[Content-Length:[0] Ok:[ok] Server:[Fluent Bit v2.1.10]] 0xc0001f4dc0 0 [] false false map[] 0xc00091c000 <nil>}" file="http/rest_sink.go:67" rule=fluentbit-output
time="2023-10-23 13:08:53" level=debug msg="receive http request: /api/data" file="httpserver/data_server.go:83" httppush_connection=0
time="2023-10-23 13:08:53" level=debug msg="httppush received message map[Mem.free:%!s(float64=728528) Mem.total:%!s(float64=3.2819404e+07) Mem.used:%!s(float64=3.2090876e+07) Swap.free:%!s(float64=7.761148e+06) Swap.total:%!s(float64=7.995388e+06) Swap.used:%!s(float64=234240) date:2023-10-23T13:08:52.461838Z tag:memory]" file="httpserver/data_server.go:92" httppush_connection=0
time="2023-10-23 13:08:53" level=debug msg="memory source broadcast from topic $$httppush//api/data to fluentbit-output_HTTPInput_0 done" file="pubsub/manager.go:137" httppush_connection=0
time="2023-10-23 13:08:53" level=debug msg="filter plan receive &{HTTPInput map[Mem.free:728528 Mem.total:3.2819404e+07 Mem.used:3.2090876e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:52.461838Z tag:memory] 1698066533466 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/filter_operator.go:37" rule=fluentbit-output
time="2023-10-23 13:08:53" level=debug msg="project plan receive &{HTTPInput map[Mem.free:728528 Mem.total:3.2819404e+07 Mem.used:3.2090876e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:52.461838Z tag:memory] 1698066533466 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/project_operator.go:54" rule=fluentbit-output
time="2023-10-23 13:08:53" level=debug msg="sending data: [map[Mem.free:728528 Mem.total:3.2819404e+07]]" file="node/sink_node.go:242" rule=fluentbit-output
time="2023-10-23 13:08:53" level=debug msg="rest sink receive map[Mem.free:%!s(float64=728528) Mem.total:%!s(float64=3.2819404e+07)]" file="http/rest_sink.go:87" rule=fluentbit-output
time="2023-10-23 13:08:53" level=debug msg="do request: &http.Request{Method:\"POST\", URL:(*url.URL)(0xc0005bc240), Proto:\"HTTP/1.1\", ProtoMajor:1, ProtoMinor:1, Header:http.Header{\"Content-Type\":[]string{\"application/json\"}}, Body:io.nopCloserWriterTo{Reader:(*bytes.Buffer)(0xc0005beab0)}, GetBody:(func() (io.ReadCloser, error))(0xfcaea0), ContentLength:40, TransferEncoding:[]string(nil), Close:false, Host:\"192.168.1.50:8888\", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:\"\", RequestURI:\"\", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:(*context.emptyCtx)(0xc000046080)}" file="httpx/http.go:99" rule=fluentbit-output
time="2023-10-23 13:08:53" level=debug msg="rest sink got response &{201 Created  201 HTTP/1.1 1 1 map[Content-Length:[0] Ok:[ok] Server:[Fluent Bit v2.1.10]] 0xc0001f4f20 0 [] false false map[] 0xc0005dc600 <nil>}" file="http/rest_sink.go:67" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="receive http request: /api/data" file="httpserver/data_server.go:83" httppush_connection=0
time="2023-10-23 13:08:54" level=debug msg="httppush received message map[Mem.free:%!s(float64=726552) Mem.total:%!s(float64=3.2819404e+07) Mem.used:%!s(float64=3.2092852e+07) Swap.free:%!s(float64=7.761148e+06) Swap.total:%!s(float64=7.995388e+06) Swap.used:%!s(float64=234240) date:2023-10-23T13:08:53.461865Z tag:memory]" file="httpserver/data_server.go:92" httppush_connection=0
time="2023-10-23 13:08:54" level=debug msg="memory source broadcast from topic $$httppush//api/data to fluentbit-output_HTTPInput_0 done" file="pubsub/manager.go:137" httppush_connection=0
time="2023-10-23 13:08:54" level=debug msg="filter plan receive &{HTTPInput map[Mem.free:726552 Mem.total:3.2819404e+07 Mem.used:3.2092852e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:53.461865Z tag:memory] 1698066534466 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/filter_operator.go:37" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="project plan receive &{HTTPInput map[Mem.free:726552 Mem.total:3.2819404e+07 Mem.used:3.2092852e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:53.461865Z tag:memory] 1698066534466 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/project_operator.go:54" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="sending data: [map[Mem.free:726552 Mem.total:3.2819404e+07]]" file="node/sink_node.go:242" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="rest sink receive map[Mem.free:%!s(float64=726552) Mem.total:%!s(float64=3.2819404e+07)]" file="http/rest_sink.go:87" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="do request: &http.Request{Method:\"POST\", URL:(*url.URL)(0xc0000d5cb0), Proto:\"HTTP/1.1\", ProtoMajor:1, ProtoMinor:1, Header:http.Header{\"Content-Type\":[]string{\"application/json\"}}, Body:io.nopCloserWriterTo{Reader:(*bytes.Buffer)(0xc0009a2240)}, GetBody:(func() (io.ReadCloser, error))(0xfcaea0), ContentLength:40, TransferEncoding:[]string(nil), Close:false, Host:\"192.168.1.50:8888\", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:\"\", RequestURI:\"\", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:(*context.emptyCtx)(0xc000046080)}" file="httpx/http.go:99" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="rest sink got response &{201 Created  201 HTTP/1.1 1 1 map[Content-Length:[0] Ok:[ok] Server:[Fluent Bit v2.1.10]] 0xc00007e860 0 [] false false map[] 0xc000766700 <nil>}" file="http/rest_sink.go:67" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="receive http request: /api/data" file="httpserver/data_server.go:83" httppush_connection=0
time="2023-10-23 13:08:54" level=debug msg="httppush received message map[Mem.free:%!s(float64=725800) Mem.total:%!s(float64=3.2819404e+07) Mem.used:%!s(float64=3.2093604e+07) Swap.free:%!s(float64=7.761148e+06) Swap.total:%!s(float64=7.995388e+06) Swap.used:%!s(float64=234240) date:2023-10-23T13:08:54.461890Z tag:memory]" file="httpserver/data_server.go:92" httppush_connection=0
time="2023-10-23 13:08:54" level=debug msg="memory source broadcast from topic $$httppush//api/data to fluentbit-output_HTTPInput_0 done" file="pubsub/manager.go:137" httppush_connection=0
time="2023-10-23 13:08:54" level=debug msg="filter plan receive &{HTTPInput map[Mem.free:725800 Mem.total:3.2819404e+07 Mem.used:3.2093604e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:54.461890Z tag:memory] 1698066534789 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/filter_operator.go:37" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="project plan receive &{HTTPInput map[Mem.free:725800 Mem.total:3.2819404e+07 Mem.used:3.2093604e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:54.461890Z tag:memory] 1698066534789 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/project_operator.go:54" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="sending data: [map[Mem.free:725800 Mem.total:3.2819404e+07]]" file="node/sink_node.go:242" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="rest sink receive map[Mem.free:%!s(float64=725800) Mem.total:%!s(float64=3.2819404e+07)]" file="http/rest_sink.go:87" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="do request: &http.Request{Method:\"POST\", URL:(*url.URL)(0xc0006994d0), Proto:\"HTTP/1.1\", ProtoMajor:1, ProtoMinor:1, Header:http.Header{\"Content-Type\":[]string{\"application/json\"}}, Body:io.nopCloserWriterTo{Reader:(*bytes.Buffer)(0xc0007db9b0)}, GetBody:(func() (io.ReadCloser, error))(0xfcaea0), ContentLength:40, TransferEncoding:[]string(nil), Close:false, Host:\"192.168.1.50:8888\", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:\"\", RequestURI:\"\", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:(*context.emptyCtx)(0xc000046080)}" file="httpx/http.go:99" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="rest sink got response &{201 Created  201 HTTP/1.1 1 1 map[Content-Length:[0] Ok:[ok] Server:[Fluent Bit v2.1.10]] 0xc0001f56c0 0 [] false false map[] 0xc00091c700 <nil>}" file="http/rest_sink.go:67" rule=fluentbit-output
time="2023-10-23 13:09:00" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:09:10" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"

So, my questions to try to understand are basically:

Thanks in advance for your help.

ngjaying commented 1 year ago

@canob The error happens when eKuiper parse the response from Fluentbit. Looks like the response from Fluentbit is not a JSON string. The error message is misleading, we'll optimize that.

I guess you have set debugResp option to true for rest sink. You can just set it to false to skip parsing response to avoid this problem.

canob commented 1 year ago

Thanks @ngjaying ! With debugResp set to false I don't see the unexpected end of JSON input: http error error anymore. Amazing.

canob commented 1 year ago

Hi @ngjaying, Back with the title of this issue, "File Sink Output not completing the last log line", I already managed to use REST Sink Output without any errors thanks to your help, but I really want to understand the behavior of File Sink, because I still think that something is not working as expected, and maybe I was not enough clear to explain the problem before, and I'm going to try again with a simpler explanation: So, for example, I have this workflow:

Fluentbit HTTP Output
-> Stream eKuiper HTTP Push Source, named dns
    -> Rule with "select * FROM dns"
        -> File Sink, rolling by time (as you explained to me before in one of your answers in other post)

Now, what for me is not ok or is not the expected behavior (or in other words, I not saw that behavior in other stream processing solutions), is that for many seconds, in the oldest file, last line of the file, when I do a "cat filename", contains a "cutted" JSON line at the end, and after 10-20 seconds (one or two "cat filename"), that mentioned line complete their content, additional JSON lines appear, and the file is closed and a new file is created by eKuiper to continue appending JSON lines to it. Here is a simple example of what I'm trying to explain (between parenthesis my comments):

root@ar09-ubuntu:/disk/ekuiper/output# ls -lah
total 17.6K
drwxr-xr-x 2 root root 4.0K Oct 25 18:55 .
drwxr-xr-x 5 root root 4.0K Oct 24 14:35 ..
-rw-r--r-- 1 root root 6.7K Oct 25 18:54 dns-query-1698260023842.json
-rw-r--r-- 1 root root 6.9K Oct 25 18:55 dns-query-1698260065851.json
-rw-r--r-- 1 root root 4.0K Oct 25 18:55 dns-query-1698260107832.json (the file, for now incomplete, of 4.0K, but can be of 8.0K, 12.0K, and so on, depends on how many logs is receiving before the rotation)

root@ar09-ubuntu:/disk/ekuiper/output# cat dns-query-1698260107832.json
... (more lines here that I deleted for the example)
{"date":"2023-10-25T18:56:00.000000Z","qname":"encrypted-tbn0.gstatic.com","query_type":"A","source_ip":"10.11.21.43","tag":"dns_query","timestamp":"Oct 25 15:56:00"}
{"date":"2023-10-25T18:56:01.000000Z","qname":"login.microsoftonline.com","query_type":"A","source_ip":"10.199.0.65","tag":"dns_query" (this last JSON line is not complete)

root@ar09-ubuntu:/disk/ekuiper/output# ls -lah
total 17.6K
drwxr-xr-x 2 root root 4.0K Oct 25 18:55 .
drwxr-xr-x 5 root root 4.0K Oct 24 14:35 ..
-rw-r--r-- 1 root root 6.7K Oct 25 18:54 dns-query-1698260023842.json
-rw-r--r-- 1 root root 6.9K Oct 25 18:55 dns-query-1698260065851.json
-rw-r--r-- 1 root root 4.0K Oct 25 18:55 dns-query-1698260107832.json (the file, for now incomplete, of 4.0K, but can be of 8.0K, 12.0K, and so on, depends on how many logs is receiving before the rotation)

root@ar09-ubuntu:/disk/ekuiper/output# cat dns-query-1698260107832.json (executing this 10 seconds later)
... (more lines here that I deleted for the example)
{"date":"2023-10-25T18:56:00.000000Z","qname":"encrypted-tbn0.gstatic.com","query_type":"A","source_ip":"10.11.21.43","tag":"dns_query","timestamp":"Oct 25 15:56:00"}
{"date":"2023-10-25T18:56:01.000000Z","qname":"login.microsoftonline.com","query_type":"A","source_ip":"10.199.0.65","tag":"dns_query" (again, the same last JSON line is not complete)

root@ar09-ubuntu:/disk/ekuiper/output# cat dns-query-1698260107832.json (executing this 10 seconds later)
... (more lines here that I deleted for the example)
{"date":"2023-10-25T18:56:00.000000Z","qname":"encrypted-tbn0.gstatic.com","query_type":"A","source_ip":"10.11.21.43","tag":"dns_query","timestamp":"Oct 25 15:56:00"}
{"date":"2023-10-25T18:56:01.000000Z","qname":"login.microsoftonline.com","query_type":"A","source_ip":"10.199.0.65","tag":"dns_query","timestamp":"Oct 25 15:56:01"} (the JSON line is now complete, and you have others after)
{"date":"2023-10-25T18:56:02.000000Z","qname":"www.googleapis.com","query_type":"HTTPS","source_ip":"10.11.21.43","tag":"dns_query","timestamp":"Oct 25 15:56:02"}
{"date":"2023-10-25T18:56:03.000000Z","qname":"ssl01.sjc2.goskope.com","query_type":"AAAA","source_ip":"10.199.0.190","tag":"dns_query","timestamp":"Oct 25 15:56:03"}
{"date":"2023-10-25T18:56:05.000000Z","qname":"nam.loki.delve.office.com","query_type":"A","source_ip":"10.199.0.190","tag":"dns_query","timestamp":"Oct 25 15:56:05"}
{"date":"2023-10-25T18:56:10.000000Z","qname":"dit.whatsapp.net","query_type":"A","source_ip":"10.199.0.39","tag":"dns_query","timestamp":"Oct 25 15:56:10"}
{"date":"2023-10-25T18:56:17.000000Z","qname":"www.msftconnecttest.com","query_type":"A","source_ip":"10.11.21.37","tag":"dns_query","timestamp":"Oct 25 15:56:17"}
{"date":"2023-10-25T18:56:18.000000Z","qname":"ssl01.sjc2.goskope.com","query_type":"AAAA","source_ip":"10.199.0.190","tag":"dns_query","timestamp":"Oct 25 15:56:18"}

root@ar09-ubuntu:/disk/ekuiper/output# ls -lah
total 19.4K
drwxr-xr-x 2 root root 4.0K Oct 25 18:55 .
drwxr-xr-x 5 root root 4.0K Oct 24 14:35 ..
-rw-r--r-- 1 root root 6.7K Oct 25 18:54 dns-query-1698260023842.json
-rw-r--r-- 1 root root 6.9K Oct 25 18:55 dns-query-1698260065851.json
-rw-r--r-- 1 root root 6.8K Oct 25 18:55 dns-query-1698260107832.json (the file, now complete)
-rw-r--r-- 1 root root    0 Oct 25 18:55 dns-query-1698260149841.json

My assumption is that eKuiper has a kind of buffer/cache of 4K to send the generated rule output content to the file, because every time the startsize of the file after receiving the first bunch of events is of 4.0K, and so on (8.0K, 12.0K, etc.). What I want to know is if I can avoid that, that is, receive the JSON lines in real time in the file, or if that is not possible, receive only complete JSON lines in the file, not cutted.

I already tried to "play" with Cache configs and Async configs on eKuiper, but nothing helped to change this behavior, so maybe it is by design, but I want to understand why, because I prefer to use File Sink than REST Sink, thinking in performance.

Thanks in advance for your help, and sorry for the big explanation, but I feel that when you try to explain a problem/issue, is better to give all the details that you can (and more if your are not a native english speaker, like me).

ngjaying commented 1 year ago

As file sink is not designed for real time data transfer, we are using golang bufio to write files for better io perfermance. That should be its default behavior. MQTT may be suitable for realtime transfer. We are also working on websocket sink, which is also suitable for data transfer.

Regarding json "not completed", that's because you cannot complete a json array in the middle. Think about you expect to have 10 message to write to a json array, when you get the first image, you cannot "compete" it. [{"id":1}], otherwise, how to append the second message? This is not a valid json [{"id":1}],[{"id":2}] should be [{"id":1"},{"id:2"} and append the last ] once all expected messages are arrived. There is another format lines which do not require a hook to "complete" at last.

Finally, as we are an open source project, we encourage you to read the source code directly if neede.

canob commented 1 year ago

Hi @ngjaying . Thanks for your answer. Perfect, so File Sink Output is not an option for me. I can use MQTT but only for Input in FluentBit (MQTT Sink in eKuiper side), because Output to MQTT is not supported: https://github.com/fluent/fluent-bit/issues/674 (you can see in that feature request that I already talked about eKuiper as a really good companion for FluentBit). About "JSON not completed", I'm already using JSON lines format for my File Output, so the idea is that every line (so, every log) is a valid JSON, and that was the "why" of my question, only that. I tried to read all the docs I can before asked, but sometimes I don't find the answer and that is the reason I'm bothering you with maybe stupid questions, and I'm sorry about that. Regarding source code, I have notions of programming, but I'm not a developer, so understand the source code, at least for me, sometimes is complicated. Again, thanks for all your help and all your patience. I really think that eKuiper is an amazing project, and I think that at the end with my questions and your answers we are contributing to clarify things for everyone, that's all.

ngjaying commented 1 year ago

@canob You're welcome! About JSON incomplete, sorry that I did not read carefully. We should expect to have complete JSON outputs each time. Let me look into that.