SkyTruth / cerulean-cloud

All cloud services including inference and database structure
Apache License 2.0
7 stars 2 forks source link

SQL Updates for slick, slick_plus #116

Open bitner opened 1 month ago

bitner commented 1 month ago
\set ON_ERROR_STOP on
\set ON_ERROR_ROLLBACK on
BEGIN;

DROP INDEX IF EXISTS idx_slick_centroid;
DROP INDEX IF EXISTS idx_slick_cls;
DROP INDEX IF EXISTS idx_slick_fill_factor;
DROP INDEX IF EXISTS idx_slick_hitl;
DROP INDEX IF EXISTS idx_slick_length;
DROP INDEX IF EXISTS idx_slick_polsby_popper;

CREATE OR REPLACE FUNCTION empty(a ANYARRAY) RETURNS bool AS $$
    SELECT a IS NULL OR cardinality(a) < 1;
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;

CREATE OR REPLACE FUNCTION nonempty(a ANYARRAY) RETURNS bool AS $$
    SELECT a IS NOT NULL AND cardinality(a) >= 1;
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;

CREATE OR REPLACE FUNCTION polsby_popper(geog geography) RETURNS FLOAT AS $$
    SELECT 4.0 * pi() * st_area(geog) / (st_perimeter(geog) ^ 2.0);
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE STRICT;

CREATE OR REPLACE FUNCTION fill_factor(geog geography) RETURNS FLOAT AS $$
DECLARE
    area float := ST_Area(geog);
    perimeter float := ST_Perimeter(geog);
    geom geometry := geog::geometry;
    oriented_envelope geometry := st_orientedenvelope(geom);
BEGIN
    RETURN area / ST_Area(oriented_envelope::geography);
END;
$$ LANGUAGE PLPGSQL IMMUTABLE PARALLEL SAFE STRICT;

CREATE OR REPLACE FUNCTION length(geog geography) RETURNS FLOAT AS $$
DECLARE
    geom geometry := geog::geometry;
    oriented_envelope geometry := st_orientedenvelope(geom);
    oe_ring geometry := st_exteriorring(oriented_envelope);
BEGIN
    RETURN GREATEST(
                st_distance(
                    st_pointn(oe_ring,1)::geography,
                    st_pointn(oe_ring,2)::geography
                ),
                st_distance(
                    st_pointn(oe_ring,2)::geography,
                    st_pointn(oe_ring,3)::geography
                )
            );
END;
$$ LANGUAGE PLPGSQL IMMUTABLE PARALLEL SAFE STRICT;

CREATE OR REPLACE FUNCTION linearity(geog geography) RETURNS FLOAT AS $$
    SELECT (length(geog) ^ 2::double precision) / st_area(geog) / polsby_popper(geog);
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE STRICT;

DROP VIEW IF EXISTS slick_plus;

ALTER TABLE slick
    DROP COLUMN IF EXISTS area,
    ADD COLUMN area float GENERATED ALWAYS AS (ST_Area(geometry)) STORED NOT NULL,
    DROP COLUMN IF EXISTS perimeter,
    ADD COLUMN perimeter float GENERATED ALWAYS AS (ST_Perimeter(geometry)) STORED NOT NULL,
    DROP COLUMN IF EXISTS centroid,
    ADD COLUMN centroid geography(Point,4326) GENERATED ALWAYS AS (ST_Centroid(geometry)) STORED NOT NULL,
    DROP COLUMN IF EXISTS length,
    ADD COLUMN length float GENERATED ALWAYS AS (length(geometry)) STORED NOT NULL,
    DROP COLUMN IF EXISTS polsby_popper,
    ADD COLUMN polsby_popper float GENERATED ALWAYS AS (polsby_popper(geometry)) STORED NOT NULL,
    DROP COLUMN IF EXISTS fill_factor,
    ADD COLUMN fill_factor float GENERATED ALWAYS AS (fill_factor(geometry)) STORED NOT NULL,
    ADD COLUMN linearity float GENERATED ALWAYS AS (linearity(geometry)) STORED NOT NULL,
    ADD COLUMN geom3857 geometry(MultiPolygon, 3857) GENERATED ALWAYS AS (ST_Transform(geometry::geometry, 3857)) STORED NOT NULL,
    ADD COLUMN geom4326 geometry(MultiPolygon, 4326)GENERATED ALWAYS AS (ST_Transform(geometry::geometry, 4326)) STORED NOT NULL,
    ADD COLUMN aoi_type_1_ids bigint[],
    ADD COLUMN aoi_type_2_ids bigint[],
    ADD COLUMN aoi_type_3_ids bigint[],
    ADD COLUMN source_type_1_ids bigint[],
    ADD COLUMN source_type_2_ids bigint[]
