elastic / beats

:tropical_fish: Beats - Lightweight shippers for Elasticsearch & Logstash
https://www.elastic.co/products/beats
Other
103 stars 4.92k forks source link

[filebeat][azure-blob-storage] - Fix concurrency issues and flakey tests. #35983

Closed ShourieG closed 1 year ago

ShourieG commented 1 year ago

There are a couple of concurrency issues that exist inside the azure blob storage input, along with some flakey tests. This is issue describes what need fixing.

1) Fix concurrency issues related to dynamic cursor and map iterations. 2) Decouple object timeout from from scheduler timeout. 3) Fix flakey tests.

Related Issues: #35126

elasticmachine commented 1 year ago

Pinging @elastic/security-external-integrations (Team:Security-External Integrations)

sebastienminne commented 1 year ago

@ShourieG Thanks for the fix, facing the issue currently. Any idea when the release will be done ?

ShourieG commented 1 year ago

@sebastienminne It should be available with the release of 8.10 on September 15th

sebastienminne commented 1 year ago

@ShourieG

Not sure the fix is effective, seems it is still impossible tu have more than one max_worker

getting this stack

goroutine 60 [running]: github.com/elastic/go-structform.extObjVisitor.OnInt64Object({{0x55cd10a95660?, 0xc0036d4000?}}, 0xc001345cb0) github.com/elastic/go-structform@v0.0.10/map.go:114 +0xac github.com/elastic/go-structform/gotype.foldMapInt64(0x55cd102be2a0?, {0x55cd102be2a0?, 0xc001345cb0?}) github.com/elastic/go-structform@v0.0.10/gotype/fold_map.go:83 +0x3d github.com/elastic/go-structform/gotype.liftFold.func1(0x55cd105edd80?, {0x55cd102be2a0?, 0xc001345ca8?, 0x55cd13034ac0?}) github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:605 +0xda github.com/elastic/go-structform/gotype.makeFieldFold.func1(0xc003dc1950, {0x55cd105edd80?, 0xc001345c80?, 0xc0065851a0?}) github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:311 +0xa9 github.com/elastic/go-structform/gotype.makeFieldsFold.func1(0xc002a608b0?, {0x55cd105edd80?, 0xc001345c80?, 0x16?}) github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:198 +0x89 github.com/elastic/go-structform/gotype.makeStructFold.func1(0xc003dc1950, {0x55cd105edd80?, 0xc001345c80?, 0xc002a608d8?}) github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:188 +0x83 github.com/elastic/go-structform/gotype.makePointerFold.func1(0xc003dc1950, {0x55cd0ffc3e00?, 0xc001345c80?, 0x55cd0bb90bf4?}) github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:124 +0x115 github.com/elastic/go-structform/gotype.foldAnyReflect(0x55cd0ffc3e00?, {0x55cd0ffc3e00?, 0xc001345c80?, 0xc002a609a8?}) github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:568 +0x9d github.com/elastic/go-structform/gotype.foldInterfaceValue(0xc003dc1950, {0x55cd0ffc3e00?, 0xc001345c80}) github.com/elastic/go-structform@v0.0.10/gotype/fold.go:116 +0x27f github.com/elastic/go-structform/gotype.(Iterator).Fold(...) github.com/elastic/go-structform@v0.0.10/gotype/fold.go:93 github.com/elastic/beats/v7/libbeat/common/transform/typeconv.(Converter).Convert(0xc0039e5420, {0x55cd0ffe4bc0, 0xc0002468d8}, {0x55cd0ffc3e00, 0xc001345c80}) github.com/elastic/beats/v7/libbeat/common/transform/typeconv/typeconv.go:108 +0x13a github.com/elastic/beats/v7/libbeat/common/transform/typeconv.Convert({0x55cd0ffe4bc0, 0xc0002468d8}, {0x55cd0ffc3e00, 0xc001345c80}) github.com/elastic/beats/v7/libbeat/common/transform/typeconv/typeconv.go:130 +0xcb github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(updateOp).Execute(0xc0063ddd80, 0x0?) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/publish.go:136 +0x199 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.newInputACKHandler.func1(0xc002a60c78?, {0xc001580000?, 0x376?, 0x2?}) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/input.go:199 +0x9f github.com/elastic/beats/v7/libbeat/common/acker.(eventDataACKer).onACK(0xc000fa1580, 0x55cd0bbc1260?, 0x376) github.com/elastic/beats/v7/libbeat/common/acker/acker.go:257 +0x134 github.com/elastic/beats/v7/libbeat/common/acker.(trackingACKer).ACKEvents(0xc001345a10, 0x376) github.com/elastic/beats/v7/libbeat/common/acker/acker.go:206 +0x203 github.com/elastic/beats/v7/libbeat/common/acker.ackerList.ACKEvents(...) github.com/elastic/beats/v7/libbeat/common/acker/acker.go:294 github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(bufferingEventLoop).processACK.func1() github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/eventloop.go:515 +0x24 github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(bufferingEventLoop).processACK(0xc00018b7a0, {0x0, 0x0}, 0x800) github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/eventloop.go:525 +0x188 github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(ackLoop).handleBatchSig(0xc00012bf80) github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/ackloop.go:73 +0xa4 github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackLoop).run(0xc00012bf80) github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/ackloop.go:52 +0x187 github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue.func2() github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/broker.go:201 +0x65 created by github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/broker.go:199 +0x52d

