gresrun / jesque

An implementation of Resque in Java.
http://gresrun.github.io/jesque
Apache License 2.0
630 stars 131 forks source link

Error queues per job type #76

Closed rblazquez closed 9 years ago

rblazquez commented 9 years ago

I'm using jesque and it's very good but i am not happy to have all failed jobs in the same queue. Would it be possible to configure a failed queue per job?

gresrun commented 9 years ago

The failed jobs mechanism behaves the way that all the other Resque-combatible libraries behave. I'm really hesitant to change this behavior because it is core to the Resque design and cross-language compatibility is an important facet of this project.

What issues are you encountering regarding having failed jobs registered in a single key?

gresrun commented 9 years ago

@rblazquez What issues are you encountering regarding having failed jobs registered in a single key?

gresrun commented 9 years ago

@rblazquez I'm going to close this issue if I don't hear back from you in 48 hours.

rblazquez commented 9 years ago

In a enviroment with high load of jobs and potential high load of failures related to tasks that does not have the same criticity it is very helpful to have different error queues so i can address the critical problems first. Having everything mixed causes time consuming task ot reviewing and decide.

In other solutions (like e.g. camel + JMS) it would be very easy to reroute the failures to dedicated queues. I understand this is a totally different system, but it would be very nice to have a way to do it.

argvk commented 9 years ago

Regarding failed jobs, can we allow setting an optional custom redis key at the WorkerImpl ? So the failed jobs for the worker would all go into the key specified and not the global failed queue.

/cc @rblazquez @gresrun

gresrun commented 9 years ago

If I understood @rblazquez correctly, his use-case was a per-job-type decision not a per-worker decision. I'm open to discussion about how to best proceed on this. I really don't want to break compatibility with other Rescue clients and workers.

rblazquez commented 9 years ago

Thanks guys,

In fact i want a "per queue" decision ... not sure what "per-job-type" means (is job type a jesque concept?)

It is usual to place in the same queue same kind of tasks.

/cc @argvk @gresrun

gresrun commented 9 years ago

@rblazquez @argvk It sounds like there could be a variety of ways to decide which failure queue to use. Perhaps a we can have an interface that decides the appropriate failure queue name (strategy pattern).

First pass:

public interface FailQueueStrategy {
    String getFailQueueKey(Throwable t, Job job, String curQueue);
}

public class DefaultFailQueueStrategy implements FailQueueStrategy {
    private final String namespace;

    public DefaultFailQueueStrategy(final String namespace) {
        this.namespace = namespace;
    }

    public String getFailQueueKey(final Throwable t, final Job job, final String curQueue) {
         return JesqueUtils.createKey(this.namespace, ResqueConstants.FAILED);
    }
}

public class WorkerImpl implements Worker, ... {
    // ...
    private FailQueueStrategy failQueueStrategy = new DefaultFailQueueStrategy(this.namespace);
    // ...
    public FailQueueStrategy getFailQueueStrategy() {
         return this.failQueueStrategy;
    }

    public void setFailQueueStrategy(final FailQueueStrategy failQueueStrategy) {
        this.failQueueStrategy = failQueueStrategy;
    }
    // ...
    protected void failure(final Throwable t, final Job job, final String curQueue) {
        // The job may have taken a long time; make an effort to ensure the connection is OK
        JedisUtils.ensureJedisConnection(this.jedis);
        try {
            this.jedis.incr(key(STAT, FAILED));
            this.jedis.incr(key(STAT, FAILED, this.name));
            this.jedis.rpush(this.failQueueStrategy.getFailQueueKey(t, job, curQueue), 
                    failMsg(t, curQueue, job));
        } catch (JedisException je) {
            LOG.warn("Error updating failure stats for throwable=" + t + " job=" + job, je);
        } catch (IOException ioe) {
            LOG.warn("Error serializing failure payload for throwable=" + t + " job=" + job, ioe);
        }
        this.listenerDelegate.fireEvent(JOB_FAILURE, this, curQueue, job, null, null, t);
    }
    // ...
}

To modify the behavior, create your own FailQueueStrategy implementation and set it on the Workers. Does this address both of your use-cases?

gresrun commented 9 years ago

@rblazquez @argvk Any thoughts on the proposed implementation?

argvk commented 9 years ago

@gresrun this looks per-worker rather than per-queue implementation, which I'm OK with. :+1:

gresrun commented 9 years ago

Well, the worker is the one that performs the operation but the strategy for determining the name of the failure queue could be based upon anything, including the name of the queue that the job was taken from, the classname of the Job or whether or not it is a blue moon; it's up to you.

gresrun commented 9 years ago

@rblazquez Any thoughts?

gresrun commented 9 years ago

@rblazquez You can now implement a per-queue strategy like so:

public class PerQueueFailQueueStrategy implements FailQueueStrategy {
    private final String namespace;

    public PerQueueFailQueueStrategy(final String namespace) {
        this.namespace = namespace;
    }

    public String getFailQueueKey(final Throwable t, final Job job, final String curQueue) {
         return null; // TODO: Return a value based on curQueue here...
    }
}