Closed barrownicholas closed 4 months ago
@filipecabaco this might be related to #866, yet it still seems odd that none of the tables are even being created (versus just seeding the first/default tenant)
If I revert back to v2.28.32
the error no longer occurs, so it must be happening somewhere between 28.32
and 29.9
:
Here's raw (good/working) logs from 2.28.32
if that helps to compare to the logs with the error being produced above:
2024-06-20 14:27:37 Setting RLIMIT_NOFILE to 10000
2024-06-20 14:27:40 18:27:40.147 [info] == Running 20210706140551 Realtime.Repo.Migrations.CreateTenants.change/0 forward
2024-06-20 14:27:40 18:27:40.156 [info] create table tenants
2024-06-20 14:27:40 18:27:40.181 [info] create index tenants_external_id_index
2024-06-20 14:27:40 18:27:40.193 [info] == Migrated 20210706140551 in 0.0s
2024-06-20 14:27:40 18:27:40.238 [info] == Running 20220329161857 Realtime.Repo.Migrations.AddExtensionsTable.change/0 forward
2024-06-20 14:27:40 18:27:40.238 [info] create table extensions
2024-06-20 14:27:40 18:27:40.267 [info] create index extensions_tenant_external_id_type_index
2024-06-20 14:27:40 18:27:40.279 [info] == Migrated 20220329161857 in 0.0s
2024-06-20 14:27:40 18:27:40.286 [info] == Running 20220410212326 Realtime.Repo.Migrations.AddTenantMaxEps.up/0 forward
2024-06-20 14:27:40 18:27:40.286 [info] alter table tenants
2024-06-20 14:27:40 18:27:40.290 [info] == Migrated 20220410212326 in 0.0s
2024-06-20 14:27:40 18:27:40.292 [info] == Running 20220506102948 Realtime.Repo.Migrations.RenamePollIntervalToPollIntervalMs.up/0 forward
2024-06-20 14:27:40 18:27:40.298 [warning] Replica region not found, defaulting to Realtime.Repo
2024-06-20 14:27:40 18:27:40.311 [debug] QUERY OK source="extensions" db=1.6ms
2024-06-20 14:27:40 SELECT e0."id", e0."type", e0."settings", e0."tenant_external_id", e0."inserted_at", e0."updated_at" FROM "extensions" AS e0 WHERE (e0."type" = $1) ["postgres_cdc_rls"]
2024-06-20 14:27:40 18:27:40.312 [info] == Migrated 20220506102948 in 0.0s
2024-06-20 14:27:40 18:27:40.318 [info] == Running 20220527210857 Realtime.Repo.Migrations.AddExternalIdUniqIndex.change/0 forward
2024-06-20 14:27:40 18:27:40.318 [info] execute "alter table tenants add constraint uniq_external_id unique (external_id)"
2024-06-20 14:27:40 18:27:40.321 [info] == Migrated 20220527210857 in 0.0s
2024-06-20 14:27:40 18:27:40.327 [info] == Running 20220815211129 Realtime.Repo.Migrations.NewMaxEventsPerSecondDefault.change/0 forward
2024-06-20 14:27:40 18:27:40.328 [info] alter table tenants
2024-06-20 14:27:40 18:27:40.335 [info] == Migrated 20220815211129 in 0.0s
2024-06-20 14:27:40 18:27:40.350 [info] == Running 20220815215024 Realtime.Repo.Migrations.SetCurrentMaxEventsPerSecond.change/0 forward
2024-06-20 14:27:40 18:27:40.350 [info] execute "update tenants set max_events_per_second = 1000"
2024-06-20 14:27:40 18:27:40.355 [info] == Migrated 20220815215024 in 0.0s
2024-06-20 14:27:40 18:27:40.357 [info] == Running 20220818141501 Realtime.Repo.Migrations.ChangeLimitsDefaults.change/0 forward
2024-06-20 14:27:40 18:27:40.357 [info] alter table tenants
2024-06-20 14:27:40 18:27:40.360 [info] == Migrated 20220818141501 in 0.0s
2024-06-20 14:27:40 18:27:40.363 [info] == Running 20221018173709 Realtime.Repo.Migrations.AddCdcDefault.up/0 forward
2024-06-20 14:27:40 18:27:40.364 [info] alter table tenants
2024-06-20 14:27:40 18:27:40.365 [info] == Migrated 20221018173709 in 0.0s
2024-06-20 14:27:40 18:27:40.369 [info] == Running 20221102172703 Realtime.Repo.Migrations.RenamePgType.up/0 forward
2024-06-20 14:27:40 18:27:40.369 [info] execute "update extensions set type = 'postgres_cdc_rls'"
2024-06-20 14:27:40 18:27:40.370 [info] == Migrated 20221102172703 in 0.0s
2024-06-20 14:27:40 18:27:40.376 [info] == Running 20221223010058 Realtime.Repo.Migrations.DropTenantsUniqExternalIdIndex.change/0 forward
2024-06-20 14:27:40 18:27:40.376 [info] execute "ALTER TABLE IF EXISTS tenants DROP CONSTRAINT IF EXISTS uniq_external_id"
2024-06-20 14:27:40 18:27:40.378 [info] == Migrated 20221223010058 in 0.0s
2024-06-20 14:27:40 18:27:40.397 [info] == Running 20230110180046 Realtime.Repo.Migrations.AddLimitsFieldsToTenants.change/0 forward
2024-06-20 14:27:40 18:27:40.397 [info] alter table tenants
2024-06-20 14:27:40 18:27:40.399 [info] == Migrated 20230110180046 in 0.0s
2024-06-20 14:27:40 18:27:40.401 [info] == Running 20230810220907 Realtime.Repo.Migrations.AlterTenantsTableColumnsToText.change/0 forward
2024-06-20 14:27:40 18:27:40.402 [info] alter table tenants
2024-06-20 14:27:40 18:27:40.404 [info] == Migrated 20230810220907 in 0.0s
2024-06-20 14:27:40 18:27:40.406 [info] == Running 20230810220924 Realtime.Repo.Migrations.AlterExtensionsTableColumnsToText.change/0 forward
2024-06-20 14:27:40 18:27:40.406 [info] alter table extensions
2024-06-20 14:27:40 18:27:40.409 [info] == Migrated 20230810220924 in 0.0s
2024-06-20 14:27:40 18:27:40.411 [info] == Running 20231024094642 :"Elixir.Realtime.Repo.Migrations.Add-tenant-suspend-flag".change/0 forward
2024-06-20 14:27:40 18:27:40.411 [info] alter table tenants
2024-06-20 14:27:40 18:27:40.412 [info] == Migrated 20231024094642 in 0.0s
2024-06-20 14:27:40 18:27:40.414 [info] == Running 20240306114423 Realtime.Repo.Migrations.AdddTenantJwtJwksColumn.change/0 forward
2024-06-20 14:27:40 18:27:40.414 [info] alter table tenants
2024-06-20 14:27:40 18:27:40.415 [info] == Migrated 20240306114423 in 0.0s
2024-06-20 14:27:40 18:27:40.417 [info] == Running 20240418082835 Realtime.Repo.Migrations.AddAuthorizationFlag.change/0 forward
2024-06-20 14:27:40 18:27:40.417 [info] alter table tenants
2024-06-20 14:27:40 18:27:40.417 [info] == Migrated 20240418082835 in 0.0s
2024-06-20 14:27:41 18:27:41.094 [debug] QUERY OK db=1.3ms queue=85.8ms idle=0.0ms
2024-06-20 14:27:41 begin []
2024-06-20 14:27:41 18:27:41.111 [debug] QUERY OK source="tenants" db=0.8ms
2024-06-20 14:27:41 SELECT t0."id", t0."name", t0."external_id", t0."jwt_secret", t0."jwt_jwks", t0."postgres_cdc_default", t0."max_concurrent_users", t0."max_events_per_second", t0."max_bytes_per_second", t0."max_channels_per_client", t0."max_joins_per_second", t0."suspend", t0."enable_authorization", t0."inserted_at", t0."updated_at" FROM "tenants" AS t0 WHERE (t0."external_id" = $1) ["realtime-dev"]
2024-06-20 14:27:41 18:27:41.172 [debug] QUERY OK db=16.7ms
2024-06-20 14:27:41 INSERT INTO "tenants" ("enable_authorization","external_id","jwt_secret","max_bytes_per_second","max_channels_per_client","max_concurrent_users","max_events_per_second","max_joins_per_second","name","suspend","inserted_at","updated_at","id") VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) [false, "realtime-dev", "eGxa2ZKVreSn7eWieRQdp60i5H6KJLiST7splFU6MVHylMSAoQ2SjsTrTTQo/+bmYjQcO4hNnGTU+D1wtlXreA==", 100000, 100, 200, 100, 100, "realtime-dev", false, ~N[2024-06-20 18:27:41], ~N[2024-06-20 18:27:41], <<135, 98, 98, 111, 148, 61, 65, 111, 168, 163, 41, 220, 41, 229, 54, 50>>]
2024-06-20 14:27:41 18:27:41.185 [debug] QUERY OK db=10.3ms
2024-06-20 14:27:41 INSERT INTO "extensions" ("settings","tenant_external_id","type","inserted_at","updated_at","id") VALUES ($1,$2,$3,$4,$5,$6) [%{"db_host" => "QhixI0o7PYIABziLUL4f0A==", "db_name" => "sWBpZNdjggEPTQVlI52Zfw==", "db_password" => "eGxa2ZKVreSn7eWieRQdp74vN25K+qFgdnxmDCKe4p20+C0410WXonzXTEj9CgYx", "db_port" => "+enMDFi1J/3IrrquHHwUmA==", "db_user" => "uxbEq/zz8DXVD53TOI1zmw==", "poll_interval_ms" => 100, "poll_max_changes" => 100, "poll_max_record_bytes" => 1048576, "publication" => "supabase_realtime", "region" => "us-east-1", "slot_name" => "supabase_realtime_replication_slot", "ssl_enforced" => false}, "realtime-dev", "postgres_cdc_rls", ~N[2024-06-20 18:27:41], ~N[2024-06-20 18:27:41], <<91, 233, 216, 197, 237, 247, 75, 236, 180, 39, 150, 106, 114, 198, 175, 175>>]
2024-06-20 14:27:41 18:27:41.193 [debug] QUERY OK db=8.1ms
2024-06-20 14:27:41 commit []
2024-06-20 14:27:42 18:27:42.886 [notice] :alarm_handler: {:set, {:system_memory_high_watermark, []}}
2024-06-20 14:27:42 18:27:42.962 [info] Elixir.Realtime.SignalHandler is being initialized...
2024-06-20 14:27:42 18:27:42.969 [notice] SYN[realtime@127.0.0.1] Adding node to scope <Elixir.Realtime.Tenants.Connect>
2024-06-20 14:27:42 18:27:42.969 [notice] SYN[realtime@127.0.0.1] Creating tables for scope <Elixir.Realtime.Tenants.Connect>
2024-06-20 14:27:42 18:27:42.969 [notice] SYN[realtime@127.0.0.1|registry<Elixir.Realtime.Tenants.Connect>] Discovering the cluster
2024-06-20 14:27:42 18:27:42.969 [notice] SYN[realtime@127.0.0.1|pg<Elixir.Realtime.Tenants.Connect>] Discovering the cluster
2024-06-20 14:27:42 18:27:42.969 [notice] SYN[realtime@127.0.0.1] Adding node to scope <users>
2024-06-20 14:27:42 18:27:42.969 [notice] SYN[realtime@127.0.0.1] Creating tables for scope <users>
2024-06-20 14:27:42 18:27:42.970 [notice] SYN[realtime@127.0.0.1|registry<users>] Discovering the cluster
2024-06-20 14:27:42 18:27:42.970 [notice] SYN[realtime@127.0.0.1|pg<users>] Discovering the cluster
2024-06-20 14:27:42 18:27:42.970 [notice] SYN[realtime@127.0.0.1] Adding node to scope <Elixir.RegionNodes>
2024-06-20 14:27:42 18:27:42.970 [notice] SYN[realtime@127.0.0.1] Creating tables for scope <Elixir.RegionNodes>
2024-06-20 14:27:42 18:27:42.970 [notice] SYN[realtime@127.0.0.1|registry<Elixir.RegionNodes>] Discovering the cluster
2024-06-20 14:27:42 18:27:42.970 [notice] SYN[realtime@127.0.0.1|pg<Elixir.RegionNodes>] Discovering the cluster
2024-06-20 14:27:42 18:27:42.972 [warning] Replica region not found, defaulting to Realtime.Repo
2024-06-20 14:27:42 18:27:42.993 [info] Running RealtimeWeb.Endpoint with cowboy 2.10.0 at :::4000 (http)
2024-06-20 14:27:42 18:27:42.994 [info] Access RealtimeWeb.Endpoint at http://realtime.fly.dev
2024-06-20 14:27:43 18:27:43.006 [notice] SYN[realtime@127.0.0.1] Adding node to scope <Elixir.PostgresCdcStream>
2024-06-20 14:27:43 18:27:43.007 [notice] SYN[realtime@127.0.0.1] Creating tables for scope <Elixir.PostgresCdcStream>
2024-06-20 14:27:43 18:27:43.007 [notice] SYN[realtime@127.0.0.1|registry<Elixir.PostgresCdcStream>] Discovering the cluster
2024-06-20 14:27:43 18:27:43.007 [notice] SYN[realtime@127.0.0.1|pg<Elixir.PostgresCdcStream>] Discovering the cluster
2024-06-20 14:27:43 18:27:43.007 [notice] SYN[realtime@127.0.0.1] Adding node to scope <Elixir.Extensions.PostgresCdcRls>
2024-06-20 14:27:43 18:27:43.008 [notice] SYN[realtime@127.0.0.1] Creating tables for scope <Elixir.Extensions.PostgresCdcRls>
2024-06-20 14:27:43 18:27:43.008 [notice] SYN[realtime@127.0.0.1|registry<Elixir.Extensions.PostgresCdcRls>] Discovering the cluster
2024-06-20 14:27:43 18:27:43.008 [notice] SYN[realtime@127.0.0.1|pg<Elixir.Extensions.PostgresCdcRls>] Discovering the cluster
2024-06-20 14:27:45 18:27:45.963 [debug] Tzdata polling for update.
2024-06-20 14:27:46 18:27:46.030 [info] tzdata release in place is from a file last modified Fri, 22 Oct 2021 02:20:47 GMT. Release file on server was last modified Thu, 01 Feb 2024 18:40:48 GMT.
2024-06-20 14:27:46 18:27:46.030 [debug] Tzdata downloading new data from https://data.iana.org/time-zones/tzdata-latest.tar.gz
2024-06-20 14:27:46 18:27:46.105 [debug] Tzdata data downloaded. Release version 2024a.
2024-06-20 14:27:46 18:27:46.480 [info] Tzdata has updated the release from 2021e to 2024a
2024-06-20 14:27:46 18:27:46.480 [debug] Tzdata deleting ETS table for version 2021e
2024-06-20 14:27:46 18:27:46.481 [debug] Tzdata deleting ETS table file for version 2021e
2024-06-20 14:27:48 18:27:48.024 request_id=F9rJx9wgyGFzid4AAAAB [info] HEAD /api/tenants/realtime-dev/health
2024-06-20 14:27:48 18:27:48.024 request_id=F9rJx9wgyGFzid4AAAAB [debug] Processing with RealtimeWeb.TenantController.health/2
2024-06-20 14:27:48 Parameters: %{"tenant_id" => "realtime-dev"}
2024-06-20 14:27:48 Pipelines: [:api]
2024-06-20 14:27:48 18:27:48.024 [warning] Replica region not found, defaulting to Realtime.Repo
2024-06-20 14:27:48 18:27:48.028 [debug] QUERY OK source="tenants" db=1.3ms queue=1.5ms idle=1038.0ms
2024-06-20 14:27:48 SELECT t0."id", t0."name", t0."external_id", t0."jwt_secret", t0."jwt_jwks", t0."postgres_cdc_default", t0."max_concurrent_users", t0."max_events_per_second", t0."max_bytes_per_second", t0."max_channels_per_client", t0."max_joins_per_second", t0."suspend", t0."enable_authorization", t0."inserted_at", t0."updated_at" FROM "tenants" AS t0 WHERE (t0."external_id" = $1) ["realtime-dev"]
2024-06-20 14:27:48 18:27:48.033 [debug] QUERY OK source="extensions" db=4.3ms queue=0.5ms idle=1041.0ms
2024-06-20 14:27:48 SELECT e0."id", e0."type", e0."settings", e0."tenant_external_id", e0."inserted_at", e0."updated_at", e0."tenant_external_id" FROM "extensions" AS e0 WHERE (e0."tenant_external_id" = $1) ORDER BY e0."tenant_external_id" ["realtime-dev"]
2024-06-20 14:27:48 18:27:48.033 request_id=F9rJx9wgyGFzid4AAAAB project=realtime-dev external_id=realtime-dev [warning] Zero region nodes for iad using :"realtime@127.0.0.1"
2024-06-20 14:27:48 18:27:48.033 project=realtime-dev external_id=realtime-dev [warning] Replica region not found, defaulting to Realtime.Repo
2024-06-20 14:27:48 18:27:48.035 project=realtime-dev external_id=realtime-dev [debug] QUERY OK source="tenants" db=1.1ms idle=1046.3ms
2024-06-20 14:27:48 SELECT t0."id", t0."name", t0."external_id", t0."jwt_secret", t0."jwt_jwks", t0."postgres_cdc_default", t0."max_concurrent_users", t0."max_events_per_second", t0."max_bytes_per_second", t0."max_channels_per_client", t0."max_joins_per_second", t0."suspend", t0."enable_authorization", t0."inserted_at", t0."updated_at" FROM "tenants" AS t0 WHERE (t0."external_id" = $1) ["realtime-dev"]
2024-06-20 14:27:48 18:27:48.039 project=realtime-dev external_id=realtime-dev [debug] QUERY OK source="extensions" db=3.9ms idle=1047.6ms
2024-06-20 14:27:48 SELECT e0."id", e0."type", e0."settings", e0."tenant_external_id", e0."inserted_at", e0."updated_at", e0."tenant_external_id" FROM "extensions" AS e0 WHERE (e0."tenant_external_id" = $1) ORDER BY e0."tenant_external_id" ["realtime-dev"]
2024-06-20 14:27:48 18:27:48.078 project=realtime-dev external_id=realtime-dev application_name=realtime_connect [info] Applying migrations to db
2024-06-20 14:27:48 18:27:48.134 [info] == Running 20211116024918 Realtime.Tenants.Migrations.CreateRealtimeSubscriptionTable.change/0 forward
2024-06-20 14:27:48 18:27:48.139 [info] execute "create type realtime.equality_op as enum(\n 'eq', 'neq', 'lt', 'lte', 'gt', 'gte'\n );"
2024-06-20 14:27:48 18:27:48.157 [info] execute "create type realtime.user_defined_filter as (\n column_name text,\n op realtime.equality_op,\n value text\n );"
2024-06-20 14:27:48 18:27:48.158 [info] execute "create table realtime.subscription (\n -- Tracks which users are subscribed to each table\n id bigint not null generated always as identity,\n user_id uuid not null,\n -- Populated automatically by trigger. Required to enable auth.email()\n email varchar(255),\n entity regclass not null,\n filters realtime.user_defined_filter[] not null default '{}',\n created_at timestamp not null default timezone('utc', now()),\n\n constraint pk_subscription primary key (id),\n unique (entity, user_id, filters)\n )"
2024-06-20 14:27:48 18:27:48.181 [info] execute "create index ix_realtime_subscription_entity on realtime.subscription using hash (entity)"
2024-06-20 14:27:48 18:27:48.184 [info] == Migrated 20211116024918 in 0.0s
2024-06-20 14:27:48 18:27:48.190 [info] == Running 20211116045059 Realtime.Tenants.Migrations.CreateRealtimeCheckFiltersTrigger.change/0 forward
2024-06-20 14:27:48 18:27:48.190 [info] execute "create function realtime.subscription_check_filters()\n returns trigger\n language plpgsql\n as $$\n /*\n Validates that the user defined filters for a subscription:\n - refer to valid columns that 'authenticated' may access\n - values are coercable to the correct column type\n */\n declare\n col_names text[] = coalesce(\n array_agg(c.column_name order by c.ordinal_position),\n '{}'::text[]\n )\n from\n information_schema.columns c\n where\n (quote_ident(c.table_schema) || '.' || quote_ident(c.table_name))::regclass = new.entity\n and pg_catalog.has_column_privilege('authenticated', new.entity, c.column_name, 'SELECT');\n filter realtime.user_defined_filter;\n col_type text;\n begin\n for filter in select * from unnest(new.filters) loop\n -- Filtered column is valid\n if not filter.column_name = any(col_names) then\n raise exception 'invalid column for filter %', filter.column_name;\n end if;\n\n -- Type is sanitized and safe for string interpolation\n col_type = (\n select atttypid::regtype\n from pg_catalog.pg_attribute\n where attrelid = new.entity\n and attname = filter.column_name\n )::text;\n if col_type is null then\n raise exception 'failed to lookup type for column %', filter.column_name;\n end if;\n -- raises an exception if value is not coercable to type\n perform format('select %s::%I', filter.value, col_type);\n end loop;\n\n -- Apply consistent order to filters so the unique constraint on\n -- (user_id, entity, filters) can't be tricked by a different filter order\n new.filters = coalesce(\n array_agg(f order by f.column_name, f.op, f.value),\n '{}'\n ) from unnest(new.filters) f;\n\n return new;\n end;\n $$;"
2024-06-20 14:27:48 18:27:48.195 [info] execute "create trigger tr_check_filters\n before insert or update on realtime.subscription\n for each row\n execute function realtime.subscription_check_filters();"
2024-06-20 14:27:48 18:27:48.197 [info] == Migrated 20211116045059 in 0.0s
2024-06-20 14:27:48 18:27:48.210 [info] == Running 20211116050929 Realtime.Tenants.Migrations.CreateRealtimeQuoteWal2jsonFunction.change/0 forward
2024-06-20 14:27:48 18:27:48.210 [info] execute "create function realtime.quote_wal2json(entity regclass)\n returns text\n language sql\n immutable\n strict\n as $$\n select\n (\n select string_agg('' || ch,'')\n from unnest(string_to_array(nsp.nspname::text, null)) with ordinality x(ch, idx)\n where\n not (x.idx = 1 and x.ch = '\"')\n and not (\n x.idx = array_length(string_to_array(nsp.nspname::text, null), 1)\n and x.ch = '\"'\n )\n )\n || '.'\n || (\n select string_agg('' || ch,'')\n from unnest(string_to_array(pc.relname::text, null)) with ordinality x(ch, idx)\n where\n not (x.idx = 1 and x.ch = '\"')\n and not (\n x.idx = array_length(string_to_array(nsp.nspname::text, null), 1)\n and x.ch = '\"'\n )\n )\n from\n pg_class pc\n join pg_namespace nsp\n on pc.relnamespace = nsp.oid\n where\n pc.oid = entity\n $$;"
2024-06-20 14:27:48 18:27:48.214 [info] == Migrated 20211116050929 in 0.0s
2024-06-20 14:27:48 18:27:48.218 [info] == Running 20211116051442 Realtime.Tenants.Migrations.CreateRealtimeCheckEqualityOpFunction.change/0 forward
2024-06-20 14:27:48 18:27:48.218 [info] execute "create function realtime.check_equality_op(\n op realtime.equality_op,\n type_ regtype,\n val_1 text,\n val_2 text\n )\n returns bool\n immutable\n language plpgsql\n as $$\n /*\n Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness\n */\n declare\n op_symbol text = (\n case\n when op = 'eq' then '='\n when op = 'neq' then '!='\n when op = 'lt' then '<'\n when op = 'lte' then '<='\n when op = 'gt' then '>'\n when op = 'gte' then '>='\n else 'UNKNOWN OP'\n end\n );\n res boolean;\n begin\n execute format('select %L::'|| type_::text || ' ' || op_symbol || ' %L::'|| type_::text, val_1, val_2) into res;\n return res;\n end;\n $$;"
2024-06-20 14:27:48 18:27:48.220 [info] == Migrated 20211116051442 in 0.0s
2024-06-20 14:27:48 18:27:48.224 [info] == Running 20211116212300 Realtime.Tenants.Migrations.CreateRealtimeBuildPreparedStatementSqlFunction.change/0 forward
2024-06-20 14:27:48 18:27:48.224 [info] execute "create type realtime.wal_column as (\n name text,\n type text,\n value jsonb,\n is_pkey boolean,\n is_selectable boolean\n );"
2024-06-20 14:27:48 18:27:48.226 [info] execute "create function realtime.build_prepared_statement_sql(\n prepared_statement_name text,\n entity regclass,\n columns realtime.wal_column[]\n )\n returns text\n language sql\n as $$\n /*\n Builds a sql string that, if executed, creates a prepared statement to\n tests retrive a row from *entity* by its primary key columns.\n\n Example\n select realtime.build_prepared_statment_sql('public.notes', '{\"id\"}'::text[], '{\"bigint\"}'::text[])\n */\n select\n 'prepare ' || prepared_statement_name || ' as\n select\n exists(\n select\n 1\n from\n ' || entity || '\n where\n ' || string_agg(quote_ident(pkc.name) || '=' || quote_nullable(pkc.value) , ' and ') || '\n )'\n from\n unnest(columns) pkc\n where\n pkc.is_pkey\n group by\n entity\n $$;"
2024-06-20 14:27:48 18:27:48.228 [info] == Migrated 20211116212300 in 0.0s
2024-06-20 14:27:48 18:27:48.230 [info] == Running 20211116213355 Realtime.Tenants.Migrations.CreateRealtimeCastFunction.change/0 forward
2024-06-20 14:27:48 18:27:48.230 [info] execute "create function realtime.cast(val text, type_ regtype)\n returns jsonb\n immutable\n language plpgsql\n as $$\n declare\n res jsonb;\n begin\n execute format('select to_jsonb(%L::'|| type_::text || ')', val) into res;\n return res;\n end\n $$;"
2024-06-20 14:27:48 18:27:48.232 [info] == Migrated 20211116213355 in 0.0s
2024-06-20 14:27:48 18:27:48.236 [info] == Running 20211116213934 Realtime.Tenants.Migrations.CreateRealtimeIsVisibleThroughFiltersFunction.change/0 forward
2024-06-20 14:27:48 18:27:48.236 [info] execute "create function realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[])\n returns bool\n language sql\n immutable\n as $$\n /*\n Should the record be visible (true) or filtered out (false) after *filters* are applied\n */\n select\n -- Default to allowed when no filters present\n coalesce(\n sum(\n realtime.check_equality_op(\n op:=f.op,\n type_:=col.type::regtype,\n -- cast jsonb to text\n val_1:=col.value #>> '{}',\n val_2:=f.value\n )::int\n ) = count(1),\n true\n )\n from\n unnest(filters) f\n join unnest(columns) col\n on f.column_name = col.name;\n $$;"
2024-06-20 14:27:48 18:27:48.238 [info] == Migrated 20211116213934 in 0.0s
2024-06-20 14:27:48 18:27:48.243 [info] == Running 20211116214523 Realtime.Tenants.Migrations.CreateRealtimeApplyRlsFunction.change/0 forward
2024-06-20 14:27:48 18:27:48.243 [info] execute "create type realtime.action as enum ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'ERROR');"
2024-06-20 14:27:48 18:27:48.245 [info] execute "create type realtime.wal_rls as (\n wal jsonb,\n is_rls_enabled boolean,\n users uuid[],\n errors text[]\n );"
2024-06-20 14:27:48 18:27:48.247 [info] execute "create function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n when 'T' then 'TRUNCATE'\n else 'ERROR'\n end\n );\n\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n\n -- Subscription vars\n user_id uuid;\n email varchar(255);\n user_has_access bool;\n is_visible_to_user boolean;\n visible_to_user_ids uuid[] = '{}';\n\n -- user subscriptions to the wal record's table\n subscriptions realtime.subscription[] =\n array_agg(sub)\n from\n realtime.subscription sub\n where\n sub.entity = entity_;\n\n -- structured info for wal's columns\n columns realtime.wal_column[] =\n array_agg(\n (\n x->>'name',\n x->>'type',\n realtime.cast((x->'value') #>> '{}', (x->>'type')::regtype),\n (pks ->> 'name') is not null,\n pg_catalog.has_column_privilege('authenticated', entity_, x->>'name', 'SELECT')\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n -- previous identity values for update/delete\n old_columns realtime.wal_column[] =\n array_agg(\n (\n x->>'name',\n x->>'type',\n realtime.cast((x->'value') #>> '{}', (x->>'type')::regtype),\n (pks ->> 'name') is not null,\n pg_catalog.has_column_privilege('authenticated', entity_, x->>'name', 'SELECT')\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n output jsonb;\n\n -- Error states\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n error_unauthorized boolean = not pg_catalog.has_any_column_privilege('authenticated', entity_, 'SELECT');\n\n errors text[] = case\n when error_record_exceeds_max_size then array['Error 413: Payload Too Large']\n else '{}'::text[]\n end;\n begin\n\n -- The 'authenticated' user does not have SELECT permission on any of the columns for the entity_\n if error_unauthorized is true then\n return (\n null,\n null,\n visible_to_user_ids,\n array['Error 401: Unauthorized']\n )::realtime.wal_rls;\n end if;\n\n -------------------------------\n -- Build Output JSONB Object --\n -------------------------------\n output = jsonb_build_object(\n 'schema', wal ->> 'schema',\n 'table', wal ->> 'table',\n 'type', action,\n 'commit_timestamp', (wal ->> 'timestamp')::text::timestamp with time zone,\n 'columns', (\n select\n jsonb_agg(\n jsonb_build_object(\n 'name', pa.attname,\n 'type', pt.typname\n )\n order by pa.attnum asc\n )\n from\n pg_attribute pa\n join pg_type pt\n on pa.atttypid = pt.oid\n where\n attrelid = entity_\n and attnum > 0\n and pg_catalog.has_column_privilege('authenticated', entity_, pa.attname, 'SELECT')\n )\n )\n -- Add \"record\" key for insert and update\n || case\n when error_record_exceeds_max_size then jsonb_build_object('record', '{}'::jsonb)\n when action in ('INSERT'," <> ...
2024-06-20 14:27:48 18:27:48.254 [info] == Migrated 20211116214523 in 0.0s
2024-06-20 14:27:48 18:27:48.257 [info] == Running 20211122062447 Realtime.Tenants.Migrations.GrantRealtimeUsageToAuthenticatedRole.change/0 forward
2024-06-20 14:27:48 18:27:48.257 [info] execute "grant usage on schema realtime to authenticated;"
2024-06-20 14:27:48 18:27:48.258 [info] == Migrated 20211122062447 in 0.0s
2024-06-20 14:27:48 18:27:48.266 [info] == Running 20211124070109 Realtime.Tenants.Migrations.EnableRealtimeApplyRlsFunctionPostgrest9Compatibility.change/0 forward
2024-06-20 14:27:48 18:27:48.267 [info] execute "create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n when 'T' then 'TRUNCATE'\n else 'ERROR'\n end\n );\n\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n\n -- Subscription vars\n user_id uuid;\n email varchar(255);\n user_has_access bool;\n is_visible_to_user boolean;\n visible_to_user_ids uuid[] = '{}';\n\n -- user subscriptions to the wal record's table\n subscriptions realtime.subscription[] =\n array_agg(sub)\n from\n realtime.subscription sub\n where\n sub.entity = entity_;\n\n -- structured info for wal's columns\n columns realtime.wal_column[] =\n array_agg(\n (\n x->>'name',\n x->>'type',\n realtime.cast((x->'value') #>> '{}', (x->>'type')::regtype),\n (pks ->> 'name') is not null,\n pg_catalog.has_column_privilege('authenticated', entity_, x->>'name', 'SELECT')\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n -- previous identity values for update/delete\n old_columns realtime.wal_column[] =\n array_agg(\n (\n x->>'name',\n x->>'type',\n realtime.cast((x->'value') #>> '{}', (x->>'type')::regtype),\n (pks ->> 'name') is not null,\n pg_catalog.has_column_privilege('authenticated', entity_, x->>'name', 'SELECT')\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n output jsonb;\n\n -- Error states\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n error_unauthorized boolean = not pg_catalog.has_any_column_privilege('authenticated', entity_, 'SELECT');\n\n errors text[] = case\n when error_record_exceeds_max_size then array['Error 413: Payload Too Large']\n else '{}'::text[]\n end;\n begin\n\n -- The 'authenticated' user does not have SELECT permission on any of the columns for the entity_\n if error_unauthorized is true then\n return (\n null,\n null,\n visible_to_user_ids,\n array['Error 401: Unauthorized']\n )::realtime.wal_rls;\n end if;\n\n -------------------------------\n -- Build Output JSONB Object --\n -------------------------------\n output = jsonb_build_object(\n 'schema', wal ->> 'schema',\n 'table', wal ->> 'table',\n 'type', action,\n 'commit_timestamp', (wal ->> 'timestamp')::text::timestamp with time zone,\n 'columns', (\n select\n jsonb_agg(\n jsonb_build_object(\n 'name', pa.attname,\n 'type', pt.typname\n )\n order by pa.attnum asc\n )\n from\n pg_attribute pa\n join pg_type pt\n on pa.atttypid = pt.oid\n where\n attrelid = entity_\n and attnum > 0\n and pg_catalog.has_column_privilege('authenticated', entity_, pa.attname, 'SELECT')\n )\n )\n -- Add \"record\" key for insert and update\n || case\n when error_record_exceeds_max_size then jsonb_build_object('record', '{}'::jsonb)\n when action in" <> ...
2024-06-20 14:27:48 18:27:48.269 [info] == Migrated 20211124070109 in 0.0s
2024-06-20 14:27:48 18:27:48.273 [info] == Running 20211202204204 Realtime.Tenants.Migrations.UpdateRealtimeSubscriptionCheckFiltersFunctionSecurity.change/0 forward
2024-06-20 14:27:48 18:27:48.273 [info] execute "create or replace function realtime.subscription_check_filters()\n returns trigger\n language plpgsql\n as $$\n /*\n Validates that the user defined filters for a subscription:\n - refer to valid columns that 'authenticated' may access\n - values are coercable to the correct column type\n */\n declare\n col_names text[] = coalesce(\n array_agg(c.column_name order by c.ordinal_position),\n '{}'::text[]\n )\n from\n information_schema.columns c\n where\n (quote_ident(c.table_schema) || '.' || quote_ident(c.table_name))::regclass = new.entity\n and pg_catalog.has_column_privilege('authenticated', new.entity, c.column_name, 'SELECT');\n filter realtime.user_defined_filter;\n col_type regtype;\n begin\n for filter in select * from unnest(new.filters) loop\n -- Filtered column is valid\n if not filter.column_name = any(col_names) then\n raise exception 'invalid column for filter %', filter.column_name;\n end if;\n\n -- Type is sanitized and safe for string interpolation\n col_type = (\n select atttypid::regtype\n from pg_catalog.pg_attribute\n where attrelid = new.entity\n and attname = filter.column_name\n );\n if col_type is null then\n raise exception 'failed to lookup type for column %', filter.column_name;\n end if;\n -- raises an exception if value is not coercable to type\n perform realtime.cast(filter.value, col_type);\n end loop;\n\n -- Apply consistent order to filters so the unique constraint on\n -- (user_id, entity, filters) can't be tricked by a different filter order\n new.filters = coalesce(\n array_agg(f order by f.column_name, f.op, f.value),\n '{}'\n ) from unnest(new.filters) f;\n\n return new;\n end;\n $$;"
2024-06-20 14:27:48 18:27:48.276 [info] == Migrated 20211202204204 in 0.0s
2024-06-20 14:27:48 18:27:48.277 [info] == Running 20211202204605 Realtime.Tenants.Migrations.UpdateRealtimeBuildPreparedStatementSqlFunctionForCompatibilityWithAllTypes.change/0 forward
2024-06-20 14:27:48 18:27:48.278 [info] execute "create or replace function realtime.build_prepared_statement_sql(\n prepared_statement_name text,\n entity regclass,\n columns realtime.wal_column[]\n )\n returns text\n language sql\n as $$\n /*\n Builds a sql string that, if executed, creates a prepared statement to\n tests retrive a row from *entity* by its primary key columns.\n\n Example\n select realtime.build_prepared_statment_sql('public.notes', '{\"id\"}'::text[], '{\"bigint\"}'::text[])\n */\n select\n 'prepare ' || prepared_statement_name || ' as\n select\n exists(\n select\n 1\n from\n ' || entity || '\n where\n ' || string_agg(quote_ident(pkc.name) || '=' || quote_nullable(pkc.value #>> '{}') , ' and ') || '\n )'\n from\n unnest(columns) pkc\n where\n pkc.is_pkey\n group by\n entity\n $$;"
2024-06-20 14:27:48 18:27:48.279 [info] == Migrated 20211202204605 in 0.0s
2024-06-20 14:27:48 18:27:48.285 [info] == Running 20211210212804 Realtime.Tenants.Migrations.EnableGenericSubscriptionClaims.change/0 forward
2024-06-20 14:27:48 18:27:48.285 [info] execute "truncate table realtime.subscription restart identity"
2024-06-20 14:27:48 18:27:48.382 [info] execute "alter table realtime.subscription\n drop constraint subscription_entity_user_id_filters_key cascade,\n drop column email cascade,\n drop column created_at cascade"
2024-06-20 14:27:48 18:27:48.387 [info] execute "alter table realtime.subscription rename user_id to subscription_id"
2024-06-20 14:27:48 18:27:48.389 [info] execute "create function realtime.to_regrole(role_name text)\n returns regrole\n immutable\n language sql\n -- required to allow use in generated clause\n as $$ select role_name::regrole $$;"
2024-06-20 14:27:48 18:27:48.391 [info] execute "alter table realtime.subscription\n add column claims jsonb not null,\n add column claims_role regrole not null generated always as (realtime.to_regrole(claims ->> 'role')) stored,\n add column created_at timestamp not null default timezone('utc', now())"
2024-06-20 14:27:48 18:27:48.418 [info] execute "create unique index subscription_subscription_id_entity_filters_key on realtime.subscription (subscription_id, entity, filters)"
2024-06-20 14:27:48 18:27:48.429 [info] execute "revoke usage on schema realtime from authenticated;"
2024-06-20 14:27:48 18:27:48.429 [info] execute "revoke all on realtime.subscription from authenticated;"
2024-06-20 14:27:48 18:27:48.430 [info] execute "create or replace function realtime.subscription_check_filters()\n returns trigger\n language plpgsql\n as $$\n /*\n Validates that the user defined filters for a subscription:\n - refer to valid columns that the claimed role may access\n - values are coercable to the correct column type\n */\n declare\n col_names text[] = coalesce(\n array_agg(c.column_name order by c.ordinal_position),\n '{}'::text[]\n )\n from\n information_schema.columns c\n where\n format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity\n and pg_catalog.has_column_privilege((new.claims ->> 'role'), new.entity, c.column_name, 'SELECT');\n filter realtime.user_defined_filter;\n col_type regtype;\n begin\n for filter in select * from unnest(new.filters) loop\n -- Filtered column is valid\n if not filter.column_name = any(col_names) then\n raise exception 'invalid column for filter %', filter.column_name;\n end if;\n\n -- Type is sanitized and safe for string interpolation\n col_type = (\n select atttypid::regtype\n from pg_catalog.pg_attribute\n where attrelid = new.entity\n and attname = filter.column_name\n );\n if col_type is null then\n raise exception 'failed to lookup type for column %', filter.column_name;\n end if;\n -- raises an exception if value is not coercable to type\n perform realtime.cast(filter.value, col_type);\n end loop;\n\n -- Apply consistent order to filters so the unique constraint on\n -- (subscription_id, entity, filters) can't be tricked by a different filter order\n new.filters = coalesce(\n array_agg(f order by f.column_name, f.op, f.value),\n '{}'\n ) from unnest(new.filters) f;\n\n return new;\n end;\n $$;"
2024-06-20 14:27:48 18:27:48.432 [info] execute "alter type realtime.wal_rls rename attribute users to subscription_ids cascade;"
2024-06-20 14:27:48 18:27:48.432 [info] execute "drop function realtime.apply_rls(jsonb, integer);"
2024-06-20 14:27:48 18:27:48.433 [info] execute "create function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns setof realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n else 'ERROR'\n end\n );\n\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n\n subscriptions realtime.subscription[] = array_agg(subs)\n from\n realtime.subscription subs\n where\n subs.entity = entity_;\n\n -- Subscription vars\n roles regrole[] = array_agg(distinct us.claims_role)\n from\n unnest(subscriptions) us;\n\n working_role regrole;\n claimed_role regrole;\n claims jsonb;\n\n subscription_id uuid;\n subscription_has_access bool;\n visible_to_subscription_ids uuid[] = '{}';\n\n -- structured info for wal's columns\n columns realtime.wal_column[];\n -- previous identity values for update/delete\n old_columns realtime.wal_column[];\n\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n\n -- Primary jsonb output for record\n output jsonb;\n\n begin\n perform set_config('role', null, true);\n\n columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n realtime.cast((x->'value') #>> '{}', (x->>'type')::regtype),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n old_columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n realtime.cast((x->'value') #>> '{}', (x->>'type')::regtype),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n for working_role in select * from unnest(roles) loop\n\n -- Update `is_selectable` for columns and old_columns\n columns =\n array_agg(\n (\n c.name,\n c.type,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(columns) c;\n\n old_columns =\n array_agg(\n (\n c.name,\n c.type,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(old_columns) c;\n\n if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then\n return next (\n null,\n is_rls_enabled,\n -- subscriptions is already filtered by entity\n (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role),\n array['Error 400: Bad Request, no primary key']\n )::realtime.wal_rls;\n\n -- The claims role does not have SELECT permission to the primary key of entity\n elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey t" <> ...
2024-06-20 14:27:48 18:27:48.437 [info] == Migrated 20211210212804 in 0.1s
2024-06-20 14:27:48 18:27:48.451 [info] == Running 20211228014915 Realtime.Tenants.Migrations.AddWalPayloadOnErrorsInApplyRlsFunction.change/0 forward
2024-06-20 14:27:48 18:27:48.452 [info] execute "create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns setof realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n else 'ERROR'\n end\n );\n\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n\n subscriptions realtime.subscription[] = array_agg(subs)\n from\n realtime.subscription subs\n where\n subs.entity = entity_;\n\n -- Subscription vars\n roles regrole[] = array_agg(distinct us.claims_role)\n from\n unnest(subscriptions) us;\n\n working_role regrole;\n claimed_role regrole;\n claims jsonb;\n\n subscription_id uuid;\n subscription_has_access bool;\n visible_to_subscription_ids uuid[] = '{}';\n\n -- structured info for wal's columns\n columns realtime.wal_column[];\n -- previous identity values for update/delete\n old_columns realtime.wal_column[];\n\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n\n -- Primary jsonb output for record\n output jsonb;\n\n begin\n perform set_config('role', null, true);\n\n columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n realtime.cast((x->'value') #>> '{}', (x->>'type')::regtype),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n old_columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n realtime.cast((x->'value') #>> '{}', (x->>'type')::regtype),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n for working_role in select * from unnest(roles) loop\n\n -- Update `is_selectable` for columns and old_columns\n columns =\n array_agg(\n (\n c.name,\n c.type,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(columns) c;\n\n old_columns =\n array_agg(\n (\n c.name,\n c.type,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(old_columns) c;\n\n if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then\n return next (\n jsonb_build_object(\n 'schema', wal ->> 'schema',\n 'table', wal ->> 'table',\n 'type', action\n ),\n is_rls_enabled,\n -- subscriptions is already filtered by entity\n (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role),\n array['Error 400: Bad Request, no primary key']\n )::realtime.wal_rls;\n\n -- The claims role does not have SELECT permission to the primary key of entity\n elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey then\n return next (\n jsonb_build_object(\n 'schema', wal ->> 'schema',\n 'table', wal ->> 'table',\n 'type', action\n ),\n " <> ...
2024-06-20 14:27:48 18:27:48.454 [info] == Migrated 20211228014915 in 0.0s
2024-06-20 14:27:48 18:27:48.457 [info] == Running 20220107221237 Realtime.Tenants.Migrations.UpdateChangeTimestampToIso8601ZuluFormat.change/0 forward
2024-06-20 14:27:48 18:27:48.458 [info] execute "create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns setof realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n else 'ERROR'\n end\n );\n\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n\n subscriptions realtime.subscription[] = array_agg(subs)\n from\n realtime.subscription subs\n where\n subs.entity = entity_;\n\n -- Subscription vars\n roles regrole[] = array_agg(distinct us.claims_role)\n from\n unnest(subscriptions) us;\n\n working_role regrole;\n claimed_role regrole;\n claims jsonb;\n\n subscription_id uuid;\n subscription_has_access bool;\n visible_to_subscription_ids uuid[] = '{}';\n\n -- structured info for wal's columns\n columns realtime.wal_column[];\n -- previous identity values for update/delete\n old_columns realtime.wal_column[];\n\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n\n -- Primary jsonb output for record\n output jsonb;\n\n begin\n perform set_config('role', null, true);\n\n columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n realtime.cast((x->'value') #>> '{}', (x->>'type')::regtype),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n old_columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n realtime.cast((x->'value') #>> '{}', (x->>'type')::regtype),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n for working_role in select * from unnest(roles) loop\n\n -- Update `is_selectable` for columns and old_columns\n columns =\n array_agg(\n (\n c.name,\n c.type,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(columns) c;\n\n old_columns =\n array_agg(\n (\n c.name,\n c.type,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(old_columns) c;\n\n if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then\n return next (\n null,\n is_rls_enabled,\n -- subscriptions is already filtered by entity\n (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role),\n array['Error 400: Bad Request, no primary key']\n )::realtime.wal_rls;\n\n -- The claims role does not have SELECT permission to the primary key of entity\n elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where " <> ...
2024-06-20 14:27:48 18:27:48.460 [info] == Migrated 20220107221237 in 0.0s
2024-06-20 14:27:48 18:27:48.463 [info] == Running 20220228202821 Realtime.Tenants.Migrations.UpdateSubscriptionCheckFiltersFunctionDynamicTableName.change/0 forward
2024-06-20 14:27:48 18:27:48.463 [info] execute "create or replace function realtime.subscription_check_filters()\n returns trigger\n language plpgsql\n as $$\n /*\n Validates that the user defined filters for a subscription:\n - refer to valid columns that the claimed role may access\n - values are coercable to the correct column type\n */\n declare\n col_names text[] = coalesce(\n array_agg(c.column_name order by c.ordinal_position),\n '{}'::text[]\n )\n from\n information_schema.columns c\n where\n format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity\n and pg_catalog.has_column_privilege(\n (new.claims ->> 'role'),\n format('%I.%I', c.table_schema, c.table_name)::regclass,\n c.column_name,\n 'SELECT'\n );\n filter realtime.user_defined_filter;\n col_type regtype;\n begin\n for filter in select * from unnest(new.filters) loop\n -- Filtered column is valid\n if not filter.column_name = any(col_names) then\n raise exception 'invalid column for filter %', filter.column_name;\n end if;\n\n -- Type is sanitized and safe for string interpolation\n col_type = (\n select atttypid::regtype\n from pg_catalog.pg_attribute\n where attrelid = new.entity\n and attname = filter.column_name\n );\n if col_type is null then\n raise exception 'failed to lookup type for column %', filter.column_name;\n end if;\n -- raises an exception if value is not coercable to type\n perform realtime.cast(filter.value, col_type);\n end loop;\n\n -- Apply consistent order to filters so the unique constraint on\n -- (subscription_id, entity, filters) can't be tricked by a different filter order\n new.filters = coalesce(\n array_agg(f order by f.column_name, f.op, f.value),\n '{}'\n ) from unnest(new.filters) f;\n\n return new;\n end;\n $$;"
2024-06-20 14:27:48 18:27:48.464 [info] == Migrated 20220228202821 in 0.0s
2024-06-20 14:27:48 18:27:48.466 [info] == Running 20220312004840 Realtime.Tenants.Migrations.UpdateApplyRlsFunctionToApplyIso8601.change/0 forward
2024-06-20 14:27:48 18:27:48.467 [info] execute "create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns setof realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n else 'ERROR'\n end\n );\n\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n\n subscriptions realtime.subscription[] = array_agg(subs)\n from\n realtime.subscription subs\n where\n subs.entity = entity_;\n\n -- Subscription vars\n roles regrole[] = array_agg(distinct us.claims_role)\n from\n unnest(subscriptions) us;\n\n working_role regrole;\n claimed_role regrole;\n claims jsonb;\n\n subscription_id uuid;\n subscription_has_access bool;\n visible_to_subscription_ids uuid[] = '{}';\n\n -- structured info for wal's columns\n columns realtime.wal_column[];\n -- previous identity values for update/delete\n old_columns realtime.wal_column[];\n\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n\n -- Primary jsonb output for record\n output jsonb;\n\n begin\n perform set_config('role', null, true);\n\n columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n realtime.cast((x->'value') #>> '{}', (x->>'type')::regtype),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n old_columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n realtime.cast((x->'value') #>> '{}', (x->>'type')::regtype),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n for working_role in select * from unnest(roles) loop\n\n -- Update `is_selectable` for columns and old_columns\n columns =\n array_agg(\n (\n c.name,\n c.type,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(columns) c;\n\n old_columns =\n array_agg(\n (\n c.name,\n c.type,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(old_columns) c;\n\n if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then\n return next (\n null,\n is_rls_enabled,\n -- subscriptions is already filtered by entity\n (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role),\n array['Error 400: Bad Request, no primary key']\n )::realtime.wal_rls;\n\n -- The claims role does not have SELECT permission to the primary key of entity\n elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where " <> ...
2024-06-20 14:27:48 18:27:48.468 [info] == Migrated 20220312004840 in 0.0s
2024-06-20 14:27:48 18:27:48.471 [info] == Running 20220603231003 Realtime.Tenants.Migrations.AddQuotedRegtypesSupport.change/0 forward
2024-06-20 14:27:48 18:27:48.471 [info] execute "drop type if exists realtime.wal_column cascade;"
2024-06-20 14:27:48 18:27:48.472 [info] drop cascades to 2 other objects
2024-06-20 14:27:48 18:27:48.472 [info] execute "\n create type realtime.wal_column as (\n name text,\n type_name text,\n type_oid oid,\n value jsonb,\n is_pkey boolean,\n is_selectable boolean\n );\n "
2024-06-20 14:27:48 18:27:48.473 [info] execute "\n create or replace function realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[])\n returns bool\n language sql\n immutable\n as $$\n /*\n Should the record be visible (true) or filtered out (false) after *filters* are applied\n */\n select\n -- Default to allowed when no filters present\n coalesce(\n sum(\n realtime.check_equality_op(\n op:=f.op,\n type_:=col.type_oid::regtype,\n -- cast jsonb to text\n val_1:=col.value #>> '{}',\n val_2:=f.value\n )::int\n ) = count(1),\n true\n )\n from\n unnest(filters) f\n join unnest(columns) col\n on f.column_name = col.name;\n $$;"
2024-06-20 14:27:48 18:27:48.484 [info] execute "\n create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns setof realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n else 'ERROR'\n end\n );\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n subscriptions realtime.subscription[] = array_agg(subs)\n from\n realtime.subscription subs\n where\n subs.entity = entity_;\n -- Subscription vars\n roles regrole[] = array_agg(distinct us.claims_role)\n from\n unnest(subscriptions) us;\n working_role regrole;\n claimed_role regrole;\n claims jsonb;\n subscription_id uuid;\n subscription_has_access bool;\n visible_to_subscription_ids uuid[] = '{}';\n -- structured info for wal's columns\n columns realtime.wal_column[];\n -- previous identity values for update/delete\n old_columns realtime.wal_column[];\n\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n\n -- Primary jsonb output for record\n output jsonb;\n\n begin\n perform set_config('role', null, true);\n\n columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n (x->>'typeoid')::regtype\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n old_columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n (x->>'typeoid')::regtype\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n for working_role in select * from unnest(roles) loop\n\n -- Update `is_selectable` for columns and old_columns\n columns =\n array_agg(\n (\n c.name,\n c.type_name,\n c.type_oid,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(columns) c;\n\n old_columns =\n array_agg(\n (\n c.name,\n " <> ...
2024-06-20 14:27:48 18:27:48.486 [info] == Migrated 20220603231003 in 0.0s
2024-06-20 14:27:48 18:27:48.489 [info] == Running 20220603232444 Realtime.Tenants.Migrations.AddOutputForDataLessThanEqual64BytesWhenPayloadTooLarge.change/0 forward
2024-06-20 14:27:48 18:27:48.490 [info] execute "\n create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns setof realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n else 'ERROR'\n end\n );\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n subscriptions realtime.subscription[] = array_agg(subs)\n from\n realtime.subscription subs\n where\n subs.entity = entity_;\n -- Subscription vars\n roles regrole[] = array_agg(distinct us.claims_role)\n from\n unnest(subscriptions) us;\n working_role regrole;\n claimed_role regrole;\n claims jsonb;\n subscription_id uuid;\n subscription_has_access bool;\n visible_to_subscription_ids uuid[] = '{}';\n -- structured info for wal's columns\n columns realtime.wal_column[];\n -- previous identity values for update/delete\n old_columns realtime.wal_column[];\n\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n\n -- Primary jsonb output for record\n output jsonb;\n\n begin\n perform set_config('role', null, true);\n\n columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n (x->>'typeoid')::regtype\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n old_columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n (x->>'typeoid')::regtype\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n for working_role in select * from unnest(roles) loop\n\n -- Update `is_selectable` for columns and old_columns\n columns =\n array_agg(\n (\n c.name,\n c.type_name,\n c.type_oid,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(columns) c;\n\n old_columns =\n array_agg(\n (\n c.name,\n c.type_name,\n c.type_oid,\n c.value,\n c.is_pkey,\n pg_ca" <> ...
2024-06-20 14:27:48 18:27:48.495 [info] == Migrated 20220603232444 in 0.0s
2024-06-20 14:27:48 18:27:48.499 [info] == Running 20220615214548 Realtime.Tenants.Migrations.AddQuotedRegtypesBackwardCompatibilitySupport.change/0 forward
2024-06-20 14:27:48 18:27:48.499 [info] execute "\n create or replace function realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[])\n returns bool\n language sql\n immutable\n as $$\n /*\n Should the record be visible (true) or filtered out (false) after *filters* are applied\n */\n select\n -- Default to allowed when no filters present\n coalesce(\n sum(\n realtime.check_equality_op(\n op:=f.op,\n type_:=coalesce(\n col.type_oid::regtype, -- null when wal2json version <= 2.4\n col.type_name::regtype\n ),\n -- cast jsonb to text\n val_1:=col.value #>> '{}',\n val_2:=f.value\n )::int\n ) = count(1),\n true\n )\n from\n unnest(filters) f\n join unnest(columns) col\n on f.column_name = col.name;\n $$;\n "
2024-06-20 14:27:48 18:27:48.504 [info] execute "\n create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns setof realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n else 'ERROR'\n end\n );\n\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n\n subscriptions realtime.subscription[] = array_agg(subs)\n from\n realtime.subscription subs\n where\n subs.entity = entity_;\n\n -- Subscription vars\n roles regrole[] = array_agg(distinct us.claims_role)\n from\n unnest(subscriptions) us;\n\n working_role regrole;\n claimed_role regrole;\n claims jsonb;\n\n subscription_id uuid;\n subscription_has_access bool;\n visible_to_subscription_ids uuid[] = '{}';\n\n -- structured info for wal's columns\n columns realtime.wal_column[];\n -- previous identity values for update/delete\n old_columns realtime.wal_column[];\n\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n\n -- Primary jsonb output for record\n output jsonb;\n\n begin\n perform set_config('role', null, true);\n\n columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n coalesce(\n (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4\n (x->>'type')::regtype\n )\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n old_columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n coalesce(\n (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4\n (x->>'type')::regtype\n )\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n for working_role in select * from unnest(roles) loop\n\n -- Update `is_selectable` for columns and old_columns\n columns =\n array_agg(\n (\n c.name,\n c.type_name,\n c.type_oid,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(columns) c;\n\n " <> ...
2024-06-20 14:27:48 18:27:48.506 [info] == Migrated 20220615214548 in 0.0s
2024-06-20 14:27:48 18:27:48.508 [info] == Running 20220712093339 Realtime.Tenants.Migrations.RecreateRealtimeBuildPreparedStatementSqlFunction.change/0 forward
2024-06-20 14:27:48 18:27:48.509 [info] execute "\n create or replace function realtime.build_prepared_statement_sql(\n prepared_statement_name text,\n entity regclass,\n columns realtime.wal_column[]\n )\n returns text\n language sql\n as $$\n /*\n Builds a sql string that, if executed, creates a prepared statement to\n tests retrive a row from *entity* by its primary key columns.\n Example\n select realtime.build_prepared_statement_sql('public.notes', '{\"id\"}'::text[], '{\"bigint\"}'::text[])\n */\n select\n 'prepare ' || prepared_statement_name || ' as\n select\n exists(\n select\n 1\n from\n ' || entity || '\n where\n ' || string_agg(quote_ident(pkc.name) || '=' || quote_nullable(pkc.value #>> '{}') , ' and ') || '\n )'\n from\n unnest(columns) pkc\n where\n pkc.is_pkey\n group by\n entity\n $$;\n "
2024-06-20 14:27:48 18:27:48.510 [info] == Migrated 20220712093339 in 0.0s
2024-06-20 14:27:48 18:27:48.512 [info] == Running 20220908172859 Realtime.Tenants.Migrations.NullPassesFiltersRecreateIsVisibleThroughFilters.change/0 forward
2024-06-20 14:27:48 18:27:48.512 [info] execute "\n create or replace function realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[])\n returns bool\n language sql\n immutable\n as $$\n /*\n Should the record be visible (true) or filtered out (false) after *filters* are applied\n */\n select\n -- Default to allowed when no filters present\n $2 is null -- no filters. this should not happen because subscriptions has a default\n or array_length($2, 1) is null -- array length of an empty array is null\n or bool_and(\n coalesce(\n realtime.check_equality_op(\n op:=f.op,\n type_:=coalesce(\n col.type_oid::regtype, -- null when wal2json version <= 2.4\n col.type_name::regtype\n ),\n -- cast jsonb to text\n val_1:=col.value #>> '{}',\n val_2:=f.value\n ),\n false -- if null, filter does not match\n )\n )\n from\n unnest(filters) f\n join unnest(columns) col\n on f.column_name = col.name;\n $$;\n "
2024-06-20 14:27:48 18:27:48.514 [info] == Migrated 20220908172859 in 0.0s
2024-06-20 14:27:48 18:27:48.519 [info] == Running 20220916233421 Realtime.Tenants.Migrations.UpdateApplyRlsFunctionToPassThroughDeleteEventsOnFilter.change/0 forward
2024-06-20 14:27:48 18:27:48.519 [info] execute "\n create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns setof realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n else 'ERROR'\n end\n );\n\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n\n subscriptions realtime.subscription[] = array_agg(subs)\n from\n realtime.subscription subs\n where\n subs.entity = entity_;\n\n -- Subscription vars\n roles regrole[] = array_agg(distinct us.claims_role)\n from\n unnest(subscriptions) us;\n\n working_role regrole;\n claimed_role regrole;\n claims jsonb;\n\n subscription_id uuid;\n subscription_has_access bool;\n visible_to_subscription_ids uuid[] = '{}';\n\n -- structured info for wal's columns\n columns realtime.wal_column[];\n -- previous identity values for update/delete\n old_columns realtime.wal_column[];\n\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n\n -- Primary jsonb output for record\n output jsonb;\n\n begin\n perform set_config('role', null, true);\n\n columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n coalesce(\n (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4\n (x->>'type')::regtype\n )\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n old_columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n coalesce(\n (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4\n (x->>'type')::regtype\n )\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n for working_role in select * from unnest(roles) loop\n\n -- Update `is_selectable` for columns and old_columns\n columns =\n array_agg(\n (\n c.name,\n c.type_name,\n c.type_oid,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(columns) c;\n\n " <> ...
2024-06-20 14:27:48 18:27:48.524 [info] == Migrated 20220916233421 in 0.0s
2024-06-20 14:27:48 18:27:48.529 [info] == Running 20230119133233 Realtime.Tenants.Migrations.MillisecondPrecisionForWalrus.change/0 forward
2024-06-20 14:27:48 18:27:48.530 [info] execute "\n create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns setof realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n else 'ERROR'\n end\n );\n\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n\n subscriptions realtime.subscription[] = array_agg(subs)\n from\n realtime.subscription subs\n where\n subs.entity = entity_;\n\n -- Subscription vars\n roles regrole[] = array_agg(distinct us.claims_role)\n from\n unnest(subscriptions) us;\n\n working_role regrole;\n claimed_role regrole;\n claims jsonb;\n\n subscription_id uuid;\n subscription_has_access bool;\n visible_to_subscription_ids uuid[] = '{}';\n\n -- structured info for wal's columns\n columns realtime.wal_column[];\n -- previous identity values for update/delete\n old_columns realtime.wal_column[];\n\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n\n -- Primary jsonb output for record\n output jsonb;\n\n begin\n perform set_config('role', null, true);\n\n columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n coalesce(\n (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4\n (x->>'type')::regtype\n )\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n old_columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n coalesce(\n (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4\n (x->>'type')::regtype\n )\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n for working_role in select * from unnest(roles) loop\n\n -- Update `is_selectable` for columns and old_columns\n columns =\n array_agg(\n (\n c.name,\n c.type_name,\n c.type_oid,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(columns) c;\n\n " <> ...
2024-06-20 14:27:48 18:27:48.538 [info] == Migrated 20230119133233 in 0.0s
2024-06-20 14:27:48 18:27:48.549 [info] == Running 20230128025114 Realtime.Tenants.Migrations.AddInOpToFilters.change/0 forward
2024-06-20 14:27:48 18:27:48.549 [info] execute "alter type realtime.equality_op add value 'in';"
2024-06-20 14:27:48 18:27:48.553 [info] execute "\n create or replace function realtime.check_equality_op(\n op realtime.equality_op,\n type_ regtype,\n val_1 text,\n val_2 text\n )\n returns bool\n immutable\n language plpgsql\n as $$\n /*\n Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness\n */\n declare\n op_symbol text = (\n case\n when op = 'eq' then '='\n when op = 'neq' then '!='\n when op = 'lt' then '<'\n when op = 'lte' then '<='\n when op = 'gt' then '>'\n when op = 'gte' then '>='\n when op = 'in' then '= any'\n else 'UNKNOWN OP'\n end\n );\n res boolean;\n begin\n execute format(\n 'select %L::'|| type_::text || ' ' || op_symbol\n || ' ( %L::'\n || (\n case\n when op = 'in' then type_::text || '[]'\n else type_::text end\n )\n || ')', val_1, val_2) into res;\n return res;\n end;\n $$;\n "
2024-06-20 14:27:48 18:27:48.558 [info] execute "\n create or replace function realtime.subscription_check_filters()\n returns trigger\n language plpgsql\n as $$\n /*\n Validates that the user defined filters for a subscription:\n - refer to valid columns that the claimed role may access\n - values are coercable to the correct column type\n */\n declare\n col_names text[] = coalesce(\n array_agg(c.column_name order by c.ordinal_position),\n '{}'::text[]\n )\n from\n information_schema.columns c\n where\n format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity\n and pg_catalog.has_column_privilege(\n (new.claims ->> 'role'),\n format('%I.%I', c.table_schema, c.table_name)::regclass,\n c.column_name,\n 'SELECT'\n );\n filter realtime.user_defined_filter;\n col_type regtype;\n\n in_val jsonb;\n begin\n for filter in select * from unnest(new.filters) loop\n -- Filtered column is valid\n if not filter.column_name = any(col_names) then\n raise exception 'invalid column for filter %', filter.column_name;\n end if;\n\n -- Type is sanitized and safe for string interpolation\n col_type = (\n select atttypid::regtype\n from pg_catalog.pg_attribute\n where attrelid = new.entity\n and attname = filter.column_name\n );\n if col_type is null then\n raise exception 'failed to lookup type for column %', filter.column_name;\n end if;\n\n -- Set maximum number of entries for in filter\n if filter.op = 'in'::realtime.equality_op then\n in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype);\n if coalesce(jsonb_array_length(in_val), 0) > 100 then\n raise exception 'too many values for `in` filter. Maximum 100';\n end if;\n end if;\n\n -- raises an exception if value is not coercable to type\n perform realtime.cast(filter.value, col_type);\n end loop;\n\n -- Apply consistent order to filters so the unique constraint on\n -- (subscription_id, entity, filters) can't be tricked by a different filter order\n new.filters = coalesce(\n array_agg(f order by f.column_name, f.op, f.value),\n '{}'\n ) from unnest(new.filters) f;\n\n return new;\n end;\n $$;\n "
2024-06-20 14:27:48 18:27:48.561 [info] == Migrated 20230128025114 in 0.0s
2024-06-20 14:27:48 18:27:48.564 [info] == Running 20230128025212 Realtime.Tenants.Migrations.EnableFilteringOnDeleteRecord.change/0 forward
2024-06-20 14:27:48 18:27:48.565 [info] execute "\n create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns setof realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n else 'ERROR'\n end\n );\n\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n\n subscriptions realtime.subscription[] = array_agg(subs)\n from\n realtime.subscription subs\n where\n subs.entity = entity_;\n\n -- Subscription vars\n roles regrole[] = array_agg(distinct us.claims_role)\n from\n unnest(subscriptions) us;\n\n working_role regrole;\n claimed_role regrole;\n claims jsonb;\n\n subscription_id uuid;\n subscription_has_access bool;\n visible_to_subscription_ids uuid[] = '{}';\n\n -- structured info for wal's columns\n columns realtime.wal_column[];\n -- previous identity values for update/delete\n old_columns realtime.wal_column[];\n\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n\n -- Primary jsonb output for record\n output jsonb;\n\n begin\n perform set_config('role', null, true);\n\n columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n coalesce(\n (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4\n (x->>'type')::regtype\n )\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n old_columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n coalesce(\n (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4\n (x->>'type')::regtype\n )\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n for working_role in select * from unnest(roles) loop\n\n -- Update `is_selectable` for columns and old_columns\n columns =\n array_agg(\n (\n c.name,\n c.type_name,\n c.type_oid,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(columns) c;\n\n " <> ...
2024-06-20 14:27:48 18:27:48.566 [info] == Migrated 20230128025212 in 0.0s
2024-06-20 14:27:48 18:27:48.569 [info] == Running 20230227211149 Realtime.Tenants.Migrations.UpdateSubscriptionCheckFiltersForInFilterNonTextTypes.change/0 forward
2024-06-20 14:27:48 18:27:48.570 [info] execute "\n create or replace function realtime.subscription_check_filters()\n returns trigger\n language plpgsql\n as $$\n /*\n Validates that the user defined filters for a subscription:\n - refer to valid columns that the claimed role may access\n - values are coercable to the correct column type\n */\n declare\n col_names text[] = coalesce(\n array_agg(c.column_name order by c.ordinal_position),\n '{}'::text[]\n )\n from\n information_schema.columns c\n where\n format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity\n and pg_catalog.has_column_privilege(\n (new.claims ->> 'role'),\n format('%I.%I', c.table_schema, c.table_name)::regclass,\n c.column_name,\n 'SELECT'\n );\n filter realtime.user_defined_filter;\n col_type regtype;\n\n in_val jsonb;\n begin\n for filter in select * from unnest(new.filters) loop\n -- Filtered column is valid\n if not filter.column_name = any(col_names) then\n raise exception 'invalid column for filter %', filter.column_name;\n end if;\n\n -- Type is sanitized and safe for string interpolation\n col_type = (\n select atttypid::regtype\n from pg_catalog.pg_attribute\n where attrelid = new.entity\n and attname = filter.column_name\n );\n if col_type is null then\n raise exception 'failed to lookup type for column %', filter.column_name;\n end if;\n\n -- Set maximum number of entries for in filter\n if filter.op = 'in'::realtime.equality_op then\n in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype);\n if coalesce(jsonb_array_length(in_val), 0) > 100 then\n raise exception 'too many values for `in` filter. Maximum 100';\n end if;\n else\n -- raises an exception if value is not coercable to type\n perform realtime.cast(filter.value, col_type);\n end if;\n\n end loop;\n\n -- Apply consistent order to filters so the unique constraint on\n -- (subscription_id, entity, filters) can't be tricked by a different filter order\n new.filters = coalesce(\n array_agg(f order by f.column_name, f.op, f.value),\n '{}'\n ) from unnest(new.filters) f;\n\n return new;\n end;\n $$;\n "
2024-06-20 14:27:48 18:27:48.570 [info] == Migrated 20230227211149 in 0.0s
2024-06-20 14:27:48 18:27:48.573 [info] == Running 20230228184745 Realtime.Tenants.Migrations.ConvertCommitTimestampToUtc.change/0 forward
2024-06-20 14:27:48 18:27:48.573 [info] execute "\n create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns setof realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n else 'ERROR'\n end\n );\n\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n\n subscriptions realtime.subscription[] = array_agg(subs)\n from\n realtime.subscription subs\n where\n subs.entity = entity_;\n\n -- Subscription vars\n roles regrole[] = array_agg(distinct us.claims_role)\n from\n unnest(subscriptions) us;\n\n working_role regrole;\n claimed_role regrole;\n claims jsonb;\n\n subscription_id uuid;\n subscription_has_access bool;\n visible_to_subscription_ids uuid[] = '{}';\n\n -- structured info for wal's columns\n columns realtime.wal_column[];\n -- previous identity values for update/delete\n old_columns realtime.wal_column[];\n\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n\n -- Primary jsonb output for record\n output jsonb;\n\n begin\n perform set_config('role', null, true);\n\n columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n coalesce(\n (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4\n (x->>'type')::regtype\n )\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n old_columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n coalesce(\n (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4\n (x->>'type')::regtype\n )\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n for working_role in select * from unnest(roles) loop\n\n -- Update `is_selectable` for columns and old_columns\n columns =\n array_agg(\n (\n c.name,\n c.type_name,\n c.type_oid,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(columns) c;\n\n old_columns =\n array_agg(\n (\n c.name,\n c.type_name,\n c.type_oid,\n " <> ...
2024-06-20 14:27:48 18:27:48.575 [info] == Migrated 20230228184745 in 0.0s
2024-06-20 14:27:48 18:27:48.577 [info] == Running 20230308225145 Realtime.Tenants.Migrations.OutputFullRecordWhenUnchangedToast.change/0 forward
2024-06-20 14:27:48 18:27:48.577 [info] execute "\n create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)\n returns setof realtime.wal_rls\n language plpgsql\n volatile\n as $$\n declare\n -- Regclass of the table e.g. public.notes\n entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;\n\n -- I, U, D, T: insert, update ...\n action realtime.action = (\n case wal ->> 'action'\n when 'I' then 'INSERT'\n when 'U' then 'UPDATE'\n when 'D' then 'DELETE'\n else 'ERROR'\n end\n );\n\n -- Is row level security enabled for the table\n is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;\n\n subscriptions realtime.subscription[] = array_agg(subs)\n from\n realtime.subscription subs\n where\n subs.entity = entity_;\n\n -- Subscription vars\n roles regrole[] = array_agg(distinct us.claims_role)\n from\n unnest(subscriptions) us;\n\n working_role regrole;\n claimed_role regrole;\n claims jsonb;\n\n subscription_id uuid;\n subscription_has_access bool;\n visible_to_subscription_ids uuid[] = '{}';\n\n -- structured info for wal's columns\n columns realtime.wal_column[];\n -- previous identity values for update/delete\n old_columns realtime.wal_column[];\n\n error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;\n\n -- Primary jsonb output for record\n output jsonb;\n\n begin\n perform set_config('role', null, true);\n\n columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n coalesce(\n (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4\n (x->>'type')::regtype\n )\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'columns') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n old_columns =\n array_agg(\n (\n x->>'name',\n x->>'type',\n x->>'typeoid',\n realtime.cast(\n (x->'value') #>> '{}',\n coalesce(\n (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4\n (x->>'type')::regtype\n )\n ),\n (pks ->> 'name') is not null,\n true\n )::realtime.wal_column\n )\n from\n jsonb_array_elements(wal -> 'identity') x\n left join jsonb_array_elements(wal -> 'pk') pks\n on (x ->> 'name') = (pks ->> 'name');\n\n for working_role in select * from unnest(roles) loop\n\n -- Update `is_selectable` for columns and old_columns\n columns =\n array_agg(\n (\n c.name,\n c.type_name,\n c.type_oid,\n c.value,\n c.is_pkey,\n pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')\n )::realtime.wal_column\n )\n from\n unnest(columns) c;\n\n old_columns =\n array_agg(\n (\n c.name,\n c.type_name,\n c.type_oid,\n " <> ...
2024-06-20 14:27:48 18:27:48.580 [info] == Migrated 20230308225145 in 0.0s
2024-06-20 14:27:48 18:27:48.582 [info] == Running 20230328144023 Realtime.Tenants.Migrations.CreateListChangesFunction.change/0 forward
2024-06-20 14:27:48 18:27:48.582 [info] execute "create or replace function realtime.list_changes(publication name, slot_name name, max_changes int, max_record_bytes int)\n returns setof realtime.wal_rls\n language sql\n set log_min_messages to 'fatal'\n as $$\n with pub as (\n select\n concat_ws(\n ',',\n case when bool_or(pubinsert) then 'insert' else null end,\n case when bool_or(pubupdate) then 'update' else null end,\n case when bool_or(pubdelete) then 'delete' else null end\n ) as w2j_actions,\n coalesce(\n string_agg(\n realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass),\n ','\n ) filter (where ppt.tablename is not null and ppt.tablename not like '% %'),\n ''\n ) w2j_add_tables\n from\n pg_publication pp\n left join pg_publication_tables ppt\n on pp.pubname = ppt.pubname\n where\n pp.pubname = publication\n group by\n pp.pubname\n limit 1\n ),\n w2j as (\n select\n x.*, pub.w2j_add_tables\n from\n pub,\n pg_logical_slot_get_changes(\n slot_name, null, max_changes,\n 'include-pk', 'true',\n 'include-transaction', 'false',\n 'include-timestamp', 'true',\n 'include-type-oids', 'true',\n 'format-version', '2',\n 'actions', pub.w2j_actions,\n 'add-tables', pub.w2j_add_tables\n ) x\n )\n select\n xyz.wal,\n xyz.is_rls_enabled,\n xyz.subscription_ids,\n xyz.errors\n from\n w2j,\n realtime.apply_rls(\n wal := w2j.data::jsonb,\n max_record_bytes := max_record_bytes\n ) xyz(wal, is_rls_enabled, subscription_ids, errors)\n where\n w2j.w2j_add_tables <> ''\n and xyz.subscription_ids[1] is not null\n $$;"
2024-06-20 14:27:48 18:27:48.585 [info] == Migrated 20230328144023 in 0.0s
2024-06-20 14:27:48 18:27:48.592 [info] == Running 20231018144023 Realtime.Tenants.Migrations.CreateChannels.change/0 forward
2024-06-20 14:27:48 18:27:48.592 [info] create table realtime.channels
2024-06-20 14:27:48 18:27:48.595 [info] create index realtime.channels_name_index
2024-06-20 14:27:48 18:27:48.598 [info] == Migrated 20231018144023 in 0.0s
2024-06-20 14:27:48 18:27:48.604 [info] == Running 20231204144023 Realtime.Tenants.Migrations.SetRequiredGrants.change/0 forward
2024-06-20 14:27:48 18:27:48.604 [info] execute "GRANT USAGE ON SCHEMA realtime TO postgres, anon, authenticated, service_role\n"
2024-06-20 14:27:48 18:27:48.608 [info] execute "GRANT SELECT ON ALL TABLES IN SCHEMA realtime TO postgres, anon, authenticated, service_role\n"
2024-06-20 14:27:48 18:27:48.611 [info] execute "GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA realtime TO postgres, anon, authenticated, service_role\n"
2024-06-20 14:27:48 18:27:48.615 [info] execute "GRANT USAGE ON ALL SEQUENCES IN SCHEMA realtime TO postgres, anon, authenticated, service_role\n"
2024-06-20 14:27:48 18:27:48.617 [info] == Migrated 20231204144023 in 0.0s
2024-06-20 14:27:48 18:27:48.623 [info] == Running 20231204144024 Realtime.Tenants.Migrations.CreateRlsHelperFunctions.change/0 forward
2024-06-20 14:27:48 18:27:48.623 [info] execute "create or replace function realtime.channel_name() returns text as $$\nselect nullif(current_setting('realtime.channel_name', true), '')::text;\n$$ language sql stable;\n"
2024-06-20 14:27:48 18:27:48.625 [info] == Migrated 20231204144024 in 0.0s
2024-06-20 14:27:48 18:27:48.628 [info] == Running 20231204144025 Realtime.Tenants.Migrations.EnableChannelsRls.change/0 forward
2024-06-20 14:27:48 18:27:48.628 [info] execute "ALTER TABLE realtime.channels ENABLE row level security"
2024-06-20 14:27:48 18:27:48.631 [info] == Migrated 20231204144025 in 0.0s
2024-06-20 14:27:48 18:27:48.635 [info] == Running 20240108234812 Realtime.Tenants.Migrations.AddChannelsColumnForWriteCheck.change/0 forward
2024-06-20 14:27:48 18:27:48.635 [info] alter table realtime.channels
2024-06-20 14:27:48 18:27:48.636 [info] == Migrated 20240108234812 in 0.0s
2024-06-20 14:27:48 18:27:48.637 [info] == Running 20240109165339 Realtime.Tenants.Migrations.AddUpdateGrantToChannels.change/0 forward
2024-06-20 14:27:48 18:27:48.637 [info] execute "GRANT UPDATE ON realtime.channels TO postgres, anon, authenticated, service_role\n"
2024-06-20 14:27:48 18:27:48.638 [info] == Migrated 20240109165339 in 0.0s
2024-06-20 14:27:48 18:27:48.642 [info] == Running 20240227174441 Realtime.Tenants.Migrations.AddBroadcastsPoliciesTable.change/0 forward
2024-06-20 14:27:48 18:27:48.642 [info] create table realtime.broadcasts
2024-06-20 14:27:48 18:27:48.647 [info] create index realtime.broadcasts_channel_id_index
2024-06-20 14:27:48 18:27:48.649 [info] execute "ALTER TABLE realtime.broadcasts ENABLE row level security"
2024-06-20 14:27:48 18:27:48.651 [info] execute "GRANT SELECT ON realtime.broadcasts TO postgres, anon, authenticated, service_role"
2024-06-20 14:27:48 18:27:48.651 [info] execute "GRANT UPDATE ON realtime.broadcasts TO postgres, anon, authenticated, service_role"
2024-06-20 14:27:48 18:27:48.652 [info] == Migrated 20240227174441 in 0.0s
2024-06-20 14:27:48 18:27:48.655 [info] == Running 20240311171622 Realtime.Tenants.Migrations.AddInsertAndDeleteGrantToChannels.change/0 forward
2024-06-20 14:27:48 18:27:48.655 [info] execute "GRANT INSERT, DELETE ON realtime.channels TO postgres, anon, authenticated, service_role\n"
2024-06-20 14:27:48 18:27:48.656 [info] execute "GRANT INSERT ON realtime.broadcasts TO postgres, anon, authenticated, service_role\n"
2024-06-20 14:27:48 18:27:48.657 [info] execute "GRANT USAGE ON SEQUENCE realtime.broadcasts_id_seq TO postgres, anon, authenticated, service_role\n"
2024-06-20 14:27:48 18:27:48.658 [info] == Migrated 20240311171622 in 0.0s
2024-06-20 14:27:48 18:27:48.660 [info] == Running 20240321100241 Realtime.Tenants.Migrations.AddPresencesPoliciesTable.change/0 forward
2024-06-20 14:27:48 18:27:48.660 [info] create table realtime.presences
2024-06-20 14:27:48 18:27:48.665 [info] create index realtime.presences_channel_id_index
2024-06-20 14:27:48 18:27:48.667 [info] execute "ALTER TABLE realtime.presences ENABLE row level security"
2024-06-20 14:27:48 18:27:48.668 [info] execute "GRANT SELECT ON realtime.presences TO postgres, anon, authenticated, service_role"
2024-06-20 14:27:48 18:27:48.668 [info] execute "GRANT UPDATE ON realtime.presences TO postgres, anon, authenticated, service_role"
2024-06-20 14:27:48 18:27:48.669 [info] execute "GRANT INSERT ON realtime.presences TO postgres, anon, authenticated, service_role\n"
2024-06-20 14:27:48 18:27:48.670 [info] execute "GRANT USAGE ON SEQUENCE realtime.presences_id_seq TO postgres, anon, authenticated, service_role\n"
2024-06-20 14:27:48 18:27:48.670 [info] == Migrated 20240321100241 in 0.0s
2024-06-20 14:27:48 18:27:48.673 [info] == Running 20240401105812 Realtime.Tenants.Migrations.CreateRealtimeAdminAndMoveOwnership.change/0 forward
2024-06-20 14:27:48 18:27:48.673 [info] execute "DO\n$do$\nBEGIN\n IF EXISTS (\n SELECT FROM pg_catalog.pg_roles\n WHERE rolname = 'supabase_realtime_admin') THEN\n\n RAISE NOTICE 'Role \"supabase_realtime_admin\" already exists. Skipping.';\n ELSE\n CREATE ROLE supabase_realtime_admin WITH NOINHERIT NOLOGIN NOREPLICATION;\n END IF;\nEND\n$do$;\n"
2024-06-20 14:27:48 18:27:48.675 [info] execute "GRANT ALL PRIVILEGES ON SCHEMA realtime TO supabase_realtime_admin"
2024-06-20 14:27:48 18:27:48.676 [info] execute "GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA realtime TO supabase_realtime_admin"
2024-06-20 14:27:48 18:27:48.676 [info] execute "GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA realtime TO supabase_realtime_admin"
2024-06-20 14:27:48 18:27:48.677 [info] execute "GRANT ALL PRIVILEGES ON ALL FUNCTIONS IN SCHEMA realtime TO supabase_realtime_admin"
2024-06-20 14:27:48 18:27:48.680 [info] execute "ALTER table realtime.channels OWNER to supabase_realtime_admin"
2024-06-20 14:27:48 18:27:48.681 [info] execute "ALTER table realtime.broadcasts OWNER to supabase_realtime_admin"
2024-06-20 14:27:48 18:27:48.682 [info] execute "ALTER table realtime.presences OWNER TO supabase_realtime_admin"
2024-06-20 14:27:48 18:27:48.682 [info] execute "ALTER function realtime.channel_name() owner to supabase_realtime_admin"
2024-06-20 14:27:48 18:27:48.683 [info] execute "GRANT supabase_realtime_admin TO postgres"
2024-06-20 14:27:48 18:27:48.685 [info] == Migrated 20240401105812 in 0.0s
2024-06-20 14:27:48 18:27:48.688 [info] == Running 20240418121054 Realtime.Tenants.Migrations.RemoveCheckColumns.change/0 forward
2024-06-20 14:27:48 18:27:48.688 [info] alter table realtime.channels
2024-06-20 14:27:48 18:27:48.689 [info] alter table realtime.broadcasts
2024-06-20 14:27:48 18:27:48.690 [info] alter table realtime.presences
2024-06-20 14:27:48 18:27:48.691 [info] == Migrated 20240418121054 in 0.0s
2024-06-20 14:27:48 18:27:48.705 request_id=F9rJx9wgyGFzid4AAAAB project=realtime-dev external_id=realtime-dev [info] Sent 200 in 680ms
2024-06-20 14:27:53 18:27:53.751 request_id=F9rJyTF6rhqmv-sAAACh [info] HEAD /api/tenants/realtime-dev/health
2024-06-20 14:27:53 18:27:53.751 request_id=F9rJyTF6rhqmv-sAAACh [debug] Processing with RealtimeWeb.TenantController.health/2
2024-06-20 14:27:53 Parameters: %{"tenant_id" => "realtime-dev"}
2024-06-20 14:27:53 Pipelines: [:api]
2024-06-20 14:27:53 18:27:53.758 request_id=F9rJyTF6rhqmv-sAAACh project=realtime-dev external_id=realtime-dev [info] Sent 200 in 6ms
2024-06-20 14:27:58 18:27:58.798 request_id=F9rJyl5IPfDtahUAAADB [info] HEAD /api/tenants/realtime-dev/health
2024-06-20 14:27:58 18:27:58.798 request_id=F9rJyl5IPfDtahUAAADB [debug] Processing with RealtimeWeb.TenantController.health/2
2024-06-20 14:27:58 Parameters: %{"tenant_id" => "realtime-dev"}
2024-06-20 14:27:58 Pipelines: [:api]
2024-06-20 14:27:58 18:27:58.802 request_id=F9rJyl5IPfDtahUAAADB project=realtime-dev external_id=realtime-dev [info] Sent 200 in 4ms
I actually think this is related with the way we're starting up the docker image as now we seem to be causing issues as we're seeing the same error on CLI when we bump the image🤔
the changes required to accept RLIMIT_NOFILE seemed to have broken the way we migrate
closing this as it should be fixed in https://github.com/supabase/realtime/pull/1081
Bug report
Describe the bug
After attempting to use the Docker self-hosted example in https://github.com/supabase/supabase/pull/27398, seeding of the default realtime tenant seems to fail. None of the tables (
realtime
, etc) are being created. This is causing realtime connections to fail.To Reproduce
docker compose up
Expected behavior
Seeding succeeds.
Screenshots
The error appears in the logs here:
System information
Additional context
Log Dump: