gocolly / colly

Elegant Scraper and Crawler Framework for Golang
https://go-colly.org/
Apache License 2.0
23.37k stars 1.77k forks source link

Add rabbitmq storage as available storage option #281

Open jredl-va opened 5 years ago

jredl-va commented 5 years ago

@asciimoo for your consideration. I've been somewhat frustrated with leveraging redis as a queue. Even though it has queue like capabilities I've found it lacking. Here is a rabbitmq based storage I would like to propose as another storage option.

On the rabbitmq front, have you given any consideration at all about adding an additional hook into the queue or the colly callbacks to be able to "ack" or "nack" a message envelop( task or pubsub for example)?

I've been considering a pull request that opens up the ability to unmarshall a request within a queues GetRequest method: https://github.com/gocolly/colly/blob/master/queue/queue.go#L142

This would allow me to place a msg envelope (for example a rabbitmq tasks) onto a request that in turn within the colly callbacks I could ack on success or nack in the event of an error. Currently I'm "auto acking" tasks but this can cause tasks to leak in the event of a non graceful shutdown of a collector.

package rabbitmq

import (
    "fmt"

    "github.com/streadway/amqp"
)

// Storage implements the rabbitmq storage backend for Colly
type Storage struct {
    // Address is the rabbitmq server address
    Address string
    // Username is the username for the rabbitmq server
    Username string
    // Password is the password for the rabbitmq server
    Password string
    // QueueName is the queue name to register with rabbitmq
    QueueName string
    // Connection is the connection to the rabbitmq server
    Connection *amqp.Connection
    // Channel is the channel to be used
    Channel *amqp.Channel
    // Queue is the queue to be used
    Queue amqp.Queue
}

// Init initializes the rabbitmq storage
func (s *Storage) Init() error {
    if s.Connection == nil {
        connection, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s/", s.Username, s.Password, s.Address))
        if err != nil {
            return fmt.Errorf("rabbitmq connection error: %s", err.Error())
        }
        s.Connection = connection
    }

    //Create a channel
    ch, err := s.Connection.Channel()
    if err != nil {
        return fmt.Errorf("rabbitmq channel creation error: %s", err.Error())
    }
    s.Channel = ch

    q, err := ch.QueueDeclare(
        s.QueueName, // name of the queue
        true,        // should the message be persistent? also queue will survive if the cluster gets reset
        false,       // autodelete if there's no consumers (like queues that have anonymous names, often used with fanout exchange)
        false,       // exclusive means I should get an error if any other consumer subscribes to this queue
        false,       // no-wait means I don't want RabbitMQ to wait if there's a queue successfully setup
        nil,         // arguments for more advanced configuration
    )
    if err != nil {
        return fmt.Errorf("rabbitmq queue declaration error: %s", err.Error())
    }
    s.Queue = q
    return nil
}

// AddRequest implements queue.Storage.AddRequest() function
func (s *Storage) AddRequest(r []byte) error {
    return s.Channel.Publish("", s.QueueName, false, false, amqp.Publishing{
        ContentType: "text/plain",
        Body:        r,
    })
}

// GetRequest implements queue.Storage.GetRequest() function
func (s *Storage) GetRequest() ([]byte, error) {
    msg, ok, err := s.Channel.Get(s.QueueName, true)
    if err != nil {
        return nil, fmt.Errorf("rabbitmq get request error: %s", err.Error())
    }
    if !ok {
        return nil, fmt.Errorf("unable to get request from rabbitmq")
    }
    return msg.Body, nil
}

// QueueSize implements queue.Storage.QueueSize() function
func (s *Storage) QueueSize() (int, error) {
    q, err := s.Channel.QueueInspect(s.QueueName)
    if err != nil {
        return 0, fmt.Errorf("rabbitmq queue inspect error: %s", err.Error())
    }
    return q.Messages, nil
}
asciimoo commented 5 years ago

@jredl-va this is a great idea. Ack/Nack members can be added to your RabbitMQ Storage class. What do you think?

jredl-va commented 5 years ago

@asciimoo my apologies for the delayed response, I missed your comment somewhere along the way.

That feels like something that would work, and would allow configurability against different storage implementations.

betazk commented 5 years ago

@jredl-va Cool, I also want to implement the colly.queue with rabbitmq,and the Ack/NAck can not be done when the OnHtml finished.Do u have any good idea ?

LamCiuLoeng commented 4 years ago

I suggest to add a Ack method to the storage interface. Both rabbitmq and redis as storage backend need to commit the job is done successfully ,otherwise the issued URL need to re-push into the queue and try again.

asciimoo commented 4 years ago

I suggest to add a Ack method to the storage interface. Both rabbitmq and redis as storage backend need to commit the job is done successfully ,otherwise the issued URL need to re-push into the queue and try again.

I'd keep these functions outside of the interface, because not all the backends needs them.