;

CREATE INDEX ON slick USING GIST(geom4326);
CREATE INDEX ON slick USING GIST(geom3857) WHERE active;
CREATE INDEX ON slick (slick_timestamp, machine_confidence, area) WHERE active;
CREATE INDEX ON slick USING GIST((ST_Transform(geometry::geometry, 3857))) WHERE active;

CREATE OR REPLACE FUNCTION update_null_aoi_types() RETURNS bigint AS $$
    WITH t AS (
        SELECT
            slick.id,
            COALESCE(array_agg(DISTINCT aoi.id) FILTER (WHERE aoi.type = 1),'{}') as t1,
            COALESCE(array_agg(DISTINCT aoi.id) FILTER (WHERE aoi.type = 2),'{}') as t2,
            COALESCE(array_agg(DISTINCT aoi.id) FILTER (WHERE aoi.type = 3),'{}') as t3
        FROM slick
            LEFT JOIN slick_to_aoi ON (slick.id = slick_to_aoi.slick)
            LEFT JOIN aoi ON (slick_to_aoi.aoi = aoi.id)
        WHERE aoi_type_1_ids IS NULL OR aoi_type_2_ids IS NULL OR aoi_type_3_ids IS NULL
        GROUP BY 1
    ), updates AS (
        UPDATE slick SET
            aoi_type_1_ids = t1,
            aoi_type_2_ids = t2,
            aoi_type_3_ids = t3
        FROM t WHERE slick.id = t.id
        RETURNING slick.id
    )
    SELECT count(*) FROM updates
    ;
$$ LANGUAGE SQL;

DROP TRIGGER IF EXISTS slick_before_trigger ON slick;

CREATE OR REPLACE FUNCTION update_null_source_types() RETURNS bigint AS $$
    WITH t AS (
        SELECT
            slick.id,
            COALESCE(array_agg(DISTINCT source.id) FILTER (WHERE source.type = 1),'{}') as t1,
            COALESCE(array_agg(DISTINCT source.id) FILTER (WHERE source.type = 2),'{}') as t2
        FROM slick
            LEFT JOIN slick_to_source ON (slick.id = slick_to_source.slick)
            LEFT JOIN source ON (slick_to_source.source = source.id)
        WHERE source_type_1_ids IS NULL OR source_type_2_ids IS NULL
        GROUP BY 1
    ), updates AS (
        UPDATE slick SET
            source_type_1_ids = t1,
            source_type_2_ids = t2
        FROM t WHERE slick.id = t.id
        RETURNING slick.id
    )
    SELECT count(*) FROM updates
    ;
$$ LANGUAGE SQL;

SELECT update_null_aoi_types(), update_null_source_types();

CREATE OR REPLACE FUNCTION slick_cls(_orchestrator_run bigint, _inference_idx int) RETURNS int AS $$
    SELECT
        cls.id
    FROM
        orchestrator_run,
        model,
        cls
    WHERE
        orchestrator_run.id = _orchestrator_run
        AND
        model.id = orchestrator_run.model
        AND
        cls.short_name = model.cls_map->>(_inference_idx::text)
    LIMIT 1
    ;
