plgd-dev / go-coap

Implementation of CoAP Server & Client in Go
https://coap.technology
Apache License 2.0
447 stars 116 forks source link

Observe timeout #573

Open palsivertsen opened 2 months ago

palsivertsen commented 2 months ago

I'm playing around with observe and I'm experiencing context cancellation when sending replies to the client. Since I'm running locally I did not expect there to be a deadline for observe notifications. I this the intended behavior and is there a way to configure this?

To reproduce this one can change the sleep time in the example code to something like 30 seconds. The server will then report an error on the second transmission.

Server log:

2024/08/09 11:47:17 Got message path=/some/path: Code: GET, Token: e81949f7e5f85be4, Path: /some/path, Type: Confirmable, MessageID: 62273 from [::1]:53810
2024/08/09 11:47:47 Error on transmitter, stopping: cannot write request: context canceled

Client log:

2024/08/09 11:47:17 Got Code: Content, Token: e81949f7e5f85be4, ContentFormat: text/plain; charset=utf-8, Type: Confirmable, MessageID: 25594, PayloadLen: 22

EDIT 1

server.go:

package main

import (
    "bytes"
    "fmt"
    "log"
    "time"

    coap "github.com/plgd-dev/go-coap/v3"
    "github.com/plgd-dev/go-coap/v3/message"
    "github.com/plgd-dev/go-coap/v3/message/codes"
    "github.com/plgd-dev/go-coap/v3/mux"
)

func getPath(opts message.Options) string {
    path, err := opts.Path()
    if err != nil {
        log.Printf("cannot get path: %v", err)
        return ""
    }
    return path
}

func sendResponse(cc mux.Conn, token []byte, subded time.Time, obs int64) error {
    m := cc.AcquireMessage(cc.Context())
    defer cc.ReleaseMessage(m)
    m.SetCode(codes.Content)
    m.SetToken(token)
    m.SetBody(bytes.NewReader([]byte(fmt.Sprintf("Been running for %v", time.Since(subded)))))
    m.SetContentFormat(message.TextPlain)
    if obs >= 0 {
        m.SetObserve(uint32(obs))
    }
    return cc.WriteMessage(m)
}

func periodicTransmitter(cc mux.Conn, token []byte) {
    subded := time.Now()

    for obs := int64(2); ; obs++ {
        err := sendResponse(cc, token, subded, obs)
        if err != nil {
            log.Printf("Error on transmitter, stopping: %v", err)
            return
        }
        time.Sleep(time.Second * 30)
    }
}

func main() {
    log.Fatal(coap.ListenAndServe("udp", ":5688",
        mux.HandlerFunc(func(w mux.ResponseWriter, r *mux.Message) {
            log.Printf("Got message path=%v: %+v from %v", getPath(r.Options()), r, w.Conn().RemoteAddr())
            obs, err := r.Options().Observe()
            switch {
            case r.Code() == codes.GET && err == nil && obs == 0:
                go periodicTransmitter(w.Conn(), r.Token())
            case r.Code() == codes.GET:
                err := sendResponse(w.Conn(), r.Token(), time.Now(), -1)
                if err != nil {
                    log.Printf("Error on transmitter: %v", err)
                }
            }
        })))
}

client.go:

package main

import (
    "bytes"
    "fmt"
    "log"
    "time"

    coap "github.com/plgd-dev/go-coap/v3"
    "github.com/plgd-dev/go-coap/v3/message"
    "github.com/plgd-dev/go-coap/v3/message/codes"
    "github.com/plgd-dev/go-coap/v3/mux"
)

func getPath(opts message.Options) string {
    path, err := opts.Path()
    if err != nil {
        log.Printf("cannot get path: %v", err)
        return ""
    }
    return path
}

func sendResponse(cc mux.Conn, token []byte, subded time.Time, obs int64) error {
    m := cc.AcquireMessage(cc.Context())
    defer cc.ReleaseMessage(m)
    m.SetCode(codes.Content)
    m.SetToken(token)
    m.SetBody(bytes.NewReader([]byte(fmt.Sprintf("Been running for %v", time.Since(subded)))))
    m.SetContentFormat(message.TextPlain)
    if obs >= 0 {
        m.SetObserve(uint32(obs))
    }
    return cc.WriteMessage(m)
}

