databendlabs / databend

𝗗𝗮𝘁𝗮, 𝗔𝗻𝗮𝗹𝘆𝘁𝗶𝗰𝘀 & 𝗔𝗜. Modern alternative to Snowflake. Cost-effective and simple for massive-scale analytics. https://databend.com
https://docs.databend.com
Other
7.85k stars 750 forks source link

Feature: Auto compact(re-cluster) for multiple table insertion statement #16437

Closed dantengsky closed 1 month ago

dantengsky commented 2 months ago

Summary

Auto compact(re-cluster) for multiple-table insertion.

dosubot[bot] commented 2 months ago

To implement auto compact (re-cluster) for multiple table insertion statements, you can use the hook_compact function provided in the compact_hook.rs file. This function sets up a hook to perform compaction and re-clustering after the insertion pipeline finishes. Here is the relevant code snippet:

pub async fn hook_compact(
    ctx: Arc<QueryContext>,
    pipeline: &mut Pipeline,
    compact_target: CompactTargetTableDescription,
    trace_ctx: CompactHookTraceCtx,
    lock_opt: LockTableOption,
) {
    let op_name = trace_ctx.operation_name.clone();
    if let Err(e) = do_hook_compact(ctx, pipeline, compact_target, trace_ctx, lock_opt).await {
        info!("compact hook ({}) with error (ignored): {}", op_name, e);
    }
}

async fn do_hook_compact(
    ctx: Arc<QueryContext>,
    pipeline: &mut Pipeline,
    compact_target: CompactTargetTableDescription,
    trace_ctx: CompactHookTraceCtx,
    lock_opt: LockTableOption,
) -> Result<()> {
    if pipeline.is_empty() {
        return Ok(());
    }

    pipeline.set_on_finished(move |info: &ExecutionInfo| {
        let compaction_limits = match compact_target.mutation_kind {
            MutationKind::Insert => {
                let compaction_num_block_hint = ctx.get_compaction_num_block_hint();
                info!("hint number of blocks need to be compacted {}", compaction_num_block_hint);
                if compaction_num_block_hint == 0 {
                    return Ok(());
                }
                CompactionLimits {
                    segment_limit: None,
                    block_limit: Some(compaction_num_block_hint as usize),
                }
            }
            _ => {
                let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
                CompactionLimits {
                    segment_limit: Some(auto_compaction_segments_limit as usize),
                    block_limit: None,
                }
            }
        };

        metrics_inc_compact_hook_main_operation_time_ms(&trace_ctx.operation_name, trace_ctx.start.elapsed().as_millis() as u64);

        let compact_start_at = Instant::now();
        if info.res.is_ok() {
            info!("execute {} finished successfully. running table optimization job.", &trace_ctx.operation_name);
            match GlobalIORuntime::instance().block_on({
                compact_table(ctx, compact_target, compaction_limits, lock_opt)
            }) {
                Ok(_) => {
                    info!("execute {} finished successfully. table optimization job finished.", &trace_ctx.operation_name);
                }
                Err(e) => { info!("execute {} finished successfully. table optimization job failed. {:?}", &trace_ctx.operation_name, e) }
            }
        }
        metrics_inc_compact_hook_compact_time_ms(&trace_ctx.operation_name, compact_start_at.elapsed().as_millis() as u64);

        Ok(())
    });

    Ok(())
}

async fn compact_table(
    ctx: Arc<QueryContext>,
    compact_target: CompactTargetTableDescription,
    compaction_limits: CompactionLimits,
    lock_opt: LockTableOption,
) -> Result<()> {
    let table = ctx
        .get_table(
            &compact_target.catalog,
            &compact_target.database,
            &compact_target.table,
        )
        .await?;
    let do_recluster = !table.cluster_keys(ctx.clone()).is_empty();

    ctx.evict_table_from_cache(
        &compact_target.catalog,
        &compact_target.database,
        &compact_target.table,
    )?;

    let mut build_res = if do_recluster {
        let recluster = RelOperator::Recluster(Recluster {
            catalog: compact_target.catalog,
            database: compact_target.database,
            table: compact_target.table,
            filters: None,
            limit: compaction_limits.segment_limit,
        });
        let s_expr = SExpr::create_leaf(Arc::new(recluster));
        let recluster_interpreter =
            ReclusterTableInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?;
        recluster_interpreter.execute2().await?
    } else {
        let compact_block = RelOperator::CompactBlock(OptimizeCompactBlock {
            catalog: compact_target.catalog,
            database: compact_target.database,
            table: compact_target.table,
            limit: compaction_limits,
        });
        let s_expr = SExpr::create_leaf(Arc::new(compact_block));
        let compact_interpreter =
            OptimizeCompactBlockInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?;
        compact_interpreter.execute2().await?
    };

    if build_res.main_pipeline.is_empty() {
        return Ok(());
    }

    let settings = ctx.get_settings();
    build_res.set_max_threads(settings.get_max_threads()? as usize);
    let settings = ExecutorSettings::try_create(ctx.clone())?;

    if build_res.main_pipeline.is_complete_pipeline()? {
        let mut pipelines = build_res.sources_pipelines;
        pipelines.push(build_res.main_pipeline);

        let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;
        let progress_value = ctx.get_write_progress_value();
        ctx.clear_segment_locations()?;
        ctx.set_executor(complete_executor.get_inner())?;
        complete_executor.execute()?;
        drop(complete_executor);

        ctx.get_write_progress().set(&progress_value);
    }
    Ok(())
}

