vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17.6k stars 1.55k forks source link

Enable preserving event order between `vector` source and sink #13845

Open PerfectDay20 opened 2 years ago

PerfectDay20 commented 2 years ago

A note for the community

Problem

During a performance test between two Vector instances on two machines in a same DC, I found the data in the received file is disordered. Machine1:

Machine2:

The data file is 956MB, each line prefixed with line number: 0,1,2... In the received file, the line numbers are disordered like:

...
297999 ...
299000 ... (should be 298000 here)
299001 ...
...
299999 ...
298000 ... (now the missing 298000-298999 are saved after 299000-299999)
298001 ...
...

The client that writes to the HTTP source is a simple Java method:

public class VectorTest {

    public static void main(String[] args) throws Exception {
        String path = args[0];
        int capacity = Integer.parseInt(args[1]);
        HttpClient client = HttpClient.newHttpClient();
        try (
                Scanner input = new Scanner(Path.of(path), StandardCharsets.UTF_8)
        ) {

            List<String> buffer = new ArrayList<>(capacity);
            while (input.hasNextLine()) {
                while (input.hasNextLine() && buffer.size() < capacity) {
                    String line = input.nextLine();
                    if (!line.isBlank()) {
                        buffer.add(line);
                    }
                }

                HttpRequest request = HttpRequest.newBuilder()
                        .uri(URI.create("http://127.0.0.1:9001/"))
                        .headers("Content-Type", "text/plain;charset=UTF-8")
                        .POST(HttpRequest.BodyPublishers.ofString(String.join("\n", buffer)))
                        .build();

                HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
                if (response.statusCode() != 200) {
                    System.out.println(response);
                }
                buffer.clear();
            }
        }
    }
}

The file written by machine1 file sink is ordered, while the file written by machine2 is ordered in some tests, and disordered in other tests.

At first, I thought this may be caused by Vector sink's concurrent sending and retries. But when I disabled retry with request.retry_attempts = 0, the file is still complete with disordered data. So I assume this is not caused by failed requests and retry.

I read through the docs and searched issues but find no guarantees about the data order, so I wonder what's the cause of the disorder, is this the expected behavior?

Configuration

# machine1
data_dir = "/root/foo/vector/inner_data"
acknowledgements.enabled = true

[sources.machine1_socket]
type = "http"
address = "127.0.0.1:9001"

[sinks.machine2_vector]
type = "vector"
inputs = [ "machine1_socket" ]
address = "machine2:9000"
version = "2"
compression = true

buffer.type = "disk"
buffer.max_size = 9_268_435_488
batch.max_events = 1_000
batch.timeout_secs = 1
request.retry_attempts = 0

[sinks.local_file]
type = "file"
inputs = [ "machine1_socket" ]
compression = "none"
path = "/root/foo/vector/vector-%Y-%m-%d.log"
encoding.codec = "text"
framing.method = "newline_delimited"

# machine2
data_dir = "/root/foo/vector/inner_data"
acknowledgements.enabled = true

[sources.machine2_vector]
type = "vector"
address = "machine2:9000"
version = "2"

[sinks.local_file]
type = "file"
inputs = [ "machine2_vector" ]
compression = "none"
path = "/root/foo/vector/vector-%Y-%m-%d.log"
encoding.codec = "text"
framing.method = "newline_delimited"

Version

vector 0.23.0 (x86_64-unknown-linux-gnu 38c2435 2022-07-11)

Debug Output

sorry, debug output is too large to upload, 4GB for the whole test.

Example Data

(line number + space + long text) 0 a842a1434a... (500 chars) 1 a842a1434a... 2 a842a1434a... 3 a842a1434a... 4 a842a1434a... 5 a842a1434a... 6 a842a1434a... 7 a842a1434a... 8 a842a1434a... 9 a842a1434a...

Additional Context

No response

References

No response

jszwedko commented 2 years ago

Hi @PerfectDay20 !

