PretendoNetwork / nex-go

Barebones PRUDP/NEX server library written in Go
GNU Affero General Public License v3.0
72 stars 16 forks source link

[Enhancement]: Rework background tasks/timers #49

Closed jonbarrow closed 2 days ago

jonbarrow commented 1 month ago

Checked Existing

What enhancement would you like to see?

(Using the new issue templates flow for this one)

Tagging @DaniElectra for thoughts.

@PabloMK7 has reported that there is another memory leak potentially somewhere in here (the CTGP-7 server apparently crashed from an OOM error). While we have not been able to duplicate the issue, it's still something to look into.

The only real place I can think of that this would happen with our usage of timers and tickers in PRUDPConnection and PendingPacket in the resend scheduler. Timers and tickers can leave resources unfreed and are somewhat annoying to manage between structs and goroutines, making them prone to memory leaks. https://github.com/PretendoNetwork/nex-go/pull/44 was made to address this, and seemed to work, but it's possible this didn't catch everything.

We can remove this reliance on tickers/timers entirely by moving to a context based model. This would also be the more "Go" way of doing things (the timer/ticker pattern was carried over from v1 of this module). The context package provides ways to handle tasks that can be canceled and timed out.

Some good reading on this pattern:

A basic implementation using this pattern could look something like:

package main

import (
    "context"
    "fmt"
    "time"
)

type Packet struct {
    data string
}

type ResendTask struct {
    ctx            context.Context
    cancel         context.CancelFunc
    resendCount    int
    maxResends     int
    timeoutSeconds time.Duration
    packet         *Packet
}

func (rt *ResendTask) Begin() {
    if rt.resendCount <= rt.maxResends {
        ctx, cancel := context.WithTimeout(context.Background(), rt.timeoutSeconds*time.Second)

        rt.ctx = ctx
        rt.cancel = cancel

        go rt.start()
    } else {
        fmt.Println("Resent too many times")
    }
}

func (rt *ResendTask) Stop() {
    rt.cancel()
}

func (rt *ResendTask) start() {
    for {
        select {
        case <-rt.ctx.Done():
            fmt.Printf("Timeout. Resending %+v\n", rt.packet)
            rt.resendCount++
            rt.Begin() // * Start again
            return
        }
    }
}

func NewResendTask() *ResendTask {
    return &ResendTask{
        maxResends:     5,
        timeoutSeconds: 5,
        packet: &Packet{
            data: "Payload",
        },
    }
}

func main() {
    task := NewResendTask()

    task.Begin()

    for {
    }
}

In this example:

This pattern makes managing these timeout/resends easier and less error prone due to 2 key factors of contexts using WithTimeout:

  1. WithTimeout has a cancel function, but unlike WithCancel the Done channel closes and resources are freed once the timeout elapses.
  2. Calling the cancel function from a WithTimeout context will immediately free the resources associated with the context and stop all operations using the context (stopping the timeout check).

Because of these 2 properties we essentially do not need to worry about memory usage for packet retransmission. Whenever a timeout occurs, the context frees itself. If a packet is acknowledged and the context canceled then the resources are also freed.

Any other details to share? (OPTIONAL)

No response

DaniElectra commented 1 month ago

I agree with using contexts as it would be the more "Go" way of doing things. Though I think most of our issues are related with race conditions, because the logic of resending a packet isn't atomic. See also https://github.com/PretendoNetwork/nex-go/blob/5f38bf34bfabacd03b586226350edc5706394b15/resend_scheduler.go#L99

https://github.com/PretendoNetwork/nex-go/pull/47 could help with this issue

Calling the cancel function from a WithTimeout context will immediately free the resources associated with the context and stop all operations using the context (stopping the timeout check)

I'm not sure I understand what this means? In the example you give the goroutine doesn't stop, even if using Stop. Adding the following code on main:

func main() {
    task := NewResendTask()

    task.Begin()
    fmt.Println("test1")
    time.Sleep(9 * time.Second)
    fmt.Println("test2")
    task.Stop()
    time.Sleep(10 * time.Second)
    fmt.Println("test3")
}

