jaegertracing / jaeger

CNCF Jaeger, a Distributed Tracing Platform
https://www.jaegertracing.io/
Apache License 2.0
20.35k stars 2.43k forks source link

Storage API to support batch writing #1696

Open burmanm opened 5 years ago

burmanm commented 5 years ago

Requirement - what kind of business use case are you trying to solve?

Allow storage backends to write faster by using batching. Currently, there are some open tickets because ES can't necessarily keep up with the writes and how to configure Jaeger for correct queue sizes.

However, when emptying the queues, the writes happen in a single transaction (in storage layers perspective) by calling WriteSpan(*model.Span). If instead there would be a method that could allow writing multiple elements at the same time, such as WriteBatch([]*model.Span) certain backends could benefit from this by improving the writing speed.

In ES this could mean using "bulk API", in badger backend this would allow writing more keys during the same transaction and even Cassandra backend can use batched writes if multiple spans would target the same hash range.

This would obviously create a somewhat more complicated setup between "workers", "queue size" and acceptable "batch size". But I believe most backends would still easily benefit from this and setting batch size to 1 would equal current model in any case.

Problem - what in Jaeger blocks you from solving the requirement?

Input from others. Am I missing something obvious that would prevent doing this? Or are there better approaches to exploiting the same batched benefits?

yurishkuro commented 5 years ago

This would obviously create a somewhat more complicated setup

Yes, this sounds like a significant refactoring of the collector and/or injester pipeline. With Cassandra that may actually be counterproductive, since the shard is determined by span.traceID, but a batch from a queue is likely to contain spans from different traces, so instead of going directly to the correct shard (when using token-aware LB in gocql) the whole batch would have to go to a coordinator node first. However, this is mostly a speculation, it would be interesting to benchmark.

However, if the pipeline is refactored to be more batch-oriented, then it could address both batch writes per this ticket and still support current behavior with batchSize=1 parameter.

burmanm commented 5 years ago

I'm not entirely sure how gocql handles the connections, but at least the datastax's java-driver connected to multiple nodes and as such allowed behavior where you wouldn't have to transfer data through the coordinator node always. In the hawkular-metrics we used this with the following code when inserting large amount of data:

    /*
     * Applies micro-batching capabilities by taking advantage of token ranges in the Cassandra
     */
    private Observable.Transformer<BoundStatement, Integer> applyMicroBatching() {
        return tObservable -> tObservable
                .groupBy(b -> {
                    ByteBuffer routingKey = b.getRoutingKey(ProtocolVersion.NEWEST_SUPPORTED,
                            codecRegistry);
                    Token token = metadata.newToken(routingKey);
                    for (TokenRange tokenRange : session.getCluster().getMetadata().getTokenRanges()) {
                        if (tokenRange.contains(token)) {
                            return tokenRange;
                        }
                    }
                    log.warn("Unable to find any Cassandra node to insert token " + token.toString());
                    return session.getCluster().getMetadata().getTokenRanges().iterator().next();
                })
                .flatMap(g -> g.compose(new BoundBatchStatementTransformer()))
                .flatMap(batch -> rxSession
                        .execute(batch)
                        .compose(applyInsertRetryPolicy())
                        .map(resultSet -> batch.size())
                );
    }

In short, it takes in a batch of inserts and then builds smaller groups of those inserts, each targeting a certain token range (which can then be sent to the correct node directly). We observed around 50% increased insert speed on our workloads (which is still quite similar to what Jaeger does, time series in any case).

I think there was also some conference talk in older Cassandra/Datastax conferences. I also do remember doing profiling to Cassandra and there was a noticeable difference in where the time is spent.

Obviously, it could be that the java-driver is more advanced in this context and things don't work the same way for this workload with gocql.

yurishkuro commented 5 years ago

right, if you do your own batching based on the knowledge of the tokens distribution then it won't require coordinator node. I don't know if gocql exposes that. There are also bugs in gocql related to token-aware polity where it drops cluster gossip updates.

I kinda expect the benefits to be marginal in this case, since on a sizeable C cluster the spans from a single batch may be fairly distributed. It would be interesting to run tests on real prod data to validate that.

Anyway, I am not saying that this is a bad idea, and would probably help a lot with other storage engines.

guo0693 commented 5 years ago

Not sure if this is the right place, a little bit side track, can we use cassandra batch writes for the indexer updates in cassandra spanstore?

Currently 1 span will be indexed multiple times, feel like the batch operation can benefit us a lot given the number of span writes per second we have right now.

Sasasu commented 5 years ago

I believe WriteBatch([]*model.Span, BatchWriteOption) is batter, if the Writer interface has an options parameter, I can add more meta information to customer storage without modifying the Writer interface. Also for the queue in collector. Also for grpc storage plugin, use reserved keyword to tell others which field can be used.