open-telemetry / opentelemetry-collector-contrib

Contrib repository for the OpenTelemetry Collector
https://opentelemetry.io
Apache License 2.0
3.05k stars 2.36k forks source link

[receiver/filelog] A proposal for issue #17846 #18908

Closed VihasMakwana closed 1 year ago

VihasMakwana commented 1 year ago

Component(s)

receiver/filelog

Is your feature request related to a problem? Please describe.

This enhancement request is related to the issue #17846

Describe the solution you'd like

High Level Architecture:

Solution for this would be a thread pooling model where we'll fire MaxConcurrentFiles/2 number of goroutines and they'll consume all the paths concurrently.

Go channels used:

  1. channel_1: Channel to send the readers to thread pool
  2. channel_2: Channel to receive the readers after all the logs have been emitted (this one will be explained later)

Architecture for thread pool function:

Screenshot 2023-02-24 at 5 57 31 PM

Steps per poll cycle:

  1. Call FindFiles function which returns all the matched files
  2. We then fire MaxConcurrentFiles/2 number of goroutines, all of which will listen to channel_1.
  3. We'll then loop through all the matched files and send the readers to the channel.

Architecture for channel_2:

Screenshot 2023-02-24 at 6 07 55 PM

Steps for thread pool function:

  1. Listen to channel_1.
  2. Recieve the reader and we call ReadToEnd() and we will close the reader if deleteAfterRead is true. (Same logic as per current implementation)
  3. Send the reader to channel_2.
  4. Repeat step 1 until the channel is closed.

Architecture for thread pool function:

Screenshot 2023-02-24 at 6 15 41 PM

Steps for channel_2:

  1. Listen to channel_2 for any readers.
  2. Append the reader to a list which keep track of open readers.
  3. Close the open readers once the length of the list exceeds MaxConcurrentFiles/2. Because at one time, we're only allowed to keep MaxConcurrentFiles open files.

Describe alternatives you've considered

Another approach was to send the paths to the channel and create readers in goroutines itself.

Caveats of this approach

Additional context

No response

github-actions[bot] commented 1 year ago

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

VihasMakwana commented 1 year ago

cc: @atoulme

atoulme commented 1 year ago

OK. How will you test for this? Please try to code this up and see where that would live.

djaglowski commented 1 year ago

Thanks for putting this together @vihas-splunk.

There are a few cases that I am not sure how this proposal would handle.

First, to make sure I understand the overall goal correctly, I'll restate it here. We want to allow concurrency between poll cycles. e.g. p0 finds a very large file and takes a long time to consume it. Meanwhile, p1, p2, etc can still be triggered and consume other files.


The proposal states that we would create MaxConcurrentFiles/2 goroutines during each poll cycle. However, if we are allowing poll cycles to overlap, we are at risk of violating MaxConcurrentFiles/2 whenever we have 3 or more poll cycles running concurrently. I may be missing it but I don't think this is handled in this design.

It seems to me that the purpose of a pool in this case would be to limit the total number of open files, regardless of which poll cycles have found the files. Therefore, if we are to use a pool here, I think we'd need it to be shared across all poll cycles.

I think your design handles the notion of lost files, but I don't see how the same mechanism works if we account for the above problem. Ultimately, we have a difficult tension between the number of open files and our ability to detect lost files. If we manage all open files as a single pool, then we need a way to know which files in that pool were found by previous poll cycles.


We are already running each reader's ReadToEnd in a separate goroutine. This effectively means that 1) we know how many of these are running in parallel, and 2) we know when each is done. Do we really need to involve channels here? Perhaps the solution is to skip the wg.Wait at the end of each poll cycle, and instead to have the manager limit the number of files we open during each poll cycle.


How will we ensure that concurrent poll cycles do not attempt to read the same file concurrently? e.g. p0 is consuming a large file. p1 finds the same file. How does p1 establish that this file is already being consumed?

VihasMakwana commented 1 year ago

The proposal states that we would create MaxConcurrentFiles/2 goroutines during each poll cycle. However, if we are allowing poll cycles to overlap, we are at risk of violating MaxConcurrentFiles/2 whenever we have 3 or more poll cycles running concurrently. I may be missing it but I don't think this is handled in this design.

It seems to me that the purpose of a pool in this case would be to limit the total number of open files, regardless of which poll cycles have found the files. Therefore, if we are to use a pool here, I think we'd need it to be shared across all poll cycles.`

Yes, you're right. In my design, we fire a thread pool per poll cycle and wait for that poll cycle to finish consuming all the files. Only after that, we move to the next poll. Having one large pool between poll cycles sounds doable to me, I have to improve the current design. I will work on that today and let you know. But as you said, there's a difficult tension. I shall answer all the questions after I improve this design.

VihasMakwana commented 1 year ago

How will we ensure that concurrent poll cycles do not attempt to read the same file concurrently? e.g. p0 is consuming a large file. p1 finds the same file. How does p1 establish that this file is already being consumed?