func periodicTransmitter(cc mux.Conn, token []byte) {
    subded := time.Now()

    for obs := int64(2); ; obs++ {
        err := sendResponse(cc, token, subded, obs)
        if err != nil {
            log.Printf("Error on transmitter, stopping: %v", err)
            return
        }
        time.Sleep(time.Second * 30)
    }
}

func main() {
    log.Fatal(coap.ListenAndServe("udp", ":5688",
        mux.HandlerFunc(func(w mux.ResponseWriter, r *mux.Message) {
            log.Printf("Got message path=%v: %+v from %v", getPath(r.Options()), r, w.Conn().RemoteAddr())
            obs, err := r.Options().Observe()
            switch {
            case r.Code() == codes.GET && err == nil && obs == 0:
                go periodicTransmitter(w.Conn(), r.Token())
            case r.Code() == codes.GET:
                err := sendResponse(w.Conn(), r.Token(), time.Now(), -1)
                if err != nil {
                    log.Printf("Error on transmitter: %v", err)
                }
            }
        })))
}

Setting context.Background() in AquireMessage yields similar results, except that the second message is sent before the error is returned:

server.go:

package main

import (
    "bytes"
    "context"
    "fmt"
    "log"
    "time"

    coap "github.com/plgd-dev/go-coap/v3"
    "github.com/plgd-dev/go-coap/v3/message"
    "github.com/plgd-dev/go-coap/v3/message/codes"
    "github.com/plgd-dev/go-coap/v3/mux"
)

func getPath(opts message.Options) string {
    path, err := opts.Path()
    if err != nil {
        log.Printf("cannot get path: %v", err)
        return ""
    }
    return path
}

func sendResponse(cc mux.Conn, token []byte, subded time.Time, obs int64) error {
    m := cc.AcquireMessage(context.Background()) // <-- I changed this context
    defer cc.ReleaseMessage(m)
    m.SetCode(codes.Content)
    m.SetToken(token)
    m.SetBody(bytes.NewReader([]byte(fmt.Sprintf("Been running for %v", time.Since(subded)))))
    m.SetContentFormat(message.TextPlain)
    if obs >= 0 {
        m.SetObserve(uint32(obs))
    }
    return cc.WriteMessage(m)
}

func periodicTransmitter(cc mux.Conn, token []byte) {
    subded := time.Now()

    for obs := int64(2); ; obs++ {
        err := sendResponse(cc, token, subded, obs)
        if err != nil {
            log.Printf("Error on transmitter, stopping: %v", err)
            return
        }
        time.Sleep(time.Second * 30)
    }
}

func main() {
    log.Fatal(coap.ListenAndServe("udp", ":5688",
        mux.HandlerFunc(func(w mux.ResponseWriter, r *mux.Message) {
            log.Printf("Got message path=%v: %+v from %v", getPath(r.Options()), r, w.Conn().RemoteAddr())
            obs, err := r.Options().Observe()
            switch {
            case r.Code() == codes.GET && err == nil && obs == 0:
                go periodicTransmitter(w.Conn(), r.Token())
            case r.Code() == codes.GET:
                err := sendResponse(w.Conn(), r.Token(), time.Now(), -1)
                if err != nil {
                    log.Printf("Error on transmitter: %v", err)
                }
            }
        })))
}

server log:

2024/08/09 12:51:28 Got message path=/some/path: Code: GET, Token: f40f11161a825088, Path: /some/path, Type: Confirmable, MessageID: 23914 from [::1]:54400
2024/08/09 12:51:58 Error on transmitter, stopping: cannot write request: connection was closed: context canceled

client log:

2024/08/09 12:51:28 Got Code: Content, Token: f40f11161a825088, ContentFormat: text/plain; charset=utf-8, Type: Confirmable, MessageID: 28440, PayloadLen: 26
2024/08/09 12:51:58 Got Code: Content, Token: f40f11161a825088, ContentFormat: text/plain; charset=utf-8, Type: Confirmable, MessageID: 28441, PayloadLen: 30

Packet capture: image