$$ LANGUAGE SQL STABLE;

CREATE OR REPLACE FUNCTION slick_before_trigger_func() RETURNS trigger
AS $$
    DECLARE
        timer timestamptz := clock_timestamp();
        geom geometry := NEW.geometry::geometry;
    BEGIN
        RAISE NOTICE '---------------------------------------------------------';
        RAISE NOTICE 'In slick_before_trigger_func. %', (clock_timestamp() - timer)::interval;

        NEW.cls := slick_cls(NEW.orchestrator_run, NEW.inference_idx);
        RAISE NOTICE 'Calculated NEW.cls. %', (clock_timestamp() - timer)::interval;

        IF TG_OP = 'DELETE' OR (TG_OP = 'UPDATE' AND NEW.geometry IS DISTINCT FROM OLD.geometry) OR NEW.aoi_type_1_ids = '{-1}'::bigint[] THEN
            DELETE FROM slick_to_aoi WHERE slick = NEW.id;
            RAISE NOTICE 'Removed slick from slick_to_aoi. %', (clock_timestamp() - timer)::interval;
        END IF;

        IF TG_OP = 'INSERT' OR (TG_OP = 'UPDATE' AND NEW.geometry IS DISTINCT FROM OLD.geometry) OR NEW.aoi_type_1_ids = '{-1}'::bigint[] THEN
            INSERT INTO slick_to_aoi (slick, aoi)
                SELECT DISTINCT NEW.id, aoi_chunks.id
                FROM aoi_chunks
                WHERE ST_Intersects(geom, aoi_chunks.geometry)
            ON CONFLICT DO NOTHING
            ;

            RAISE NOTICE 'Added slick to slick_to_aoi. %', (clock_timestamp() - timer)::interval;

            SELECT INTO NEW.aoi_type_1_ids, NEW.aoi_type_2_ids, NEW.aoi_type_3_ids
                COALESCE(array_agg(DISTINCT aoi.id) FILTER (WHERE aoi.type = 1),'{}'),
                COALESCE(array_agg(DISTINCT aoi.id) FILTER (WHERE aoi.type = 2),'{}'),
                COALESCE(array_agg(DISTINCT aoi.id) FILTER (WHERE aoi.type = 3),'{}')
            FROM slick_to_aoi JOIN aoi ON (slick_to_aoi.aoi = aoi.id)
            WHERE slick_to_aoi.slick = NEW.id;

            RAISE NOTICE 'Populated AOI Arrays. %', (clock_timestamp() - timer)::interval;

            SELECT INTO NEW.source_type_1_ids, NEW.source_type_2_ids
                COALESCE(array_agg(DISTINCT aoi.id) FILTER (WHERE aoi.type = 1),'{}'),
                COALESCE(array_agg(DISTINCT aoi.id) FILTER (WHERE aoi.type = 2),'{}')
            FROM slick_to_source JOIN source ON (slick_to_source.source = source.id)
            WHERE slick_to_source.slick = NEW.id;

            RAISE NOTICE 'Populated Source Arrays. %', (clock_timestamp() - timer)::interval;
        END IF;

        RAISE NOTICE 'Returning. %', (clock_timestamp() - timer)::interval;

        IF TG_OP = 'UPDATE' AND NEW IS NOT DISTINCT FROM OLD THEN
            RETURN NULL;
        END IF;
        RETURN NEW;
    END;
$$ LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS slick_before_trigger ON slick;
CREATE TRIGGER slick_before_trigger
BEFORE INSERT OR UPDATE OR DELETE ON slick
FOR EACH ROW EXECUTE FUNCTION slick_before_trigger_func();

