brimdata / zed

A novel data lake based on super-structured data
https://zed.brimdata.io/
BSD 3-Clause "New" or "Revised" License
1.36k stars 67 forks source link

"Live" data import with Fluentd #4271

Open philrz opened 1 year ago

philrz commented 1 year ago

In an exercise similar to #3151 with Logstash, I've run a test with external agent Fluentd to push "live" data continuously to a Zed lake.

tl;dr - As Fluentd is a more modern tool than Logstash, in the end it did indeed seem to play more nicely with Zed in a minimal out-of-the-box config.

The test was performed with a Zed lake running behind Zui insiders 0.30.1-142 (which is Zed commit 101d358), Fluentd 1.15.3, and Zeek 5.1.1 (with the json-streaming-logs package) sniffing the coincidental live local traffic on my wireless interface during a typical work-from-home day.

The fluentd.conf I ultimately settled on:

<source>
  @type tail
  path /usr/local/zeek-5.1.1/logs/current/json_streaming_*
  follow_inodes true
  pos_file /Users/phil/work/fluentd/fluentd.pos
  tag zeek
  <parse>
    @type json
  </parse>
</source>

<match zeek>
  @type http
  endpoint http://127.0.0.1:9988/pool/zeek/branch/main
  content_type application/json

  <format>
    @type json
  </format>
</match>

Then to start Fluentd:

$ ulimit -n 65535
$ fluentd -c fluentd.conf 

The ulimit guidance came from the "Before Installation" docs and did end up being significant, as the default setting on my Macbook was 256 and in an early run Fluentd did indeed complain of running out of file descriptors.

The only minor speedbump I hit along the way was needing to specify content_type application/json. At first after reading the relevant docs I thought the defaults would work since the doc explains that json_array is false by default and hence it posts in NDJSON (which zed serve accepts) and sets Content Type to application/x-ndjson. However, this caused an HTTP 400 error because the Zed backend handles both regular JSON and NDJSON through the same reader and this is wired up only through Content Type application/json, so I had to set that explicit content_type.

Other than that, the defaults are way more friendly than what we saw with Logstash. As #3151 gets into, Logstash's default behavior was to either send a single JSON object per post or be configured to send a JSON array of objects. The former is known to perform poorly (#4266) and with the latter the posts end up in the pool as whole arrays and hence currently need to be post-processed in some way if they're going to be treated as individual Zed records. By comparison, Fluentd's default behavior not only sends NDJSON but also does so at a flush_interval of 60s, which means more records per commit and hence better performance. There's also a lot of tuning parameters at that page that indicate users have a lot more flexibility at their disposal if these defaults are undesirable.

Having let it run for a couple hours, 7523 Zeek events have accumumulated in my pool, and here's a look at the metadata that shows object size/count:

