influxdata / influxdb

Scalable datastore for metrics, events, and real-time analytics
https://influxdata.com
Apache License 2.0
28.57k stars 3.53k forks source link

Integrate Arrow to query / read #10116

Open stuartcarnie opened 6 years ago

stuartcarnie commented 6 years ago

This issue addresses the section titled Research best practices when working with Arrow of #10088. As outlined below, Arrow was not a simple drop-in to the existing storage read path. Some exploratory attempts to integrate it to the existing code-generated cursors and arrays failed or raised significant performance concerns. Taking a step back to explore the spec, canonical C++ implementation and the existing Go code has proven fruitful in better understanding the right path. The first section details that research and also includes a high-level review of the TSM encoding and compaction process, to ensure the design was appropriate in order to move this area forward to columnar data structures. See Design / TODOs section below for the list of tasks to be completed.

A complimentary issue and set of TODOs will be prepared for the encoding and compaction work.

Overview

Query and compaction are independent processes that leverage a number of shared APIs to manage the reading, decoding, deletion (tombstones) and merging of TSM and cache (WAL) data. The two process diverge for specific use cases, that will be identified later. Assume these processes are performed per-shard.

For the sake of simplicity, examples will refer to the Float data type, however, any of the other supported data types (Integer, String, etc) could be substituted.

At a high level, the common APIs used by query and compaction are:

The subsequent sections discuss the differences.

Query / Read

At a high level, the query process is encapsulated in a cursor. Concretely, a cursor is

Series keys are typically read in random order, as driven by the query engine.

To fulfill this requirement, a cursor is constructed with the series key to be read, any associated values from the cache1 and a KeyCursor. The KeyCursor is a lower-level construct that produces merged blocks from the TSM files for the shard.

Code generation (AKA "generics") is employed to produce concrete implementations for each supported value type and whether the data should be retrieved in ascending or descending time order. The InfluxQL query engine and storage/read services uses alternate implementations, which have significantly different performance characteristics. The focus will remain on the newer columnar implementations, which have been primed for moving to Arrow.

A query may span multiple shards and therefore the process will create cursors for each shard in order to aggregate or filter the data.

Arrow Challenges

As data flows between these layers, existing FloatArray blocks are mutated via the FloatArray.Include or FloatArray.Exclude APIs. In some cases, new arrays are allocated via the FloatArray.Merge API. These allocations occur in isolation due to no shared state or context, making the process of tracking resource usage impractical; a requirement for 2.0. This process and lack of shared context is particularly problematic for Arrow, which generally considers arrays immutable and requires access to a memory.Allocator for building new arrays. It is worth noting that immutability may facilitate caching in the future.

Compaction

The equivalent merge operation for compaction is implemented by the tsmKeyIterator. The tsmKeyIterator is

A notable difference from the query process is data is read sequentially.

Arrow Challenges

Similar to the KeyCursor used by the query process, the tsmKeyIterator leverages the equivalent Include, Exclude and Merge APIs during the merge process, however, again the lack of shared state or context presents challenges for Arrow.

NOTE: The compaction service has not been updated to use the columnar decoders, which is out of scope for the goal of supporting columnar access patterns through the storage/read service. There is also additional work required to review and implement columnar encoders and refactor tsmKeyIterator and the Cache to use the new columnar data structures.

Summary

Given the differences between the query and compaction processes, it may not be possible or practical to unify the entire LSM Tree merge operations. It is necessary that FloatArray and related types be altered to utilize a shared context in order to provide an Arrow memory.Allocator and facilitate tracking metrics in the future.

Design / TODOs

Although context.Context was considered, it is not appropriate in this scenario, as the shared state is required. A new data type will be created as a container for the shared state and made available through a significant portion of the read process. At a minimum, it will provide access to an Arrow memory.Allocator and later extended to supply various metrics / counters and resource tracking.

Much of the existing code uses code-generation to produce type-specific cursors and arrays for each of the supported types, namely Integer, Float, Unsigned, String and Boolean. These are currently represented as simple Go slices, so the templated code to mutate and create new blocks is easily shared. Updating the fixed-width numeric types should present few challenges, as they map natively to the memory layout of the underlying architecture and Go slices. The packed types (String and Boolean) can no longer share this generic code and therefore will require the most changes. Additionally, whilst Arrow arrays don't require the validity vectors (used to track the positions of null values in an array) and nor does InfluxDB, the builders always create them. To avoid the overhead for manually building the fixed-width types, we'll simply create the arrays directly from the decoded blocks. We may also experiment with creating an additional set of builder types that do not create the validity vectors, if that permits more reuse in the templates.

Create execute state / context

Update following to accept / use context

Update <Type>Array types

This collection of types represent the time / value blocks. They require some significant changes to be a container for Arrow builders and arrays vs the current implementation, which are simple Go slices.

Float, Unsigned, Integer are fixed-width, thus their in-memory representation matches a native slice, therefore they should be relatively easy to implement.

String and Boolean are densely packed in Arrow, so these are not a drop in replacement. Some experimentation will be required to determine best / most efficient approach for implementing Include, Exclude and Merge APIs. Downstream filter query cursors will also require changes, when mutating these blocks.

Update fixed-width <Type>Array types:

Update packed StringArray type:

Update packed BooleanArray type:

Update cursors to use Arrow arrays

KeyCursor

Typed ascending / descending cursors

These implement the TSM + Cache LSM Tree merge operation

Update fixed-width cursor types:

Similarly to the <Type>Array, the fixed-width cursors (Integer, Float and Unsigned) should be relatively easy to update, due to the Arrow arrays layout in memory matching Go slices.

Update packed string cursor

Update packed boolean cursor

Update query cursors

These cursors are implemented in the storage/read package.

Update aggregate cursors

These cursors perform aggregations for the numeric cursor types (Integer, Float and Unsigned) and will be easy to update.

Update filter cursors

This is another cursor that mutates incoming data, and will likely required differences between the fixed-width vs packed data types. By this point, I expect patterns to be established for efficiently mutating both formats.

Switch query cursors to public API

Move these cursors to a public-facing API, so they can be reused directly in Cloud 2.0.

NOTE: Additional time should be scheduled increase the reusability of the OSS storage service APIs, to avoid the constant duplication of work between OSS and Cloud 2.0.

nathanielc commented 6 years ago

@stuartcarnie Looks like you have a clear picture of what needs to be done. My only question is regarding the OSS and Cloud 2.0 duplication. Does it make sense to try and unify those code paths before or after the Arrow work? My understanding is that there is currently divergence between the two code bases because of the batch decoding work. And the Arrow work is going to change things more than we expected. In that light it makes sense to get Arrow fully integrated in OSS and then once that code is stable, decide how to incorporate that code in the cloud 2.0 bits. Does that make sense? Or are their significant requirements that 2.0 places on the code that we need to consider upfront?

stuartcarnie commented 6 years ago

@nathaniel, the divergence occurred from the outset, when the read RPC was added, to get things working quickly. I agree it is time to put some effort into sharing as much of the code as possible.

We can chat tomorrow to decide on scheduling. My gut says do that work now, so that InfluxDB updates don’t impede other work

stale[bot] commented 4 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.