streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

Publish from GoLang, and Subscribe from Javascript (Websockets) #418

Closed KabDeveloper closed 5 years ago

KabDeveloper commented 5 years ago

HI

I am subscribing to a topic from Javascript using this library: https://www.eclipse.org/paho/clients/js/

Here the code used:


    <script>
        var has_had_focus = false;
        var pipe = function(el_name, send) {
            var div  = $(el_name + ' div');
            var inp  = $(el_name + ' input');
            var form = $(el_name + ' form');

            var print = function(m, p) {
                p = (p === undefined) ? '' : JSON.stringify(p);
                div.append($("<code>").text(m + ' ' + p));
                div.scrollTop(div.scrollTop() + 10000);
            };

            if (send) {
                form.submit(function() {
                    send(inp.val());
                    inp.val('');
                    return false;
                });
            }
            return print;
        };

        var print_first = pipe('#first', function(data) {
            message = new Paho.MQTT.Message(data);
            message.destinationName = "logs";
            debug("SEND ON " + message.destinationName + " PAYLOAD " + data);
            client.send(message);
        });

        var debug = pipe('#second');

        var wsbroker = location.hostname;  //mqtt websocket enabled broker
        var wsport = 15675; // port for above

        var client = new Paho.MQTT.Client(wsbroker, wsport, "/ws",
            "myclientid_" + parseInt(Math.random() * 100, 10));

        client.onConnectionLost = function (responseObject) {
            debug("CONNECTION LOST - " + responseObject.errorMessage);
        };

        client.onMessageArrived = function (message) {
            debug("RECEIVE ON " + message.destinationName + " PAYLOAD " + message.payloadString);
            print_first(message.payloadString);
        };

        var options = {
            timeout: 3,
            onSuccess: function () {
                debug("CONNECTION SUCCESS");
                client.subscribe('logs', {qos: 1});
            },
            onFailure: function (message) {
                debug("CONNECTION FAILURE - " + message.errorMessage);
            }
        };

        if (location.protocol == "https:") {
            options.useSSL = true;
        }

        debug("CONNECT TO " + wsbroker + ":" + wsport);
        client.connect(options);

        $('#first input').focus(function() {
            if (!has_had_focus) {
                has_had_focus = true;
                $(this).val("");
            }
        });
    </script>

Example taken from here: https://github.com/rabbitmq/rabbitmq-web-mqtt-examples/blob/master/priv/echo.html

And here what I used as code in Golang to publish message to the browser (Same PubSUb example included in your git):


// Command pubsub is an example of a fanout exchange with dynamic reliable
// membership, reading from stdin, writing to stdout.
//
// This example shows how to implement reconnect logic independent from a
// publish/subscribe loop with bridges to application types.

package main

import (
    "bufio"
    "crypto/sha1"
    "flag"
    "fmt"
    "io"
    "log"
    "os"

    "github.com/streadway/amqp"
    "golang.org/x/net/context"
)

var url = flag.String("url", "amqp://admin:password@127.0.0.1:5672/ws", "AMQP URI")

// exchange binds the publishers to the subscribers
const exchange = "logs"

// message is the application type for a message.  This can contain identity,
// or a reference to the recevier chan for further demuxing.
type message []byte

// session composes an amqp.Connection with an amqp.Channel
type session struct {
    *amqp.Connection
    *amqp.Channel
}

// Close tears the connection down, taking the channel with it.
func (s session) Close() error {
    if s.Connection == nil {
        return nil
    }
    return s.Connection.Close()
}

// redial continually connects to the URL, exiting the program when no longer possible
func redial(ctx context.Context, url string) chan chan session {
    sessions := make(chan chan session)

    go func() {
        sess := make(chan session)
        defer close(sessions)

        for {
            select {
            case sessions <- sess:
            case <-ctx.Done():
                log.Println("shutting down session factory")
                return
            }

            conn, err := amqp.Dial(url)
            if err != nil {
                log.Fatalf("cannot (re)dial: %v: %q", err, url)
            }

            ch, err := conn.Channel()
            if err != nil {
                log.Fatalf("cannot create channel: %v", err)
            }

            if err := ch.ExchangeDeclare(exchange, "fanout", false, true, false, false, nil); err != nil {
                log.Fatalf("cannot declare fanout exchange: %v", err)
            }

            select {
            case sess <- session{conn, ch}:
            case <-ctx.Done():
                log.Println("shutting down new session")
                return
            }
        }
    }()

    return sessions
}