For this, we can use hashing of some sense, maybe we can store the path in a hashmap and remove that once it's consumed. This way, p1 can check if a particular path is in the hashmap, if it is then it means it's currently being consumed or else it will start consuming it.

hvaghani221 commented 1 year ago

We can create a single channel to hold all the readers, and never close it. Also, maintain a hash map that keeps track of all the readers in the channel. In any poll cycle, do not add the reader to the channel if it is present in the hash map. The buffer length of the channel can be set to MaxConcurrentFiles. It makes sure we don't have too many overlapping poll cycles.

type Manager struct {
    // ...
    readerChan chan *Reader
    // Use queuedPath with mutex
    queuedPath map[string]struct{}
}

func (m *Manager) Worker(ctx context.Context) {
    for {
        select {
        case <-m.readerChan:
            // open the file if not opened
            // consume file
            // Remove path from queuedPath
            // feed back the reader to lost file manager
        case <-ctx.Done():
            // return
        }
    }
}

func (m *Manager) startPoller(ctx context.Context) {
    for i := 0; i < m.maxBatchFiles; i++ {
        go m.Worker(ctx)
    }
    // ...
}

func (m *Manager) poll(ctx context.Context) {

    // ...
    // matches := m.finder.FindFiles()
    var matches []string
    for _, path := range matches {
        if _, ok := m.queuedPath[path]; ok {
            continue
        }
        // create a reader but don't open the file
        m.readerChan <- createReader(path)
    }

}

Files will be lost most likely beacuse of log rotation in production environement. Files can be rotated because of 2 reasons:

We can take advantage of this behaviour and implement the lost file manager as a priority queue. So, when we need to open more than MaxConcurrentFiles/2 files to track any lost files, the lost file manager can close the file that is the least likely to be rotated.

The lost file manager can periodically check if any opened file is lost, and it can add the lost file reader to the readerChan.

VihasMakwana commented 1 year ago

We are already running each reader's ReadToEnd in a separate goroutine. This effectively means that 1) we know how many of these are running in parallel, and 2) we know when each is done. Do we really need to involve channels here? Perhaps the solution is to skip the wg.Wait at the end of each poll cycle, and instead to have the manager limit the number of files we open during each poll cycle.

We need channels for the thread pooling goroutines, nothing else. When we send the reader to a channel, any of the available goroutines will pick that reader up and start consuming it.

VihasMakwana commented 1 year ago

Yes @hvaghani221, that's exactly what I mean by my previous comment. Thanks for making it clear using that snippet

djaglowski commented 1 year ago

In my design, we fire a thread pool per poll cycle and wait for that poll cycle to finish consuming all the files.

I don't think I understand what we are trying to accomplish then. The current implementation waits for each poll cycle to finish. What's the benefit of using a pool if we are going to wait?

VihasMakwana commented 1 year ago

I will tweak this current design to have concurrency between poll intervals. We can have a global pool for readers and use hashing. I will update you with the new design soon.

VihasMakwana commented 1 year ago

Hello @djaglowski

Design of the poller:

Screenshot 2023-03-01 at 12 15 02 PM

Design of per thread goroutine:

Screenshot 2023-03-01 at 12 16 17 PM

Answers:

How will we ensure that concurrent poll cycles do not attempt to read the same file concurrently? e.g. p0 is consuming a large file. p1 finds the same file. How does p1 establish that this file is already being consumed?

As we use a global hash, we'll store the file path once it start consuming, and remove once it's done consuming. This way, if we find same file in future poll cycles, we can look into the hash set and verify that it's currently being read and move to the next one.

First, to make sure I understand the overall goal correctly, I'll restate it here. We want to allow concurrency between poll cycles. e.g. p0 finds a very large file and takes a long time to consume it. Meanwhile, p1, p2, etc can still be triggered and consume other files.

Yes, that's correct. The current design wouldn't wait for files to consume and those readers will be consumed asynchronously. It will send the readers to the channel and move to next poll cycle.

Note:

djaglowski commented 1 year ago

@vihas-splunk, thanks for the detailed design.

Using the file path as a hash key is an interesting idea but it may have some subtle implications.

For example, it is possible that the file is rotated while we are reading it. In many cases this will result in a newer file with the same path that we would ignore until we've finished consuming the original file. This potentially adds some latency, and introduces a small amount of risk that we miss our chance to read the file before it is deleted (when rotation is occurring very rapidly). However, it also would mean that rotated files are consumed sequentially (excluding those found in the first poll cycle).

If you want to put a PR together, I'm happy to review it. Given the complexity, a clean diff will be more helpful than overall clean code. We can refactor later if this design proves to work.

VihasMakwana commented 1 year ago

If you want to put a PR together, I'm happy to review it. Given the complexity, a clean diff will be more helpful than overall clean code. We can refactor later if this design proves to work.

Sure @djaglowski, can you assign this to me?

hvaghani221 commented 1 year ago

I will be happy to review it as well :)

github-actions[bot] commented 1 year ago

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

github-actions[bot] commented 1 year ago

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

github-actions[bot] commented 1 year ago

This issue has been closed as inactive because it has been stale for 120 days with no activity.