CREATE OR REPLACE FUNCTION chunks(_geom geometry) RETURNS SETOF geometry AS $$
    WITH t AS (
        SELECT geom
        FROM
            ST_Dump(
                ST_MakeValid(
                    ST_Buffer(_geom, 0)
                )
            )
    )
    SELECT ST_Subdivide(geom) FROM t WHERE ST_Npoints(geom) > 255
    UNION ALL
    SELECT geom FROM t WHERE ST_Npoints(geom) <= 255
    ;
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE STRICT;

CREATE OR REPLACE FUNCTION chunks(geog geography) RETURNS SETOF geometry AS $$
    SELECT * FROM chunks(geog::geometry);
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE STRICT;

CREATE OR REPLACE FUNCTION aoi_after_statement_trigger_func() RETURNS TRIGGER AS $$
DECLARE
    _slick slick;
    rows bigint;
    ctx text;
    rec record;
BEGIN
    RAISE NOTICE 'IN aoi_after_statement_trigger_func %', TG_OP;
    -- clear out aoi_chunks for any deleted or modified AOIs
    IF TG_OP = 'DELETE' OR TG_OP = 'UPDATE' THEN
        DELETE FROM aoi_chunks WHERE id IN (SELECT DISTINCT id FROM OLDTABLE);
        RAISE NOTICE 'Deleted from aoi_chunks';
        DELETE FROM slick_to_aoi WHERE aoi IN (SELECT DISTINCT id FROM OLDTABLE);
        RAISE NOTICE 'Deleted from aoi_chunks';
    END IF;

    -- Populate aoi_chunks, slick_to_aoi, and slick aoi arrays
    IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
        RAISE NOTICE 'Creating Chunked Geometry for AOI and populating slick_to_aoi and slick arrays';

        WITH chunks AS (
            INSERT INTO aoi_chunks (id, geometry)
                SELECT id, chunks(geometry)
                FROM NEWTABLE
            RETURNING *
        ), matches AS (
            INSERT INTO slick_to_aoi (slick, aoi)
            SELECT
                DISTINCT slick.id, chunks.id
            FROM slick JOIN chunks ON (ST_Intersects(slick.geom4326,chunks.geometry))
            RETURNING *
        ), arr AS (
            SELECT
                matches.slick,
                COALESCE(array_agg(DISTINCT aoi.id) FILTER (WHERE aoi.type = 1),'{}') as t1,
                COALESCE(array_agg(DISTINCT aoi.id) FILTER (WHERE aoi.type = 2),'{}') as t2,
                COALESCE(array_agg(DISTINCT aoi.id) FILTER (WHERE aoi.type = 3),'{}') as t3
            FROM matches JOIN aoi ON (matches.aoi = aoi.id)
            GROUP BY matches.slick
        )
        UPDATE slick SET
            aoi_type_1_ids = t1,
            aoi_type_2_ids = t2,
            aoi_type_3_ids = t3
        FROM arr
        WHERE
            slick.id = arr.slick
        ;
        RAISE NOTICE 'Updated aoi tables.';

    END IF;

    RETURN NEW;
END;
$$ LANGUAGE PLPGSQL;

CREATE OR REPLACE TRIGGER aoi_after_statement_insert_trigger
AFTER INSERT ON aoi
REFERENCING NEW TABLE AS NEWTABLE
FOR EACH STATEMENT EXECUTE FUNCTION aoi_after_statement_trigger_func();

CREATE OR REPLACE TRIGGER aoi_after_statement_update_trigger
AFTER UPDATE ON aoi
REFERENCING OLD TABLE AS OLDTABLE NEW TABLE AS NEWTABLE
FOR EACH STATEMENT EXECUTE FUNCTION aoi_after_statement_trigger_func();

CREATE OR REPLACE TRIGGER aoi_after_statement_delete_trigger
AFTER DELETE ON aoi
REFERENCING OLD TABLE AS OLDTABLE
FOR EACH STATEMENT EXECUTE FUNCTION aoi_after_statement_trigger_func();

CREATE OR REPLACE FUNCTION slick_to_source_after_trigger_func() RETURNS TRIGGER AS $$
DECLARE
    slicks bigint[];