// publish publishes messages to a reconnecting session to a fanout exchange.
// It receives from the application specific source of messages.
func publish(sessions chan chan session, messages <-chan message) {
    for session := range sessions {
        var (
            running bool
            reading = messages
            pending = make(chan message, 1)
            confirm = make(chan amqp.Confirmation, 1)
        )

        pub := <-session

        // publisher confirms for this channel/connection
        if err := pub.Confirm(false); err != nil {
            log.Printf("publisher confirms not supported")
            close(confirm) // confirms not supported, simulate by always nacking
        } else {
            pub.NotifyPublish(confirm)
        }

        log.Printf("publishing...")

    Publish:
        for {
            var body message
            select {
            case confirmed, ok := <-confirm:
                if !ok {
                    break Publish
                }
                if !confirmed.Ack {
                    log.Printf("nack message %d, body: %q", confirmed.DeliveryTag, string(body))
                }
                reading = messages

            case body = <-pending:
                //routingKey := "ignored for fanout exchanges, application dependent for other exchanges"
                /*err := pub.Publish(exchange, routingKey, false, false, amqp.Publishing{
                    Body: body,
                })*/

                err := pub.Publish("logs", "topic", false, false, amqp.Publishing{
                    Body: body,
                })
                // Retry failed delivery on the next session
                if err != nil {
                    pending <- body
                    pub.Close()
                    break Publish
                }

            case body, running = <-reading:
                // all messages consumed
                if !running {
                    return
                }
                // work on pending delivery until ack'd
                pending <- body
                reading = nil
            }
        }
    }
}

// identity returns the same host/process unique string for the lifetime of
// this process so that subscriber reconnections reuse the same queue name.
func identity() string {
    hostname, err := os.Hostname()
    h := sha1.New()
    fmt.Fprint(h, hostname)
    fmt.Fprint(h, err)
    fmt.Fprint(h, os.Getpid())
    return fmt.Sprintf("%x", h.Sum(nil))
}

// subscribe consumes deliveries from an exclusive queue from a fanout exchange and sends to the application specific messages chan.
func subscribe(sessions chan chan session, messages chan<- message) {
    queue := identity()

    for session := range sessions {
        sub := <-session

        if _, err := sub.QueueDeclare(queue, false, true, true, false, nil); err != nil {
            log.Printf("cannot consume from exclusive queue: %q, %v", queue, err)
            return
        }

        routingKey := "application specific routing key for fancy toplogies"
        if err := sub.QueueBind(queue, routingKey, exchange, false, nil); err != nil {
            log.Printf("cannot consume without a binding to exchange: %q, %v", exchange, err)
            return
        }

        deliveries, err := sub.Consume(queue, "", false, true, false, false, nil)
        if err != nil {
            log.Printf("cannot consume from: %q, %v", queue, err)
            return
        }

        log.Printf("subscribed...")

        for msg := range deliveries {
            messages <- message(msg.Body)
            sub.Ack(msg.DeliveryTag, false)
        }
    }
}

// read is this application's translation to the message format, scanning from
// stdin.
func read(r io.Reader) <-chan message {
    lines := make(chan message)
    go func() {
        defer close(lines)
        scan := bufio.NewScanner(r)
        for scan.Scan() {
            lines <- message(scan.Bytes())
        }
    }()
    return lines
}

// write is this application's subscriber of application messages, printing to
// stdout.
func write(w io.Writer) chan<- message {
    lines := make(chan message)
    go func() {
        for line := range lines {
            fmt.Fprintln(w, string(line))
        }
    }()
    return lines
}

func main() {
    flag.Parse()

    ctx, done := context.WithCancel(context.Background())

    go func() {
        publish(redial(ctx, *url), read(os.Stdin))
        done()
    }()

    go func() {
        subscribe(redial(ctx, *url), write(os.Stdout))
        done()
    }()

    <-ctx.Done()
}

You can notice that when I use: var url = flag.String("url", "amqp://admin:password@127.0.0.1:15675/ws", "AMQP URI")

Instead of: var url = flag.String("url", "amqp://admin:password@127.0.0.1:5672/ws", "AMQP URI")

This error is returned:


2019/09/30 13:18:37 cannot (re)dial: Exception (501) Reason: "EOF": "amqp://admin:password@127.0.0.1:15675/ws"
exit status 1

The message is received in golang application, but not in the browser from the Javascript example.

Do you have an idea on how to solve this problem and give me another approach if I am doing wrong ?

Thank you

michaelklishin commented 5 years ago

You cannot consume from a Web MQTT endpoint using this client as it implements a different protocol. I'd recommend putting together an AMQP 0-9-1 and MQTT interop example that does not involve WebSockets first.

RabbitMQ MQTT plugin uses a predeclared topic exchange by default. So can your AMQP 0-9-1 clients. Beware of the topic segment separator differences between the protocols.