sequinstream / sequin

An open source message stream built on Postgres.
https://sequinstream.com
MIT License
262 stars 4 forks source link

Sequin Elixir: Stream Processing for User-Owned Postgres Tables #94

Closed RTLS closed 1 week ago

RTLS commented 1 month ago

Overview

Problem statement

Currently, Sequin requires a separate messages table to store stream data. This adds complex lifecycle management and it forces the user to decide read patterns at write time (ie. choose filterable columns). Additionally, a separate messages table adds storage and IOPS to the Postgres database.

Proposed solution

Develop a Sequin Elixir dependency that enables developers to treat their existing Postgres tables as streams. This library will provide a simple, Oban-like interface for creating consumers that can filter, transform, and react to changes in specified tables.

Consumers will pull directly from user-owned tables for backfill and replays, eliminating the need for a messages table. Consumers will then subscribe to WAL messages for real-time processing of inserts, updates, and deletes.

Goals / Objectives

  1. Simplify stream processing for existing Postgres tables
  2. Eliminate the need for separate stream tables managed by Sequin
  3. Reduce burden of adoption to a mix dependency installation
  4. Filter, delivery, and backfill from existing user-owned Postgres tables so users are fully in control
  5. Create a foundation for building more complex features (e.g., audit logs)

Key Concepts

Consumers

Consumers are the core abstraction. A consumer:

Core Features

  1. Table Subscriptions: Subscribe to one or more user-owned Postgres tables.
  2. Flexible Filtering: Define SQL-based filters per table subscription.
  3. Backfills and Replays:
    • Specify SQL queries for initial data loads or replays
    • Option to delay real-time processing until backfill completes
  4. Real-time Processing:
    • Consume changes from Postgres WAL after initial backfill
    • Handle inserts and updates efficiently
  5. Exactly-once Delivery: Consumers track in-flight and processed messages.
  6. Optional Delete Handling:
    • Configurable option to receive delete events
    • Stores delete payloads per consumer for processing

Example

Configuration

config :my_app, MyApp.Sequin,
  consumers: [
    orders: [
      tables: [
        table: "orders",
        filter: "processed_at IS NOT NULL",
        backfill_query: "SELECT * FROM orders WHERE created_at >= '2024-01-01'"
      ],
      max_concurrency: 10
    ],
  repo: MyApp.Repo

Consuming

defmodule MyApp.OrderConsumer do
  use Sequin.Consumer, consumer: :orders

  @impl Sequin.Consumer
  def handle_message(%Sequin.Message{table: "orders", data: data}) do
    order = struct!(MyApp.Order, data["record"])

    case order.status do
      "shipped" ->
        send_shipping_notification(order)

      "cancelled" ->
        process_cancellation(order)

      _ ->
        Logger.info("Unhandled order status: #{inspect(order)}")
    end

    :ok
  end

  defp send_shipping_notification(order) do
    # Implementation
  end

  defp process_cancellation(order) do
    # Implementation
  end
end

Implementation Details

Consumer Messages

The library manages a consumer_messages table to track processed messages and ensure exactly-once delivery.

acco commented 1 month ago

Notes for discussion:

Interestingly, all the deps I can think of use config to setup vs some imperative flow -- makes sense.