$ zed query -z 'from zeek@main:objects'
{id:0x10281d7b0f05a343fe5296fe38a4604340a312da(=ksuid.KSUID),meta:{first:"2022-12-15T00:33:54.369077Z",last:"2022-12-15T00:31:38.906444Z",count:55(uint64),size:4195}(=data.Meta)}(=data.Object)
{id:0x102815f489f922f3d2c91aa854ca9a1daa22a9ee(=ksuid.KSUID),meta:{first:"2022-12-15T00:01:36.401594Z",last:"2022-12-14T23:36:13.903690Z",count:66(uint64),size:4248}(=data.Meta)}(=data.Object)
{id:0x1028167ce874f348a0b2b3bd50a80d13365db4ee(=ksuid.KSUID),meta:{first:"2022-12-15T00:04:01.581654Z",last:"2022-12-14T23:58:46.339061Z",count:80(uint64),size:5928}(=data.Meta)}(=data.Object)
{id:0x10281aaa1cb9e06d8ee6cdc53391cd6feac65216(=ksuid.KSUID),meta:{first:"2022-12-15T00:21:55.553981Z",last:"2022-12-15T00:16:02.116655Z",count:57(uint64),size:4240}(=data.Meta)}(=data.Object)
{id:0x10281318731c08fb28a40deaaf75c62386cb2d30(=ksuid.KSUID),meta:{first:"2022-12-14T23:49:36.320922Z",last:"2022-12-14T23:47:25.507561Z",count:67(uint64),size:4598}(=data.Meta)}(=data.Object)
{id:0x10280eafda094eeb2a72b43a25f20acc5ce290c7(=ksuid.KSUID),meta:{first:"2022-12-14T23:30:41.635227Z",last:"2022-12-14T23:24:14.233836Z",count:51(uint64),size:3814}(=data.Meta)}(=data.Object)
{id:0x10281bb0893b3e78776ab521e967eadf3af98680(=ksuid.KSUID),meta:{first:"2022-12-15T00:26:00.496052Z",last:"2022-12-15T00:13:58.991809Z",count:107(uint64),size:6939}(=data.Meta)}(=data.Object)
{id:0x10281d3899c455779dea527350e3425204d56647(=ksuid.KSUID),meta:{first:"2022-12-15T00:32:39.517490Z",last:"2022-12-15T00:15:30.676887Z",count:61(uint64),size:4406}(=data.Meta)}(=data.Object)
{id:0x1028195d6d2753600cfb04852c0ab53297d7d210(=ksuid.KSUID),meta:{first:"2022-12-15T00:16:23.722289Z",last:"2022-12-15T00:13:47.880190Z",count:142(uint64),size:8860}(=data.Meta)}(=data.Object)
{id:0x102813e2e6d1a9be970674aea46ba794ab38505d(=ksuid.KSUID),meta:{first:"2022-12-14T23:53:05.837437Z",last:"2022-12-14T23:50:24.830662Z",count:42(uint64),size:2835}(=data.Meta)}(=data.Object)
{id:0x1028190caf5bb74b52c1a5f0a22d9dddbcffbb35(=ksuid.KSUID),meta:{first:"2022-12-15T00:14:38.921771Z",last:"2022-12-15T00:06:56.790906Z",count:67(uint64),size:4451}(=data.Meta)}(=data.Object)
{id:0x10281aebb786aa1c2d12018f8aabfdc36eb58fc9(=ksuid.KSUID),meta:{first:"2022-12-15T00:23:02.237135Z",last:"2022-12-15T00:21:07.734245Z",count:149(uint64),size:10789}(=data.Meta)}(=data.Object)
{id:0x102810f799f3aef602b047e881573b2386f93577(=ksuid.KSUID),meta:{first:"2022-12-14T23:40:36.288578Z",last:"2022-12-14T23:26:59.060221Z",count:82(uint64),size:5420}(=data.Meta)}(=data.Object)
{id:0x102818ca249e2221d4c37a8b7edb65d0b7e751b9(=ksuid.KSUID),meta:{first:"2022-12-15T00:13:59.395104Z",last:"2022-12-15T00:11:47.185874Z",count:64(uint64),size:4479}(=data.Meta)}(=data.Object)
{id:0x10280faf96b5ea336207d34a980d3d1c50b9c079(=ksuid.KSUID),meta:{first:"2022-12-14T23:34:49.089985Z",last:"2022-12-14T23:24:12.728754Z",count:96(uint64),size:7282}(=data.Meta)}(=data.Object)
{id:0x1028135f7642ceda3e3c7fdfac16a89920838080(=ksuid.KSUID),meta:{first:"2022-12-14T23:50:36.789345Z",last:"2022-12-14T23:23:55.939031Z",count:57(uint64),size:4065}(=data.Meta)}(=data.Object)
{id:0x10280f6e6c4b1478313b60adbe2e03059a61988d(=ksuid.KSUID),meta:{first:"2022-12-14T23:33:58.709723Z",last:"2022-12-14T23:24:16.419988Z",count:70(uint64),size:4209}(=data.Meta)}(=data.Object)
{id:0x10281a2738714dcd940d567fd7cfc9edba5c9cd6(=ksuid.KSUID),meta:{first:"2022-12-15T00:19:40.686016Z",last:"2022-12-15T00:14:04.289765Z",count:60(uint64),size:4365}(=data.Meta)}(=data.Object)
{id:0x10281e030145e71c84e9b343384a4ccd9379f9a9(=ksuid.KSUID),meta:{first:"2022-12-15T00:36:04.653440Z",last:"2022-12-15T00:32:37.305235Z",count:99(uint64),size:5285}(=data.Meta)}(=data.Object)
{id:0x10281a68855c9701915c102c854ad9821db06f8d(=ksuid.KSUID),meta:{first:"2022-12-15T00:20:50.017824Z",last:"2022-12-15T00:16:01.573861Z",count:104(uint64),size:7887}(=data.Meta)}(=data.Object)
{id:0x10281c33bcd9eb58dd17bcc8052e6cca62b493b1(=ksuid.KSUID),meta:{first:"2022-12-15T00:28:22.641059Z",last:"2022-12-15T00:22:24.852142Z",count:70(uint64),size:5887}(=data.Meta)}(=data.Object)
{id:0x10280ff1b203645bd50cf056858c157227166f32(=ksuid.KSUID),meta:{first:"2022-12-14T23:36:15.207764Z",last:"2022-12-14T23:24:13.058901Z",count:89(uint64),size:7917}(=data.Meta)}(=data.Object)
{id:0x1028188878a3cf9bd2a6265637ecaaea2946cc2d(=ksuid.KSUID),meta:{first:"2022-12-15T00:12:55.221145Z",last:"2022-12-15T00:09:03.450834Z",count:50(uint64),size:4568}(=data.Meta)}(=data.Object)
{id:0x1028200e566e5b0274981f56b59ac002a2607b9e(=ksuid.KSUID),meta:{first:"2022-12-15T00:44:36.617342Z",last:"2022-12-15T00:37:33.515044Z",count:43(uint64),size:3181}(=data.Meta)}(=data.Object)
{id:0x10281c74dd87d80501d6f1447c618277fc1d7de6(=ksuid.KSUID),meta:{first:"2022-12-15T00:29:36.543174Z",last:"2022-12-15T00:18:07.726186Z",count:93(uint64),size:5721}(=data.Meta)}(=data.Object)
{id:0x102817c9cedd477a6144a75c58cc0adcc3983d0d(=ksuid.KSUID),meta:{first:"2022-12-15T00:09:37.863995Z",last:"2022-12-15T00:06:17.892402Z",count:74(uint64),size:4722}(=data.Meta)}(=data.Object)
{id:0x102812956425670086d023734747f1240d362349(=ksuid.KSUID),meta:{first:"2022-12-14T23:47:25.497124Z",last:"2022-12-14T23:44:53.188852Z",count:59(uint64),size:3580}(=data.Meta)}(=data.Object)
{id:0x102814239c83873459591c0486bc9ef17d632259(=ksuid.KSUID),meta:{first:"2022-12-14T23:53:54.960568Z",last:"2022-12-14T23:48:36.878593Z",count:85(uint64),size:5183}(=data.Meta)}(=data.Object)
{id:0x102810744e6db746bab27b72910c430ac659ec6f(=ksuid.KSUID),meta:{first:"2022-12-14T23:38:07.649214Z",last:"2022-12-14T23:23:54.665245Z",count:69(uint64),size:4572}(=data.Meta)}(=data.Object)
{id:0x102810329aa3670ea85ee553104ec911524bd305(=ksuid.KSUID),meta:{first:"2022-12-14T23:37:06.508527Z",last:"2022-12-14T23:24:16.757055Z",count:55(uint64),size:4388}(=data.Meta)}(=data.Object)
{id:0x10281b2d27b326d4aa9d0596829a5403e9811119(=ksuid.KSUID),meta:{first:"2022-12-15T00:24:05.366952Z",last:"2022-12-15T00:01:09.722919Z",count:127(uint64),size:9169}(=data.Meta)}(=data.Object)
{id:0x102813a185f347c8eca873ef93ad2928bed22507(=ksuid.KSUID),meta:{first:"2022-12-14T23:51:41.267126Z",last:"2022-12-14T23:23:37.202062Z",count:59(uint64),size:5991}(=data.Meta)}(=data.Object)
{id:0x102815713cf94e52da146bb4b01bea7068092da4(=ksuid.KSUID),meta:{first:"2022-12-14T23:59:36.393177Z",last:"2022-12-14T23:57:32.147785Z",count:89(uint64),size:6226}(=data.Meta)}(=data.Object)
{id:0x1028199f3b4a3d3fe174d4d5c388e39fcee942fc(=ksuid.KSUID),meta:{first:"2022-12-15T00:17:25.834405Z",last:"2022-12-15T00:15:31.970388Z",count:99(uint64),size:5415}(=data.Meta)}(=data.Object)
{id:0x10280ef07498a13017c3f72b4814716bd5a8b8d7(=ksuid.KSUID),meta:{first:"2022-12-14T23:31:57.672281Z",last:"2022-12-14T23:23:47.391406Z",count:115(uint64),size:10644}(=data.Meta)}(=data.Object)
{id:0x10281dc16398b7dfd2fe1601dd4fc5d06ac32431(=ksuid.KSUID),meta:{first:"2022-12-15T00:35:06.089253Z",last:"2022-12-15T00:27:50.310906Z",count:67(uint64),size:4630}(=data.Meta)}(=data.Object)
{id:0x10281bf2f364aefb042f757d3402681d177446d6(=ksuid.KSUID),meta:{first:"2022-12-15T00:27:01.122826Z",last:"2022-12-15T00:22:23.086189Z",count:64(uint64),size:2973}(=data.Meta)}(=data.Object)
{id:0x102810b57b2a75f0c4f843a8a2a950f91505c254(=ksuid.KSUID),meta:{first:"2022-12-14T23:39:20.038697Z",last:"2022-12-14T23:24:19.610793Z",count:98(uint64),size:5979}(=data.Meta)}(=data.Object)
{id:0x1028120294a6bb54036d4c82e7968adbbba8414d(=ksuid.KSUID),meta:{first:"2022-12-14T23:44:57.519133Z",last:"2022-12-14T23:23:40.779648Z",count:65(uint64),size:5148}(=data.Meta)}(=data.Object)
{id:0x102812d72523ae00b60987cbb575a72469ecc77e(=ksuid.KSUID),meta:{first:"2022-12-14T23:48:36.320776Z",last:"2022-12-14T23:44:38.466381Z",count:91(uint64),size:4993}(=data.Meta)}(=data.Object)
{id:0x10280da9410592ff805786f1fa76d26eb22845ff(=ksuid.KSUID),meta:{first:"2022-12-14T23:26:32.092939Z",last:"2022-12-14T23:24:12.664567Z",count:420(uint64),size:33356}(=data.Meta)}(=data.Object)
{id:0x1028163baa13feae4272420cbae287e852b522c6(=ksuid.KSUID),meta:{first:"2022-12-15T00:02:51.618986Z",last:"2022-12-15T00:00:18.357330Z",count:43(uint64),size:3068}(=data.Meta)}(=data.Object)
{id:0x102816ffc6f45ad917a48863e720c811d2e4858f(=ksuid.KSUID),meta:{first:"2022-12-15T00:06:17.884326Z",last:"2022-12-15T00:02:51.593496Z",count:76(uint64),size:4223}(=data.Meta)}(=data.Object)
{id:0x10281cb63d125bce69a5d1c3ff23c5a23d5a9935(=ksuid.KSUID),meta:{first:"2022-12-15T00:30:39.326320Z",last:"2022-12-15T00:24:05.339656Z",count:54(uint64),size:4006}(=data.Meta)}(=data.Object)
{id:0x10281805f4da43015e3c2cd1f6529cd9433068fb(=ksuid.KSUID),meta:{first:"2022-12-15T00:10:39.101937Z",last:"2022-12-15T00:08:20.966425Z",count:94(uint64),size:6081}(=data.Meta)}(=data.Object)
{id:0x1028117ae81f92957be828fb380ea87c1e31915c(=ksuid.KSUID),meta:{first:"2022-12-14T23:42:46.337119Z",last:"2022-12-14T23:38:14.970666Z",count:66(uint64),size:4657}(=data.Meta)}(=data.Object)
{id:0x10280deab296e587e50926bd9073ccfdfdad05f7(=ksuid.KSUID),meta:{first:"2022-12-14T23:27:36.864611Z",last:"2022-12-14T23:23:36.832279Z",count:189(uint64),size:15116}(=data.Meta)}(=data.Object)
{id:0x10281f8ce3043f8a24ea6448c05b0ba2ba9336a5(=ksuid.KSUID),meta:{first:"2022-12-15T00:42:36.611466Z",last:"2022-12-14T23:36:13.896447Z",count:55(uint64),size:4847}(=data.Meta)}(=data.Object)
{id:0x10281b6e32dd1fa2e27e893db79982c7adaf4980(=ksuid.KSUID),meta:{first:"2022-12-15T00:24:59.057296Z",last:"2022-12-15T00:09:42.396771Z",count:75(uint64),size:5777}(=data.Meta)}(=data.Object)
{id:0x102816b8f40f958c79c614ffe350488567ed0ed5(=ksuid.KSUID),meta:{first:"2022-12-15T00:05:07.322537Z",last:"2022-12-15T00:03:14.277248Z",count:45(uint64),size:3267}(=data.Meta)}(=data.Object)
{id:0x10281f4ac1a0148ffb16d69ccad117174e5943ee(=ksuid.KSUID),meta:{first:"2022-12-15T00:41:43.802271Z",last:"2022-12-15T00:39:07.101302Z",count:63(uint64),size:4517}(=data.Meta)}(=data.Object)
{id:0x1028146ad8046970038fff09cb8bb6098f6e6430(=ksuid.KSUID),meta:{first:"2022-12-14T23:55:18.123145Z",last:"2022-12-14T23:49:05.166232Z",count:52(uint64),size:4708}(=data.Meta)}(=data.Object)
{id:0x102817465357b8452efff0523797909cfe3a12b4(=ksuid.KSUID),meta:{first:"2022-12-15T00:07:14.611369Z",last:"2022-12-15T00:05:16.962595Z",count:105(uint64),size:5311}(=data.Meta)}(=data.Object)
{id:0x10281f09903cc45af868de8c4c55268a554eec0b(=ksuid.KSUID),meta:{first:"2022-12-15T00:40:36.597157Z",last:"2022-12-15T00:30:16.617628Z",count:71(uint64),size:4556}(=data.Meta)}(=data.Object)
{id:0x10281ec728a7971a45505307b653937ffe72c834(=ksuid.KSUID),meta:{first:"2022-12-15T00:39:33.545691Z",last:"2022-12-15T00:23:25.942687Z",count:91(uint64),size:6931}(=data.Meta)}(=data.Object)
{id:0x10281cf78b4811f22c6aac60bd7f15be3e7e2d61(=ksuid.KSUID),meta:{first:"2022-12-15T00:31:36.559041Z",last:"2022-12-15T00:20:36.725557Z",count:55(uint64),size:3240}(=data.Meta)}(=data.Object)
{id:0x10280e6d0c60dad134c5adbe34a3684b9c12cb24(=ksuid.KSUID),meta:{first:"2022-12-14T23:29:36.231974Z",last:"2022-12-14T23:23:54.241076Z",count:68(uint64),size:4860}(=data.Meta)}(=data.Object)
{id:0x102819e0037cb4488e80c3e2e2fa9c7cad5dd750(=ksuid.KSUID),meta:{first:"2022-12-15T00:18:36.472696Z",last:"2022-12-15T00:15:30.643509Z",count:106(uint64),size:6686}(=data.Meta)}(=data.Object)
{id:0x102814eebbf0a8ed36becb7b3cbaa61d7a814260(=ksuid.KSUID),meta:{first:"2022-12-14T23:57:32.329728Z",last:"2022-12-14T23:38:37.652742Z",count:99(uint64),size:5472}(=data.Meta)}(=data.Object)
{id:0x10280d6738f083c8d4a537d85d350532e3603d4b(=ksuid.KSUID),meta:{first:"2022-12-14T23:25:26.379656Z",last:null,count:1406(uint64),size:81180}(=data.Meta)}(=data.Object)
{id:0x102817882a024ff97342d1399df5613189628ef0(=ksuid.KSUID),meta:{first:"2022-12-15T00:08:36.442121Z",last:"2022-12-14T23:59:54.624202Z",count:98(uint64),size:5805}(=data.Meta)}(=data.Object)
{id:0x10281847966536a427cc07b001634ea41e55f313(=ksuid.KSUID),meta:{first:"2022-12-15T00:11:36.448901Z",last:"2022-12-15T00:08:56.725171Z",count:53(uint64),size:3877}(=data.Meta)}(=data.Object)
{id:0x102815b2beae1ffb64fb340ba1ef47f6bc129348(=ksuid.KSUID),meta:{first:"2022-12-15T00:00:32.684574Z",last:"2022-12-14T23:50:06.399812Z",count:94(uint64),size:6027}(=data.Meta)}(=data.Object)
{id:0x10281530ab54ed8b1240209839a583e2272276ed(=ksuid.KSUID),meta:{first:"2022-12-14T23:58:36.391613Z",last:"2022-12-14T23:50:08.179374Z",count:82(uint64),size:4683}(=data.Meta)}(=data.Object)
{id:0x102811bbbe7b2416576b55dc8a1f617b1c19dbfc(=ksuid.KSUID),meta:{first:"2022-12-14T23:43:36.296832Z",last:"2022-12-14T23:31:59.853610Z",count:93(uint64),size:6482}(=data.Meta)}(=data.Object)
{id:0x10280f2c5f055e896e0c79e307fd10b0076f562a(=ksuid.KSUID),meta:{first:"2022-12-14T23:32:44.208336Z",last:"2022-12-14T23:24:17.843368Z",count:87(uint64),size:5548}(=data.Meta)}(=data.Object)
{id:0x10281e85d9d6a5ecbc7de64fe65b74618aea8343(=ksuid.KSUID),meta:{first:"2022-12-15T00:38:28.405089Z",last:"2022-12-15T00:23:25.878890Z",count:89(uint64),size:7257}(=data.Meta)}(=data.Object)
{id:0x10281fcdec55f5346dd0081a5e5f0ecf93ed9eb8(=ksuid.KSUID),meta:{first:"2022-12-15T00:43:40.947908Z",last:"2022-12-15T00:36:59.909246Z",count:66(uint64),size:4999}(=data.Meta)}(=data.Object)
{id:0x10281e44e5f881970da05290410a1719386477a2(=ksuid.KSUID),meta:{first:"2022-12-15T00:37:04.864535Z",last:"2022-12-15T00:35:06.103223Z",count:84(uint64),size:4437}(=data.Meta)}(=data.Object)
{id:0x10281138462519fc5b7233bf6181838bda026b0a(=ksuid.KSUID),meta:{first:"2022-12-14T23:41:37.048916Z",last:"2022-12-14T23:39:37.850641Z",count:73(uint64),size:5265}(=data.Meta)}(=data.Object)
{id:0x10280e2c58e2668f51c47e590a924846a9b5fa66(=ksuid.KSUID),meta:{first:"2022-12-14T23:28:42.249016Z",last:"2022-12-14T23:23:37.146459Z",count:91(uint64),size:7556}(=data.Meta)}(=data.Object)
{id:0x102814ad319ba0609dc0f08b3977a530f8b57568(=ksuid.KSUID),meta:{first:"2022-12-14T23:56:08.768224Z",last:"2022-12-14T23:54:31.534224Z",count:93(uint64),size:5490}(=data.Meta)}(=data.Object)
{id:0x10281254e6df56cc20f5c7b502c1533521a19003(=ksuid.KSUID),meta:{first:"2022-12-14T23:46:11.789390Z",last:"2022-12-14T23:23:47.079369Z",count:150(uint64),size:8723}(=data.Meta)}(=data.Object)

