fredwu / opq

Elixir queue! A simple, in-memory queue with worker pooling and rate limiting in Elixir.
https://hex.pm/packages/opq
266 stars 8 forks source link
back-pressure demand-control elixir fifo gen-stage genstage in-memory opq pool pooling queue rate-limit worker-pool

OPQ: One Pooled Queue

Build Status CodeBeat Coverage Hex Version Hex Docs Total Download License Last Updated

Elixir Queue!

A simple, in-memory queue with worker pooling and rate limiting in Elixir. OPQ leverages Erlang's queue module and Elixir's GenStage.

Originally built to support Crawler.

Features

See Hex documentation.

Installation

def deps do
  [{:opq, "~> 4.0"}]
end

Usage

A simple example:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
OPQ.enqueue(opq, fn -> IO.inspect("world") end)

Specify module, function and arguments:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, IO, :inspect, ["hello"])
OPQ.enqueue(opq, IO, :inspect, ["world"])

Specify a custom name for the queue:

OPQ.init(name: :items)

OPQ.enqueue(:items, fn -> IO.inspect("hello") end)
OPQ.enqueue(:items, fn -> IO.inspect("world") end)

Start as part of a supervision tree:

Note, when starting as part of a supervision tree, the :name option must be provided.

children = [
  {OPQ, name: :items}
]

Specify a custom worker to process items in the queue:

defmodule CustomWorker do
  def start_link(item) do
    Task.start_link(fn ->
      Agent.update(:bucket, &[item | &1])
    end)
  end
end

Agent.start_link(fn -> [] end, name: :bucket)

{:ok, opq} = OPQ.init(worker: CustomWorker)

OPQ.enqueue(opq, "hello")
OPQ.enqueue(opq, "world")

Agent.get(:bucket, & &1) # => ["world", "hello"]

Rate limit:

{:ok, opq} = OPQ.init(workers: 1, interval: 1000)

Task.async(fn ->
  OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
  OPQ.enqueue(opq, fn -> IO.inspect("world") end)
end)

If no interval is supplied, the ratelimiter will be bypassed.

Check the queue and number of available workers:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> Process.sleep(1000) end)

{status, queue, available_workers} = OPQ.info(opq) # => {:normal, #OPQ.Queue<[]>, 9}

Process.sleep(1200)

{status, queue, available_workers} = OPQ.info(opq) # => {:normal, #OPQ.Queue<[]>, 10}

If you just need to get the queue itself:

OPQ.queue(opq) # => #OPQ.Queue<[]>

Queue

OPQ implements Enumerable, so you can perform enumerable functions on the queue:

{:ok, opq} = OPQ.init()

queue = OPQ.queue(opq)

Enum.count(queue) # => 0
Enum.empty?(queue) # => true

Stop the queue:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
OPQ.stop(opq)
OPQ.enqueue(opq, fn -> IO.inspect("world") end) # => (EXIT) no process...

Pause and resume the queue:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> IO.inspect("hello") end) # => "hello"
OPQ.pause(opq)
OPQ.info(opq) # => {:paused, {[], []}, 10}
OPQ.enqueue(opq, fn -> IO.inspect("world") end)
OPQ.resume(opq) # => "world"
OPQ.info(opq) # => {:normal, {[], []}, 10}

Configurations

Option Type Default Value Description
:name atom/module pid The name of the queue.
:worker module OPQ.Worker The worker that processes each item from the queue.
:workers integer 10 Maximum number of workers.
:interval integer 0 Rate limit control - number of milliseconds before asking for more items to process, defaults to 0 which is effectively no rate limit.
:timeout integer 5000 Number of milliseconds allowed to perform the work, it should always be set to higher than :interval.

Changelog

Please see CHANGELOG.md.

License

Licensed under MIT.