scrapinghub / frontera

A scalable frontier for web crawlers
BSD 3-Clause "New" or "Revised" License
1.29k stars 216 forks source link

Gracefully restarting frontera with SQL backend #269

Open ShT3ch opened 7 years ago

ShT3ch commented 7 years ago

I use SQLAlchemy backend with db.worker in distributed spiders setting.

BACKEND = 'frontera.contrib.backends.sqlalchemy.BFS'
SQLALCHEMYBACKEND_ENGINE = 'sqlite:///../state/bfs.db'

I expected that if I stop worker gracefully I would be able to start it again with actual query as "seed". But I wasn't. Actually after stopping db.query just becomes erased. I checked it by connecting with sqlite3 utility. I failed to investigate when and why it happens.

Please, point me to the code, where it happens. Or explain, why.

ShT3ch commented 7 years ago

Some results of my investigation.

I found, that I missed 2 parameters:

SQLALCHEMYBACKEND_DROP_ALL_TABLES = False
SQLALCHEMYBACKEND_CLEAR_CONTENT = False

Now I see strange behavior: if I start worker.db it will extract(with remove) batches from queue(MAX_NEXT_REQUESTS by time) every 5 seconds. Now I am trying to understand, why it is doing it in cycle. But it works well when crawler-consumer is up.

ShT3ch commented 7 years ago

Now I see.

When MessageBus starts it does self.spider_feed_partitions = [i for i in range(settings.get('SPIDER_FEED_PARTITIONS'))]

Then in SpiderFeedStream

self.partitions = messagebus.spider_feed_partitions
self.ready_partitions = set(self.partitions)

So, worker at start believes that there is all workers are available then worker produces batches.

More! There is a code in DBWorker.consume_incoming() in if type == 'offset' section which can resolve this situation.

producer_offset = self.spider_feed_producer.get_offset(partition_id)
if producer_offset is None:
    continue

But default implementation of self.spider_feed_producer.get_offset(Producer.get_offset) is

def get_offset(self, partition_id):
        return self.counters[partition_id]

Which implies an KeyError at start.(is it a bug?) I fixed this by replacing it with self.counters.get(partition).

So, I have a question, @sibiryakov, can we fix all the things by changing SpiderFeedStream default state(all workers are availlable)?

sibiryakov commented 7 years ago

Hi @ShT3ch

But default implementation of self.spider_feed_producer.get_offset(Producer.get_offset) is There is no default message bus, I assume you've got this from ZeroMQ message bus.

It's not clear from the issue what problem do you want to solve, but I'll assume you want to solve KeyError exceptions.

It throws these exceptions because there are no offsets available, meaning there is no information if this partition is available or not. Well, nothing bad happens, it's just a confusing message, as I see it. We could make some more clear warning message, explaining that and recommending what to do (start spiders, check connectivity, etc.). PRs are welcome, as usual ;)

ShT3ch commented 7 years ago

No, the problem is to understand: what should I do to be sure if my crawling state is saved between runs.

So, at first I found two parameters which prevents removing state on application start. Then I ran into some strange behavior with starting db.worker without any spider. Then I found a bug: sometimes db.worker gives more then one batch and extract all of queue to MQ.

Solution which works for me you can see there(masters diff). But it is too dirty for me to create a PR. Maybe we should discuss the main idea of my solution and improve it to make it more "PRfull". The main idea is to not rely on proper start order and network and to control state more precise.