trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.18k stars 2.93k forks source link

[Swift] Spooled client protocol extension #22662

Closed wendigo closed 5 days ago

wendigo commented 2 months ago

Spooled Trino protocol (protocol v1+)

[!IMPORTANT] This document is a design proposal and can change in the future. This is part of the ongoing effort to improve performance of the existing client protocol (https://github.com/trinodb/trino/issues/22271)

Background

The existing Trino client protocol is over 10 years old and served different purpose upon its inception than the use cases that we want to support today. These new requirements include:

To address aforementioned requirements, we are going to introduce an extension to the existing protocol called spooled protocol, which extend its semantics in a backward-compatible fashion.

Existing client protocol

The current protocol consists of two endpoints that are used to submit queries and retrieve partial results:

Both endpoints share the same QueryResults definition:

{
  "id": "20160128_214710_00012_rk68b",
  "infoUri": "http://coordinator/query.html?20160128_214710_00012_rk68b",
  "nextUri": null,
  "columns": [
    {
      "name": "_col0",
      "type": "bigint",
      "typeSignature": {
        "rawType": "bigint",
        "arguments": []
      }
    }
  ],
  "data": [[1],[5]]
  ...
}

[!NOTE] Some of the query results fields were omitted for brevity.

The important fields related to result set retrieval are:

Spooled protocol extension

To express the partial result sets having a different encoding, spooled protocol extension is introduced, which contains backward and forward-compatible changes to the existing protocol.

These changes are:

Trino-Query-Data-Encoding

The header is used by the client libraries to use the new spooled protocol extension and contains the list of encoding in the order of preference (comma separated). If any of the encodings is supported by the server, the client can expect the QueryResults.data to be returned in a new format. If it's not supported, server fallbacks to the existing behaviour for compatibility.

[!NOTE] Once the encoding is negotiated between the client and the server, it stays the same for the duration of the query which means that client can expect results in a single format - either existing one or extended.

EncodedQueryData

EncodedQueryData is the extension of the QueryResults.data field that has the following on-the-wire representation:

{
  "id": "20160128_214710_00012_rk68b",
  "infoUri": "http://coordinator/query.html?20160128_214710_00012_rk68b",
  "nextUri": null,
  "columns": [...],
  "data": {
    "encodingId": "json",
    "segments": [
      {
        "type": "inline",
        "data": "c3VwZXI=",
        "metadata": {
          "rowOffset": 0,
          "rowsCount": 100,
          "segmentSize": 5
        }
      },
      {
        "type": "spooled",
        "uri": "http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/2",
        "headers": {
            "x-amz-server-side​-encryption​-customer-key": ["key"],
            "x-amz-server-side​-encryption​-customer-algorithm": ["AES256"]
        },
        "metadata": {
          "rowOffset": 200,
          "rowsCount": 100,
          "segmentSize": 1024
        }
      }
    ]
  }
}

Meaning of the fields is following:

DataSegment

DataSegment is a representation of the encoded data and has two distinct types: inline and spooled with following semantics:

[!NOTE] Data segments are returned in the order specified by the query semantics. Data is spooled when client library requests next batch of result set data by following nextUri. It's up to the client library to decide whether all segments will be spooled at once, buffered and then read.

[!IMPORTANT] In order to support the spooled protocol, client implementations need to support both inline and spooled representations as server can use these interchangeably.

[!CAUTION] Client implementation must not send any additional information when retrieving spooled data segment, particularly the authentication headers used to authenticate to a Trino.

DataSegment contains a metadata field of type Map<String, Object> with attributes describing a data segment.

Following metadata attributes are always present:

Optional metadata attributes can be a part of the encoding definition shared between the client and server implementations (to be discussed).

Implementation considerations

Compatibility

The implementation of this design must to take into the consideration backward and forward compatibility which means two things:

As this is an extension of the protocol, it's an opt-in feature that requires a new client and a new server with additional configuration (spooling storage).

Encodings

Encoding describes the serialization format (like JSON) and other information required to both write (encode) and read (decode) result set as data segments. Example of encodings are:

Definition and meaning of the encoding is a contract between client and the server and will be specified separately in the future for each encoding.

Spooling

Spooling by definition means buffering data in a separate location by the server and retrieving it later by the client. In order to support spooled protocol extension servers (coordinator and workers) are required to be configured to use the spooling. In the initial implementation we plan to add a plugin SPI for the SpoolingManager and implementation based on the native file-system API. SpoolingManager is responsible for both storing and retrieving spooled data segments from the external storage.

Segments

It's up to the server implementation whether the partial result set will be inlined, spooled, either or both. It's required that client requesting a spooled protocol extensions supports both types of the data segments.

Plugabillity

An encoding is a shared definition between the client and the server, therefore it can't be a Plugin SPI interface. In the initial implementation, we do not plan to provide an external SPI for adding new encodings. These will be shipped as part of the implementation of the client and server libraries. Spooling process on the server-side is pluggable with the new SpoolingManager SPI.

Backward compatibility

Above and for all, the new spooled protocol can't break existing one as it's an opt-in client/server feature. At any given moment, the client or the server can fall back to the original, unmodified v1 protocol to maintain backward compatibility.

Performance

For small result sets, spooling on the storage adds an additional overhead which can be avoided by inlining the data in the response. For larger ones, spooling allows faster, parallel data retrieval as the result set can be fully spooled and query finished as fast as possible and then retrieved, decoded and processed by the client after the query has already completed.

We plan to implement spooling on the worker side which means that when the result set is partially or fully spooled, coordinator node is not involved in encoding of the data to a requested format and spooling it in the storage. According to our benchmarks, JSON encoding in the existing format accounts for significant amount of the CPU time during output processing. Initially we plan to support existing JSON format but the encoding will be moved to worker nodes which will reduce the load on the coordinator.

Security

Protocol supports server-side encryption of data. We've settled on server-side encryption, rather than client-side due to the following reasons:

Proposed API/SPI

Server-side

public interface QueryDataEncoder
{
    DataAttributes encodeTo(OutputStream stream, List<Page> pages)
            throws IOException; // returns segment-level attributes

    String encodingId(); // encoding id

    DataAttributes attributes(); // query-level attributes (like encryption key)

    interface Factory
    {
        QueryDataEncoder create(Session session, List<OutputColumn> columns);

        String encodingId();
    }
}

Used to encode the data in the specified output format.

// Implementation specific identifier
public interface SpooledSegmentHandle
{
}

public interface SpoolingManager
{
    SpooledSegmentHandle create(SpoolingContext spoolingContext);

    OutputStream createOutputStream(SpooledSegmentHandle segmentHandle)
            throws Exception;

    InputStream unspool(SpooledSegmentHandle segmentHandle)
            throws IOException;

    default void acknowledge(SpooledSegmentHandle segmentHandle)
            throws IOException
    {
    }

    default Optional<URI> directLocation(SpooledSegmentHandle segmentHandle)
    {
        return Optional.empty();
    }

    default Slice serialize(SpooledSegmentHandle handle)
    {
        throw new UnsupportedOperationException();
    }

    default SpooledSegmentHandle deserialize(Slice slice)
    {
        throw new UnsupportedOperationException();
    }

    default Map<String, String> headers(SpooledSegmentHandle deserialize)
    {
        return Map.of();
    }
}

Used to manage, spool and unspool data segments.

Client-side

public interface QueryDataDecoder
{
    Iterable<List<Object>> decode(InputStream inputStream, DataAttributes segmentAttributes)
            throws IOException;

    String encodingId();

    interface Factory
    {
        QueryDataDecoder create(List<Column> columns, DataAttributes queryAttributes);

        String encodingId();
    }
}

Used to decode the data segment.

sajjoseph commented 2 months ago

Really excited about this enhancement. Does this arrangement support retries for failed requests/corrupted data - especially when we can retrieve data in parallel?

wendigo commented 2 months ago

Yes. It will be possible to retrieve the same segment couple of times

sajjoseph commented 1 month ago

Thanks for the update. Few more thoughts.

  1. Spooled data transfer looks like a communication between client (cli/jdbc/odbc/python...) and Trino cluster. There is no opportunity to introduce independent layers (proxies, smart caching between client and coordinator etc). If we keep the URLs used (including dataUri in spooled segment) in HTTP headers, the proxies and other agents can process that data without decoding the entire http response payload.

  2. v1 protocol transmits metadata in each data chunk being transmitted. Will it continue to happen in spooled version (EncodedQueryData) as well? Is there any opportunity to improve that area?

  3. Please elaborate on the parallel data transfer aspect and how it works. I understood that we are offloading the data preparation from coordinator to workers. But, we are still sending data in a serial fashion (one at a time) from coordinator to client - right? How about coordinator share a list of nextUris with the client and the client retrieves them in parallel?

  4. Hope we can include msgpack as an encoding option for data transfer (assuming that we get better compression than other mechanisms).

Below are some ideas that I thought could benefit client/coordinator communication.

  1. Coordinator and client could establish/negotiate the targetResultSize value (and periodically reevaluate during transfer) while complex queries are running in workers (a possible solution is to send some dummy data - similar to iperf3 - from coordinator to client to determine the transmission rate).

  2. Move nextUri, partialCancelUri, infoUri to HTTP Header area. At present, only clients can see these header values and that too after decoding the data chunk. If we move these to the header fields, we can implement performance improvements where we can download the binary data as is (probably by a proxy node) and deliver to the actual client at the rate it can consume. The advantage is that Trino cluster will not have to hold the data in memory and operate at clients retrieval/consumption rate. Instead, we can offload data to the client proxy node (or in client itself if it is capable of storing/caching the binary data somewhere) at a faster rate. We definitely need to consider security and avoid man-in-the-middle attacks. But, a smart caching/data offloading mechanism can improve end user experience.

  3. Trino Cluster identifier mechanism - when there are multiple clusters behind Trino-gateway like arrangement, it will be great if we can add some kind of identifier to the nextUrl (only Trino-gateway/query routing layer/coordinator will understand/decode this value). In my private fork, I implemented this feature and it is working well. It is backward compatible with all client libraries. (This doesn't have to be a v1+ feature as it can be implemented in Trino coordinator)

Thoughts?

nickalexander053 commented 1 month ago

Would be interested at looking at client implementation that utilise this. Specially for distributed ML frameworks that need to load a lot of data into distributed workers. Of interest would be a spark connector and a client for Ray.

wendigo commented 1 month ago

@sajjoseph

  1. This is correct. In order to drive adoption, we've decided to make it easy for client integrations. We've taken into the account a need for query result set caching and this is possible to implement with proposed changes. We do not want to move the data segment URIs to headers though.
  2. This will be a separate change that will allow for disabling columns and/or stats retransmission.
  3. Data is encoded and spooled on the worker nodes. Coordinator instead of getting an actual data, gets the list of spooled locations (basically URIs from the client perspective). Client instead of getting an actual data, gets these URIs that points to spooled data. In a single request to a nextURI, client can get multiple segments from multiple workers (it depends only on how fast we can spool data on the worker node). The result set target doesn't apply here - we've added a separate "target segment size" to a spooling configuration that sets a target size for a spooled segment (right now it's 16 MB). Usually, you will get multiple segment URIs at once, which makes the total amount of data available for reading much higher than current 1 MB target result size. Since the spooled segment is just an URI and you can get multiple of these, you can read them in parallel, you can even buffer all of the segment URIs until the query if finished and then read all of the segments in parallel at once - there is no limitation here.
  4. I don't think that we will do msgpack but I agree that the JSON is not the best option as it costs a lot (in our benchmarks, the 90% of the CPU used on the spooling operator is the actual encoding of the data). We are still considering options for the columnar encoding of the data that we will introduce next.

Other ideas

  1. Target result size doesn't apply here. There is a "target segment size" that controls how big the spooled segment will be, this is set to 16 MB by default. This is only to avoid having too many small segments or too big ones (as you will read them to memory as a whole). When you fetch the nextURI you will usually get multiple segments from multiple workers and it's usually limited by how fast workers can spool. Instead of getting a 1 MB targetResultSize you will get multiplier of 16 MB.
  2. This is out of scope for this change. We don't want to mess with the protocol too much as it will make client implementations harder. Since the data is now "externalized" and you only get a "pointer" to the data (segment URI) the responses to nextURIs will be usually much smaller than today (KBs rather than MBs). You can assume that the proxy will be able to read the whole response to extract required informations like nextURI or cancelURI.
  3. Isn't the X-Forwarded-Path what you are looking for?
wendigo commented 1 month ago

@nickalexander053 this is one of the use-cases we have in mind :)

sajjoseph commented 1 month ago

Thanks @wendigo for the details and I really appreciate the quick response. This makes sense.

For #3 in the first list (Data is encoded and spooled on the worker nodes...), is there an opportunity to offload spooled data to external places than worker nodes? Client shouldn't care where it gets the data from and this could introduce offline execution (clients submit queries and go away and later will be notified of data availability and clients can come and pick data).

For #3 from second list (Trino Cluster identifier mechanism), did you mean X-Forwarded-Prefix? One challenge with X-Forwarded-Prefix is that clients need to be modified to take advantage of this feature. My original suggestion is backward compatible and so it will work for all clients (ODBC, JDBC, CLI, Python etc). I already implemented this in our fork and we are using it for zero downtime deployments.

wendigo commented 1 month ago

@sajjoseph the spooling manager is a pluggable SPI. One of the implementations is based on native filesystem APIs which allow for spooling to s3/gcs/abfs out of the box.

Please file an issue for the second item. We will discuss it with the trino-gateway group.

wendigo commented 1 month ago

@sajjoseph "Encoding and spooling happens on worker nodes" means that worker nodes are spooling to the external storage :)

sajjoseph commented 1 month ago

Thanks @wendigo for the updates. I am super excited about the possibilities and how this feature can improve customer experience. Thank you!

About the cluster identifier feature, I will file an issue later.

martint commented 1 month ago

Good stuff @wendigo!

A couple of questions:

And a comment:

wendigo commented 1 month ago

@martint there is an explicit http DELETE that clients can do in order to remove a segment if it's no longer needed. Beside that, we want to implement a configurable TTL for segments that will render them useless when they are expired (client won't be able to retrieve them anymore) and the GC process that will cleanup expired segments from the storage.

