ukwa / ukwa-manage

Shepherding our web archives from crawl to access.
Apache License 2.0
10 stars 5 forks source link

Close KafkaProducer explicitly to ensure errors are caught. #63

Open anjackson opened 4 years ago

anjackson commented 4 years ago

After noticing a small crawl gap:

2019-12-05 10:00:12,826 INFO: Worker Worker(salt=769972995, workers=1, host=ingest, username=root, pid=28327) was stopped. Shutting down Keep-Alive thread
2019-12-05 10:00:12,835 INFO:
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 w3act.CrawlFeed(frequency=all, feed=npld, date=2019-12-05T10)
* 1 ran successfully:
    - 1 crawl.LaunchCrawls(frequency=all, date=2019-12-05T10, kafka_server=crawler05.n45.bl.uk:9094, queue=fc.tocrawl.npld)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

2019-12-05 10:00:12,966 INFO: Closing the Kafka producer with 0 secs timeout.
2019-12-05 10:00:12,966 INFO: Proceeding to force close the producer since pending requests could not be completed within timeout 0.
2019-12-05 10:00:12,968 WARNING: Produced messages to topic-partition TopicPartition(topic='fc.tocrawl.npld', partition=9) with base offset None and error IllegalStateError: Producer is closed forcefully..
2019-12-05 10:00:12,969 WARNING: Produced messages to topic-partition TopicPartition(topic='fc.tocrawl.npld', partition=13) with base offset None and error IllegalStateError: Producer is closed forcefully..
2019-12-05 10:00:12,969 WARNING: Produced messages to topic-partition TopicPartition(topic='fc.tocrawl.npld', partition=1) with base offset None and error IllegalStateError: Producer is closed forcefully..
2019-12-05 10:00:12,970 WARNING: Produced messages to topic-partition TopicPartition(topic='fc.tocrawl.npld', partition=10) with base offset None and error IllegalStateError: Producer is closed forcefully..
2019-12-05 10:00:12,970 WARNING: Produced messages to topic-partition TopicPartition(topic='fc.tocrawl.npld', partition=5) with base offset None and error IllegalStateError: Producer is closed forcefully..
2019-12-05 10:00:12,972 WARNING: Produced messages to topic-partition TopicPartition(topic='fc.tocrawl.npld', partition=8) with base offset None and error IllegalStateError: Producer is closed forcefully..
2019-12-05 10:00:12,973 WARNING: Produced messages to topic-partition TopicPartition(topic='fc.tocrawl.npld', partition=2) with base offset None and error IllegalStateError: Producer is closed forcefully..
2019-12-05 10:00:12,974 WARNING: Produced messages to topic-partition TopicPartition(topic='fc.tocrawl.npld', partition=14) with base offset None and error IllegalStateError: Producer is closed forcefully..
2019-12-05 10:00:12,974 WARNING: Produced messages to topic-partition TopicPartition(topic='fc.tocrawl.npld', partition=9) with base offset None and error IllegalStateError: Producer is closed forcefully..
2019-12-05 10:00:12,977 WARNING: Produced messages to topic-partition TopicPartition(topic='fc.tocrawl.npld', partition=10) with base offset None and error IllegalStateError: Producer is closed forcefully..
....
2019-12-05 10:00:13,009 INFO: <BrokerConnection node_id=1 host=192.168.45.15:9094 <connected> [IPv4 ('192.168.45.15', 9094)]>: Closing connection.
2019-12-05 10:00:13,010 WARNING: Produced messages to topic-partition TopicPartition(topic='fc.tocrawl.npld', partition=8) with base offset -1 and error Cancelled: <BrokerConnection node_id=1 host=192.168.45.15:9094 <connected> [IPv4 ('192.168.45.15', 9094)]>.
2019-12-05 10:00:13,010 WARNING: Batch is already closed -- ignoring batch.done()
2019-12-05 10:00:13,010 ERROR: Error processing errback
Traceback (most recent call last):
  File "/root/github/ukwa-manage-p3/venv/lib/python3.6/site-packages/kafka/future.py", line 79, in _call_backs
    f(value)
  File "/root/github/ukwa-manage-p3/venv/lib/python3.6/site-packages/kafka/producer/sender.py", line 186, in _failed_produce
    self._complete_batch(batch, error, -1, None)
  File "/root/github/ukwa-manage-p3/venv/lib/python3.6/site-packages/kafka/producer/sender.py", line 244, in _complete_batch
    self._accumulator.deallocate(batch)
  File "/root/github/ukwa-manage-p3/venv/lib/python3.6/site-packages/kafka/producer/record_accumulator.py", line 506, in deallocate
    self._incomplete.remove(batch)
  File "/root/github/ukwa-manage-p3/venv/lib/python3.6/site-packages/kafka/producer/record_accumulator.py", line 586, in remove
    return self._incomplete.remove(batch)
KeyError: <kafka.producer.record_accumulator.ProducerBatch object at 0x7f116f7a67f0>
2019-12-05 10:00:13,016 WARNING: Produced messages to topic-partition TopicPartition(topic='fc.tocrawl.npld', partition=14) with base offset -1 and error Cancelled: <BrokerConnection node_id=1 host=192.168.45.15:9094 <connected> [IPv4 ('192.168.45.15', 9094)]>.
2019-12-05 10:00:13,016 WARNING: Batch is already closed -- ignoring batch.done()
....
2019-12-05 10:00:13,098 INFO: Kafka producer closed

i.e. the KafkaProducer is being closed after the task has completed, but has not flushed.

anjackson commented 4 years ago

Implemented a flush and added support for a close to crawl-streams. Will update and trial.

anjackson commented 4 years ago

Hmm, code for crawl-streams includes a refreshDepth in the launcher. Need to check if that's ready to go as far as Heritrix is concerned and then review the launcher/enqueue implementation.