ByConity / ByConity

ByConity is an open source cloud data warehouse
https://byconity.github.io/
Apache License 2.0
2.24k stars 332 forks source link

Multi-tenancy Support #731

Open lazywei opened 1 year ago

lazywei commented 1 year ago

Question

ClickHouse's guidance on the number of table (ReplicatedMergeTree) inside a single database is a few hundreds https://kb.altinity.com/altinity-kb-schema-design/how-much-is-too-much/#number-of-tables-system-wide-across-all-databases

That's why when we are avoiding creating a table for each tenants when consider multi-tenancy on ClickHouse. Instead, we were putting all tenants' data into the same table.

We are curious to see if ByConity also has such limitation. Or is it fine to have a few thousands tables, with similar structure, with concurrent inserts to multiple of them, at the same time? If so, having each tenant using their own table seems to be much easier to maintain and implement resource isolation.

I tried looking into the doc but couldn't find any place referring to this. Would appreciate with any help. Thanks!

dmthuc commented 1 year ago

Hi @lazywei , we do not have that limitation. The write will usually forward to writer worker. So no limit on number of concurrent insert as long as you have enough number of workers

lazywei commented 1 year ago

@dmthuc appreciate the answer (sorry i've been asking a lot πŸ˜„ )

So that means if we have enough workers we can create as many tables as we want because they won't affect each other, except sharing the workers' resources? Is there other bottleneck like the FDB, TSO, or the byconity-server?

One thing we noticed is when we tried to insert at the rate of 60M rows/hr, the ingestion and the query performance is OK. The number of parts in a partition is around 150 (with daily partition). However, if we ramp up the ingestion volume to like 2B rows/hr we start to see some degradation, like parts in a partition rises to 3000, and the byconity-server is continuously restarted. When this happens is it because the server has no enough resources or is it because the workers are not able to keep up? (The reason I'm asking is because the worker doesn't seem to use too much resources and it's not OOM either, but based on the document it seems it's the workers who actually do all the works?)

We are trying to understand what is the right way to scale up each component. Currently we have 2 servers, each with 8CPU / 16G ram (c6i.2xlarge on AWS), and 3 write workers, 3 read workers, both 8CPU / 16G ram. Does that sound reasonable for 2B rows/hr volume? If not, should we increase the workers count or the servers count?

Thank you!

dmthuc commented 1 year ago

Hi @lazywei , thank you for trying ByConity. The FDB can scale independently so it wont be a bottle neck. TSO won't be bottle neck either. May I know which method you use to perform insert? Some insert method like insert from client or insert VALUES will be perform in servers

lazywei commented 1 year ago

Thanks! Our ingestion method is using HTTP interface as described here: https://github.com/ByConity/ByConity/issues/722#issuecomment-1732676814.

We don't necessarily need to do this approach though. So if there is a more performant way to do ingestion or better utilize all the workers, we are happy to adjust our ingestion method.

Thanks!

dmthuc commented 1 year ago

Hi @lazywei , except insert value or insert from client, other insert query( insert select and insert in file with file from remote disk) will be executed in worker. And Insert value and insert from client can send directly to worker too.

dmthuc commented 1 year ago

In near future insert select from remote function with remote address is clickhouse community should work too.

dmthuc commented 1 year ago

Hi @lazywei , may i know which use case of data import you are doing. Are you trying to import data from Clickhouse community to ByConity?

lazywei commented 1 year ago

Got it, thanks @dmthuc. Our use cases are like this:

So the "insert select" and "insert in file" probably won't work for us. Do we need something like kafka table to make the ingestion more performant? Is there a difference between inserting on worker and inserting on servers?

When we insert the files at the rate of 500M-1B rows per hour, we've seen the parts stay around 200-500 and not sure if that's healthy or not; if we further increase the ingestion rate, the # parts will become larger and we will likely get OOM.

However, the workers at the same time don't seem to use a lot of memory or CPU. So we're not sure if this is due to under-utilized workers. We've enabled the following on our table

enable_addition_bg_task = 1, cnch_merge_enable_batch_select = 1, cnch_merge_pick_worker_algo = 'RoundRobin'

But not sure if there are other settings we can tune, or we should switch to a different ingestion method, like using Kafka or something?


Like this, our workers and servers are not using a lot of CPU / RAM, but the parts are still quite high

image

β”Œβ”€table─┬─partition_id─┬─partition─┬─first_partition─┬─total_parts_number─┬─total_parts_size─┬─total_rows_count─┬─last_update_time─┐
β”‚ logs  β”‚ 20230924     β”‚ 20230924  β”‚ 20230924        β”‚                494 β”‚     210777539531 β”‚       7126679732 β”‚       1695622474 β”‚
β”‚ logs  β”‚ 20230925     β”‚ 20230925  β”‚ 20230925        β”‚                248 β”‚      76835314011 β”‚       2613329885 β”‚       1695622474 β”‚
lazywei commented 1 year ago

Hi @dmthuc,

I wonder if there is way to tell if the cluster is healthy or not so we know whether we need to slow down ingestion etc. For example, in ClickHouse the number of parts in a partition is a good indicator, and when the number of parts is too high we will get degraded READ and WRITE performance.

I wonder if that also applies to ByConity, and if so, is there any guidance on what is the reasonable number of parts in a table? Like 500, 1000, or less like 200?

We've started to ingest larger volume into ByConity, and one thing we found is the # of parts increase quite quick

β”Œβ”€table───┬─partition─┬─total_parts_number─┬─Size───────┬─Rows─────────┬─avgSize────┬─toDateTime(last_update_time)─┐
β”‚ ed_logs β”‚ 20230928  β”‚                206 β”‚ 78.76 GiB  β”‚ 1.70 billion β”‚ 391.50 MiB β”‚          2023-09-29 05:21:51 β”‚
β”‚ ed_logs β”‚ 20230929  β”‚               1090 β”‚ 390.55 GiB β”‚ 8.03 billion β”‚ 366.91 MiB β”‚          2023-09-29 05:21:51 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Given our data volume (~2B rows / hr), is this part count OK or do we need to slow down or provision more workers?

The reason I'm asking is because it seems even though the parts count is high, the workers' CPU & RAM is relatively low. It doesn't seem like ByConity needs more provisioning.

Also, when I try to run adhoc merge, it seems ByConity doesn't think it needs to merge either

There is no need to merge parts according to merge selector algorithm: dance

Full trace:

OPTIMIZE TABLE ed.ed_logs

Query id: 4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35

[byconity-server-0] 2023.09.29 05:28:14.549686 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Debug> executeQuery: target server is :0 and local rpc port is 8124
[byconity-server-0] 2023.09.29 05:28:14.550902 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Debug> TransactionCoordinator: Created txn txn_id: 444587126162980865 commit_ts: 0 status: Running priority: high location: "10.64.232.185:8124" initiator: "Server" type: Implicit

[byconity-server-0] 2023.09.29 05:28:14.550950 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Debug> executeQuery: (from [::1]:37572) optimize table ed.ed_logs
[byconity-server-0] 2023.09.29 05:28:14.551072 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Trace> ContextAccess (default): Access granted: OPTIMIZE ON ed.ed_logs
[byconity-server-0] 2023.09.29 05:28:14.551630 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Debug> DatabaseCnch (ed): Create database ed in query 
[byconity-server-0] 2023.09.29 05:28:14.567766 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Trace> Catalog: ed.ed_logs (ed20b56d-fb08-4eca-9d68-9c8a1f36f524) Start handle intermediate parts. Total number of parts is 34491, timestamp: 444587126162980866
[byconity-server-0] 2023.09.29 05:28:14.576196 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Trace> Catalog: ed.ed_logs (ed20b56d-fb08-4eca-9d68-9c8a1f36f524) Finish handle intermediate parts. Total number of parts is 34390, timestamp: 444587126162980866
[byconity-server-0] 2023.09.29 05:28:14.576225 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Debug> Catalog: Elapsed 22ms to get 34390 parts in 1 partitions for table : ed.ed_logs (ed20b56d-fb08-4eca-9d68-9c8a1f36f524) , source : PartCache, ts : 444587126162980866
[byconity-server-0] 2023.09.29 05:28:14.599678 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Debug> ed.ed_logs (ed20b56d-fb08-4eca-9d68-9c8a1f36f524)(MergeMutateThread): There is no need to merge parts according to merge selector algorithm: dance
[byconity-server-0] 2023.09.29 05:28:14.599756 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Debug> ed.ed_logs (ed20b56d-fb08-4eca-9d68-9c8a1f36f524)(MergeMutateThread): triggerPartMerge(): Selected 0 groups from 1050 parts.
[byconity-server-0] 2023.09.29 05:28:14.607480 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Trace> Catalog: ed.ed_logs (ed20b56d-fb08-4eca-9d68-9c8a1f36f524) Start handle intermediate parts. Total number of parts is 7084, timestamp: 444587126176088064
[byconity-server-0] 2023.09.29 05:28:14.609181 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Trace> Catalog: ed.ed_logs (ed20b56d-fb08-4eca-9d68-9c8a1f36f524) Finish handle intermediate parts. Total number of parts is 7033, timestamp: 444587126176088064
[byconity-server-0] 2023.09.29 05:28:14.609195 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Debug> Catalog: Elapsed 5ms to get 7033 parts in 1 partitions for table : ed.ed_logs (ed20b56d-fb08-4eca-9d68-9c8a1f36f524) , source : PartCache, ts : 444587126176088064
[byconity-server-0] 2023.09.29 05:28:14.612241 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Debug> ed.ed_logs (ed20b56d-fb08-4eca-9d68-9c8a1f36f524)(MergeMutateThread): There is no need to merge parts according to merge selector algorithm: dance
[byconity-server-0] 2023.09.29 05:28:14.612282 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Debug> ed.ed_logs (ed20b56d-fb08-4eca-9d68-9c8a1f36f524)(MergeMutateThread): triggerPartMerge(): Selected 0 groups from 155 parts.
[byconity-server-0] 2023.09.29 05:28:14.612726 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Debug> executeQuery: pipeline has all used StorageIDs: true
[byconity-server-0] 2023.09.29 05:28:14.612786 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Debug> TransactionCoordinator: Deleted txn 444587126162980865
[byconity-server-0] 2023.09.29 05:28:14.612799 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Trace> TransactionCoordinator: No need to erase active timestamp for txn 444587126162980865

[byconity-server-0] 2023.09.29 05:28:14.612845 [ 1326 ] {4a26c2b2-f1d3-4635-964e-fd8e1c6d8f35} <Debug> MemoryTracker: Peak memory usage (for query): 4.00 MiB.
Ok.

Does this mean the # of parts is OK and we can continue to ingest at this rate?

Thanks

dmthuc commented 1 year ago

Hi @lazywei , sorry for late answer. We about to release a new version of Byconity and at this release it suppose to fix the clickhouse community client compability on ByConity. We can check the number of ingest part compare to the number of merge part over time by query system table system.server_part_log. The number of merge part should be the higher or equal the number of ingest part.

dmthuc commented 1 year ago

Hi @lazywei , yeah, the number of parts that you show are not optimal. Let we see if there is a way to optimize the merge

dmthuc commented 1 year ago

Hi @lazywei, please use the following settings to increase the merge performance via alter query

dmthuc commented 1 year ago

And we have this settings as well:

The merge is supposed to happen in the write worker

dmthuc commented 1 year ago

Currently our colleage are on holiday. We will try to write a guide line to optimize the merge job as this is the common issue. cc @zeromem

dmthuc commented 1 year ago

And i see your use case, we do not need to use Kafka at all if the current ingest flow make sure no file is missing to ingest. In future after upgrade to new version. I think you can use insert select from remote() as well to insert from community clickhouse. And use cnchS3() function to insert from S3

lazywei commented 1 year ago

@dmthuc appreciate the help.

We tried tune the parameters and it seems the cnch_merge_max_total_rows_to_merge is especially helpful. The default value seems to be 50M and we increases it to 500M.

Now in each daily partition, the max part is around 27-30GB, and number of rows for that largest part is around 500M.

We're indeed seeing much fewer number of parts, it's stable around 300-500 (we also see more MergeParts operation in the system.server_part_log table)

β”Œβ”€partition─┬─total_parts_number─┬─Size───────┬─toDateTime(last_update_time)─┐
β”‚ 20230928  β”‚                 16 β”‚ 83.29 GiB  β”‚          2023-10-02 05:25:47 β”‚
β”‚ 20230930  β”‚                463 β”‚ 1.87 TiB   β”‚          2023-10-02 05:25:47 β”‚
β”‚ 20230929  β”‚                321 β”‚ 1.01 TiB   β”‚          2023-10-02 05:25:47 β”‚
β”‚ 20231001  β”‚                423 β”‚ 1.74 TiB   β”‚          2023-10-02 05:25:47 β”‚
β”‚ 20231002  β”‚                395 β”‚ 486.75 GiB β”‚          2023-10-02 05:25:47 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

However, our question is is 500M rows in one part OK or not? Will having too large part cause the worker to be slower, for example it may needs to download more bytes before it can start query? Is there a sweet spot or optimal size for the parts that is most performant for the workers?

Our other settings are like this, do they look OK? we increase quite a lot for all of the above mentioned parameters

SETTINGS storage_policy = 'cnch_default_s3', enable_addition_bg_task = 1, cnch_merge_enable_batch_select = 1, cnch_merge_pick_worker_algo = 'RoundRobin',
index_granularity = 8192, 
max_addition_bg_task_num = 200,
cnch_merge_max_total_rows_to_merge = 500000000,
cnch_merge_max_total_bytes_to_merge = 322122547200 

Or should we try to further reduce the number of parts by futher increase these parameters?

dmthuc commented 1 year ago

large

Hi @lazywei , in general the parts will be cache on the disk, so after the part is fetch to local disk the part size is ok. I think generally the number of part the smaller the better and it should not be smaller than the number of read workers because we distributed the part among workers. In general i think less than 100 parts per partition is ok

lazywei commented 1 year ago

I see thank you. So do you think we should further increase the maximum number of rows in a part to reduce the number of parts? Or do you think we should reduce the partition granularity to like hourly partition so each partition has few parts?

Given the caching behavior does it mean we should also ensure the local disk is large enough if the size of part is also large? Is there a way to check which parts are cached already on the disk, and their size?

We will try to further reduce the number of parts and see how it affects the READ query performance.

Really appreciate your help!!

dmthuc commented 1 year ago

Hi @lazywei , no, i think partition by day are ok. Another colleage with more knowledge will answer this question

hustnn commented 1 year ago

I see thank you. So do you think we should further increase the maximum number of rows in a part to reduce the number of parts? Or do you think we should reduce the partition granularity to like hourly partition so each partition has few parts?

Hi @lazywei , what is your query pattern, query per hour, per day or 1 month or more? The partition design depends on your use case. If the total number of parts involved in one query is too many, it may have longer allocation time, < 1-10k should be recommended. Basically, once the # of parts is larger than your cluster size, the smaller the better.

Given the caching behavior does it mean we should also ensure the local disk is large enough if the size of part is also large? Is there a way to check which parts are cached already on the disk, and their size?

I think currently there is no way to check which part is cached. The cache level is not part level, it is a more fine-grained level.

lazywei commented 1 year ago

@hustnn

Our query pattern is like such

SELECT ... FROM logs
WHERE timestamp > now() - 900 // common lookback range is 15m, 1h, 3h, or 24h, very very rare we do 7d lookback
AND tag = '...'
AND body like '%some_full_text_search%'

Our QPS is not high at this moment, we are more interest in speeding up each of these slow query. Our initial target of QPS is probably just 1-10 or even below 1, but we definitely would try to scale it up once we understand all the performance levers (like parts count, part size, granular, etc.)

For workers, is it better to have few but powerful/beefy nodes (like more CPUs etc.) or many but smaller nodes (like MapReduce style)?

If the total number of parts involved in one query is too many, it may have longer allocation time, < 1-10k should be recommended. Basically, once the # of parts is larger than your cluster size, the smaller the better.

I see. So when it comes to the allocation time or latency, the # of parts is more important rather than each part's size? Is it fine to have each part contains 1 billion rows or even more?

The cache level is not part level, it is a more fine-grained level.

Got it. Does this also mean the file stored on S3 (or on the filesystem) is also having finer granularity than "part"? And does it mean when downloading parts from S3, it can also be parallelized even if one part is super large?

Thanks a lot!