BEGIN
    DROP TABLE IF EXISTS mslicks;
    CREATE TEMP TABLE mslicks (slick bigint UNIQUE NOT NULL) ON COMMIT DROP;
    IF TG_OP IN ('DELETE','UPDATE') THEN
        INSERT INTO mslicks (slick) SELECT DISTINCT slick FROM OLDTABLE;
    END IF;
    IF TG_OP IN ('INSERT', 'UPDATE') THEN
        INSERT INTO mslicks (slick) SELECT DISTINCT slick FROM NEWTABLE ON CONFLICT DO NOTHING;
    END IF;

    WITH s AS (
        SELECT slick, source, type
        FROM mslicks JOIN slick_to_source USING(slick) JOIN source ON (source=source.id)
    ), arr AS (
        SELECT
            slick,
            COALESCE(array_agg(DISTINCT source) FILTER (WHERE type = 1),'{}') as t1,
            COALESCE(array_agg(DISTINCT source) FILTER (WHERE type = 2),'{}') as t2
        FROM s
        GROUP BY 1
    )
    UPDATE slick SET
        source_type_1_ids = t1,
        source_type_2_ids = t2
    FROM t WHERE slick.id = t.id
    ;
    DROP TABLE IF EXISTS mslicks;
END;
$$ LANGUAGE PLPGSQL;

CREATE OR REPLACE TRIGGER slick_to_source_after_statement_insert_trigger
AFTER INSERT ON slick_to_source
REFERENCING NEW TABLE AS UPDATES
FOR EACH STATEMENT EXECUTE FUNCTION slick_to_source_after_trigger_func();

CREATE OR REPLACE TRIGGER slick_to_source_after_statement_update_trigger
AFTER UPDATE ON slick_to_source
REFERENCING OLD TABLE AS OLDTABLE NEW TABLE AS NEWTABLE
FOR EACH STATEMENT EXECUTE FUNCTION slick_to_source_after_trigger_func();

CREATE OR REPLACE TRIGGER slick_to_source_after_statement_delete_trigger
AFTER DELETE ON slick_to_source
REFERENCING OLD TABLE AS OLDTABLE
FOR EACH STATEMENT EXECUTE FUNCTION slick_to_source_after_trigger_func();

DROP VIEW IF EXISTS slick_plus;

CREATE VIEW slick_plus AS
SELECT
    slick.*,
    nonempty(aoi_type_1_ids) AS aoi_type_1,
    nonempty(aoi_type_2_ids) AS aoi_type_2,
    nonempty(aoi_type_3_ids) AS aoi_type_3,
    aoi_type_1_ids || aoi_type_2_ids || aoi_type_3_ids AS aoi_ids,
    nonempty(source_type_1_ids) AS source_type_1,
    nonempty(source_type_2_ids) AS source_type_2,
    source_type_1_ids || source_type_2_ids AS source_ids,
    empty(source_type_1_ids) AND empty(source_type_2_ids) AS source_nosource,
    sent.scene_id as s1_scene_id,
    sent.geometry as s1_geometry,
    cls.short_name as cls_short_name,
    cls.long_name as cls_long_name
FROM
    slick
    JOIN orchestrator_run ON orchestrator_run.id = slick.orchestrator_run
    JOIN sentinel1_grd sent ON sent.id = orchestrator_run.sentinel1_grd
    JOIN cls ON cls.id = slick.cls
WHERE slick.active
;

ANALYZE slick;

COMMIT;
bitner commented 1 month ago

@aemonm @jonaraphael

I've updated the SQL updates with triggers that will keep aoi_chunks and slick_to_aoi updated bidirectionally when there are any changes to either the slicks or aoi tables. There are also triggers to keep the arrays for sources and aois up to date on any changes to slick, aoi, or slick_to_sources.

I've also tried to streamline, clean up some other things from the code I gave you earlier.