Yelp / pyleus

Pyleus is a Python framework for developing and launching Storm topologies.
Apache License 2.0
403 stars 107 forks source link

Workers connections / b.s.util [ERROR] Async loop died! #167

Open Ulitochka opened 8 years ago

Ulitochka commented 8 years ago

Hello. Thanks for you work. But I have few nuances:

Building topology: New python executable in /tmp/tmpznmQCP/resources/pyleus_venv/bin/python Installing setuptools, pip, wheel...done. Collecting pyleus==0.3.0 Collecting msgpack-python (from pyleus==0.3.0) Collecting six (from pyleus==0.3.0) Using cached six-1.10.0-py2.py3-none-any.whl Collecting virtualenv (from pyleus==0.3.0) Using cached virtualenv-13.1.2-py2.py3-none-any.whl Collecting PyYAML (from pyleus==0.3.0) Installing collected packages: msgpack-python, six, virtualenv, PyYAML, pyleus Successfully installed PyYAML-3.11 msgpack-python-0.4.6 pyleus-0.3.0 six-1.10.0 virtualenv-13.1.2 Collecting psycopg2 (from -r /home/nutch/topologies/test_db/requirements.txt (line 1)) Installing collected packages: psycopg2 Successfully installed psycopg2-2.6.1 DEBUG:pyleus.cli.build:Assemble component module: test_db.test_spout DEBUG:pyleus.cli.build:Assemble component module: test_db.test_bolt

Firstly, I have the problem like this: http://stackoverflow.com/questions/30732178/storm-error-async-loop-died Which is fixed in Storm 0.9.5. But: "you will need to download and extract Storm 0.9.4 from (https://yelp.github.io/pyleus/install.html)"

Secondly, I have problem like this: https://mail-archives.apache.org/mod_mbox/storm-user/201507.mbox/%3CCAF5108hAeJuCe5s7JmvQ-KbHz+Fw2FtnA7nNq4kdXfjwrZSFxQ@mail.gmail.com%3E The question is: how I can create connection between workers without SSH. Because in tutorial https://yelp.github.io/pyleus I don't see this topic.

poros commented 8 years ago

For the first issue, I believe that we could upgrade the Storm version in Pyleus, it should be kinda safe. Pinging @ecanzonieri In the meanwhile, I guess you need to patch pyleus to use it Storm 0.9.5 yourself, if you want a quick fix.

Regarding the second issue, it seems to me more a Storm problem than a Pyleus one... Is there any option that could help you in Storm and we are missing in Pyleus?

Ulitochka commented 8 years ago

Thanks for your answer. I have another question related to the integration python and java. How python code executed on separate machines? The python interpreter runs on each bolt and spout on each machine?

poros commented 8 years ago

Yes, pretty much. You can see how the interpreter is called at https://github.com/Yelp/pyleus/blob/develop/topology_builder/src/main/java/com/yelp/pyleus/PythonComponentsFactory.java

The build of the virtualenv is done here https://github.com/Yelp/pyleus/blob/develop/pyleus/cli/build.py instead.

Ulitochka commented 8 years ago

Thanks for links. I think I found my problem (import external python lybrary). Can you tell me please, this is correct spout code?

from __future__ import absolute_import
import logging
from collections import namedtuple
from random import choice
import time
from pyleus.storm import Spout

import psycopg2
from psycopg2 import connect
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

log = logging.getLogger('test-spout')
Request = namedtuple("new_token", "id word")

class TestSpout(Spout):
     OPTIONS = ["host", "port", "password", "user", "dbname"]
     OUTPUT_FIELDS = Request

     def initialize(self):
         self.id_number = list()
         self.word = list()

         conn = connect(user=self.options["user"], host=self.options["host"],
         password=self.options["password"], port=self.options["port"])
         dbname = self.options["dbname"]
         cur = conn.cursor()

         cur.execute("SELECT id, word FROM public.test_table")
         massive = cur.fetchall()

         conn.commit()
         conn.close()

         self.id_number.append(massive[0][0])
         self.word.append(massive[0][1])

     def next_tuple(self):
         time.sleep(0.001)
         request = Request(choise(self.id_number), choise(self.word))
         log.debug(request)
         self.emit(request)

if __name__ == '__main__':
   logging.basicConfig(level=logging.DEBUG, filename='/tmp/test_spout.log', filemode='w',)
   TestSpout().run()
poros commented 8 years ago

I can't guarantee that the code or the logic is correct (e.g. I see I couple of typos), but I can't see any major problem from a quick look...

Ulitochka commented 8 years ago

May be the problem is in integration pyleus and psycopg2. This is a library for connection with postgresql db. Anybody test this library with pyleus?