crossbario / autobahn-js

WAMP in JavaScript for Browsers and NodeJS
http://crossbar.io/autobahn
MIT License
1.43k stars 228 forks source link

Process responses asynchronously #542

Closed ondrej-vipo closed 3 years ago

ondrej-vipo commented 3 years ago

Hello,

I have a use case when I need to 1 - subscribe realtime data and 2 - occausanally call function that returns large amount of data (few MBs) in web browser.

The problem is that while waiting for the large data, the realtime data are not received, as this seems to be happening synchronously. Is there any way to get this asynchronously? (Is this a bug?)

There seems to be another problem that the large data is received very slowly (and it keeps getting slower - I have opened issue #543 for this).

I have created minimal example. I have the thruway server that in the loop returns counter (1 response per second) and provides function that return big string (the server also responds with current time).

The client is using autobahn-browser. It is subscribed to the realtime data and every 10s calls function that return large data (in setTimeout). The client then prints the time (offset) how long took the realtime data (couter) to be received. You can see in the output that after calling getlargedata function the realtime data are received after a delay (because it is waiting for getlargedata data)

Output:

(index):17 Received counter at  1616504823  with offset  0  data =  {time: 1616504823, data: 1}
(index):17 Received counter at  1616504824  with offset  0  data =  {time: 1616504824, data: 2}
(index):17 Received counter at  1616504825  with offset  0  data =  {time: 1616504825, data: 3}
(index):17 Received counter at  1616504826  with offset  0  data =  {time: 1616504826, data: 4}

[ ... ]

(index):17 Received counter at  1616504832  with offset  0  data =  {time: 1616504832, data: 10}
(index):27 calling getlargedata ...
(index):17 Received counter at  1616504834  with offset  1  data =  {time: 1616504833, data: 11}
(index):17 Received counter at  1616504834  with offset  0  data =  {time: 1616504834, data: 12}

[ ... ]

(index):27 calling getlargedata ...
(index):17 Received counter at  1616504849  with offset  4  data =  {time: 1616504845, data: 22}
(index):17 Received counter at  1616504849  with offset  3  data =  {time: 1616504846, data: 23}

[ ... ]

(index):17 Received counter at  1616504859  with offset  0  data =  {time: 1616504859, data: 36}
(index):27 calling getlargedata ...
(index):17 Received counter at  1616504868  with offset  8  data =  {time: 1616504860, data: 37}
(index):17 Received counter at  1616504868  with offset  7  data =  {time: 1616504861, data: 38}
(index):17 Received counter at  1616504868  with offset  6  data =  {time: 1616504862, data: 39}
(index):17 Received counter at  1616504868  with offset  5  data =  {time: 1616504863, data: 40}

Please, let me know if more info is needed.

Client source:

<!DOCTYPE html>
<html lang="en">
    <head>
        <title>Autobahn test</title>
        <meta charset="utf-8">
        <script src="autobahn.js"></script> <!-- autobahn-browser@20.9.2 -->
    </head>
    <body>
        <script>
            document.write('Open console!');
            (function(){
                var timer = null;
                var session = null;

                var on_counter = function(ev, data){
                    var now = Math.trunc((new Date).getTime()/1000);
                    console.info('Received counter at ', now, ' with offset ', now - data.time, ' data = ', data);
                };

                var connection = new autobahn.Connection({
                    url: "ws://127.0.0.1:9990",
                    realm: 'realm1'
                }); 

                var getLargeData = function(){
                    timer = setTimeout(function(){
                        console.info('calling getlargedata ...');
                        session.call('com.example.getlargedata', []).then(function(data){
                            // console output commeted out - to make sure that printing large data do not affect this test
                            //var now = Math.trunc((new Date).getTime()/1000);
                            //console.info('Received lrgdata at ', now, ' with offset ', now - data.time, ' data = ', data);
                            getLargeData(); // lets get large data again in 10s;
                        });
                    }, 10000);
                }

                connection.onclose = function(){
                    console.error('connection closed');
                    session = null;
                    if (timer !== null){
                        clearTimeout(timer);
                        timer = null;
                    }
                }

                connection.onopen = function (sess, details) {
                    console.log("Connected");

                    session = sess;
                    session.subscribe('com.example.oncounter', on_counter).then(
                        function (sub) {
                            console.log('subscribed to counter');
                        },
                        function (err) {
                            console.log('failed to subscribe to counter', err);
                        }
                    );
                    getLargeData();
                }
                connection.open();
            }());
        </script>
    </body>
</html>

I can provide server source code if needed.

oberstet commented 3 years ago

not related to autobahn. if you block the single thread running the event loop, you have a problem. use threads or non-blockcing code

ondrej-vipo commented 3 years ago

@oberstet I can confirm in wireshark that server responds quickly in non-blocking manner. You can also see it in the output - the server adds the current time when the response is sent - and that do not match the time when the reponse is received.

Also it is not network related problem as both server and the client are on same machine.

ondrej-vipo commented 3 years ago

@oberstet I have also managed to replicate the problem in crossbar.io.

Server:

from autobahn.twisted.component import Component, run
from autobahn.twisted.util import sleep
from twisted.internet.defer import inlineCallbacks
import os
import argparse
from time import time 

url = os.environ.get('CBURL', 'ws://localhost:8080/ws')
realmv = os.environ.get('CBREALM', 'realm1')
topic = os.environ.get('CBTOPIC', 'com.example.oncounter')
print(url, realmv)
component = Component(transports=url, realm=realmv)

@component.on_join
@inlineCallbacks
def joined(session, details):
    print("session ready")
    counter = 0
    largedata = "START" + "This is a test string"*300000 + "END";

    def getlargedata():
        print("Sending large data...");
        return {"time":(int)(time()), "data": largedata}

    try:
        yield session.register(getlargedata, 'com.example.getlargedata')
        print("procedure registered")
    except Exception as e:
        print("could not register procedure: {0}".format(e)) 

    while True:
        # publish() only returns a Deferred if we asked for an acknowledgement
        print("Puslishing ...");
        session.publish(topic, {"time":(int)(time()),"data":counter})
        counter += 1
        yield sleep(1)

if __name__ == "__main__":
    run([component])       

Client:

<!DOCTYPE html>
<html lang="en">
    <head>
        <title>Autobahn test</title>
        <meta charset="utf-8">
        <script src="autobahn.js"></script> <!-- autobahn-browser@20.9.2 -->
    </head>
    <body>
        <script>
            document.write('Open console!');
            (function(){
                var timer = null;
                var session = null;

                var on_counter = function(data){
                    var now = Math.trunc((new Date).getTime()/1000);
                    console.info('Received counter at ', now, ' with offset ', now - data[0].time, ' data = ', data[0]);
                };

                var connection = new autobahn.Connection({
                    url: "ws://127.0.0.1:8080/ws",
                    realm: 'realm1'
                }); 

                var getLargeData = function(){
                    timer = setTimeout(function(){
                        console.info('calling getlargedata ...');
                        session.call('com.example.getlargedata', []).then(function(data){
                            // console output commeted out - to make sure that printing large data do not affect this test
                            //var now = Math.trunc((new Date).getTime()/1000);
                            //console.info('Received lrgdata at ', now, ' with offset ', now - data.time, ' data = ', data);
                            getLargeData(); // lets get large data again in 10s;
                        });
                    }, 10000);
                }

                connection.onclose = function(){
                    console.error('connection closed');
                    session = null;
                    if (timer !== null){
                        clearTimeout(timer);
                        timer = null;
                    }
                }

                connection.onopen = function (sess, details) {
                    console.log("Connected");

                    session = sess;
                    session.subscribe('com.example.oncounter', on_counter).then(
                        function (sub) {
                            console.log('subscribed to counter');
                        },
                        function (err) {
                            console.log('failed to subscribe to counter', err);
                        }
                    );
                    getLargeData();
                }
                connection.open();
            }());
        </script>
    </body>
</html>