polarsignals / frostdb

❄️ Coolest database around 🧊 Embeddable column database written in Go.
Apache License 2.0
1.32k stars 66 forks source link

The problem of combining aggregation function with custom runtime.GOMAXPROCS(1) #769

Open jicanghaixb opened 7 months ago

jicanghaixb commented 7 months ago

I found a problem, if i set runtime.GOMAXPROCS(1), blow code has a litter problem, aggregation function can't work accurately:

The aggregation function directly enters the final state, physicalplan.go

func Build(
    ctx context.Context,
    pool memory.Allocator,
    tracer trace.Tracer,
    s *dynparquet.Schema,
    plan *logicalplan.LogicalPlan,
    options ...Option,
) (*OutputPlan, error) {
......
case plan.Aggregation != nil:
            ordered, err := shouldPlanOrderedAggregate(execOpts, oInfo, plan.Aggregation)
            if err != nil {
                // TODO(asubiotto): Log the error.
                ordered = false
            }
            var sync PhysicalPlan
            if len(prev) > 1 {          // because len(prev) == 1, so can't execute it 
                // These aggregate operators need to be synchronized.
                if ordered && len(plan.Aggregation.GroupExprs) > 0 {
                    sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs)
                } else {
                    sync = Synchronize(len(prev))
                }
            }
            seed := maphash.MakeSeed()
            for i := 0; i < len(prev); i++ {
                a, err := Aggregate(pool, tracer, plan.Aggregation, sync == nil, ordered, seed)   
                                // because sync is nil, so aggregation directly entering the final state, so it cannot work correctly
                if err != nil {
                    visitErr = err
                    return false
                }
                prev[i].SetNext(a)
                prev[i] = a
                if sync != nil {
                    a.SetNext(sync)
                }
            }
            if sync != nil {
                // Plan an aggregate operator to run an aggregation on all the
                // aggregations.
                a, err := Aggregate(pool, tracer, plan.Aggregation, true, ordered, seed)
                if err != nil {
                    visitErr = err
                    return false
                }
                sync.SetNext(a)
                prev = prev[0:1]
                prev[0] = a
            }
            if ordered {
                oInfo.nodeMaintainsOrdering()
            }
        default:
            panic("Unsupported plan")
        }
}
thorfour commented 7 months ago

Thanks! Yes this has been a bug for a while but I think the stale bot must have auto closed the previous issue. Thanks for reopening

thorfour commented 4 months ago

This was reverted in https://github.com/polarsignals/frostdb/pull/885 without a fix. So this bug is valid again.