apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.31k stars 1.19k forks source link

How to merge multiple data sources and deduplicate based on certain fields? #12532

Open wj-stack opened 2 months ago

wj-stack commented 2 months ago

Is your feature request related to a problem or challenge?

I currently have two data sources, one stored in Parquet format and the other in memory. I need to implement a scan function. I tried using UnionExec, but it's obviously not working, especially when using aggregation functions like count. Maybe I should use SortPreservingMergeExec, but there are too few examples of this function. I would appreciate it if you could add an example that includes multiple data sources, as these sources may contain duplicate data, and I would be happy to see an example of deduplication based on multiple fields.

    async fn scan(
        &self,
        state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2`
        let predicate = self.filters_to_predicate(state, filters)?;

        // Now, we invoke the analysis code to perform the range analysis
        let df_schema = DFSchema::try_from(self.schema())?;

        let boundaries = ExprBoundaries::try_new_unbounded(&self.schema())?;

        let analysis_result = analyze(
            &predicate,
            AnalysisContext::new(boundaries),
            df_schema.as_ref(),
        )?;

        // In this example, we use the PruningPredicate's literal guarantees to
        // analyze the predicate. In a real system, using
        // `PruningPredicate::prune` would likely be easier to do.
        let pruning_predicate =
            PruningPredicate::try_new(Arc::clone(&predicate), self.schema().clone())?;

        debug!("pruning_predicate:{:?}", pruning_predicate);

        // The PruningPredicate's guarantees must all be satisfied in order for
        // the predicate to possibly evaluate to true.
        let guarantees = pruning_predicate.literal_guarantees();

        debug!("guarantees:{:?}", guarantees);

        let object_store_url = ObjectStoreUrl::parse("file://")?;
        let mut file_scan_config = FileScanConfig::new(object_store_url, self.schema())
            .with_projection(projection.cloned())
            .with_limit(limit);

        let mut points = vec![];

        for expr in &analysis_result.boundaries {
            if expr.column.name() == "show_time" {
                let lower = expr.interval.lower().clone().to_array().unwrap();
                let lower = lower
                    .as_any()
                    .downcast_ref::<TimestampMillisecondArray>()
                    .unwrap();

                let upper = expr.interval.upper().clone().to_array().unwrap();
                let upper = upper
                    .as_any()
                    .downcast_ref::<TimestampMillisecondArray>()
                    .unwrap();

                debug!(
                    "{:?} lower:{:?} upper:{:?}",
                    expr.column,
                    lower.value(0),
                    upper.value(0)
                );

                let start = DateTime::<Utc>::from_timestamp_millis(lower.value(0)).unwrap();
                let end = DateTime::<Utc>::from_timestamp_millis(upper.value(0)).unwrap();

                let p = self.query_with_time(start, end).await;

                if p.len() != 0 {
                    let addr = { self.fields.read().await.clone() };
                    let additional = { self.additional.read().await.clone() };

                    let batch = create_record_batch(&p, &addr, &additional).unwrap();
                    points.push(batch);
                }

                let dirs = self.get_dir_by_time(start, end).unwrap_or_default();

                info!("dirs: {:?}  {:?} {:?}", start, end, dirs);

                for s in dirs {
                    let mut files = list_files_in_directory(&s).unwrap();
                    files.reverse();

                    let mut v = vec![];

                    for s in files {
                        if s.extension().unwrap() == "parquet" {
                            let f = self
                                .files
                                .optionally_get_with(format!("{:?}", s), async {
                                    let f = add_file(Path::new(&s)).unwrap();
                                    Some((String::from(f.0), f.1))
                                })
                                .await
                                .unwrap();

                            v.push(PartitionedFile::new(f.0.clone(), f.1));
                        }
                    }
                    file_scan_config = file_scan_config.with_file_group(v);
                }

                break;
            }
        }

        let exec: Arc<ParquetExec> = ParquetExec::builder(file_scan_config)
            .with_predicate(predicate)
            .build_arc();

        // count error

        let memory =
            MemoryExec::try_new(&vec![points], self.schema(), projection.cloned()).unwrap();

        let union = Arc::new(UnionExec::new(vec![Arc::new(memory), exec]));

        Ok(union)
    }

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

Omega359 commented 1 month ago

I agree that that an example for merging dataframes from different table providers and finding distinct values from there would be a nice addition. Currently there is union but the schema's must be identical for that to work iirc. (See https://github.com/apache/datafusion/issues/12650 for an enhancement request to address that)

There is distinct_on available in the dataframe api which is just a wrapper for the LogicalPlanBuilder::distinct_on fn.