ydf0509 / funboost

pip install funboost,python全功能分布式函数调度框架,funboost的功能是全面性重量级,用户能想得到的功能99%全都有;funboost的使用方式是轻量级,只有@boost一行代码需要写。支持python所有类型的并发模式和一切知名消息队列中间件,支持如 celery dramatiq等框架整体作为funboost中间件,python函数加速器,框架包罗万象,用户能想到的控制功能全都有。一统编程思维,兼容50% python业务场景,适用范围广。只需要一行代码即可分布式执行python一切函数,99%用过funboost的pythoner 感受是 简易 方便 强劲 强大,相见恨晚 。
Apache License 2.0
682 stars 135 forks source link

远程的问题 #92

Closed LeeC20 closed 10 months ago

LeeC20 commented 10 months ago

我远程好像跑不通了。 `

  import json
  import time

  from funboost import boost, BrokerEnum, PriorityConsumingControlConfig

  @boost('queue1', broker_kind=BrokerEnum.SASlPlAIN_KAFKA, qps=10)
  def f(x, y):
      return x + y

  @boost('queue2', broker_kind=BrokerEnum.SASlPlAIN_KAFKA, qps=7)
  def f2(a, b):
      return a - b

  if __name__ == '__main__':
      f.clear()  # 清空f函数对应的queue1所有消息
      for i in range(100):
          f.push(i, i * 2)  # 使用push发布消息到queue1,push的入参和正常调用函数一样
          f2.publish({'a': i, 'b': i * 2}, priority_control_config=PriorityConsumingControlConfig(
              msg_expire_senconds=30))  # # 使用publish发布消息到queue2,publish的入参第一个参数是一个字典,把所有参数组成一个字典,还可以传入其他参数。publish更强大。

      # print(f.get_message_count())  # 获取消息队列中的消息数量
      # f.consume()  # 在当前进程启动多线程/协程消费
      # f2.multi_process_consume(5)  # 启动3个进程,每个进程内部都启动多线程/协程消费,性能炸裂。

      f2.fabric_deploy(host="192.168.87.129", port=22, user="lee", password="123",
                      python_interpreter="/home/lee/miniconda3/envs/test/bin/python3.8", process_num=3,
                      file_volume_limit=100*1000, only_upload_within_the_last_modify_time=10 * 24 * 3600)
`

`

  Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>
  BrokenPipeError: [Errno 32] Broken pipe
  Exception in thread Thread-1 (_inner):
  Traceback (most recent call last):
    File "/home/lee/miniconda3/envs/celery/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
      self.run()
    File "/home/lee/miniconda3/envs/celery/lib/python3.10/threading.py", line 953, in run
      self._target(*self._args, **self._kwargs)
    File "/home/lee/miniconda3/envs/celery/lib/python3.10/site-packages/funboost-30.4-py3.10.egg/funboost/core/fabric_deploy_helper.py", line 108, in _inner
      conn.run(shell_str)
    File "<decorator-gen-3>", line 2, in run
    File "/home/lee/miniconda3/envs/celery/lib/python3.10/site-packages/fabric2/connection.py", line 30, in opens
      return method(self, *args, **kwargs)
    File "/home/lee/miniconda3/envs/celery/lib/python3.10/site-packages/fabric2/connection.py", line 723, in run
      return self._run(self._remote_runner(), command, **kwargs)
    File "/home/lee/miniconda3/envs/celery/lib/python3.10/site-packages/invoke/context.py", line 102, in _run
      return runner.run(command, **kwargs)
    File "/home/lee/miniconda3/envs/celery/lib/python3.10/site-packages/invoke/runners.py", line 380, in run
      return self._run_body(command, **kwargs)
    File "/home/lee/miniconda3/envs/celery/lib/python3.10/site-packages/invoke/runners.py", line 442, in _run_body
      return self.make_promise() if self._asynchronous else self._finish()
    File "/home/lee/miniconda3/envs/celery/lib/python3.10/site-packages/invoke/runners.py", line 509, in _finish
      raise UnexpectedExit(result)
  invoke.exceptions.UnexpectedExit: Encountered a bad command exit code!

  Command: 'cd /home/lee/codes/kafka_test; export is_funboost_remote_run=1;export PYTHONPATH=/home/lee/codes/kafka_test:$PYTHONPATH ;/home/lee/miniconda3/envs/test/bin/python3.8 -c "from main import f2;f2.multi_process_consume(3)"  -funboostmark funboost_fabric_mark__queue2__f2 '

  Exit code: -1

  Stdout: already printed

  Stderr: already printed

`

然后我试了

  from fabric2 import Connection

  cmd = 'cd /home/lee/codes/kafka_test; export is_funboost_remote_run=1;export PYTHONPATH=/home/lee/codes/kafka_test:$PYTHONPATH ;/home/lee/miniconda3/envs/test/bin/python3.8 -c "from main import f2;f2.multi_process_consume(3)"  -funboostmark funboost_fabric_mark__queue2__f2 '
  con = Connection("192.168.87.129", "lee", 22, connect_kwargs={"password": "123"})

  con.run(cmd)

’ 却可以运行。这是为什么

ydf0509 commented 10 months ago

远程也要升级到30.6版本吧

LeeC20 commented 10 months ago

我之前用的30.4, 刚刚从这里下载了,好像还是30.5,不是30.6.... , 远程依然不行 因为我要改文件,所以只能手动安装。 这个repo什么时候更新到30.6

LeeC20 commented 10 months ago

远程也要升级到30.6版本吧

我找到问题了,

当我注释掉 fabric_deploy_helper.pyline 95 -97后, fabric_deploy能够正常使用了。 估计是这个kill进程有问题。

`

    # kill_shell = f'''ps -aux|grep {process_mark}|grep -v grep|awk '{{print $2}}' |xargs kill -9'''
    # logger.warning(f'{kill_shell} 命令杀死 {process_mark} 标识的进程')
    # uploader.ssh.exec_command(kill_shell)

`

LeeC20 commented 10 months ago

当我改成这样, 也可以运行。 不知道什么问题,单独运行paramiko,也是可以。 合起来就报错

`

    # uploader.ssh.exec_command(kill_shell)
    conn.run(kill_shell, encoding='utf-8',warn=True)  # 不想提示,免得烦扰用户以为有什么异常了。所以用上面的paramiko包的ssh.exec_command

`

ydf0509 commented 10 months ago

pip 升级30.6是要使用官方源的,国内源是异步pypi官网的,又不是立即同步,是定时同步,你想第一时间安装最新的python三方包版本,就要 pip install funboost --upgrade -i https://pypi.org/simple ,

估计是你为了下载python包速度快,替换成了国内源

LeeC20 commented 10 months ago

我也试了一下30.6, 还是不能远程。 我自己改了源码就通了。。 就很奇怪

fabric_deploy_helper.py

` line97 - line 98

# uploader.ssh.exec_command(kill_shell)
 conn.run(kill_shell, encoding='utf-8',warn=True)  # 不想提示,免得烦扰用户以为有什么异常了。所以用上面的paramiko包的ssh.exec_command

`

ydf0509 commented 10 months ago

我是可以的,kill命令就是正常的linux命令,你自己可以打印出kill的linux命令

ydf0509 commented 10 months ago

估计你系统不一样,那就用conn操作呗

ydf0509 commented 10 months ago

image image

下面这个可能适应性强一些吧,我不知道你系统设置了什么类型的鉴权