I think what is happening here is that the vector sink is sending the events in concurrent requests, which means they can arrive out-of-order. You can try setting request.concurrency = 1 on the vector sink to limit it to sending one request at a time, which should preserve ordering within the sink. However, the vector source also processes incoming requests concurrently, so even then, it may be possible for events to end up out-of-order, but it should be much less likely if the sink is only sending one request at a time.

Let me know if that makes sense! This is intended behavior of that source and sink, but I can see the use-case for preserving order so we can repurpose this issue as a feature request.

PerfectDay20 commented 2 years ago

Thanks, @jszwedko !

This makes sense and I tried with request.concurrency = 1, the data is now ordered in my simple tests, though the results may differ in an extreme or production environment.

zamazan4ik commented 2 years ago

@jszwedko I suggest put somewhere into the documentation information about this scenario. We also met the same problem with a changed events order in Vector. Not it is not clear enough, what and how should be configured for getting the desired result (preserving events order).

Ideally some dedicated setting like 'preserve_event_order = true/false` could help here and would be much more understandable. However, I think we can start with a piece of documentation.

jszwedko commented 2 years ago

@jszwedko I suggest put somewhere into the documentation information about this scenario. We also met the same problem with a changed events order in Vector. Not it is not clear enough, what and how should be configured for getting the desired result (preserving events order).

Ideally some dedicated setting like 'preserve_event_order = true/false` could help here and would be much more understandable. However, I think we can start with a piece of documentation.

👍 agreed. A preserve_event_order option would be more discoverable.

fitz123 commented 11 months ago

Hi guys! My setup involves sending data through Vector with HTTPS and certificate authentication, using the following configuration: file source to vector sink, and then vector source to file sink.

I only encounter reordering issues when the internet connectivity becomes exceptionally poor. It's worth noting that my Vector agent (sender) and aggregator (receiver) are geographically distant from each other, which could be contributing to the problem.

With the concurrency set to 1, the throughput is approximately 7 times slower than with "adaptive" (default) concurrency, which is not an acceptable trade-off for my use case.

This situation prompts me to wonder if the Vector protocol is adequately optimized for situations characterized by high latency and unstable internet connections. I'm interested in knowing if you have any recommended design solutions or best practices that could facilitate high-volume, ordered delivery over long distances?

Ideally, I would prefer to maintain the current setup with adaptive concurrency, which operates effectively, and implement a transform akin to 'dedup', but designed for ensuring order consistency.

jszwedko commented 11 months ago

Hey! It's expected that you would see much lower throughput with a concurrency of 1 given that Vector will only send one request at a time and wait for the response before sending the next request. If you need ordering guarantees you could also consider putting a queue in the middle like Kafka that Vector writes to and reads from.

mikelsid commented 10 months ago

@jszwedko does this mean that with concurrency of 1 and end-to-end acknowledgments enabled with in-memory buffers along the way, the size of this buffers will always be 1?

jszwedko commented 10 months ago

@jszwedko does this mean that with concurrency of 1 and end-to-end acknowledgments enabled with in-memory buffers along the way, the size of this buffers will always be 1?

No, the buffers will still fill up as normal, they will just egress Vector one request at a time.

mikelsid commented 10 months ago

So the next batch might be sent before the acknowledgement received for the previous one? If so, this might lead to reordering as well, if the retry for the first batch happens after the second batch was written Otherwise, if next batch will only be sent after the ack on the previous one was received, it seems like there only be one batch of events in the buffer at any given time Am I missing something?

jszwedko commented 10 months ago

So the next batch might be sent before the acknowledgement received for the previous one? If so, this might lead to reordering aswell, if the retry for the first batch happens after the second batch was written Otherwise, if next batch will only be sent after the ack on the previous one was received, it seems like there only be one event in the buffer at any given time Am I missing something?

I believe retries are taken into account: that is that the next request won't be sent until the previous one is accepted. There will only be one batch in-flight, but the in-memory buffers can still queue up events.