scrapinghub / frontera

A scalable frontier for web crawlers
BSD 3-Clause "New" or "Revised" License
1.29k stars 216 forks source link

cluster kafka db worker doesnt recognize partitions #360

Closed danmsf closed 5 years ago

danmsf commented 5 years ago

Hi, Im trying to use cluster configuration. I've created topics in kafka and have it up and running. Im running into trouble starting the database worker. Tried: python -m frontera.worker.db --config config.dbw --no-incoming --partitions 0,1 got an error 0,1 not recognized, tried: python -m frontera.worker.db --config config.dbw --no-incoming --partitions 0 I was getting the same issue as in #359, but somehow that stopped happening.

Now I'm getting: that kafka partitions are not recognized or iterrable, see error. Im using python 3.6 and the frontera from the repo (FYI qzm and cachetools still needed to be installed manually). Any ideas?

File "/usr/lib64/python3.6/runpy.py", line 193, in _run_module_as_main "main", mod_spec) File "/usr/lib64/python3.6/runpy.py", line 85, in _run_code exec(code, run_globals) File "/usr/lib/python3.6/dist-packages/frontera/worker/db.py", line 246, in args.no_scoring, partitions=args.partitions) File "/usr/lib/python3.6/dist-packages/frontera/worker/stats.py", line 22, in init super(StatsExportMixin, self).init(settings, *args, kwargs) File "/usr/lib/python3.6/dist-packages/frontera/worker/db.py", line 115, in init self.slot = Slot(self, settings, slot_kwargs) File "/usr/lib/python3.6/dist-packages/frontera/worker/db.py", line 46, in init self.components = self._load_components(worker, settings, kwargs) File "/usr/lib/python3.6/dist-packages/frontera/worker/db.py", line 55, in _load_components component = cls(worker, settings, stop_event=self.stop_event, kwargs) File "/usr/lib/python3.6/dist-packages/frontera/worker/components/scoring_consumer.py", line 24, in init self.scoring_log_consumer = scoring_log.consumer() File "/usr/lib/python3.6/dist-packages/frontera/contrib/messagebus/kafkabus.py", line 219, in consumer return Consumer(self._location, self._enable_ssl, self._cert_path, self._topic, self._group, partition_id=None) File "/usr/lib/python3.6/dist-packages/frontera/contrib/messagebus/kafkabus.py", line 60, in init self._partitions = [TopicPartition(self._topic, pid) for pid in self._consumer.partitions_for_topic(self._topic)]

danmsf commented 5 years ago

I commented out get_stats() just to see if it would work. I can get data base workers to work; but then I get a different error starting starategy workers. Im closing until #359 is resolved - mabye its connected

sibiryakov commented 5 years ago

Hey @danmsf you've put the stack trace without exception preceding it, so I can't identify what is the issue. The partitions are meant to be space separated, not comma. What is that different error from strategy workers?

danmsf commented 5 years ago

I was getting an Error that said:

INFO:manager:-------------------------------------------------------------------------------- INFO:manager:Starting Frontier Manager... INFO:hbase.backend:Connecting to localhost:9090 thrift server. Traceback (most recent call last): File "/usr/lib64/python3.6/runpy.py", line 193, in _run_module_as_main "main", mod_spec) File "/usr/lib64/python3.6/runpy.py", line 85, in _run_code exec(code, run_globals) File "/usr/lib/python3.6/dist-packages/frontera/worker/strategy.py", line 388, in worker = StrategyWorker(settings, is_add_seeds_mode) File "/usr/lib/python3.6/dist-packages/frontera/worker/stats.py", line 22, in init super(StatsExportMixin, self).init(settings, *args, kwargs) File "/usr/lib/python3.6/dist-packages/frontera/worker/strategy.py", line 166, in init manager = WorkerFrontierManager.from_settings(settings, strategy_worker=True, scoring_stream=self.update_score) File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 691, in from_settings return WorkerFrontierManager(kwargs) File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 657, in init db_worker=db_worker, strategy_worker=strategy_worker) File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 157, in init super(StrategyComponentsPipelineMixin, self).init(backend, **kwargs) File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 87, in init BackendMixin.init(self, backend, db_worker, strategy_worker) File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 21, in init self._backend.frontier_start() File "/usr/lib/python3.6/dist-packages/frontera/contrib/backends/hbase/init.py", line 564, in frontier_start if component: TypeError: 'NoneType' object is not callable INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.

I changed /usr/lib/python3.6/dist-packages/frontera/contrib/backends/hbase/init.py to: "if component is None: " and that allowed me to start the strategy worker, but now Im running into an issue feeding the seeds, telling me:

INFO:manager:Frontier Manager Started! INFO:manager:-------------------------------------------------------------------------------- Traceback (most recent call last): File "/usr/lib64/python3.6/runpy.py", line 193, in _run_module_as_main "main", mod_spec) File "/usr/lib64/python3.6/runpy.py", line 85, in _run_code exec(code, run_globals) File "/usr/lib/python3.6/dist-packages/frontera/utils/add_seeds.py", line 45, in run_add_seeds(settings, args.seeds_file) File "/usr/lib/python3.6/dist-packages/frontera/utils/add_seeds.py", line 20, in run_add_seeds manager.add_seeds(fh) File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 495, in add_seeds self.strategy.read_seeds(seeds_file) File "/usr/lib/python3.6/dist-packages/frontera/strategy/basic.py", line 10, in read_seeds self.schedule(r) File "/usr/lib/python3.6/dist-packages/frontera/strategy/init.py", line 122, in schedule self._scheduled_stream.send(request, score, dont_queue) File "/usr/lib/python3.6/dist-packages/frontera/core/manager.py", line 798, in send self._queue.schedule([(request.meta[b'fingerprint'], score, request, not dont_queue)]) AttributeError: 'NoneType' object has no attribute 'schedule'

