schollz / peerdiscovery

Pure-Go library for cross-platform local peer discovery using UDP multicast :woman: :repeat: :woman:
MIT License
644 stars 56 forks source link

Make sure listening Go routine exits when done #30

Closed fzwoch closed 1 year ago

fzwoch commented 2 years ago

A proposal. Use a deadline for the read operation in the Go routine. In case the routine is supposed to shut down but no traffic is happening on the multicast address this routine never gets the chance to quit.

Instead time out the read operation after 5 seconds of no data and check whether shut down is expected.

Note that this is done against master branch. In case #29 gets accepted in some form the check for shutdown should be extended here too.

fzwoch commented 2 years ago

Here would be another approach to fix it.

Let the main loop handler close the socket of the listen() go routine. This example lacks proper locking of the connection variable, but for demonstration it should be okay.

diff --git a/peerdiscovery.go b/peerdiscovery.go
diff --git a/peerdiscovery.go b/peerdiscovery.go
index e41526f..ce5c291 100644
--- a/peerdiscovery.go
+++ b/peerdiscovery.go
@@ -81,6 +81,7 @@ type peerDiscovery struct {
        received map[string][]byte
        sync.RWMutex
        exit bool
+       c    net.PacketConn
 }

 // initialize returns a new peerDiscovery object which can be used to discover peers.
@@ -259,6 +260,10 @@ func Discover(settings ...Settings) (discoveries []Discovered, err error) {
                }
        }

+       if p.c != nil {
+               p.c.Close()
+       }
+
        if !s.DisableBroadcast {
                payload := p.settings.Payload
                if p.settings.PayloadFunc != nil {
@@ -323,7 +328,7 @@ func (p *peerDiscovery) listen() (recievedBytes []byte, err error) {
        if err != nil {
                return
        }
-       defer c.Close()
+       p.c = c

        group := p.settings.multicastAddressNumbers
        var p2 NetPacketConn

Here is some example code to trigger the issue. (That is.. if there is no other process sending broadcasts that will shut down the listen() routine).

package main

import (
    "log"
    "time"

    "github.com/schollz/peerdiscovery"
)

func main() {
    count := 0

    for {
        log.Println("loop:", count)
        count++

        discover := make(chan struct{})

        go peerdiscovery.Discover(peerdiscovery.Settings{
            TimeLimit:        -1,
            StopChan:         discover,
            AllowSelf:        true,
            DisableBroadcast: true,
        })

        time.Sleep(time.Millisecond * 10)
        close(discover)
    }
}

On my Linux test socket file handles and memory grows up to the 1000th loop then it would silently fail at:

    c, err := net.ListenPacket(fmt.Sprintf("udp%d", p.settings.IPVersion), address)
    if err != nil {
        return
    }

I'm pretty sure this is also very close to #11.

fzwoch commented 2 years ago

I guess that this block is intended to shut down the routine:

    if !s.DisableBroadcast {
        payload := p.settings.Payload
        if p.settings.PayloadFunc != nil {
            payload = p.settings.PayloadFunc()
        }
        // send out broadcast that is finished
        broadcast(p2, payload, ifaces, &net.UDPAddr{IP: group, Port: portNum})
    }

Which obviously won't trigger for DisableBroadcast: true. Since this routine seems to send the same data as the regular broadcast (and not a special BYE message?) it seems redundant when closing the routine's socket manually.

fzwoch commented 2 years ago

Or, maybe the simplest idea - reuse the already existing connection for listening as well. Not sure if that has any side effects?

--- peerdiscovery.go    2022-02-17 11:00:41.608374260 +0100
+++ peerdiscovery.go    2022-02-17 11:19:40.510144732 +0100
@@ -227,7 +227,7 @@
        p2.JoinGroup(&ifaces[i], &net.UDPAddr{IP: group, Port: portNum})
    }

-   go p.listen()
+   go p.listen(c)
    ticker := time.NewTicker(tickerDuration)
    defer ticker.Stop()
    start := time.Now()
@@ -301,9 +301,8 @@

 // Listen binds to the UDP address and port given and writes packets received
 // from that address to a buffer which is passed to a hander
-func (p *peerDiscovery) listen() (recievedBytes []byte, err error) {
+func (p *peerDiscovery) listen(c net.PacketConn) (recievedBytes []byte, err error) {
    p.RLock()
-   address := net.JoinHostPort(p.settings.MulticastAddress, p.settings.Port)
    portNum := p.settings.portNum
    allowSelf := p.settings.AllowSelf
    timeLimit := p.settings.TimeLimit
@@ -318,13 +317,6 @@
    }
    // log.Println(ifaces)

-   // Open up a connection
-   c, err := net.ListenPacket(fmt.Sprintf("udp%d", p.settings.IPVersion), address)
-   if err != nil {
-       return
-   }
-   defer c.Close()
-
    group := p.settings.multicastAddressNumbers
    var p2 NetPacketConn
    if p.settings.IPVersion == IPv4 {
fzwoch commented 2 years ago

I have updated the PR to use the latest proposal. It seemed to work fine in my use case - but that may not cover everyone's.

@schollz do you have an opinion on that one?

schollz commented 2 years ago

Yeah it looks okay, I will have to run it against some tests

maddie commented 1 year ago

Is there any chance of this getting merged? I've seen the Discover method taking up memory gradually over time, after running for months (looking very similar to #11).

image

Hopefully, this will also resolve this problem.

schollz commented 1 year ago

Looks good, thanks!!!