miguelgrinberg / Flask-SocketIO

Socket.IO integration for Flask applications.
MIT License
5.37k stars 892 forks source link

Trying to implement an atmosphere server #654

Closed evh3rd closed 6 years ago

evh3rd commented 6 years ago

Hi,

I am writing a simulator for a server to support load testing of a client that consumes data from the real server. The actual system uses https to poll for data (this works fine with the @app.route decorator from Flask) but I have not been able to get the Atmosphere async.io notification simulator to work.

I have an @app.route endpoint to perform the upgrade handshake as in RFC6455 (with Atmosphere-specific query parameters and that seems to work but Wireshark is telling me that after the response (which the client accepts), my sim is sending TCP [ACK/FIN] to close the socket, I suspect that I am missing the point -- that something built in is suppose to handle the upgrade, but I have not been able to find the documentation for it.

Here is some relevant code:

import eventlet
eventlet.monkey_patch()

from flask import Flask, request, render_template, session, Response
from flask_socketio import SocketIO, emit

app = Flask(__name__)
app.secret_key = "Shh! It's a secret"
app.config['SECRET_KEY'] = "Shh! It's a secret"
socketio = SocketIO(app, async_mode='eventlet', ping_timeout=5, logger=True, engineio_logger=True)

@app.route("/restconf/streams/v1/inventory.xml", methods=[ 'GET' ])
def inventory_subscription_request():
    """
    Handle a client notification subscription request
    """
    global client_connected, key_digest, atmos_ver, atmos_id

    print "GET from steam inventory.xml"
    print "headers: ", request.headers
    print "args: ", request.args
    print "data:", request.data
    resp = Response('\r\n')
    resp.status = "Switching Protocols"
    resp.status_code = 101
    resp.headers['Connection'] = 'Upgrade'
    resp.headers['Upgrade'] = 'websocket'
    resp.headers['Set-Cookie'] = 'JSESSIONID=BE05E3E11372EEDFF007FD7597D5C770; Path=/restconf/; HttpOnly'
    if request.headers.has_key('Sec-Websocket-Key'):
        key = request.headers['Sec-Websocket-Key'].encode('ascii') + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'.encode('ascii')
        key_digest = base64.b64encode(hashlib.sha1(key).digest()).decode('ascii')
    resp.headers['Sec-WebSocket-Accept'] = key_digest

    return resp

def notify():
    """
    Simulate notifications from captured data
    """
    data_dir =(os.getenv("DATA_DIR", "/app/data"))
    notify_interval=int(os.getenv("NOTIFY_INTERVAL", "5"))
    print "Starting notify thread: data at {}, interval = {} seconds".format(data_dir, notify_interval)
    while True:
        print "Sending notifications"
        for type in notify_types:
            for obj in notify_objs:
                fname = "{}/notifications/{}-{}-notif.json".format(data_dir, obj, type)
                if fname != "{}/notifications/{}-{}-notif.json".format(data_dir, 'vc-mlt', 'delete'):
                    try:
                        print 'notification from {}'.format(fname)
                        f = open(abspath(fname), "r")
                        j = json.load(f)
                        print 'notification: {}'.format(json.dumps(j))
                        socketio.emit('message', j)
                        eventlet.sleep(notify_interval)
                    except IOError as e:
                        print e.message

eventlet.spawn(notify)

The notify task runs and reports that it emitting the simulated notifications, but the client is not listening because the socket disconnects.

I run the app using eventlet in gunicorn, as your excellent post on "Running your Flask application over HTTPS" suggests.

Please tell me what I am missing.

Regards, Ed

miguelgrinberg commented 6 years ago

What is the purpose of the websocket handshake in your /restconf/streams/v1/inventory.xml route?

Do you want to use this extension, or do you want to use your own websocket server? If you want to use SocketIO, then you are going at it the wrong way, I suggest you take a look at the examples in this repository. In particular, the Websocket handshake does not need to be done by the application.

evh3rd commented 6 years ago

Hi,

Thanks for your quick response. After reading your suggestion, I tried letting SocketIO handle the connection. The server code now looks like this:



import eventlet
eventlet.monkey_patch()

from flask import Flask, request, render_template, session, Response
from flask_socketio import SocketIO, emit

app = Flask(__name__)
app.secret_key = "Shh! It's a secret"
app.config['SECRET_KEY'] = "Shh! It's a secret"
sio = SocketIO(app, resource='/restconf/streams/v1/inventory.xml', async_mode='eventlet')
#
# tried this, too
#
# sio = SocketIO(app, path='/restconf/streams/v1/inventory.xml', async_mode='eventlet')

# @app.route stuff that works fine removed...

def notify():
    """
    Simulate notifications from captured data
    """
    data_dir =(os.getenv("DATA_DIR", "/app/data"))
    notify_interval=int(os.getenv("NOTIFY_INTERVAL", "5"))
    print "Starting notify thread: data at {}, interval = {} seconds".format(data_dir, notify_interval)
    while True:
        print "Sending notifications"
        for type in notify_types:
            for obj in notify_objs:
                fname = "{}/notifications/{}-{}-notif.json".format(data_dir, obj, type)
                if fname != "{}/notifications/{}-{}-notif.json".format(data_dir, 'vc-mlt', 'delete'):
                    try:
                        print 'notification from {}'.format(fname)
                        f = open(abspath(fname), "r")
                        j = json.load(f)
                        print 'notification: {}'.format(json.dumps(j))
                        sio.emit('message', j)
                        eventlet.sleep(notify_interval)
                    except IOError as e:
                        print e.message

@sio.on('connect')
def on_connect():
    """
    Handle connection from client (registration)
    Framework handles the RFC 6455 Websocket handshake(?)
    """
    print "Connection received"
    print "arguments: {}".format(request.args)
    print "headers: {}".format(request.headers)
    return True

# @app.route("/restconf/streams/v1/inventory.xml", methods=[ 'GET' ])
def inventory_subscription_request():
    """
    DEAD CODE: Handle a client notification subscription request
    """
    global client_connected, key_digest, atmos_ver, atmos_id

    print "GET from steam inventory.xml"
    print "headers: ", request.headers
    print "args: ", request.args
    print "data:", request.data
    resp = Response('\r\n')
    resp.status = "Switching Protocols"
    resp.status_code = 101
    resp.headers['Connection'] = 'Upgrade'
    resp.headers['Upgrade'] = 'websocket'
    resp.headers['Set-Cookie'] = 'JSESSIONID=BE05E3E11372EEDFF007FD7597D5C770; Path=/restconf/; HttpOnly'
    if request.headers.has_key('Sec-Websocket-Key'):
        key = request.headers['Sec-Websocket-Key'].encode('ascii') + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'.encode('ascii')
        key_digest = base64.b64encode(hashlib.sha1(key).digest()).decode('ascii')
    resp.headers['Sec-WebSocket-Accept'] = key_digest

    return resp

eventlet.spawn(notify)

```python
evh3rd commented 6 years ago

I ran this will gunicorn as before. All my @app.endpoints work, but the sio.on('connect') decorator never triggers the on_connect method below it.

From the (Java) application trying to connect:

Connecting to http://10.156.20.241/restconf/streams/v1/inventory ... 2018-03-09 00:15:25 TRACE DefaultSocket:157 - WebSocket Connect Timeout 9223372036854775807 2018-03-09 00:15:25 DEBUG NettyConnectListener:68 - Using non-cached Channel [id: 0xfb0f2385, /172.20.117.143:57200 => /10.156.20.241:80] for GET '/restconf/streams/v1/inventory.xml?X-Atmosphere-Transport=websocket&X-Atmosphere-Framework=2.3.0&X-atmo-protocol=true&X-Atmosphere-tracking-id=0' 2018-03-09 00:15:25 TRACE WebSocketTransport:202 - Status received com.ning.http.client.providers.netty.response.NettyResponseStatus@27717259 2018-03-09 00:15:25 DEBUG WebSocketTransport:207 - Invalid status code 404 for WebSocket Handshake 2018-03-09 00:15:25 DEBUG Processor:143 - Unexpected I/O exception on channel [id: 0xfb0f2385, /172.20.117.143:57200 => /10.156.20.241:80] Connection Error 404 : NOT FOUND

Wireshark confirms: image

What am I doing wrong? Ed

miguelgrinberg commented 6 years ago

Are you using the Java SocketIO client to connect?

evh3rd commented 6 years ago

No, I am using code derived from the Atmosphere wasync client (https://github.com/Atmosphere/wasync). This in turn runs on top of Async HTTP client (https://asynchttpclient.github.io/async-http-client/) The server that I am simulating uses Jeanfrancois Arcand's Atmosphere server (https://github.com/Atmosphere/atmosphere)

The packet I showed in the Wireshark capture is the one that is rejected (404) by my flask_SocketIO() based simulation of the Atmosphere server instead of returning a 101 upgrade code. The @app.route code I provided earlier makes the handshake correctly, but does not actually upgrade to a websocket because Flask doesn't do that (of course). My problem is that the Flask_SocketIO version does not do the handshake. I saw information about the path (earlier resource) attribute to python-socketio and I was hoping that if I provided the URL to SocketIO in the attribute it would receive the connection request at that URL, but apparently that is not happening.

Thanks, Ed

miguelgrinberg commented 6 years ago

Ed, you are confusing WebSocket with SocketIO. These are not compatible. Your server is SocketIO, you need to use a SocketIO client to connect to it.

evh3rd commented 6 years ago

Hi Miguel, Thanks for your feedback. I was confused about WebSocket and SocketIO, which I thought was a name for library (as in Unix socket library) and not the name of a protocol. I used https://github.com/kennethreitz/flask-sockets, but was not able to find a way to configure the maximum frame length. (WebSocket support breaking large messages up into segments with continuation frames and a FIN on the last packet.) I wound up writing my own fragmentation code and now my python-based simulator is emulating our production WebSocket server. Thanks for your help, all the excellent code, and instructive blogs, Ed