so that probably was not a good fix, and mabye my settings are off? any ideas?

danmsf commented 5 years ago

Also i would update the docs to say: 'python -m frontera.worker.db --config [db worker config module] --no-incoming --partitions 0 1'

and in the common module script: SCORING_TOPIC = 'frontier-score' instead of SCORING_TOPIC = 'frontier-scoring'

(I made a 'frontier-scoring' topic and lost some time figuring that out... its mentioned in the kafka topic creation correctly)

sibiryakov commented 5 years ago

hi @danmsf there is probably an issue with configuration. Have you specified BACKEND properly?

sibiryakov commented 5 years ago

post your configs here, pls

danmsf commented 5 years ago

I copy pasted from the docs - so everything is the same as here: https://frontera.readthedocs.io/en/latest/topics/cluster-setup.html .

in spider module: BACKEND = 'frontera.contrib.backends.remote.messagebus.MessageBusBackend' in worker module: BACKEND = 'frontera.contrib.backends.hbase.HBaseBackend' just for good measure added: HBASE_DROP_ALL_TABLES = True HBASE_THRIFT_PORT = 9090 HBASE_THRIFT_HOST = 'localhost' HBASE_METADATA_TABLE = 'metadata' HBASE_QUEUE_TABLE = 'queue' MESSAGE_BUS = 'frontera.contrib.messagebus.kafkabus.MessageBus'

in common module: changed topic to 'frontier-score' in strategy worker module chose strategy: 'frontera.strategy.basic.BasicCrawlingStrategy'

running on: linux (aws emr) Python 3.6 kafka 2.1.0 hbase: 1.4.8 zookeeper: 3.4.13

And i get the NoneType not callable (first error) error above when starting the strategy worker

danmsf commented 5 years ago

If it helps to point me in the right direction, I put a print() in the init script: ''' def frontier_start(self): for component in [self.metadata, self.queue, self.states, self.domain_metadata]: print(component) if component: component.frontier_start() '''

and the output is:

INFO:manager:-------------------------------------------------------------------------------- INFO:manager:Starting Frontier Manager... INFO:hbase.backend:Connecting to localhost:9090 thrift server. None None <frontera.contrib.backends.hbase.HBaseState object at 0x7f46801f6550> DomainCache([], maxsize=1000, currsize=0) Traceback (most recent call last): .... error.....

sibiryakov commented 5 years ago

No idea what's happening. This code shouldn't execute any calls to component if statement if component is resolved to False. There is a little chance that interpreter is somehow modified in emr. Could you try in the Python interactive mode this:

x = [0, 1, 2, None]
for c in x:
    if c:
        print(c, type(c))

if None is handled correctly, then please specify your shell command.

danmsf commented 5 years ago

That works fine in python. My shell commands are: Starting the DB worker (this works): python -m frontera.worker.db --config multicluster.config.dbw --no-incoming --partitions 0 1

This to start the Strategy worker (this is where it gets stuck): python -m frontera.worker.strategy --config multicluster.config.sw --partition-id 0

I think its getting stuck on the 'DomainCache([], maxsize=1000, currsize=0)' object trying to start it. I am using cachetools version 2.1.0 Thanks for the help

danmsf commented 5 years ago

I also tried instantiating the DomainCache object in shell: d = DomainCache(connection = con, table_name = 'crawler:domain_metadata') if d: print(d) and i get a NoneType error even though if i do: type(d) I get a class of DomainCache... I also made sure the connection to Hbase is working in the code (was able to print the list of tables...) What is the first attribute of DomainCache([],..) supposed to look like?

sibiryakov commented 5 years ago

https://github.com/scrapinghub/frontera/blob/master/frontera/contrib/backends/hbase/domaincache.py#L88

sibiryakov commented 5 years ago

The problem is probably connected with how Python evaluates object in if statement https://docs.python.org/3.6/reference/datamodel.html#object.__bool__

danmsf commented 5 years ago

I found this helpful: https://stackoverflow.com/q/20364414 I didnt have a chance to check it yet but the gist of it is that the class.__eq__ method that DomainCache inherited from Cache (inherited from collections.MutableMapping .... ) implemented an equality based on a method of the class (I dont know which) without first checking if it has that method... so whats going on is python is trying to checking something like: DomainCache.methodA==None.methodA and thats why its returning NoneType not callable (None doesnt have methodA).

If this is the case a local fix could be overloading the DomainCache.__eq__ method to include a hasattr() assertion... But if Im the only one getting this for some reason maybe Im the exception :) Thanks

FrancoAlmonacid commented 5 years ago

Hello,

Did someone find a solution for this problem? I have the exact same error which I fixed changing the following line:

if component is not None:

I'm not sure yet what is causing it so any comment would help :)

ghost commented 5 years ago

@danmsf Hi, I have face a similar kafka problem when start db worker. However, i didnt have any issue about the get_stat() that you have mentioned at #359 .

59094202-3caafc00-8948-11e9-9e30-cb47d0206c51

Can you share with me, how did you solved this problem? I have opened an issue at #371 .

Thanks in advance!