goroutine 1 [chan receive, 3 minutes]: github.com/elastic/beats/v7/filebeat/beater.(signalWait).Wait(...) github.com/elastic/beats/v7/filebeat/beater/signalwait.go:44 github.com/elastic/beats/v7/filebeat/beater.(Filebeat).Run(0xc00066bd80, 0xc000fa2a80) github.com/elastic/beats/v7/filebeat/beater/filebeat.go:438 +0x17bf github.com/elastic/beats/v7/libbeat/cmd/instance.(Beat).launch(0xc000fa2a80, {{0x55cd0f51db37, 0x8}, {0x55cd0f51db37, 0x8}, {0x0, 0x0}, 0x1, 0x1, {{0x0, ...}, ...}, ...}, ...) github.com/elastic/beats/v7/libbeat/cmd/instance/beat.go:520 +0x9d3 github.com/elastic/beats/v7/libbeat/cmd/instance.Run.func1(0xc001a37ca8, 0x0?) github.com/elastic/beats/v7/libbeat/cmd/instance/beat.go:216 +0x145 github.com/elastic/beats/v7/libbeat/cmd/instance.Run({{0x55cd0f51db37, 0x8}, {0x55cd0f51db37, 0x8}, {0x0, 0x0}, 0x1, 0x1, {{0x0, 0x0}, ...}, ...}, ...) github.com/elastic/beats/v7/libbeat/cmd/instance/beat.go:217 +0x25 github.com/elastic/beats/v7/libbeat/cmd.genRunCmd.func1(0xc000006600?, {0x55cd0f514d42?, 0x3?, 0x3?}) github.com/elastic/beats/v7/libbeat/cmd/run.go:36 +0x58 github.com/spf13/cobra.(Command).execute(0xc000006600, {0xc000076050, 0x3, 0x3}) github.com/spf13/cobra@v1.7.0/command.go:944 +0x847 github.com/spf13/cobra.(Command).ExecuteC(0xc000006600) github.com/spf13/cobra@v1.7.0/command.go:1068 +0x3bd github.com/spf13/cobra.(Command).Execute(0xc0000081a0?) github.com/spf13/cobra@v1.7.0/command.go:992 +0x19 main.main() github.com/elastic/beats/v7/x-pack/filebeat/main.go:22 +0x25

goroutine 22 [chan receive]: k8s.io/klog/v2.(*loggingT).flushDaemon(0x0?) k8s.io/klog/v2@v2.30.0/klog.go:1181 +0x6a created by k8s.io/klog/v2.init.0 k8s.io/klog/v2@v2.30.0/klog.go:420 +0xf6

goroutine 10 [select]: go.opencensus.io/stats/view.(*worker).start(0xc00044ff80) go.opencensus.io@v0.24.0/stats/view/worker.go:292 +0xad created by go.opencensus.io/stats/view.init.0 go.opencensus.io@v0.24.0/stats/view/worker.go:34 +0x96

goroutine 16 [IO wait, 3 minutes]: internal/poll.runtime_pollWait(0x7f9254442fd8, 0x72) runtime/netpoll.go:306 +0x89 internal/poll.(pollDesc).wait(0xc00044ef00?, 0xc000507cb0?, 0x0) internal/poll/fd_poll_runtime.go:84 +0x32 internal/poll.(pollDesc).waitRead(...) internal/poll/fd_poll_runtime.go:89 internal/poll.(FD).Accept(0xc00044ef00) internal/poll/fd_unix.go:614 +0x2bd net.(netFD).accept(0xc00044ef00) net/fd_unix.go:172 +0x35 net.(TCPListener).accept(0xc000134a08) net/tcpsock_posix.go:148 +0x25 net.(TCPListener).Accept(0xc000134a08) net/tcpsock.go:297 +0x3d net/http.(Server).Serve(0xc001f8a0f0, {0x55cd10a575a0, 0xc000134a08}) net/http/server.go:3059 +0x385 net/http.Serve(...) net/http/server.go:2581 github.com/elastic/beats/v7/libbeat/api.(Server).Start.func1({0x55cd10a575a0, 0xc000134a08}) github.com/elastic/beats/v7/libbeat/api/server.go:72 +0x14c created by github.com/elastic/beats/v7/libbeat/api.(*Server).Start github.com/elastic/beats/v7/libbeat/api/server.go:70 +0x12d

goroutine 82 [chan receive, 3 minutes]: github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(azurebsInput).run.func1() github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/input.go:135 +0x3c created by github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(azurebsInput).run github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/input.go:134 +0x2fd

goroutine 57 [select]: github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*eventConsumer).run(0xc0003dea00) github.com/elastic/beats/v7/libbeat/publisher/pipeline/consumer.go:161 +0x325 github.com/elastic/beats/v7/libbeat/publisher/pipeline.newEventConsumer.func1() github.com/elastic/beats/v7/libbeat/publisher/pipeline/consumer.go:91 +0x5a created by github.com/elastic/beats/v7/libbeat/publisher/pipeline.newEventConsumer github.com/elastic/beats/v7/libbeat/publisher/pipeline/consumer.go:89 +0x1b8

goroutine 58 [select]: github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(broker).Get(0xc000115f20, 0x800) github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/broker.go:228 +0xc5 github.com/elastic/beats/v7/libbeat/publisher/pipeline.(queueReader).run(0xc0003dea30, 0xc0005cae50) github.com/elastic/beats/v7/libbeat/publisher/pipeline/queue_reader.go:57 +0x11d created by github.com/elastic/beats/v7/libbeat/publisher/pipeline.newEventConsumer github.com/elastic/beats/v7/libbeat/publisher/pipeline/consumer.go:102 +0x218