The data is spooled only if the nextUri is called by the client - the difference is that coordinator is not getting an actual page of data but the uri to the data. If client has abandoned calling nextUri - spooling won't progress.

As for the latency part, spooled extension allows both inline and spooled segments so it's up to the engine whether the data should be inlined or spooled. The current spooling operator implementation has a SpoolingController that tries to decide when to switch from streaming of the inlined data to a spooling (it's based on the configurable thresholds - number of initial rows/size of the initial pages) so that initial couple of hundreds of rows will be inlined, but once the threshold is reached, data will be spooled and client will get URIs instead of inline data.

I was thinking that the client implementation could actually hint the engine whether it prefers the throughput of the data transfer over initial latency.

wendigo commented 1 month ago

@martint here's how we plugged it into the execution on the high level:

Canvas 1

martint commented 1 month ago

In the case when the SpoolingOperator is spooling, how does pushback happen? Today, it's based on task output buffers filling up.

wendigo commented 1 month ago

@martint right now the spooling operator holds only a single spooled segment that wasn't consumed by the coordinator (and thus client). It won't request/accept more pages until client retrieves the segment so it's similar to being blocked while having a buffer full but buffer is represented as the identifier of the spooled segment rather than actual data in memory.

martint commented 1 month ago

How does it know it hasn't been consumed by the coordinator? The coordinator interacts with the Task Output Buffer. The Driver will attempt to fetch from the tip of the operator pipeline as long as the buffer is not full. Does this require that the output buffer understand the internals of these special pages produced by the spooling operator?

wendigo commented 1 month ago

@martint I see what you mean now. I think that we would need to improve that area

electrum commented 1 month ago

There is no pushback for spooling. We assume that storage is effectively infinite, or large enough to store the entire result set. This is true for the expected use case of object storage for spooling.

Requiring that clients consume the results in a specific order makes this more difficult to use and can result in deadlocks. This doesn't seem useful given that storage is typically unlimited.

Consider using this with a system like MapReduce or Spark where the URIs are scheduled independently as worker tasks. There are no guarantees as to when those tasks will run, and these systems often need all the URIs up front.

We could implement a best effort limit on the total size (IIRC we have something similar for scan size) if needed. You can think of this as similar to CTAS, which writes the full result up front and doesn't have any limit on size.

sopel39 commented 1 month ago

@wendigo is there any PoC branch with the new approach or it's still in design phase?

wendigo commented 1 month ago

@sopel39 not yet, why are you asking? :)

wendigo commented 5 days ago

The inital work is done. We will now follow-up with better spooling and some experiments around formats