The current implementation of ESQL may result in higher intra-transfers compared to the _search API. Below are some scenarios in which higher intra-transfers can occur:
TopN: The _search API fetches and transfers up to N documents between nodes. However, in ESQL, each pipeline on each data node can fetch and put N documents to the exchange sink. As a result, the total number of documents transferred between nodes can reach up to D 3/2 CPUs N documents, where D is the number of data nodes participating in the computation. To address this issue, we can place a TopN on each data node before pushing documents to the exchange sink, limiting the number of documents transferred to D N. To fully resolve the problem, we need to introduce remote field extractors and fetch documents only after applying a global TopN, similar to the _search API.
Limit: Similar to TopN, but the Limit operator can terminate the computation on data nodes once the limit is reached.
Aggregation: Currently, we transfer the intermediate states of partial aggregation for each Lucene slice between nodes, whereas the _search API performs this per shard. With the doc or segment data partitioning, a shard can be divided into many Lucene slices. To reduce intra-transfers, we can introduce a partial-partial aggregation step before pushing pages to the exchange sink. This would group the partial state per node, potentially resulting in lower intra-transfers than the _search API. Additionally, we should consider capping the number of groups in grouping aggregation, as the _search API does. This can be especially important in cases where the state of each group is large (e.g., count_distinct) or the keys are large (composited keywords).
In addition to these, we should explore improving the serialization of Blocks.
The current implementation of ESQL may result in higher intra-transfers compared to the _search API. Below are some scenarios in which higher intra-transfers can occur:
TopN: The _search API fetches and transfers up to N documents between nodes. However, in ESQL, each pipeline on each data node can fetch and put N documents to the exchange sink. As a result, the total number of documents transferred between nodes can reach up to D 3/2 CPUs N documents, where D is the number of data nodes participating in the computation. To address this issue, we can place a TopN on each data node before pushing documents to the exchange sink, limiting the number of documents transferred to D N. To fully resolve the problem, we need to introduce remote field extractors and fetch documents only after applying a global TopN, similar to the _search API.
Limit: Similar to TopN, but the Limit operator can terminate the computation on data nodes once the limit is reached.
Aggregation: Currently, we transfer the intermediate states of partial aggregation for each Lucene slice between nodes, whereas the _search API performs this per shard. With the doc or segment data partitioning, a shard can be divided into many Lucene slices. To reduce intra-transfers, we can introduce a partial-partial aggregation step before pushing pages to the exchange sink. This would group the partial state per node, potentially resulting in lower intra-transfers than the _search API. Additionally, we should consider capping the number of groups in grouping aggregation, as the _search API does. This can be especially important in cases where the state of each group is large (e.g., count_distinct) or the keys are large (composited keywords).
In addition to these, we should explore improving the serialization of Blocks.