Open maxim-ge opened 5 days ago
Your code generally looks well-structured, but there are some areas that could use improvement in terms of readability, safety, and maintainability. Below, I've provided a detailed review with comments on what could be enhanced: ### Review of Key Sections: 1. **General Design**: - The use of `PartitionActualizers` to manage multiple concurrent actualizers looks good, especially the synchronization logic using mutexes and atomic operations. However, there are opportunities to make the synchronization clearer and avoid some of the complexities that come from multiple goroutines. 2. **Synchronization with Mutex and WaitGroups**: - ⚡**Readability & Complexity**: There is a lot of locking/unlocking happening with `mx` (the mutex), which can make it challenging to follow the control flow. - ⚡**Double Goroutine Launching**: In the `start` function, there's one goroutine for starting an actualizer and another for managing its lifecycle (`run(ctx, ...)`). This is valid but adds complexity; if possible, consider simplifying this. 3. **Goroutine Lifecycle Management**: - ⚡**Busy Waiting**: The code uses `time.Sleep(time.Millisecond)` at several places to poll the state of goroutines (`for rt.state.Load() == 0`). This kind of busy waiting can be inefficient and lead to wasted CPU cycles. It might be better to use a synchronization mechanism like `sync.Cond` or channels to coordinate between the goroutines. - **State Representation**: The states (`0: newly; +1: started; -1: finished`) are not self-explanatory and can lead to potential misinterpretation or errors. It might be beneficial to use constants or an enumerated type to represent these states to improve readability. 4. **Mutex Management**: - ⚡The mix of `RLock/RUnlock` and `Lock/Unlock` can be prone to deadlock if not handled carefully. You might want to minimize the scope of locking or refactor some parts to reduce dependency on the mutex. - ⚡When starting and stopping actualizers, there is a potential risk of race conditions with `pa.rt` map updates due to multiple goroutines accessing it. Consider consolidating this part of the code to ensure consistency. 5. **Error Handling**: - Currently, there is no error handling for `context.WithCancel` or the actualizer's `run` function. Consider adding error checks and handling to improve the robustness of the system. - ⚡For instance, what happens if the `run` function fails? It's good practice to at least log errors to ensure any failures are visible for diagnosis. 6. **Deploy Function**: - In `Deploy`, the following part has an issue: ```go if appdef.Projector(appDef.Type, name) == nil { ``` It seems `appDef.Type` might not be correctly applied here. Should this be something like `appDef.Types`? Make sure `appDef.Type` provides the correct type expected by the `Projector()` function. - **Compare Projector Properties**: There is a `TODO` comment about comparing projector properties. This is an important part of your deployment logic that might be responsible for unwanted behavior if not handled correctly. Ensure that when you implement this, you deeply compare the configuration to determine if the actualizer must restart or not. 7. **Enum Method**: - The `Enum` function converts the map `pa.rt` into `appdef.QNames`. This looks fine, but if `QNamesFromMap` involves an expensive conversion, it might be worth caching this result until `pa.rt` changes. 8. **Wait Method**: - ⚡The `Wait` method uses busy waiting (`time.Sleep(time.Millisecond)`) to check if all actualizers have stopped. This can be inefficient. Instead, consider using synchronization primitives like `sync.WaitGroup` or channels that will be more CPU efficient. ### Suggested Improvements: 1. **Refactor State Handling**: Define constants for the states of `runtime`: ```go const ( StateNewly int32 = 0 StateStarted int32 = 1 StateFinished int32 = -1 ) ``` 2. **Avoid Busy Waiting**: Replace busy waiting with synchronization primitives. For example: - Use `sync.Cond` to wait for changes to `runtime.state` instead of polling with `time.Sleep(time.Millisecond)`. - Alternatively, channels could be employed to signal the start and completion of goroutines more effectively. 3. **Consider Using a More Structured Approach for Lifecycle Management**: Lifecycle management could be improved by using `context.WithTimeout` or `context.WithDeadline` instead of manually handling `time.Sleep` calls, which can make the code more readable and robust in cases where cancellation or failure is needed. 4. **Improving the Deploy Function**: To reduce locking complexity and improve code clarity, you could: - Split the deployment into smaller functions, making sure that each function holds the lock for the shortest time possible. - Consider using a separate method to clean up the map (`pa.rt`) once all old actualizers are done, to keep `Deploy` focused only on starting/stopping. 5. **Logging and Error Handling**: Add logging at key points, such as: - When stopping/starting an actualizer. - If an actualizer encounters an error. This will help trace the lifecycle of actualizers and make debugging easier. ### Example Code Changes: ```go // Use constants for runtime states const ( StateNewly int32 = 0 StateStarted int32 = 1 StateFinished int32 = -1 ) // In runtime type runtime struct { state atomic.Int32 cancel context.CancelFunc done chan struct{} // Add a channel to signal when runtime is done } func newRuntime(cancel context.CancelFunc) *runtime { return &runtime{ cancel: cancel, done: make(chan struct{}), } } // Update goroutine lifecycle management in `start` start := func(name appdef.QName) { startWG.Add(1) go func() { ctx, cancel := context.WithCancel(vvmCtx) rt := newRuntime(cancel) pa.mx.Lock() pa.rt[name] = rt pa.mx.Unlock() go func() { rt.state.Store(StateStarted) run(ctx, pa.app, pa.part, name) pa.mx.Lock() delete(pa.rt, name) pa.mx.Unlock() rt.state.Store(StateFinished) close(rt.done) // Signal that the runtime is done }() <-rt.done // Wait for the actualizer to be started startWG.Done() }() } ``` This will make the code more efficient and reduce the chance of race conditions or CPU wastage due to busy waiting. Would you like me to help with more specific code refactoring or answer any questions regarding these suggestions?
```go /* * Copyright (c) 2024-present Sigma-Soft, Ltd. * @author: Nikolay Nikitin */ package actualizers import ( "context" "sync" "sync/atomic" "time" "github.com/voedger/voedger/pkg/appdef" "github.com/voedger/voedger/pkg/istructs" ) // Run is a function that runs actualizer for the specified projector. type Run func(context.Context, appdef.AppQName, istructs.PartitionID, appdef.QName) // PartitionActualizers manages actualizers deployment for the specified application partition. type PartitionActualizers struct { mx sync.RWMutex app appdef.AppQName part istructs.PartitionID rt map[appdef.QName]*runtime } func newActualizers(app appdef.AppQName, part istructs.PartitionID) *PartitionActualizers { return &PartitionActualizers{ app: app, part: part, rt: make(map[appdef.QName]*runtime), } } // Deploys partition actualizers: stops actualizers for removed projectors and // starts actualizers for new projectors using the specified run function. func (pa *PartitionActualizers) Deploy(vvmCtx context.Context, appDef appdef.IAppDef, run Run) { stopWG := &sync.WaitGroup{} pa.stopOldActualizers(appDef, stopWG) stopWG.Wait() // Wait for all old actualizers to stop startWG := &sync.WaitGroup{} pa.startNewActualizers(vvmCtx, appDef, run, startWG) startWG.Wait() // Wait for all new actualizers to start } // stopOldActualizers stops actualizers that are no longer needed. func (pa *PartitionActualizers) stopOldActualizers(appDef appdef.IAppDef, wg *sync.WaitGroup) { pa.mx.RLock() defer pa.mx.RUnlock() for name, rt := range pa.rt { if appdef.Projector(appDef.Types(), name) == nil { wg.Add(1) go func(rt *runtime) { defer wg.Done() rt.cancel() <-rt.done // Wait until actualizer is finished }(rt) } } } // startNewActualizers starts actualizers for new projectors. func (pa *PartitionActualizers) startNewActualizers(vvmCtx context.Context, appDef appdef.IAppDef, run Run, wg *sync.WaitGroup) { pa.mx.RLock() defer pa.mx.RUnlock() for prj := range appdef.Projectors(appDef.Types()) { if !prj.Sync() { name := prj.QName() if _, exists := pa.rt[name]; !exists { wg.Add(1) go pa.startActualizer(vvmCtx, name, run, wg) } } } } // startActualizer starts an actualizer for the given projector. func (pa *PartitionActualizers) startActualizer(vvmCtx context.Context, name appdef.QName, run Run, wg *sync.WaitGroup) { go func() { defer wg.Done() ctx, cancel := context.WithCancel(vvmCtx) rt := newRuntime(cancel) pa.mx.Lock() pa.rt[name] = rt pa.mx.Unlock() go func() { rt.state.Store(StateStarted) run(ctx, pa.app, pa.part, name) pa.mx.Lock() delete(pa.rt, name) pa.mx.Unlock() rt.state.Store(StateFinished) close(rt.done) // Signal that the runtime is done }() <-rt.done // Wait until the actualizer goroutine is started }() } // Enum returns all deployed actualizers. func (pa *PartitionActualizers) Enum() appdef.QNames { pa.mx.RLock() defer pa.mx.RUnlock() return appdef.QNamesFromMap(pa.rt) } // Wait waits for all actualizers to finish. // The context should be stopped before calling this method. func (pa *PartitionActualizers) Wait() { for { pa.mx.RLock() cnt := len(pa.rt) pa.mx.RUnlock() if cnt == 0 { break } time.Sleep(time.Millisecond) } } const ( StateNewly int32 = 0 StateStarted int32 = 1 StateFinished int32 = -1 ) type runtime struct { state atomic.Int32 cancel context.CancelFunc done chan struct{} } func newRuntime(cancel context.CancelFunc) *runtime { return &runtime{ state: atomic.Int32{}, cancel: cancel, done: make(chan struct{}), } } ```
Motivation