czasg / pywss

一个轻量级的 Python Web 框架
https://czasg.github.io/pywss/
MIT License
96 stars 18 forks source link

结合心跳机制的异步操作 #5

Closed doubleLLL3 closed 2 years ago

doubleLLL3 commented 4 years ago

您的异步操作非常及时!不过我最近加了一个心跳机制,就是因为我爬虫的过程有时需要很长时间,而中间前后端没有通信,我发现每过15分钟通信就会自动断开。

然而加的心跳机制在爬虫运行过程中时触发不了的,所以考虑用您的异步操作,我的路径只有一条,我将它设为异步路径,然后在爬虫的函数前加入await,还是没有达到想要的效果,爬虫过程中并不会触发心跳。 代码大致如下,请指教:

@route('/view_tool')
async def start_crawler(request, args):
    if args == "ping":  # 如果传过来的是心跳
        print("peng!")
        return "pong"
    else:  # 否则开始爬虫
        await f(爬虫函数)

还有一个问题,就是您的新版本里没有之前的DaemonMiddleware中间件了,用什么代替呢?如果您的新版本再加一点说明就更好了,线程那部分也希望有更具体的例子,对新手更友好... 支持!

czasg commented 4 years ago

1、心跳这一块我确实没有考虑到。 15min断开连接,谢谢你的提示,我会好好研究下的。 能不能用radio作为一个心跳呢,我需要尝试一下,当然你这种感觉也是可以的。

2、异步模块的,爬虫不会执行 我大概看出来,可能你使用的方法不当。 f(爬虫函数) --- 此处需要一个可等待对象,或者未来对象, 是需要将原来的爬虫进行重构的。 这个等我给你写几个demo

3、DaemonMiddleware 所有的中间件模块,只保留了radio...... SOR! DaemonMiddleware中间件确实很有意义,我想想如何解决

4、补充demo和readme

czasg commented 4 years ago

2、爬虫案列:(可以参考着改,但是不要用aiohttp,目前发现了个bug)

from pywss import AsyncPyws

ws = AsyncPyws(__name__)

def spider_by_requests():  # 此处替换 f(爬虫函数) 
    import requests
    response = requests.get('http://www.baidu.com')
    return response.text

@ws.route('/test')
async def test(request, data):
    future = request._loop.run_in_executor(None, spider_by_requests)
    return await future

if __name__ == '__main__':
    ws.serve_forever()
czasg commented 4 years ago

1、心跳机制+重试机制 我觉得这一块和网络因素有很大的关系,可以加入一些必要的重试和心跳 前端js:

class WebSocketClient{
    constructor(uri=null, host='127.0.0.1', port=8866, path='/test', max_retry_times=5,
                retry_timeout=8000, heartbeatInterval=10000){
        if (!uri){
            this.uri = `ws://${host}:${port}${path}`
        } else {
            this.uri = uri
        }
        this.ws = null
        this.max_retry_times = max_retry_times
        this.retry_timeout = retry_timeout
        this.heartbeatInterval = heartbeatInterval
        this.retry_times = max_retry_times
        window.wsClient = this
    }

    wsState(){
        return this.ws.readyState
    }

    onmessage(data) { console.log(JSON.parse(data)) }
    onclose(ev) { console.log('Connect Closed') }
    onopen() { this.ws.send('hello, pywss') }

    retryConnect(){
        this.ws = new WebSocket(this.uri)
        this.ws.onmessage = (ev) => {
            this.onmessage(ev.data);
        }
        this.ws.onclose = (ev) => {
            if (this.retry_times > 0){
                console.log(`waiting ${this.retry_timeout} ms before retry`)
                this.retry_times -= 1
                setTimeout(retryFunc, this.retry_timeout)
            }
        }
        this.ws.onopen = () => {
            if (this.wsState() === WebSocket.OPEN) {
                this.retry_times = this.max_retry_times
                this.onopen()
            }
        }
    }

    heartbeat(){
        if (this.wsState() === WebSocket.OPEN) {
            this.ws.send('ping')
        }
    }

    startHeartbeat(){
        setInterval(heartbeatFunc, this.heartbeatInterval)
    }

    safe_start(){
        this.retryConnect()
        this.startHeartbeat()
    }
}

function retryFunc(){
    if (window.wsClient) {
        window.wsClient.retryConnect()
    }
}

function heartbeatFunc(){
    if (window.wsClient) {
        window.wsClient.heartbeat()
    }
}

ws = new WebSocketClient('ws://127.0.0.1:8866/test')
ws.safe_start()

后端:

from pywss import AsyncPyws

ws = AsyncPyws(__name__)

def spider_by_requests():
    import requests
    response = requests.get('http://www.baidu.com')
    return response.text

@ws.route('/test')
async def test(request, data):
    if data == "ping": return "pong"
    future = request._loop.run_in_executor(None, spider_by_requests)
    return await future

if __name__ == '__main__':
    ws.serve_forever()
czasg commented 4 years ago

3、DaemonMiddleware 这一块我打算添加一个 before_first_request 装饰器,表示第一次建立连接后发送的数据,会走这里。 每个连接仅走一次这个流程。

具体我需要测试一下。

czasg commented 4 years ago

4、demo和readme 我发现了一个很大的问题,就是一旦我有新的idea,,, 我就会想着删掉以前觉得不舒服的代码,然后重写新的 没有考虑一个迭代的情况,,

demo和readme我得想想怎么写,最近复工期,有点懵

doubleLLL3 commented 4 years ago

谢谢您这么详细的回答。

  1. 关于您写的心跳+重试机制,是否可以在每次接收到消息可以重置一下心跳,这样可以减少不必要的通信,然后我这里前端js推荐一个重连库,如果您需要的话:reconnecting-websocket-可调参数包括重连次数、间隔、调试信息、自动连接等。
  1. 根据您的爬虫结合异步的Demo,修改了我的代码,就是我想要的效果!之前不太明白异步的操作,就是东拼西凑的写法,见谅..这个很神奇,我有空再好好学一遍。

期待您的DaemonMiddleware复工,再次感谢您的热情!祝工作顺利!

czasg commented 4 years ago

你太客气了,大家一起相互学习,我也工作不久,很多都懵逼。

1、心跳机制,其实我很多也不懂,,不过我下版本把默认回复去掉了,return 为空则不会返回数据。

2、js是我遇到过最困难的语言,他的this机制有点搞事情,总指不到正确的对象 O.O 这个js三方库感觉很强

3、radio设计为一个阻塞队列,只有当存在连接时才回启动,是有部分消耗。 我倒是很好奇那些大框架是怎么设计心跳的,我也好模仿学习下,,比如rabbitmq,可惜看不懂erlang...

4、异步操作推荐看下asyncio的官方文档,可以大概了解部分编写方法。

异步的本质,是内部挂载了一个消息循环,每当触发可读或者可写事件时,会进行相应的回调操作。

这里之所以用run_in_executor,是因为我对原生的消息循环loop打了一个websocket的补丁,结果导致不能用aiohttp了,发生冲突(我发现我总喜欢搞些野路子,,)

而你说的run_inexecutor线程消耗,大可不必担心。你可以参考concurrent这个原生包,他在内部维护了一个线程池,所以不会有大量创建与销户线程的开销。 (这个包写的非常漂亮,推荐看线程池部分,进程池的看不懂(==))

5、这两天应该会再发一个版本,总体感觉异步版本确实比线程版本友好一些,没有大量频繁创建与销毁线程的操作。