kataras / neffos

A modern, fast and scalable websocket framework with elegant API written in Go
http://bit.ly/neffos-wiki
MIT License
571 stars 47 forks source link

Problem with Broadcasting #12

Closed majidbigdeli closed 5 years ago

majidbigdeli commented 5 years ago

@kataras I have Issue with

server.Broadcast(nil,neffos.Message{
To: userID,
Namespace: variable.Agent,
Event:     "notif",
Body:      neffos.Marshal(nf),  
})

i use gobwas.DefaultUpgrader

when i use server.Broadcast in loop it is work but But not very good. You can test yourself. i open multiple browser tap with same userId(socketId)

server.brodacast not send all message in loop;

you can see cron example .

c.write(neffos.Message{
To: userID,
Namespace: variable.Agent,
Event:     "notif",
Body:      neffos.Marshal(nf),  
})

write() mthod work very very good but it send message to one tap (last) with same userId .

I'm going to send it to all the tabs. with same userId

kataras commented 5 years ago

Hello @majidbigdeli I am not what's going on yet, I am checking it as we speak, but meanwhile, could you please transfer this issue to the correct repository? https://github.com/kataras/neffos/issues/new

Also, to understand it:

You have many clients with same ID and you want to send a message to all of them through server's Broadcast (through a cron job), right? The design of neffos allows that as each connection is responsible to "catch" the message on server's broadcast, so it doesn't "stop on first found" so it should work without problem... can you come to the neffos chat? I need an example to test it too.

Thanks!

majidbigdeli commented 5 years ago

@kataras Hello. Kataras I'm sure. i use server broadcast in loop . i have 58 message (58 loop) . but my client get 30 message from 58 message.

majidbigdeli commented 5 years ago

@kataras I send example in neffos chat

kataras commented 5 years ago

@majidbigdeli I made a test on that case and it seems that it works here...need more details on your app.

Here is the code I used, which simulates your case with 1000 clients (u have 57) and 1000 messages every 2 seconds, total of 10000 messages:

package main

import (
    "log"
    "net/http"
    "sync"
    "sync/atomic"
    "time"

    "github.com/kataras/neffos"
    "github.com/kataras/neffos/gorilla"

    "github.com/robfig/cron"
)

/*
    stress-test broadcasting every 2seconds (10 times, 10000 messages)
    to 1000 clients with the same ID.
*/

const (
    namespace    = "agent"
    clientEvent  = "notify"
    serverEvent  = "ackNotify"
    clientConnID = "client"
    every        = "2s"

    clientsCount  = 1000
    messagesCount = clientsCount * 10
)

type notification struct {
    Title   string `json:"title"`
    Subject string `json:"subject"`
    Message string `json:"message"`
}

var message = neffos.Message{
    To:        clientConnID,
    Namespace: namespace,
    Event:     clientEvent,
    Body: neffos.Marshal(notification{
        Title:   "master notification",
        Subject: "master",
        Message: "a notification message",
    }),
}

var (
    serverEvents = neffos.WithTimeout{
        Namespaces: neffos.Namespaces{
            namespace: neffos.Events{
                serverEvent: func(ns *neffos.NSConn, msg neffos.Message) error {
                    return nil
                },
            },
        },
    }

    received uint32

    clientEvents = neffos.Namespaces{
        namespace: neffos.Events{
            clientEvent: func(ns *neffos.NSConn, msg neffos.Message) error {
                n := atomic.AddUint32(&received, 1)
                log.Printf("Total Received: %d", n)
                return nil
            },
        },
    }
)

func main() {

    // connect all and then start cron.
    server := startServer()
    time.Sleep(200 * time.Millisecond)

    wg := new(sync.WaitGroup)
    wg.Add(clientsCount)
    wgClose := new(sync.WaitGroup)
    wgClose.Add(clientsCount)

    now := time.Now()

    for i := 0; i < clientsCount; i++ {
        go startClient(wg, wgClose)
    }

    wg.Wait()
    startCron(server)

    wgClose.Wait()
    log.Printf("Done in: %s", time.Since(now))
}