Doesn't stop the task, at least on Go Playground:

test1
Timeout. Resending &{data:Payload}
test2
Timeout. Resending &{data:Payload}
Timeout. Resending &{data:Payload}
test3
jonbarrow commented 1 month ago

In the example you give the goroutine doesn't stop at least on Go Playground

This may be an issue with the playground, then? Run the following normally and you'll see the context cancels all operations and frees itself, removing the goroutine:

package main

import (
    "context"
    "fmt"
    "runtime"
    "time"
)

type Packet struct {
    data string
}

type ResendTask struct {
    ctx            context.Context
    cancel         context.CancelFunc
    resendCount    int
    maxResends     int
    timeoutSeconds time.Duration
    packet         *Packet
}

func (rt *ResendTask) Begin() {
    if rt.resendCount == 3 {
        rt.Stop()
    } else if rt.resendCount <= rt.maxResends {
        ctx, cancel := context.WithTimeout(context.Background(), rt.timeoutSeconds*time.Second)

        rt.ctx = ctx
        rt.cancel = cancel

        go rt.start()
    } else {
        fmt.Println("Resent too many times")
    }
}

func (rt *ResendTask) Stop() {
    rt.cancel()
}

func (rt *ResendTask) start() {
    for {
        select {
        case <-rt.ctx.Done():
            fmt.Printf("Timeout. Resending %+v\n", rt.packet)
            rt.resendCount++
            rt.Begin() // * Start again
            return
        }
    }
}

func NewResendTask() *ResendTask {
    return &ResendTask{
        maxResends:     5,
        timeoutSeconds: 5,
        packet: &Packet{
            data: "Payload",
        },
    }
}

func main() {
    task := NewResendTask()

    task.Begin()

    for {
        logGoroutines()
    }
}

func logGoroutines() {
    time.Sleep(1 * time.Second)
    fmt.Printf("goroutines: %d\n", runtime.NumGoroutine())
}

This will print goroutines: 2 up until rt.Stop() is called, at which point it will print goroutines: 1 forever (showing the goroutine has indeed closed) and the inner prints no longer happen

Though I think most of our issues are related with race conditions

It addresses the same issue. The underlying issue resulting in memory leaks here is likely down to "bad management of tickers/timers". #47 addresses this as well, but this PR aims to remove the issue entirely.

jonbarrow commented 1 month ago

Example output from the above example:

goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
Timeout. Resending &{data:Payload}
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
Timeout. Resending &{data:Payload}
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
Timeout. Resending &{data:Payload}
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
jonbarrow commented 1 month ago

@DaniElectra No, you were correct. My results were from a faulty test. Calling cancel here will still trigger Done as expected, however this can easily be worked around with an acknowledged field on the struct:

type ResendTask struct {
    ctx            context.Context
    cancel         context.CancelFunc
    resendCount    int
    maxResends     int
    timeoutSeconds time.Duration
    packet         *Packet
    acknowledged   bool
}

func (rt *ResendTask) Stop() {
    rt.acknowledged = true
    rt.cancel()
}

func (rt *ResendTask) start() {
    for {
        select {
        case <-rt.ctx.Done():
            if !rt.acknowledged {
                fmt.Printf("Timeout. Resending %+v\n", rt.packet)
                rt.resendCount++
                rt.Begin() // * Start again
            }
            return
        }
    }
}

Now if you move the resendCount check and the Stop call outside of that function, say into logGoroutines, it stops as expected:

func logGoroutines() {
    if task.resendCount == 3 {
        task.Stop()
    }

    time.Sleep(1 * time.Second)
    fmt.Printf("goroutines: %d\n", runtime.NumGoroutine())
}
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
Timeout. Resending &{data:Payload}
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
Timeout. Resending &{data:Payload}
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
Timeout. Resending &{data:Payload}
goroutines: 2
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
jonbarrow commented 1 month ago

@DaniElectra Do you think this kind of task manager could be useful outside of nex-go? I wonder if this is something we should make as it's own module