istresearch / scrapy-cluster

This Scrapy project uses Redis and Kafka to create a distributed on demand scraping cluster.
http://scrapy-cluster.readthedocs.io/
MIT License
1.18k stars 324 forks source link

Migrate to using kafka-python futures callback #93

Closed madisonb closed 7 years ago

madisonb commented 7 years ago

Our current setup of sending data to kafka does not allow us to determine whether the data was successfully sent or failed to send. This is an oversight when we upgraded kafka-python, as the new .send() method doesn't return (or fail) if the record was unable to be sent (docs). Instead, it uses a future object that you can poll or add a callback to in order to understand if the data was sent successfully or not.

We should use the callback functionality provided and ensure we properly log success or failures. This impacts the Redis Monitor, Rest, and Item Pipeline specifically (anything that sends data to kafka).

sample code:

    def _gen_kafka_success(self):
        def kafka_success(item, response):
            self.logger.info("Kafka Success")
        return kafka_success

    def _gen_kafka_failure(self):
        def kafka_success(item, response):
            self.logger.info("Kafka Failure")
        return kafka_failure

    # anywhere we send to kafka
    future = self.kafka_conn.send(self.kafka_topic, enriched_data)
    future.add_callback(self._gen_kafka_success, item)
    future.add_errback(self._gen_kafka_failure)

In fact, this may eliminate one of the items in the pipeline entirely.

madisonb commented 7 years ago

Latest commit fixes this. Closing.