supabase / realtime

Broadcast, Presence, and Postgres Changes via WebSockets
https://supabase.com/realtime
Apache License 2.0
6.73k stars 302 forks source link

fix: Allow Overriding of `max_channels_per_client` via Environment Variable #866

Closed barrownicholas closed 3 months ago

barrownicholas commented 4 months ago

What kind of change does this PR introduce?

Bug fix: allows overriding the default of 100 for max_channels_per_client on a realtime tenant.

What is the current behavior?

The default is locked at 100 without any ready way to override it.

What is the new behavior?

Allows a user to specify MAX_CHANNELS_PER_CLIENT in the runtime environment to override this value.

Additional context

Fixes https://github.com/supabase/realtime/issues/843

vercel[bot] commented 4 months ago

@barrownicholas is attempting to deploy a commit to the Supabase Team on Vercel.

A member of the Team first needs to authorize it.

barrownicholas commented 4 months ago

@filipecabaco when you have time could you take a peak at this? You've been a huge help in the past, sorry to bother, you're just the best point-of-contact I have!!

barrownicholas commented 4 months ago

Also, can confirm that this built successfully in Docker on my local machine

filipecabaco commented 4 months ago

👋 will check today sorry for the delay

barrownicholas commented 3 months ago

@filipecabaco good idea, just pushed those in 11197b4

barrownicholas commented 3 months ago

@filipecabaco good idea about updating the defaults... I may undertake a larger project and set TENANT_... variables for all the defaults after we wrap this up. Changes should be good for you to review again when you have a sec.

filipecabaco commented 3 months ago

@filipecabaco good idea about updating the defaults... I may undertake a larger project and set TENANT_... variables for all the defaults after we wrap this up. Changes should be good for you to review again when you have a sec.

that would be awesome! and if you have any other feedback on self hosted issues you found please do ping

barrownicholas commented 3 months ago

@filipecabaco good idea about updating the defaults... I may undertake a larger project and set TENANT_... variables for all the defaults after we wrap this up. Changes should be good for you to review again when you have a sec.

that would be awesome! and if you have any other feedback on self hosted issues you found please do ping

Will do then! As soon as this ships, I'll open up a new fork + PR and start that

filipecabaco commented 3 months ago

small ci error, could you run mix format and commit?

barrownicholas commented 3 months ago

sorry about that, should be good now @filipecabaco

barrownicholas commented 3 months ago

@filipecabaco looks like I accidenatally duplicated lines 24-26 in tenant.ex while trying to resolve merge conflicts, which produced the failure in the CI workflow. I think it should be good to go now.

filipecabaco commented 3 months ago

🤦‍♂️

my bad forgot that schema is a compiled thing, tenant.ex should actually do this with a changeset change. Here's a proposal for the change that can easily adapt to the other fields when we need to do it lib/realtime/api/tenant.ex

defmodule Realtime.Api.Tenant do
  @moduledoc """
  Describes a database/tenant which makes use of the realtime service.
  """
  use Ecto.Schema
  import Ecto.Changeset
  alias Realtime.Api.Extensions
  alias Realtime.Crypto

  @type t :: %__MODULE__{}

  @primary_key {:id, :binary_id, autogenerate: true}
  @foreign_key_type :binary_id
  schema "tenants" do
    field(:name, :string)
    field(:external_id, :string)
    field(:jwt_secret, :string)
    field(:jwt_jwks, :map)
    field(:postgres_cdc_default, :string)
    field(:max_concurrent_users, :integer, default: 200)
    field(:max_events_per_second, :integer, default: 100)
    field(:max_bytes_per_second, :integer, default: 100_000)
    field(:max_channels_per_client, :integer)
    field(:max_joins_per_second, :integer, default: 100)
    field(:suspend, :boolean, default: false)
    field(:events_per_second_rolling, :float, virtual: true)
    field(:events_per_second_now, :integer, virtual: true)
    field(:enable_authorization, :boolean, default: false)

    has_many(:extensions, Realtime.Api.Extensions,
      foreign_key: :tenant_external_id,
      references: :external_id,
      on_delete: :delete_all,
      on_replace: :delete
    )

    timestamps()
  end

  @doc false
  def changeset(tenant, attrs) do
    # TODO: remove after infra update
    extension_key =
      if attrs[:extensions] do
        :extensions
      else
        "extensions"
      end

    attrs =
      if attrs[extension_key] do
        ext =
          Enum.map(attrs[extension_key], fn
            %{"type" => "postgres"} = e -> %{e | "type" => "postgres_cdc_rls"}
            e -> e
          end)

        %{attrs | extension_key => ext}
      else
        attrs
      end

    tenant
    |> cast(attrs, [
      :name,
      :external_id,
      :jwt_secret,
      :jwt_jwks,
      :max_concurrent_users,
      :max_events_per_second,
      :postgres_cdc_default,
      :max_bytes_per_second,
      :max_channels_per_client,
      :max_joins_per_second,
      :suspend,
      :enable_authorization
    ])
    |> validate_required([
      :external_id,
      :jwt_secret
    ])
    |> unique_constraint([:external_id])
    |> encrypt_jwt_secret()
    |> maybe_set_default(:max_channels_per_client, :tenant_max_channels_per_client)
    |> cast_assoc(:extensions, with: &Extensions.changeset/2)
  end

  def maybe_set_default(changeset, property, config_key) do
    has_key? = Map.get(changeset.data, property) || Map.get(changeset.changes, property)

    if has_key? do
      changeset
    else
      put_change(changeset, property, Application.fetch_env!(:realtime, config_key))
    end
  end

  def encrypt_jwt_secret(changeset) do
    update_change(changeset, :jwt_secret, &Crypto.encrypt!/1)
  end
