Open amirbenun opened 1 year ago
I would like us to preserve the way we are treating the different steps of the vulnerability as a sort of micro-services that works on the channel queue in the background and are easily scalable.
If you look at the initial solution each step works on a single entity from a different channel and you can easily scale them with goroutines without doing any code changes increasing the concurrency to our needs.
This scaling is also dynamic and can be altered at every moment by bringing up/down of goroutines as needed.
Obviously, this is only thread
wise as of right now because we are running as a process under the elastic-agent
, but we shouldn't block ourselves from scaling differently in the future.
This feels like an improvement that we can definitely prioritize and perform a quick POC to ensure viability, but I think that we have more important things to cover as of right now in the immediate time. Once we will have a stable flow of the pipeline (including reclaiming of snapshots), then it will be probably easier to refactor the pipeline to our tailored needs and we will have more knowledge on the areas of focus for improvements.
our CNVM EKS implementation could be a good opportunity for this - we can see how stable it is, and then refactor our original implementation
This is a toy implementation of my suggestion in the retro.
As a demo, I replaced the channel in the IAM fetcher with this structure. Notice how the fetcher doesn't have access to the channel anymore so it's not possible to end up in a goroutine leak.
If we refactor everything like this, we are going to end with
func NewCisAwsFactory(log *logp.Logger, cfg aws.Config, ch pipeline.ContextAwareChannel[fetching.ResourceInfo], identity *awslib.Identity) FetchersMap {
making it impossible to use the raw channel in new / old fetchers.
diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go
index 55c7686..bcec271 100644
--- a/pipeline/pipeline.go
+++ b/pipeline/pipeline.go
@@ -47,3 +47,20 @@ func Step[In any, Out any](ctx context.Context, log *logp.Logger, inputChannel c
return outputCh
}
+
+type ContextAwareChannel[T any] struct {
+ ch chan T
+}
+
+func NewContextAwareChannel[T any](ch chan T) ContextAwareChannel[T] {
+ return ContextAwareChannel[T]{ch: ch}
+}
+
+func (r ContextAwareChannel[T]) Write(ctx context.Context, info T) error {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case r.ch <- info:
+ return nil
+ }
+}
diff --git a/resources/fetching/factory/aws_factory.go b/resources/fetching/factory/aws_factory.go
index a746f96..0c02761 100644
--- a/resources/fetching/factory/aws_factory.go
+++ b/resources/fetching/factory/aws_factory.go
@@ -21,6 +21,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/elastic/elastic-agent-libs/logp"
+ "github.com/elastic/cloudbeat/pipeline"
"github.com/elastic/cloudbeat/resources/fetching"
fetchers "github.com/elastic/cloudbeat/resources/fetching/fetchers/aws"
"github.com/elastic/cloudbeat/resources/providers/aws_cis/logging"
@@ -44,7 +45,7 @@ func NewCisAwsFactory(log *logp.Logger, cfg aws.Config, ch chan fetching.Resourc
m := make(FetchersMap)
iamProvider := iam.NewIAMProvider(log, cfg, &awslib.MultiRegionClientFactory[iam.AccessAnalyzerClient]{})
- iamFetcher := fetchers.NewIAMFetcher(log, iamProvider, ch, identity)
+ iamFetcher := fetchers.NewIAMFetcher(log, iamProvider, pipeline.NewContextAwareChannel(ch), identity)
m[fetching.IAMType] = RegisteredFetcher{Fetcher: iamFetcher}
kmsProvider := kms.NewKMSProvider(log, cfg, &awslib.MultiRegionClientFactory[kms.Client]{})
diff --git a/resources/fetching/fetchers/aws/iam_fetcher.go b/resources/fetching/fetchers/aws/iam_fetcher.go
index 09961ae..6bb3ba1 100644
--- a/resources/fetching/fetchers/aws/iam_fetcher.go
+++ b/resources/fetching/fetchers/aws/iam_fetcher.go
@@ -23,6 +23,7 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
+ "github.com/elastic/cloudbeat/pipeline"
"github.com/elastic/cloudbeat/resources/fetching"
"github.com/elastic/cloudbeat/resources/providers/awslib"
"github.com/elastic/cloudbeat/resources/providers/awslib/iam"
@@ -31,7 +32,7 @@ import (
type IAMFetcher struct {
log *logp.Logger
iamProvider iam.AccessManagement
- resourceCh chan fetching.ResourceInfo
+ resourceCh pipeline.ContextAwareChannel[fetching.ResourceInfo]
cloudIdentity *awslib.Identity
}
@@ -44,7 +45,7 @@ type IAMResource struct {
identity *awslib.Identity
}
-func NewIAMFetcher(log *logp.Logger, provider iam.AccessManagement, ch chan fetching.ResourceInfo, identity *awslib.Identity) *IAMFetcher {
+func NewIAMFetcher(log *logp.Logger, provider iam.AccessManagement, ch pipeline.ContextAwareChannel[fetching.ResourceInfo], identity *awslib.Identity) *IAMFetcher {
return &IAMFetcher{
log: log,
iamProvider: provider,
@@ -95,12 +96,18 @@ func (f IAMFetcher) Fetch(ctx context.Context, cMetadata fetching.CycleMetadata)
}
for _, iamResource := range iamResources {
- f.resourceCh <- fetching.ResourceInfo{
- Resource: IAMResource{
- AwsResource: iamResource,
- identity: f.cloudIdentity,
+ err = f.resourceCh.Write(
+ ctx,
+ fetching.ResourceInfo{
+ Resource: IAMResource{
+ AwsResource: iamResource,
+ identity: f.cloudIdentity,
+ },
+ CycleMetadata: cMetadata,
},
- CycleMetadata: cMetadata,
+ )
+ if err != nil {
+ return err
}
}
This is a nice wrapper @orestisfl, it cant ensure that a channel is closed though
func main() {
ch := make(chan bool)
close(ch)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
select {
case ch <- true:
return
}
}()
wg.Wait()
}
@olegsu writing on a closed channel is a panic and we can't do anything about it, your program exits with a panic. This is why writers should always be responsible for closing channels (if they are to be closed at all).
@jeniawhite @olegsu
After yesterday's discussion, here's a toy project that illustrates how we can work with multiple workers that can be independently restarted with separate contexts + safe context-aware channel + non-leaky reader: https://go.dev/play/p/0_Q6kyYeI2E
Explanation of each section of the output:
Initially, the workers are working without interruption:
2023/07/19 15:05:33 Got data: w2-i0
2023/07/19 15:05:33 Got data: w0-i0
2023/07/19 15:05:33 Got data: w1-i0
2023/07/19 15:05:33 Got data: w0-i1
2023/07/19 15:05:33 Got data: w1-i1
2023/07/19 15:05:33 Got data: w2-i1
2023/07/19 15:05:33 Got data: w2-i2
2023/07/19 15:05:33 Got data: w0-i2
2023/07/19 15:05:33 Got data: w1-i2
2023/07/19 15:05:33 Got data: w1-i3
2023/07/19 15:05:33 Got data: w2-i3
2023/07/19 15:05:33 Got data: w0-i3
2023/07/19 15:05:33 Got data: w1-i4
2023/07/19 15:05:33 Got data: w0-i4
2023/07/19 15:05:33 Got data: w2-i4
Then, workers 0 + 2 are stopped (without restart):
2023/07/19 15:05:33 Stopping workers 0 and 2
2023/07/19 15:05:33 Got data: w2-i5
2023/07/19 15:05:33 Got data: w1-i5
2023/07/19 15:05:33 Worker 0 stopping: context canceled
2023/07/19 15:05:33 Got data: w1-i6
2023/07/19 15:05:33 Got data: w2-i6
2023/07/19 15:05:33 Got data: w2-i7
2023/07/19 15:05:33 Got data: w1-i7
2023/07/19 15:05:33 Got data: w1-i8
2023/07/19 15:05:33 Worker 2 stopping: context canceled
2023/07/19 15:05:33 Got data: w1-i9
Notice how this (as expected) doesn't happen instantaneously. However, after the "Worker 0 stopping" message, we don't get any other data from that worker.
Later, worker 1 & 2 are restarted. Worker 2 was not running because it was stopped before, so we don't get another "Worker 2 stopping" message.
2023/07/19 15:05:33 Restarting workers 1 and 2
2023/07/19 15:05:33 Got data: w2-i0
2023/07/19 15:05:33 Got data: w1-i0
2023/07/19 15:05:33 Got data: w1-i10
2023/07/19 15:05:33 Got data: w1-i1
2023/07/19 15:05:33 Got data: w2-i1
2023/07/19 15:05:33 Worker 1 stopping: context canceled
2023/07/19 15:05:33 Got data: w2-i2
2023/07/19 15:05:33 Got data: w1-i2
2023/07/19 15:05:33 Got data: w1-i3
2023/07/19 15:05:33 Got data: w2-i3
2023/07/19 15:05:33 Got data: w2-i4
2023/07/19 15:05:33 Got data: w1-i4
Finally, the root context is canceled and all "children" context are subsequently canceled as well:
2023/07/19 15:05:33 Stopping all workers
2023/07/19 15:05:33 Read loop context done
2023/07/19 15:05:33 Read loop exiting
2023/07/19 15:05:33 Worker 1 stopping: context canceled
2023/07/19 15:05:33 Worker 2 stopping: context canceled
Process finished with the exit code 0
Note that worker 0 was already stopped before. Also note that the read loop is safely exited as well.
Nice @orestisfl !!
Notice we already have some similar implementation in pipeline.go
.
And yet, I don't think we should abstract away the channel, instead, I believe it will be more beneficial for the team that we all get more familiar with channels usage and common practices.
Eventually, Go native code should be readable for the team and in our current code base I don't see the benefits of maintaining that extra layer.
Hey @amirbenun
I am not sure the implementation in pipeline.go
is similar, since it doesn't handle the context done case, we will have to fix that usage as well.
The main reasons I am arguing for an abstraction is:
Motivation
Today in cloudbeat, we have our own custom implementation for handling asynchronous flow. In posture flavor, we are using
Step
, a thin generic function to support a multilayer of asynchronous pipelines implemented over go channels. In vulnerability flavor, we are using theworker
, a main component that syncs between all the async providers, this implementation forces all the providers to be channels aware and focused on the pipeline instead of the business logic.Proposition: RxGo
A well-tested library to handle async pipeline of events, that will allow us to focus on the business logic implementation and go forward without the concern synchronisation bugs. Maybe as a further step, it can replace the
pipeline
package entirely. RxGo has 4.5K stars, and the organization is also popular among other coding languages.Definition of done