goroutine 43 [chan receive, 3 minutes]: github.com/elastic/elastic-agent-libs/service.HandleSignals.func1() github.com/elastic/elastic-agent-libs@v0.3.13/service/service.go:50 +0x5e created by github.com/elastic/elastic-agent-libs/service.HandleSignals github.com/elastic/elastic-agent-libs@v0.3.13/service/service.go:49 +0x18f

goroutine 59 [select]: github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*bufferingEventLoop).run(0xc00018b7a0) github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/eventloop.go:311 +0x1d4 github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue.func1() github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/broker.go:197 +0x67 created by github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/broker.go:195 +0x4d6

goroutine 65 [select, 3 minutes]: github.com/elastic/go-concert/timed.Periodic({0x7f92543cb608, 0xc00135c0a0}, 0x0?, 0xc0000a9e58) github.com/elastic/go-concert@v0.2.0/timed/timed.go:76 +0x10f github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(cleaner).run(0xc000658238, {0x7f92543cb5e0?, 0xc00135c0a0}, 0xc00135c050, 0x0?) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/clean.go:47 +0xb1 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(InputManager).Init.func1({0x55cd10a5bd40?, 0xc00135c0a0}) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/manager.go:132 +0x10e github.com/elastic/go-concert/unison.(TaskGroup).Go.func1() github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:163 +0xc3 created by github.com/elastic/go-concert/unison.(TaskGroup).Go github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:159 +0xcd

goroutine 152 [chan receive]: github.com/Shopify/sarama.(topicProducer).dispatch(0xc00202d900) github.com/Shopify/sarama@v1.27.0/async_producer.go:412 +0x54 github.com/Shopify/sarama.withRecover(0x7420666920202020?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newTopicProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:407 +0x1f6

goroutine 61 [select]: github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*netClientWorker).run(0xc00066ba80) github.com/elastic/beats/v7/libbeat/publisher/pipeline/client_worker.go:121 +0x96 created by github.com/elastic/beats/v7/libbeat/publisher/pipeline.makeClientWorker github.com/elastic/beats/v7/libbeat/publisher/pipeline/client_worker.go:74 +0x22f

goroutine 62 [select]: github.com/elastic/beats/v7/libbeat/monitoring/report/log.(*reporter).snapshotLoop(0xc0003deb90) github.com/elastic/beats/v7/libbeat/monitoring/report/log/log.go:161 +0x23c github.com/elastic/beats/v7/libbeat/monitoring/report/log.MakeReporter.func1() github.com/elastic/beats/v7/libbeat/monitoring/report/log/log.go:134 +0x5a created by github.com/elastic/beats/v7/libbeat/monitoring/report/log.MakeReporter github.com/elastic/beats/v7/libbeat/monitoring/report/log/log.go:132 +0x310

goroutine 64 [syscall, 3 minutes]: os/signal.signal_recv() runtime/sigqueue.go:152 +0x2f os/signal.loop() os/signal/signal_unix.go:23 +0x19 created by os/signal.Notify.func1.1 os/signal/signal.go:151 +0x2a

goroutine 66 [select, 3 minutes]: github.com/elastic/go-concert/timed.Periodic({0x7f92543cb608, 0xc00135c0a0}, 0x3?, 0xc001a8ae58) github.com/elastic/go-concert@v0.2.0/timed/timed.go:76 +0x10f github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(cleaner).run(0xc000658258, {0x7f92543cb5e0?, 0xc00135c0a0}, 0xc00135c0f0, 0x0?) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/clean.go:47 +0xb1 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(InputManager).Init.func1({0x55cd10a5bd40?, 0xc00135c0a0}) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/manager.go:132 +0x10e github.com/elastic/go-concert/unison.(TaskGroup).Go.func1() github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:163 +0xc3 created by github.com/elastic/go-concert/unison.(TaskGroup).Go github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:159 +0xcd

goroutine 67 [select, 3 minutes]: github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile.(updateChan).Recv(0xc00132b920, {0x55cd10a5bd40, 0xc00135c1e0}) github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile/update_writer.go:151 +0x17d github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile.(updateWriter).run(0xc000fc1540, {0x55cd10a5bd40, 0xc00135c1e0}) github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile/update_writer.go:87 +0x65 github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile.newUpdateWriter.func1({0x55cd10a5bd40?, 0xc00135c1e0?}) github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile/update_writer.go:65 +0x30 github.com/elastic/go-concert/unison.(TaskGroup).Go.func1() github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:163 +0xc3 created by github.com/elastic/go-concert/unison.(TaskGroup).Go github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:159 +0xcd

goroutine 68 [select, 3 minutes]: github.com/elastic/go-concert/timed.Periodic({0x7f92543cb608, 0xc00135c0a0}, 0x0?, 0xc001a8ee58) github.com/elastic/go-concert@v0.2.0/timed/timed.go:76 +0x10f github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile.(cleaner).run(0xc0006582b8, {0x7f92543cb5e0?, 0xc00135c0a0}, 0xc00135c190, 0x0?) github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile/clean.go:50 +0xe5 github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile.(InputManager).Init.func1({0x55cd10a5bd40?, 0xc00135c0a0}) github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile/manager.go:142 +0x10e github.com/elastic/go-concert/unison.(TaskGroup).Go.func1() github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:163 +0xc3 created by github.com/elastic/go-concert/unison.(TaskGroup).Go github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:159 +0xcd

goroutine 69 [select, 3 minutes]: github.com/elastic/go-concert/timed.Periodic({0x7f92543cb608, 0xc00135c0a0}, 0x0?, 0xc0000abe58) github.com/elastic/go-concert@v0.2.0/timed/timed.go:76 +0x10f github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(cleaner).run(0xc000658398, {0x7f92543cb5e0?, 0xc00135c0a0}, 0xc00135c280, 0x0?) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/clean.go:47 +0xb1 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(InputManager).Init.func1({0x55cd10a5bd40?, 0xc00135c0a0}) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/manager.go:132 +0x10e github.com/elastic/go-concert/unison.(TaskGroup).Go.func1() github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:163 +0xc3 created by github.com/elastic/go-concert/unison.(TaskGroup).Go github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:159 +0xcd

goroutine 70 [select, 3 minutes]: github.com/elastic/go-concert/timed.Periodic({0x7f92543cb608, 0xc00135c0a0}, 0x0?, 0xc0000ace58) github.com/elastic/go-concert@v0.2.0/timed/timed.go:76 +0x10f github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(cleaner).run(0xc000658448, {0x7f92543cb5e0?, 0xc00135c0a0}, 0xc00135c2d0, 0x0?) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/clean.go:47 +0xb1 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(InputManager).Init.func1({0x55cd10a5bd40?, 0xc00135c0a0}) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/manager.go:132 +0x10e github.com/elastic/go-concert/unison.(TaskGroup).Go.func1() github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:163 +0xc3 created by github.com/elastic/go-concert/unison.(TaskGroup).Go github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:159 +0xcd

goroutine 71 [select, 3 minutes]: github.com/elastic/go-concert/timed.Periodic({0x7f92543cb608, 0xc00135c0a0}, 0x0?, 0xc0000ade58) github.com/elastic/go-concert@v0.2.0/timed/timed.go:76 +0x10f github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(cleaner).run(0xc000658470, {0x7f92543cb5e0?, 0xc00135c0a0}, 0xc00135c320, 0x0?) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/clean.go:47 +0xb1 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(InputManager).Init.func1({0x55cd10a5bd40?, 0xc00135c0a0}) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/manager.go:132 +0x10e github.com/elastic/go-concert/unison.(TaskGroup).Go.func1() github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:163 +0xc3 created by github.com/elastic/go-concert/unison.(TaskGroup).Go github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:159 +0xcd

goroutine 72 [select, 3 minutes]: github.com/elastic/go-concert/timed.Periodic({0x7f92543cb608, 0xc00135c0a0}, 0x0?, 0xc0000a6e58) github.com/elastic/go-concert@v0.2.0/timed/timed.go:76 +0x10f github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(cleaner).run(0xc0006584a8, {0x7f92543cb5e0?, 0xc00135c0a0}, 0xc00135c370, 0x0?) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/clean.go:47 +0xb1 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(InputManager).Init.func1({0x55cd10a5bd40?, 0xc00135c0a0}) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/manager.go:132 +0x10e github.com/elastic/go-concert/unison.(TaskGroup).Go.func1() github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:163 +0xc3 created by github.com/elastic/go-concert/unison.(TaskGroup).Go github.com/elastic/go-concert@v0.2.0/unison/taskgroup.go:159 +0xcd

goroutine 73 [select, 3 minutes]: github.com/elastic/beats/v7/filebeat/registrar.(Registrar).Run(0xc000eb60c0) github.com/elastic/beats/v7/filebeat/registrar/registrar.go:162 +0x286 github.com/elastic/beats/v7/filebeat/registrar.(Registrar).Start.func1() github.com/elastic/beats/v7/filebeat/registrar/registrar.go:122 +0x5a created by github.com/elastic/beats/v7/filebeat/registrar.(*Registrar).Start github.com/elastic/beats/v7/filebeat/registrar/registrar.go:120 +0xc5

goroutine 74 [semacquire, 3 minutes]: sync.runtimeSemacquire(0x18?) runtime/sema.go:62 +0x27 sync.(WaitGroup).Wait(0x55cd0bbc9780?) sync/waitgroup.go:116 +0x4b github.com/elastic/go-concert/unison.(MultiErrGroup).Wait(0xc0013459b0) github.com/elastic/go-concert@v0.2.0/unison/multierrgroup.go:54 +0x4b github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(*managedInput).Run(, {0xc00134a1f0, {0xc000336288, 0x16}, {{0x55cd0f51db37, 0x8}, {0x55cd0f51db37, 0x8}, {0x55cd0f5171c0, 0x6}, ...}, ...}, ...) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/input.go:129 +0x354 github.com/elastic/beats/v7/filebeat/input/v2/compat.(runner).Start.func1() github.com/elastic/beats/v7/filebeat/input/v2/compat/compat.go:122 +0x215 created by github.com/elastic/beats/v7/filebeat/input/v2/compat.(runner).Start github.com/elastic/beats/v7/filebeat/input/v2/compat/compat.go:119 +0xd8

goroutine 75 [chan receive, 3 minutes]: github.com/elastic/beats/v7/filebeat/beater.(signalWait).AddChan.func1() github.com/elastic/beats/v7/filebeat/beater/signalwait.go:74 +0x1f github.com/elastic/beats/v7/filebeat/beater.(signalWait).Add.func1() github.com/elastic/beats/v7/filebeat/beater/signalwait.go:51 +0x26 created by github.com/elastic/beats/v7/filebeat/beater.(*signalWait).Add github.com/elastic/beats/v7/filebeat/beater/signalwait.go:50 +0x78

goroutine 76 [chan send]: github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(limiter).acquire(...) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/scheduler.go:30 github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(scheduler).scheduleOnce(0xc000fe6840, {0x55cd10a5bd40?, 0xc0003de190}) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/scheduler.go:111 +0x44b github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(scheduler).schedule(0xc000fe6840, {0x55cd10a5bd40?, 0xc0003de190}) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/scheduler.go:81 +0xc6 github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(azurebsInput).run(, {0xc0023b4030, {0xc0023b2000, 0x3f}, {{0x55cd0f51db37, 0x8}, {0x55cd0f51db37, 0x8}, {0x55cd0f5171c0, 0x6}, ...}, ...}, ...) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/input.go:149 +0x5a6 github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(*azurebsInput).Run(, {0xc0023b4030, {0xc0023b2000, 0x3f}, {{0x55cd0f51db37, 0x8}, {0x55cd0f51db37, 0x8}, {0x55cd0f5171c0, 0x6}, ...}, ...}, ...) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/input.go:124 +0x225 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(managedInput).runSource(_, {0xc0023b4030, {0xc0023b2000, 0x3f}, {{0x55cd0f51db37, 0x8}, {0x55cd0f51db37, 0x8}, {0x55cd0f5171c0, 0x6}, ...}, ...}, ...) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/input.go:168 +0x459 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(managedInput).Run.func1() github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/input.go:122 +0x1bf github.com/elastic/go-concert/unison.(MultiErrGroup).Go.func1() github.com/elastic/go-concert@v0.2.0/unison/multierrgroup.go:42 +0x75 created by github.com/elastic/go-concert/unison.(MultiErrGroup).Go github.com/elastic/go-concert@v0.2.0/unison/multierrgroup.go:40 +0x8d

goroutine 77 [chan send]: github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(limiter).acquire(...) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/scheduler.go:30 github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(scheduler).scheduleOnce(0xc000eb69c0, {0x55cd10a5bd40?, 0xc00135c5f0}) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/scheduler.go:111 +0x44b github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(scheduler).schedule(0xc000eb69c0, {0x55cd10a5bd40?, 0xc00135c5f0}) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/scheduler.go:81 +0xc6 github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(azurebsInput).run(, {0xc00134a2c0, {0xc001331400, 0x3f}, {{0x55cd0f51db37, 0x8}, {0x55cd0f51db37, 0x8}, {0x55cd0f5171c0, 0x6}, ...}, ...}, ...) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/input.go:149 +0x5a6 github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(*azurebsInput).Run(, {0xc00134a2c0, {0xc001331400, 0x3f}, {{0x55cd0f51db37, 0x8}, {0x55cd0f51db37, 0x8}, {0x55cd0f5171c0, 0x6}, ...}, ...}, ...) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/input.go:124 +0x225 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(managedInput).runSource(_, {0xc00134a2c0, {0xc001331400, 0x3f}, {{0x55cd0f51db37, 0x8}, {0x55cd0f51db37, 0x8}, {0x55cd0f5171c0, 0x6}, ...}, ...}, ...) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/input.go:168 +0x459 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(managedInput).Run.func1() github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/input.go:122 +0x1bf github.com/elastic/go-concert/unison.(MultiErrGroup).Go.func1() github.com/elastic/go-concert@v0.2.0/unison/multierrgroup.go:42 +0x75 created by github.com/elastic/go-concert/unison.(MultiErrGroup).Go github.com/elastic/go-concert@v0.2.0/unison/multierrgroup.go:40 +0x8d

goroutine 78 [select, 3 minutes]: reflect.rselect({0xc000130b40, 0x5, 0x55cd0bb9c1cb?}) runtime/select.go:589 +0x2ee reflect.Select({0xc000f40160?, 0x5, 0x0?}) reflect/value.go:3052 +0x58a github.com/elastic/beats/v7/libbeat/publisher/pipeline.(Pipeline).runSignalPropagation(0xc0005287b0?) github.com/elastic/beats/v7/libbeat/publisher/pipeline/pipeline.go:325 +0x1c9 created by github.com/elastic/beats/v7/libbeat/publisher/pipeline.(Pipeline).registerSignalPropagation.func1 github.com/elastic/beats/v7/libbeat/publisher/pipeline/pipeline.go:310 +0x97

goroutine 79 [chan receive, 3 minutes]: github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(azurebsInput).run.func1() github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/input.go:135 +0x3c created by github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(azurebsInput).run github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/input.go:134 +0x2fd

goroutine 150 [chan receive]: github.com/elastic/beats/v7/libbeat/outputs/kafka.(client).successWorker(0x0?, 0x0?) github.com/elastic/beats/v7/libbeat/outputs/kafka/client.go:258 +0xc7 created by github.com/elastic/beats/v7/libbeat/outputs/kafka.(client).Connect github.com/elastic/beats/v7/libbeat/outputs/kafka/client.go:132 +0x2ea

goroutine 148 [chan receive]: github.com/Shopify/sarama.(*asyncProducer).dispatcher(0xc00024c2a0) github.com/Shopify/sarama@v1.27.0/async_producer.go:330 +0xd1 github.com/Shopify/sarama.withRecover(0xc001354a20?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.newAsyncProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:166 +0x285

goroutine 149 [chan receive]: github.com/Shopify/sarama.(*asyncProducer).retryHandler(0xc00024c2a0) github.com/Shopify/sarama@v1.27.0/async_producer.go:1038 +0x18c github.com/Shopify/sarama.withRecover(0xc00066ba80?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.newAsyncProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:167 +0x305

goroutine 147 [select, 3 minutes]: github.com/Shopify/sarama.(*client).backgroundMetadataUpdater(0xc001c683f0) github.com/Shopify/sarama@v1.27.0/client.go:823 +0x128 github.com/Shopify/sarama.withRecover(0xc001354a20?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.NewClient github.com/Shopify/sarama@v1.27.0/client.go:184 +0x405

goroutine 151 [chan receive, 3 minutes]: github.com/elastic/beats/v7/libbeat/outputs/kafka.(client).errorWorker(0xc000101400, 0x0?) github.com/elastic/beats/v7/libbeat/outputs/kafka/client.go:269 +0x10c created by github.com/elastic/beats/v7/libbeat/outputs/kafka.(client).Connect github.com/elastic/beats/v7/libbeat/outputs/kafka/client.go:133 +0x36a

goroutine 166 [chan receive]: github.com/rcrowley/go-metrics.(*meterArbiter).tick(0x55cd12ffef80) github.com/rcrowley/go-metrics@v0.0.0-20201227073835-cf1acfcdf475/meter.go:239 +0x2a created by github.com/rcrowley/go-metrics.NewMeter github.com/rcrowley/go-metrics@v0.0.0-20201227073835-cf1acfcdf475/meter.go:46 +0xd3

goroutine 141 [chan receive]: github.com/Shopify/sarama.(Broker).responseReceiver(0xc000360a80) github.com/Shopify/sarama@v1.27.0/broker.go:892 +0x77 github.com/Shopify/sarama.withRecover(0x0?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(Broker).Open.func1 github.com/Shopify/sarama@v1.27.0/broker.go:212 +0xb7c

goroutine 155 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d024e0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc00052c7b8?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 156 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d02660) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc00052efb8?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 157 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d02720) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc00052bfb8?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 158 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d02840) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0x0?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 159 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d02960) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc000097fb8?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 160 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d02a20) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0x0?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 161 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d02ae0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc001ad0900?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 178 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d02ba0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc000095fb8?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 179 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d02c60) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc001348d40?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 180 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d02d20) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0x0?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 181 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d02de0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0x6574756f6465722d?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 182 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d02ea0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0x0?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 183 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d02f60) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0x1f43d6a00000000?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 184 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d03020) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0x0?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 185 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d030e0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0x1?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 186 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d031a0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0x1c?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 187 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d03260) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc00156e180?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 188 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d03320) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc001e488c0?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 189 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d033e0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc001fa28c0?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 190 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d034a0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0x0?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 191 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d03560) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc001348880?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 192 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d03620) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc001341128?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 193 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d036e0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc001346e80?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 194 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d037a0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0x0?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 195 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d03860) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc001d1b840?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 196 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d03920) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc00066aac0?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 197 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d039e0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc001c264c0?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 198 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d03aa0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc001cbe580?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 199 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d03b60) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc001eda280?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 200 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d03c20) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc0020c0300?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 201 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d03ce0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc00126e180?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 202 [chan receive]: github.com/Shopify/sarama.(partitionProducer).dispatch(0xc001d03da0) github.com/Shopify/sarama@v1.27.0/async_producer.go:546 +0x1c5 github.com/Shopify/sarama.withRecover(0xc0013ec180?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newPartitionProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:512 +0x1ea

goroutine 2718 [runnable]: github.com/elastic/go-structform/gotype.reFoldString(0xc0064d0780?, {0x55cd100b7200?, 0xc001acc2d0?, 0x198?}) github.com/elastic/go-structform@v0.0.10/gotype/fold_primitives.go:55 +0x8a github.com/elastic/go-structform/gotype.makeFieldFold.func1(0xc0064d0780, {0x55cd105edd80?, 0xc001acc2d0?, 0xa?}) github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:311 +0xa9 github.com/elastic/go-structform/gotype.makeFieldsFold.func1(0xc003267320?, {0x55cd105edd80?, 0xc001acc2d0?, 0x16?}) github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:198 +0x89 github.com/elastic/go-structform/gotype.makeStructFold.func1(0xc0064d0780, {0x55cd105edd80?, 0xc001acc2d0?, 0xc003267348?}) github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:188 +0x83 github.com/elastic/go-structform/gotype.makePointerFold.func1(0xc0064d0780, {0x55cd0ffc3e00?, 0xc001acc2d0?, 0x55cd0bb90bf4?}) github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:124 +0x115 github.com/elastic/go-structform/gotype.foldAnyReflect(0x55cd0ffc3e00?, {0x55cd0ffc3e00?, 0xc001acc2d0?, 0xc003267418?}) github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:568 +0x9d github.com/elastic/go-structform/gotype.foldInterfaceValue(0xc0064d0780, {0x55cd0ffc3e00?, 0xc001acc2d0}) github.com/elastic/go-structform@v0.0.10/gotype/fold.go:116 +0x27f github.com/elastic/go-structform/gotype.(Iterator).Fold(...) github.com/elastic/go-structform@v0.0.10/gotype/fold.go:93 github.com/elastic/beats/v7/libbeat/common/transform/typeconv.(Converter).Convert(0xc00626acb0, {0x55cd0ffe4bc0, 0xc005757ba0}, {0x55cd0ffc3e00, 0xc001acc2d0}) github.com/elastic/beats/v7/libbeat/common/transform/typeconv/typeconv.go:108 +0x13a github.com/elastic/beats/v7/libbeat/common/transform/typeconv.Convert({0x55cd0ffe4bc0, 0xc005757ba0}, {0x55cd0ffc3e00, 0xc001acc2d0}) github.com/elastic/beats/v7/libbeat/common/transform/typeconv/typeconv.go:130 +0xcb github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.createUpdateOp(0xc00135c2d0, 0xc000246900, {0x55cd0ffc3e00?, 0xc001acc2d0}) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/publish.go:101 +0x1f7 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(cursorPublisher).Publish(0xc001acc030, {{0xc13a21d61b692b63, 0x34e401ac91, 0x55cd13001260}, 0xc002bb11a0, 0xc002bb0ea0, {0x0, 0x0}, 0x0}, {0x55cd0ffc3e00, ...}) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/publish.go:71 +0x86 github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(job).readJsonAndPublish(0xc0003fc070, {0x55cd10a5bd40, 0xc0003de190}, {0x55cd10a0ad80?, 0xc00314c420}, {0xc001ac67e0, 0x54}) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/job.go:203 +0x374 github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(job).processAndPublishData(0xc0003fc070, {0x55cd10a5bd40, 0xc0003de190}, {0xc001ac67e0, 0x54}) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/job.go:128 +0x397 github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(job).do(0xc0003fc070, {0x55cd10a5bd40, 0xc0003de190}, {0xc001ac67e0, 0x54}) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/job.go:73 +0x12c github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(scheduler).scheduleOnce.func1() github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/scheduler.go:114 +0x70 created by github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(scheduler).scheduleOnce github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/scheduler.go:112 +0x2f6

goroutine 2705 [select]: net/http.(persistConn).writeLoop(0xc0038d25a0) net/http/transport.go:2410 +0xf2 created by net/http.(Transport).dialConn net/http/transport.go:1766 +0x173d

goroutine 2752 [select]: net/http.(persistConn).writeLoop(0xc002a32fc0) net/http/transport.go:2410 +0xf2 created by net/http.(Transport).dialConn net/http/transport.go:1766 +0x173d

goroutine 2779 [chan receive]: github.com/Shopify/sarama.(asyncProducer).newBrokerProducer.func1() github.com/Shopify/sarama@v1.27.0/async_producer.go:699 +0x65 github.com/Shopify/sarama.withRecover(0xc0038d0001?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newBrokerProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:698 +0x31b

goroutine 2511 [select]: net/http.(persistConn).writeLoop(0xc0030410e0) net/http/transport.go:2410 +0xf2 created by net/http.(Transport).dialConn net/http/transport.go:1766 +0x173d

goroutine 2704 [IO wait]: internal/poll.runtime_pollWait(0x7f9254442b28, 0x72) runtime/netpoll.go:306 +0x89 internal/poll.(pollDesc).wait(0xc001b5bf00?, 0xc003202000?, 0x0) internal/poll/fd_poll_runtime.go:84 +0x32 internal/poll.(pollDesc).waitRead(...) internal/poll/fd_poll_runtime.go:89 internal/poll.(FD).Read(0xc001b5bf00, {0xc003202000, 0xa000, 0xa000}) internal/poll/fd_unix.go:167 +0x299 net.(netFD).Read(0xc001b5bf00, {0xc003202000?, 0xc00320200d?, 0x602?}) net/fd_posix.go:55 +0x29 net.(conn).Read(0xc0006588a0, {0xc003202000?, 0x9ff3?, 0xc003202005?}) net/net.go:183 +0x45 crypto/tls.(atLeastReader).Read(0xc003bc7290, {0xc003202000?, 0xc003bc7290?, 0x0?}) crypto/tls/conn.go:788 +0x3d bytes.(Buffer).ReadFrom(0xc00326d410, {0x55cd10a0c680, 0xc003bc7290}) bytes/buffer.go:202 +0x98 crypto/tls.(Conn).readFromUntil(0xc00326d180, {0x55cd10a171a0?, 0xc0006588a0}, 0x0?) crypto/tls/conn.go:810 +0xe5 crypto/tls.(Conn).readRecordOrCCS(0xc00326d180, 0x0) crypto/tls/conn.go:617 +0x116 crypto/tls.(Conn).readRecord(...) crypto/tls/conn.go:583 crypto/tls.(Conn).Read(0xc00326d180, {0xc0035e3000, 0x1000, 0x2f?}) crypto/tls/conn.go:1316 +0x16f net/http.(persistConn).Read(0xc0038d25a0, {0xc0035e3000?, 0xc001c29b60?, 0xc004ca9d30?}) net/http/transport.go:1943 +0x4e bufio.(Reader).fill(0xc003e27080) bufio/bufio.go:106 +0xff bufio.(Reader).Peek(0xc003e27080, 0x1) bufio/bufio.go:144 +0x5d net/http.(persistConn).readLoop(0xc0038d25a0) net/http/transport.go:2107 +0x1ac created by net/http.(Transport).dialConn net/http/transport.go:1765 +0x16ea

goroutine 2510 [IO wait]: internal/poll.runtime_pollWait(0x7f9254442948, 0x72) runtime/netpoll.go:306 +0x89 internal/poll.(pollDesc).wait(0xc00044ff00?, 0xc003ed6000?, 0x0) internal/poll/fd_poll_runtime.go:84 +0x32 internal/poll.(pollDesc).waitRead(...) internal/poll/fd_poll_runtime.go:89 internal/poll.(FD).Read(0xc00044ff00, {0xc003ed6000, 0xa000, 0xa000}) internal/poll/fd_unix.go:167 +0x299 net.(netFD).Read(0xc00044ff00, {0xc003ed6000?, 0xc003ed600d?, 0x4010?}) net/fd_posix.go:55 +0x29 net.(conn).Read(0xc001cfa328, {0xc003ed6000?, 0x9ff3?, 0xc003ed6005?}) net/net.go:183 +0x45 crypto/tls.(atLeastReader).Read(0xc001ac3b18, {0xc003ed6000?, 0xc001ac3b18?, 0x0?}) crypto/tls/conn.go:788 +0x3d bytes.(Buffer).ReadFrom(0xc00004db10, {0x55cd10a0c680, 0xc001ac3b18}) bytes/buffer.go:202 +0x98 crypto/tls.(Conn).readFromUntil(0xc00004d880, {0x55cd10a171a0?, 0xc001cfa328}, 0x0?) crypto/tls/conn.go:810 +0xe5 crypto/tls.(Conn).readRecordOrCCS(0xc00004d880, 0x0) crypto/tls/conn.go:617 +0x116 crypto/tls.(Conn).readRecord(...) crypto/tls/conn.go:583 crypto/tls.(Conn).Read(0xc00004d880, {0xc003346000, 0x1000, 0x2f?}) crypto/tls/conn.go:1316 +0x16f net/http.(persistConn).Read(0xc0030410e0, {0xc003346000?, 0xc001336d20?, 0xc00647bd30?}) net/http/transport.go:1943 +0x4e bufio.(Reader).fill(0xc0032a03c0) bufio/bufio.go:106 +0xff bufio.(Reader).Peek(0xc0032a03c0, 0x1) bufio/bufio.go:144 +0x5d net/http.(persistConn).readLoop(0xc0030410e0) net/http/transport.go:2107 +0x1ac created by net/http.(Transport).dialConn net/http/transport.go:1765 +0x16ea

goroutine 2776 [sync.Mutex.Lock]: sync.runtime_SemacquireMutex(0xc0035f19e0?, 0x59?, 0x3?) runtime/sema.go:77 +0x26 sync.(Mutex).lockSlow(0xc0002468a0) sync/mutex.go:171 +0x165 sync.(Mutex).Lock(...) sync/mutex.go:90 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.createUpdateOp(0xc00135c2d0, 0xc000246880, {0x55cd0ffc3e00?, 0xc001345c80}) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/publish.go:91 +0xb1 github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(cursorPublisher).Publish(0xc001345b90, {{0xc13a21d61b692bc7, 0x34e401ab65, 0x55cd13001260}, 0xc0035f19e0, 0xc0035f1860, {0x0, 0x0}, 0x0}, {0x55cd0ffc3e00, ...}) github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/publish.go:71 +0x86 github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(job).readJsonAndPublish(0xc003540690, {0x55cd10a5bd40, 0xc00135c5f0}, {0x55cd10a0ad80?, 0xc00595b7a0}, {0xc000083860, 0x54}) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/job.go:203 +0x374 github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(job).processAndPublishData(0xc003540690, {0x55cd10a5bd40, 0xc00135c5f0}, {0xc000083860, 0x54}) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/job.go:128 +0x397 github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(job).do(0xc003540690, {0x55cd10a5bd40, 0xc00135c5f0}, {0xc000083860, 0x54}) github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/job.go:73 +0x12c github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(scheduler).scheduleOnce.func1() github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/scheduler.go:114 +0x70 created by github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage.(scheduler).scheduleOnce github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/scheduler.go:112 +0x2f6

goroutine 2807 [chan receive]: github.com/Shopify/sarama.(Broker).responseReceiver(0xc000188000) github.com/Shopify/sarama@v1.27.0/broker.go:892 +0x77 github.com/Shopify/sarama.withRecover(0x0?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(Broker).Open.func1 github.com/Shopify/sarama@v1.27.0/broker.go:212 +0xb7c

goroutine 2751 [select]: net/http.(persistConn).readLoop(0xc002a32fc0) net/http/transport.go:2227 +0xd85 created by net/http.(Transport).dialConn net/http/transport.go:1765 +0x16ea

goroutine 2778 [select]: github.com/Shopify/sarama.(brokerProducer).run(0xc00026d8f0) github.com/Shopify/sarama@v1.27.0/async_producer.go:751 +0x197 github.com/Shopify/sarama.withRecover(0xc0005297b8?) github.com/Shopify/sarama@v1.27.0/utils.go:43 +0x3e created by github.com/Shopify/sarama.(asyncProducer).newBrokerProducer github.com/Shopify/sarama@v1.27.0/async_producer.go:695 +0x26d

ShourieG commented 1 year ago

@sebastienminne I don't see any specific error or panic in the stack trace though. What exactly seems to be the issue, or am I missing something ?

ShourieG commented 1 year ago

@sebastienminne Are you getting this stack trace every time you use a worker count > 1 ? Is filebeat crashing immediately ? Could you share your config ?

sebastienminne commented 1 year ago

@ShourieG This happen when worker > 1, sometimes at startup sometime fews minutes after start This also happen when having more than one container watched (having one or more worker on containers)

filebeat.inputs:

output.kafka: topic: aTopic required_acks: 1 client_id: filebeat version: '1.0.0' hosts:

What I can observe as well is an incredible amount of 'listBlobs' operation (43 M a day) which cost a lot of money It is running in kube on a pod having 5 cpu and 10 g of ram

sebastienminne commented 1 year ago

Sorry I just noticed the first line of the error wasn't in the stack trace :

fatal error: concurrent map iteration and map write

ShourieG commented 1 year ago

@sebastienminne, extremely weird .. cause with 8.10 we even introduced concurrency tests that work with even 2000 workers after the fix. Could you clear your registry and do a clean stack update and try once more ? If it still occurs after that, could you check if its happening when using multiple containers or a single container ?

sebastienminne commented 1 year ago

@ShourieG test has been made after cleaning the registry both many worker on 1 container and 1 worker on multiple container produce the same error

ShourieG commented 1 year ago

@sebastienminne I was finally able to reproduce the issue locally, and the problem lies in how partial saves are done atm. Even with mutex locks, some resources are still being accessed concurrently which should not be possible. Either way in 8.11 partial saves are being removed in its current state, which will solve this issue for good.

sebastienminne commented 1 year ago

@ShourieG thanks for investigating, I'll give a try to 8.11