pingcap / tiflash

The analytical engine for TiDB and TiDB Cloud. Try free: https://tidbcloud.com/free-trial
https://docs.pingcap.com/tidb/stable/tiflash-overview
Apache License 2.0
948 stars 410 forks source link

Exchange partition sender does not respect `records_per_chunk` #2810

Open riteme opened 3 years ago

riteme commented 3 years ago

In StreamingDAGResponseWriter, broadcast sender will split blocks by records_per_chunk but partition sender does not split. It may result in sending a very large chunk in one DAG response. See StreamingDAGResponseWriter::getEncodePartitionTask.

fuzhe1989 commented 3 years ago

cc @windtalker @fzhedu @zanmato1984

windtalker commented 3 years ago

records_per_chunk is used when return data to TiDB, it actually only works when encode type is TypeChunk and TypeDefault, for exchange partition sender, the encode type is always TypeCHBlock, so we can just ignore record_per_chunk.

fuzhe1989 commented 3 years ago

@windtalker do you think record_per_chunk can be applied on TypeCHBlock? is there any potential blocking issue?

riteme commented 3 years ago

https://github.com/pingcap/tics/blob/886d7bc4789d3277770177a6458b174b31090443/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp#L57-L58

@fuzhe1989 Current CHBlockChunkCodec has some limitations. If we want to apply record_per_chunk to TypeCHBlock, we may need to enhance CHBlockChunkCodec first.

windtalker commented 3 years ago

@windtalker do you think record_per_chunk can be applied on TypeCHBlock? is there any potential blocking issue?

Any potential benefit if we enable record_per_chunk for TypeCHBlock?

riteme commented 3 years ago

@windtalker To my understanding, our partition sender shuffles only one block at a time, into several partitioned blocks. Then each partitioned block is encoded in one chunk. So if the original block is not too large, the result partitioned block will not be too large as well.

I'm not sure whether merging small chunks (not blocks) before writing response is beneficial, given that we're using gRPC? If true, we can refactor current code and use records_per_chunk as a threshold.

riteme commented 3 years ago

Related PR: #2474

windtalker commented 3 years ago

@windtalker To my understanding, our partition sender shuffles only one block at a time, into several partitioned blocks. Then each partitioned block is encoded in one chunk. So if the original block is not too large, the result partitioned block will not be too large as well.

I'm not sure whether merging small chunks (not blocks) before writing response is beneficial, given that we're using gRPC? If true, we can refactor current code and use records_per_chunk as a threshold.

For small chunks, we do not need merge them before sending because we already add a merge stage in receiver side. For larger blocks, it will throw error if the block is too large, and it is not the expected result of generating so large block, we need to find out the root cause to avoid generating such a large block.

riteme commented 3 years ago

@windtalker

https://github.com/pingcap/tics/blob/886d7bc4789d3277770177a6458b174b31090443/dbms/src/Flash/Mpp/MPPTunnelSet.cpp#L10-L16

I realized that there are three different concepts: chunk, response (tipb::SelectResponse) and packet (mpp::MPPDataPacket). Only serializing response into packet can fail. Given that all chunks are put in one response, is it useless to split chunks?

windtalker commented 3 years ago

@windtalker

https://github.com/pingcap/tics/blob/886d7bc4789d3277770177a6458b174b31090443/dbms/src/Flash/Mpp/MPPTunnelSet.cpp#L10-L16

I realized that there are three different concepts: chunk, response (tipb::SelectResponse) and packet (mpp::MPPDataPacket). Only serializing response into packet can fail. Given that all chunks are put in one response, is it useless to split chunks?

2474 already makes sure that tipb::SelectResponse is small enough to serialize into packet.

riteme commented 3 years ago

Related PR: #2405

windtalker commented 3 years ago

It is an outdated pr, at that time we do not knonw there is a size restriction when serializing response to packet, and we need to make sure that the updated pr will consider this restriction.