end

and we also need to change the runtime.exs we have at the moment ignores tests:

import Config

config :logflare_logger_backend,
  url: System.get_env("LOGFLARE_LOGGER_BACKEND_URL", "https://api.logflare.app")

app_name = System.get_env("FLY_APP_NAME", "")
default_db_host = System.get_env("DB_HOST", "localhost")
username = System.get_env("DB_USER", "postgres")
password = System.get_env("DB_PASSWORD", "postgres")
database = System.get_env("DB_NAME", "postgres")
port = System.get_env("DB_PORT", "5432")
slot_name_suffix = System.get_env("SLOT_NAME_SUFFIX")

config :realtime,
  tenant_max_channels_per_client:
    System.get_env("TENANT_MAX_CHANNELS_PER_CLIENT", "100") |> String.to_integer()

if config_env() == :prod do
  secret_key_base =
    System.get_env("SECRET_KEY_BASE") ||
      raise """
      environment variable SECRET_KEY_BASE is missing.
      You can generate one by calling: mix phx.gen.secret
      """

  if app_name == "" do
    raise "APP_NAME not available"
  end

  config :realtime, RealtimeWeb.Endpoint,
    server: true,
    url: [host: "#{app_name}.fly.dev", port: 80],
    http: [
      port: String.to_integer(System.get_env("PORT") || "4000"),
      protocol_options: [
        max_header_value_length: String.to_integer(System.get_env("MAX_HEADER_LENGTH") || "4096")
      ],
      transport_options: [
        # max_connection is per connection supervisor
        # num_conns_sups defaults to num_acceptors
        # total conns accepted here is max_connections * num_acceptors
        # ref: https://ninenines.eu/docs/en/ranch/2.0/manual/ranch/
        max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "1000"),
        num_acceptors: String.to_integer(System.get_env("NUM_ACCEPTORS") || "100"),
        # IMPORTANT: support IPv6 addresses
        socket_opts: [:inet6]
      ]
    ],
    check_origin: false,
    secret_key_base: secret_key_base
end