This code sets up a hook to perform compaction and re-clustering after the insertion pipeline finishes. The hook_compact function is called with the necessary context, pipeline, and target table description. The do_hook_compact function sets up the on-finished callback to perform the compaction and re-clustering. The compact_table function performs the actual compaction and re-clustering based on the table's cluster keys [1].

Additionally, you can refer to the distributed_compact.sql.test file for practical examples of how to set up and test the auto compaction and re-clustering process. Here is an example from the test file:

statement ok
drop table if exists t_compact_0

statement ok
set enable_distributed_compact = 1

statement ok
create table t_compact_0 (a int not null) row_per_block=5 block_per_segment=5

statement ok
insert into t_compact_0 select 50 - number from numbers(100)

statement ok
insert into t_compact_0 select 50 - number from numbers(100)

statement ok
insert into t_compact_0 select 50 - number from numbers(100)

query II
select count(),sum(a) from t_compact_0
----
300 150

statement ok
alter table t_compact_0 set options(row_per_block=10,block_per_segment=10)

statement ok
optimize table t_compact_0 compact

query I
select count() from fuse_snapshot('default', 't_compact_0')
----
4

query II
select count(),sum(a) from t_compact_0
----
300 150

statement ok
alter table t_compact_0 cluster by(abs(a))

statement ok
optimize table t_compact_0 compact

query I
select count() from fuse_snapshot('default', 't_compact_0')
----
6

query II
select count(),sum(a) from t_compact_0
----
300 150

This test script demonstrates creating a table, inserting data, altering table options, and performing compaction and re-clustering operations [2].

Furthermore, the PipelineBuilder in builder_commit.rs provides additional context on how to build the commit sink for the pipeline, which is crucial for handling mutations like compaction and re-clustering:

impl PipelineBuilder {
    pub(crate) fn build_commit_sink(&mut self, plan: &PhysicalCommitSink) -> Result<()> {
        self.build_pipeline(&plan.input)?;
        let table = self.ctx.build_table_by_table_info(&plan.table_info, None)?;
        let table = FuseTable::try_from_table(table.as_ref())?;
        let cluster_key_id = table.cluster_key_id();
        self.main_pipeline.try_resize(1)?;
        if plan.merge_meta {
            self.main_pipeline
                .add_accumulating_transformer(|| TransformMergeCommitMeta::create(cluster_key_id));
        } else {
            self.main_pipeline.add_async_accumulating_transformer(|| {
                let base_segments = if matches!(
                    plan.mutation_kind,
                    MutationKind::Compact | MutationKind::Insert | MutationKind::Recluster
                ) {
                    vec![]
                } else {
                    plan.snapshot.segments().to_vec()
                };

                // extract re-cluster related mutations from physical plan
                let recluster_info = plan.recluster_info.clone().unwrap_or_default();

                TableMutationAggregator::create(
                    table,
                    self.ctx.clone(),
                    base_segments,
                    recluster_info.merged_blocks,
                    recluster_info.removed_segment_indexes,
                    recluster_info.removed_statistics,
                    plan.mutation_kind,
                )
            });
        }

        let snapshot_gen = MutationGenerator::new(plan.snapshot.clone(), plan.mutation_kind);
        self.main_pipeline.add_sink(|input| {
            CommitSink::try_create(
                table,
                self.ctx.clone(),
                None,
                plan.update_stream_meta.clone(),
                snapshot_gen.clone(),
                input,
                None,
                None,
                plan.deduplicated_label.clone(),
            )
        })
    }
}

This implementation ensures that the pipeline is properly set up to handle the commit operations, including compaction and re-clustering, by adding the necessary transformers and sinks to the pipeline [3].

To continue talking to Dosu, mention @dosu.