(Thanks to Erlang Solutions for sponsoring this project)
EctoWatch allows you to easily setup notifications of database changes directly from PostgreSQL.
Often in Elixir applications a Phoenix.PubSub.broadcast
is inserted into the application code to notify the rest of the application about inserts, updates, or deletions (e.g. Accounts.insert_user
/Accounts.update_user
/Accounts.delete_user
). This has a few potential problems:
MyApp.Repo.*
.By getting updates directly from PostgreSQL, EctoWatch ensures that messages are sent for every change (even changes from other clients of the database). EctoWatch also establishes a simple, standardized set of messages for inserts, updates, and deletes so that there can be consistency across your application. By default only the id of the record is sent (makeing for smaller messages).
EctoWatch
To use EctoWatch, you need to add it to your supervision tree and specify watchers for Ecto schemas and update types. It would look something like this in your application.ex
file (after MyApp.Repo
and MyApp.PubSub
):
alias MyApp.Accounts.User
alias MyApp.Accounts.Package
{EctoWatch,
repo: MyApp.Repo,
pub_sub: MyApp.PubSub,
watchers: [
{User, :inserted},
{User, :updated},
{User, :deleted},
{Package, :inserted},
{Package, :updated}
]}
This will setup:
Phoenix.PubSub
Then any process (e.g. a GenServer, a LiveView, a Phoenix channel, etc...) can subscribe to messages like so:
EctoWatch.subscribe({User, :inserted})
EctoWatch.subscribe({User, :updated})
EctoWatch.subscribe({User, :deleted})
EctoWatch.subscribe({Package, :inserted})
EctoWatch.subscribe({Package, :updated})
(note that if you are subscribing in a LiveView mount
callback you should subscribe inside of a if connected?(socket) do
to avoid subscribing twice).
You can also subscribe to individual records:
EctoWatch.subscribe({User, :updated}, user.id)
EctoWatch.subscribe({User, :deleted}, user.id)
... OR you can subscribe to records by an association column (but the given column must be in the extra_columns
list for the watcher! See below for more info on the extra_columns
option):
EctoWatch.subscribe({User, :updated}, {:role_id, role.id})
EctoWatch.subscribe({User, :deleted}, {:role_id, role.id})
Once subscribed, messages can be handled like so (LiveView example are given here but handle_info
callbacks can be used elsewhere as well):
def handle_info({{User, :inserted}, %{id: id}}, socket) do
user = Accounts.get_user(id)
socket = stream_insert(socket, :users, user)
{:noreply, socket}
end
def handle_info({{User, :updated}, %{id: id}}, socket) do
user = Accounts.get_user(id)
socket = stream_insert(socket, :users, user)
{:noreply, socket}
end
def handle_info({{User, :deleted}, %{id: id}}, socket) do
socket = stream_delete_by_dom_id(socket, :songs, "users-#{id}")
{:noreply, socket}
end
There are a lot of features to check out! Check out the HexDocs documentation for all of the details!
The package can be installed from hex by adding ecto_watch
to your list of dependencies in mix.exs
:
def deps do
[
{:ecto_watch, "~> 0.11.1"}
]
end
Documentation can be generated with ExDoc and published on HexDocs. Once published, the docs can be found at https://hexdocs.pm/ecto_watch.
[^1]: more info about PubSub message standards:
users
) means that all messages are sent to all subscribers, which can be inefficient.users:1
, users:2
, etc.) means that a subscriber needs to subscribe to every record, which can be inefficient.user
vs. packages
topics) which can be confusing and lead to bugs.{:updated, id}
doesn't make it clear which schema was updated, using {schema, id}
doesn't make it clear which operation happened.