hibiken / asynq

Simple, reliable, and efficient distributed task queue in Go
MIT License
9.54k stars 692 forks source link

[FEATURE REQUEST] Support BatchEnqueue for client #535

Open Percivalll opened 2 years ago

Percivalll commented 2 years ago

When a lot of tasks need to be enqueued, current method is slow because every redis-op needs at least a RTT. For example: If I want enqueue 1000000 tasks, each client.Enqueue spends 13ms in my environment. So if I execute it without concurrency, this will spend 1000000*13=13000000ms, almost 3.6 hours. Definitely I can use a lot of goroutines to shorten the time, but takes a lot of cpu usage and many redis connections.

I think we should supply a BatchEnqueue method for supporting user to enqueue a lot of tasks at once. For redis broker, we can use pipeline to decrease network and cpu overhead.

Percivalll commented 2 years ago

Related discussions: https://github.com/hibiken/asynq/issues/339#issuecomment-985507125 https://github.com/hibiken/asynq/issues/352

KillianH commented 2 years ago

I need that too :) I have several millions of tasks to enqueue in my workflow. Overall I really love the lib I can handle 7 millions tasks in 44 minutes (with some computation and database requests)

hibiken commented 2 years ago

Thank you @Serinalice for creating this feature request!

This feature makes a lot of sense and the package should support this use case. We should probably discuss the API first (What should it look like? How should we handle partial errors?)

Percivalll commented 2 years ago

No problem, I'll describe my preliminary ideas!

On Sep 10, 2022, at 22:27, Ken Hibino @.***> wrote:



Thank you @Serinalicehttps://github.com/Serinalice for creating this feature request!

This feature makes a lot of sense and the package should support this use case. We should probably discuss the API first (What should it look like? How should we handle partial errors?)

— Reply to this email directly, view it on GitHubhttps://github.com/hibiken/asynq/issues/535#issuecomment-1242741446, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AODYOTHFSG72DSHHWBFB53TV5SLF3ANCNFSM6AAAAAAQC5HN3U. You are receiving this because you were mentioned.Message ID: @.***>

Percivalll commented 2 years ago

How about this:

func (c *Client) EnqueueBatch(tasks []*Task, opts ...Option) ([]*TaskInfo, error)
func (c *Client) EnqueueContext(ctx context.Context, tasks []*Task, opts ...Option) ([]*TaskInfo, error)

If error is nil, all tasks have been successfully enqueued. If not, array of task info saves all successfully tasks.

xuyang2 commented 2 years ago

How to configure/default batch size?

https://redis.io/docs/manual/pipelining/

IMPORTANT NOTE: While the client sends commands using pipelining, the server will be forced to queue the replies, using memory. So if you need to send a lot of commands with pipelining, it is better to send them as batches each containing a reasonable number, for instance 10k commands, read the replies, and then send another 10k commands again, and so forth. The speed will be nearly the same, but the additional memory used will be at most the amount needed to queue the replies for these 10k commands.

Percivalll commented 2 years ago

How to configure/default batch size?

https://redis.io/docs/manual/pipelining/

IMPORTANT NOTE: While the client sends commands using pipelining, the server will be forced to queue the replies, using memory. So if you need to send a lot of commands with pipelining, it is better to send them as batches each containing a reasonable number, for instance 10k commands, read the replies, and then send another 10k commands again, and so forth. The speed will be nearly the same, but the additional memory used will be at most the amount needed to queue the replies for these 10k commands.

By length of tasks.

yousifh commented 2 years ago

Would this new batch API pipeline the existing EVALSHA enqueue scripts or it will use a new Lua script that takes the batch of tasks and enqueue them all at once?

5idu commented 1 year ago

Are there any new developments on this issue?

developersam1995 commented 1 year ago

Should this API support a combination of initial task states between "aggregating", "pending", and "scheduled"?

thanhps42 commented 4 months ago

any update?