func startServer() *neffos.Server {
    server := neffos.New(gorilla.DefaultUpgrader, serverEvents)
    server.IDGenerator = func(w http.ResponseWriter, r *http.Request) string {
        username := r.Header.Get("X-Username")
        if username == "" {
            log.Fatalf("expected a username but got nothing")
        }

        return username
    }

    go func() {
        log.Fatal(http.ListenAndServe(":8080", server))
    }()

    return server
}

func startCron(server *neffos.Server) {
    c := cron.New()
    _, err := c.AddFunc("@every "+every, func() {
        if atomic.LoadUint32(&received) == messagesCount {
            c.Stop()
            server.Close()
            return
        }

        pushNotifications(server)
    })

    if err != nil {
        log.Fatal(err)
    }

    c.Start()
}

func pushNotifications(server *neffos.Server) {
    server.Broadcast(nil, message)
}

var dialer = gorilla.Dialer(&gorilla.Options{}, http.Header{"X-Username": []string{clientConnID}})

var connectedN uint32

func startClient(wg *sync.WaitGroup, wgClose *sync.WaitGroup) {
    client, err := neffos.Dial(nil, dialer, "ws://localhost:8080", clientEvents)
    if err != nil {
        log.Fatal(err)
    }

    _, err = client.Connect(nil, namespace)
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Total Connected: %d", atomic.AddUint32(&connectedN, 1))
    wg.Done()

    <-client.NotifyClose

    wgClose.Done()
}
majidbigdeli commented 5 years ago

@kataras it is not my case . you send message every 2 seconds.

please test this . send message non stop

var message = neffos.Message{
    To:        userId,
    Namespace: namespace,
    Event:     clientEvent,
    Body: neffos.Marshal(notification{
        Title:   "master notification",
        Subject: "master",
        Message: "a notification message",
    }),
}

    for i := 0; i < userCount; i++ {
        server.Broadcast(nil, message)
    }

You will see that User 1 receive Messages and other Users cannot receive Messages. and other issue

kastmgnru commented 5 years ago

I have the same problem. If you send two messages at once, the client will receive only the last:

server.Broadcast(nil, websocket.Message{To: connId, IsNative: true, Body: []byte("test;huest")}) server.Broadcast(nil, websocket.Message{To: connId, IsNative: true, Body: []byte("test2;huest2")})

Only one of these messages will be received. image

kataras commented 5 years ago

Now I see, you mean simultaneously @majidbigdeli @kastmgnru in the connection, I though the problem was sending to the same ID over different sockets in a period, that's why I tested that case. Now it's much more clear, although the writer has a mutex, expect a fix soon to https://github.com/kataras/neffos.

kataras commented 5 years ago

OK the problem is not appearing when broadcasting using redis or nats, so the issue is inside the local single-machine broadcaster channel. I found something but need to investigate it more, expect a fix until tomorrow.

kataras commented 5 years ago

@majidbigdeli you have three ways to fix that now:

Update to neffos v0.0.10 and set server.SyncBroadcaster = true before server start and you should be fine, if not please comment below.

Thanks a lot!

majidbigdeli commented 5 years ago

@kataras thank you . I'm testing it tonight.

majidbigdeli commented 5 years ago

@kataras Thanks a lot! yes . server.SyncBroadcaster = true it is fine in my test .

I have a question. Why server.Broadcast is OK with server.UseStackExchange but server.Broadcast is not OK without server.UseStackExchange? I've read your code. But I not found problem, Did you find a problem?

@kataras again thank you .

kataras commented 5 years ago

You are welcome @majidbigdeli !!

Yes I know the problem and fixed it. The problem was that the message changed before fire the publisher on the next broadcast call, it always get the last (or almost the last if many calls in the same time). But to answer your question why when use redis or nats this does not raise - it's because I made the API for a StackExchange to manage Broadcast messages by their own for better performance, so these messages are directly handled by redis or nats and they work like our local SyncBroadcaster = true already.

There are cases that you don't need to sync broadcasters(most common I think) when call Broadcast in the same time and just broadcast the last valid message, that's the default behavior.