fcfangcc / pyxxl

XXL-JOB的Python执行器实现,可以方便的将Python方法注册到XXL-JOB的调度中心上进行管理
https://fcfangcc.github.io/pyxxl/
GNU General Public License v3.0
60 stars 16 forks source link

执行完任务后,就掉线了,然后后续就不会再重试链接上服务端,导致服务端调度失败 #36

Closed dislazy closed 1 year ago

dislazy commented 1 year ago

2023-11-02 13:37:08.362 [MainThread] [170428] WARNING /usr/local/lib/python3.9/site-packages/pyxxl/executor.py(_run:237) - Traceback (most recent call last): File "/app/main.py", line 21, in test_task await asyncio.sleep(1) File "/usr/local/lib/python3.9/asyncio/tasks.py", line 652, in sleep return await future asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/usr/local/lib/python3.9/asyncio/tasks.py", line 490, in wait_for return fut.result() asyncio.exceptions.CancelledError

fcfangcc commented 1 year ago

完整或者相对完整的代码发一下? File "/app/main.py", line 21, in test_task await asyncio.sleep(1)

dislazy commented 1 year ago

main

import asyncio from pyxxl import ExecutorConfig, PyxxlRunner from config import config

xxl_config = ExecutorConfig( xxl_admin_baseurl=config.XXL_JOB_ADDRESS, executor_app_name=config.XXL_JOB_APP_NAME, )

app = PyxxlRunner(xxl_config)

@app.register(name="abc") async def test_task(): xxxx

模拟异步操作

await asyncio.sleep(1)
return "成功..."

app.run_executor()

def main():

进入无限循环,保持持续运行的状态

while True:
    pass

if name == 'main': main()

fcfangcc commented 1 year ago

我自己无法复线你的问题,这个代码用代码格式输入。这样纯文本看不清楚

fcfangcc commented 1 year ago

xxl_config = ExecutorConfig(
xxl_admin_baseurl=config.XXL_JOB_ADDRESS,
executor_app_name=config.XXL_JOB_APP_NAME,
debug=True
)```
看下输出的日志
dislazy commented 1 year ago

好的 我晚点再试试 这个正好在跑任务

dislazy commented 1 year ago

image 现在大概是这样的情况: 1、我的任务是1分钟执行一次 2、任务本身执行耗时挺长的 3、当第一次执行成功了,耗时挺长的任务在执行,然后xxl-job调度的时候就直接失败了 4、执行完成功的那个方法后,紧跟着自动执行第二个方法报timeout image 5、再往后就全部提示这个错误了 image

结论:应该是我的异步调用没有生效导致的,变成了同步调用

dislazy commented 1 year ago

再补充一点,我的任务是参照:

@app.register(name="demoJobHandler")
async def test_task():
    # you can get task params with "g"
    g.logger.info("get executor params: %s" % g.xxl_run_data.executorParams)
    for i in range(10):
        g.logger.warning("test logger %s" % i)
    await asyncio.sleep(5)
    return "成功..."

对应文档链接:https://fcfangcc.github.io/pyxxl/example/

fcfangcc commented 1 year ago

有没有其他更多日志,看不出问题。代码能不能给个完整的,我长期在用的,没发现过你说的问题。我感觉你上面发的代码有点不对

dislazy commented 1 year ago
# main
import asyncio

from bill_utils import process_bill_data
from pyxxl import ExecutorConfig, PyxxlRunner
from config import config

xxl_config = ExecutorConfig(
    xxl_admin_baseurl=config.XXL_JOB_ADDRESS,
    executor_app_name=config.XXL_JOB_APP_NAME,
)

app = PyxxlRunner(xxl_config)

def process():
      return "data"

@app.register(name="processData")
async def test_task():
    process()
    # 模拟异步操作
    await asyncio.sleep(1)
    return "成功..."

app.run_executor()

def main():
    # 进入无限循环,保持持续运行的状态
    while True:
        pass

if __name__ == '__main__':
    main()

这就是main的完整代码,应该是异步方法不对 processData 这个bean

fcfangcc commented 1 year ago

你 process()里面的逻辑是不是没用异步写?里面是同步的?是的话就是这里把整个进程卡住了。你把方法改成

@app.register(name="processData")
def test_task():
    process()
    return "成功..."
dislazy commented 1 year ago

是的,process这个里面的方法没有用异步写,实际上我的目的也是这样,正常调度,让process 这个方法异步执行即可,这个如何修改呢

fcfangcc commented 1 year ago

你 process()里面的逻辑是不是没用异步写?里面是同步的?是的话就是这里把整个进程卡住了。你把方法改成

@app.register(name="processData")
def test_task():
    process()
    return "成功..."

@dislazy 按这里说的,任务用同步方法。检测到同步函数会自动放入线程池中运行,不会阻塞调度器

dislazy commented 1 year ago

好的好的 我试试 谢啦