Applying some crude math, the average data object size/count:

$ zed query -Z 'from zeek@main:objects | AvgSize:=avg(meta.size),AvgCount:=avg(meta.count)'
{
    AvgSize: 6904.77027027027,
    AvgCount: 102.70270270270271
}

Obviously that size is still way below the default 500 MB object size and hence we'd still be waiting a long time to see benefit from compaction. However, compared to the single-object commits discussed in #3151 and #4266, the data as stored here is much more forgiving when accessed in queries, e.g., counting is still very quick:

$ time zed query 'from zeek | count()'
{count:7819(uint64)}

real    0m0.054s
user    0m0.013s
sys 0m0.010s
philrz commented 1 year ago

I revived this config while testing the compaction currently performed by lake manager (#3923). This made me wish the JSON Zeek data could actually be properly shaped so I'd have real ts timestamps etc. I came up with this revised config that currently seems to be working ok with the reference shaper for Zeek.

$ cat fluentd-shaped.conf 
<source>
  @type tail
  path /usr/local/var/logs/current/json_streaming_*
  follow_inodes true
  pos_file /Users/phil/work/fluentd/fluentd.pos
  tag zeek
  <parse>
    @type json
  </parse>
</source>

<match zeek>
  @type exec_filter
  command zq -z -I shaper.zed '| quiet(this)' -
  tag shaped
  <format>
    @type json
  </format>
  <parse>
    @type none
  </parse>
  <buffer>
    flush_interval 1s
  </buffer>
</match>

<match shaped>
  @type http
  endpoint http://127.0.0.1:9988/pool/zeek/branch/main
  content_type application/x-zson
  <format>
    @type single_value
  </format>
</match>

Things to note:

  1. The quiet(this) deals with what appear to be some new log types added in more recent Zeek releases that aren't yet covered by the reference shaper. The way the shaper is currently written, attempts to shape these just become error(missing). I'll plan to add these new logs to the reference shaper and also improve the error messages to help alert the user about what precisely went wrong.

  2. I'd have preferred transporting the shaped data as ZNG rather than ZSON, but fluentd seems fussy about things being text lines. I may tinker more with binary ZNG the next time I revisit this.

  3. The buffering config may need more work. When playing around with just sending shaped data to stdout rather than posting to the Zed lake, it seemed like it was waiting excessively (forever?) for events to accumulate before sending output. I'm not sure if that flush_interval 1s is even having much effect right now, but it does seem like my live data is getting posted periodically, so I'm ok for now.

mrbluecoat commented 1 month ago

This looks awesome! +1 for ZNG support <3

https://zed.brimdata.io/docs/commands/zq#performance-comparisons

mrbluecoat commented 1 month ago

I wonder what would happen if you replaced https://github.com/corelight/json-streaming-logs/blob/master/scripts/main.zeek#L127 with filt$config["tsv"] = "T";

https://github.com/zeek/zeek/blob/master/scripts/base/frameworks/logging/writers/ascii.zeek#L12

philrz commented 1 month ago

@mrbluecoat: Glad to hear you think it sounds encouraging! This material actually hasn't been updated in a while and the Zed lake has evolved a bit in that time (and I imagine Fluentd might have changed a bit too). I've been starting to more formally document these kinds of topics in the "Integrations" area of the Zed docs and would be keen to do that with this Fluentd stuff if it's something you could see putting to use. However, as I'm doing that, I'd like to make sure that whatever it covers is relevant to your goals. I imagine GitHub Issue comments might not be the best forum for laying out those details, so could you ping me through other channels so we could discuss? If you like Slack you could send yourself an invite to our community Slack workspace and message me @Phil or send an email to support@brimdata.io. Hope to hear from you!

mrbluecoat commented 1 month ago

Will do, thanks