if config_env() != :test do
  platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly

  config :realtime,
    secure_channels: System.get_env("SECURE_CHANNELS", "true") == "true",
    jwt_claim_validators: System.get_env("JWT_CLAIM_VALIDATORS", "{}"),
    api_jwt_secret: System.get_env("API_JWT_SECRET"),
    api_blocklist: System.get_env("API_TOKEN_BLOCKLIST", "") |> String.split(","),
    metrics_blocklist: System.get_env("METRICS_TOKEN_BLOCKLIST", "") |> String.split(","),
    metrics_jwt_secret: System.get_env("METRICS_JWT_SECRET"),
    db_enc_key: System.get_env("DB_ENC_KEY"),
    region: System.get_env("FLY_REGION") || System.get_env("REGION"),
    fly_alloc_id: System.get_env("FLY_ALLOC_ID", ""),
    prom_poll_rate: System.get_env("PROM_POLL_RATE", "5000") |> String.to_integer(),
    platform: platform,
    slot_name_suffix: slot_name_suffix

  queue_target = System.get_env("DB_QUEUE_TARGET", "5000") |> String.to_integer()
  queue_interval = System.get_env("DB_QUEUE_INTERVAL", "5000") |> String.to_integer()

  after_connect_query_args =
    case System.get_env("DB_AFTER_CONNECT_QUERY") do
      nil -> nil
      query -> {Postgrex, :query!, [query, []]}
    end

  config :realtime, Realtime.Repo,
    hostname: default_db_host,
    username: username,
    password: password,
    database: database,
    port: port,
    pool_size: System.get_env("DB_POOL_SIZE", "5") |> String.to_integer(),
    queue_target: queue_target,
    queue_interval: queue_interval,
    parameters: [
      application_name: "supabase_mt_realtime"
    ],
    after_connect: after_connect_query_args

  replica_repos = %{
    Realtime.Repo.Replica.FRA => System.get_env("DB_HOST_REPLICA_FRA", default_db_host),
    Realtime.Repo.Replica.IAD => System.get_env("DB_HOST_REPLICA_IAD", default_db_host),
    Realtime.Repo.Replica.SIN => System.get_env("DB_HOST_REPLICA_SIN", default_db_host),
    Realtime.Repo.Replica.SJC => System.get_env("DB_HOST_REPLICA_SJC", default_db_host),
    Realtime.Repo.Replica.Singapore => System.get_env("DB_HOST_REPLICA_SIN", default_db_host),
    Realtime.Repo.Replica.London => System.get_env("DB_HOST_REPLICA_FRA", default_db_host),
    Realtime.Repo.Replica.NorthVirginia => System.get_env("DB_HOST_REPLICA_IAD", default_db_host),
    Realtime.Repo.Replica.Oregon => System.get_env("DB_HOST_REPLICA_SJC", default_db_host),
    Realtime.Repo.Replica.SanJose => System.get_env("DB_HOST_REPLICA_SJC", default_db_host),
    Realtime.Repo.Replica.Local => default_db_host
  }

  # username, password, database, and port must match primary credentials
  for {replica_repo, hostname} <- replica_repos do
    config :realtime, replica_repo,
      hostname: hostname,
      username: username,
      password: password,
      database: database,
      port: port,
      pool_size: System.get_env("DB_REPLICA_POOL_SIZE", "5") |> String.to_integer(),
      queue_target: queue_target,
      queue_interval: queue_interval,
      parameters: [
        application_name: "supabase_mt_realtime_ro"
      ]
  end
end

default_cluster_strategy =
  config_env()
  |> case do
    :prod -> "DNS"
    _ -> "EPMD"
  end

cluster_topologies =
  System.get_env("CLUSTER_STRATEGIES", default_cluster_strategy)
  |> String.upcase()
  |> String.split(",")
  |> Enum.reduce([], fn strategy, acc ->
    strategy
    |> String.trim()
    |> case do
      "DNS" ->
        [
          fly6pn: [
            strategy: Cluster.Strategy.DNSPoll,
            config: [
              polling_interval: 5_000,
              query: System.get_env("DNS_NODES"),
              node_basename: app_name
            ]
          ]
        ] ++ acc

      "POSTGRES" ->
        version = "#{Application.spec(:realtime)[:vsn]}" |> String.replace(".", "_")

        [
          postgres: [
            strategy: Realtime.Cluster.Strategy.Postgres,
            config: [
              hostname: default_db_host,
              username: username,
              password: password,
              database: database,
              port: port,
              parameters: [
                application_name: "cluster_node_#{node()}"
              ],
              heartbeat_interval: 5_000,
              node_timeout: 15_000,
              channel_name:
                System.get_env("POSTGRES_CLUSTER_CHANNEL_NAME", "realtime_cluster_#{version}")
            ]
          ]
        ] ++ acc

      "EPMD" ->
        [
          dev: [
            strategy: Cluster.Strategy.Epmd,
            config: [
              hosts: [:"orange@127.0.0.1", :"pink@127.0.0.1"]
            ],
            connect: {:net_kernel, :connect_node, []},
            disconnect: {:net_kernel, :disconnect_node, []}
          ]
        ] ++ acc

      _ ->
        acc
    end
  end)

config :libcluster,
  debug: false,
  topologies: cluster_topologies

if System.get_env("LOGS_ENGINE") == "logflare" do
  if !System.get_env("LOGFLARE_API_KEY") or !System.get_env("LOGFLARE_SOURCE_ID") do
    raise """
    Environment variable LOGFLARE_API_KEY or LOGFLARE_SOURCE_ID is missing.
    Check those variables or choose another LOGS_ENGINE.
    """
  end

  config :logger,
    backends: [LogflareLogger.HttpBackend]
end

Tested locally and looks good

Screenshot 2024-06-18 at 19 05 20
barrownicholas commented 3 months ago

should actually do this with a changeset change

@filipecabaco just refactored those two files based on those changes

filipecabaco commented 3 months ago

merged 🙏 thank you so much for all the patience!

kiwicopple commented 3 months ago

:tada: This PR is included in version 2.29.10 :tada:

The release is available on GitHub release

Your semantic-release bot :package::rocket: