NASA-IMPACT / veda-backend

Backend services for VEDA
Other
13 stars 5 forks source link

Summarizing STAC Collections: Take One #31

Open anayeaye opened 2 years ago

anayeaye commented 2 years ago

The problem

For MVP dashboard visualization, we need to provide some basic information summarizing all Items in a Collection:

  1. Collection specific time picker requires either the current temporal range of the dataset or a list unique dates for a dataset that does not have periodically spaced captures.
  2. Collection specific rescale values from aggregated raster band statistics over all items (the average of the means, min of the minimums, and max of the maximums).

We intend to leverage the pgstac database to provide custom summary information in the future--such as summary statistics for items within a bounding box and datetime range.

Iterative solution

Commit to implementing a use-case specific version of the core stac-spec summaries collection property. Our implementation is intended to support the dashboard and will supply datetime and raster statistics for a single default map layer asset across the entire collection. Multi asset spectral collections do not map well to this pattern and need further consideration. This may not be the best solution but it captures where we are starting (hopefully in time to change course if there is a better solution).

Insert snapshot summary in collection record

As a first step, we are querying pgstac and updating collection records so that the dashboard can begin relying on the /collections/{collection-id} endpoint to request collection datetime summary and scaling values.

Some corners were cut with assumptions and deferred work to implement a POC for review: Assumptions

Deferred work

Using selected items rather than collection id as parameters, the same functions could be used to support full collection summaries and selected items summaries (for future API development).

Triggered summary update in collection record

If the above UDF concept pans out and we have a better idea of ingest patterns, we might decide to add triggers to keep collection summaries up to date.

Dynamic summary of search results and custom APIs

Pgstac has a nifty request customization parameter (conf) that can be used to turn context on/off (i.e. number of items matched). We might be able to rig up a something similar to enable returning summary statistics for a stac item search without creating a new endpoint.

{
    "filter": {
        </SNIP>
    },
    "conf":{
        "context":"on/off/auto",
        "summaries":"on/off"
     }
}
anayeaye commented 2 years ago

Slightly improved SQL handles set and insert dynamically and implements common collections selection

WITH coll_agg AS (
    SELECT json_build_object(
        'datetime', (
            CASE
                WHEN (c."content"->>'dashboard:is_periodic')::boolean 
                THEN (to_jsonb(array[
                    to_char(min(datetime) at time zone 'Z', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'),
                    to_char(max(datetime) at time zone 'Z', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')]))
                ELSE jsonb_agg(DISTINCT i."content"->'properties'->>'datetime')
            END
        ),
        'cog_default', json_build_object(
            'avg', avg((i."content"->'assets'->'cog_default'->'raster:bands'-> 0 ->'statistics'->>'mean')::float),
            'min', min((i."content"->'assets'->'cog_default'->'raster:bands'-> 0 ->'statistics'->>'minimum')::float),
            'max', max((i."content"->'assets'->'cog_default'->'raster:bands'-> 0 ->'statistics'->>'maximum')::float)
        )
    ) summaries,
    c.id summary_collection_id
    FROM c
    JOIN items ON i.collection_id = c.id
    GROUP BY c."content", c.id
)
UPDATE collections c
SET "content" = 
SELECT
    (
        CASE 
            WHEN EXISTS "content"->'summaries'
            THEN jsonb_set(
                c."content"::jsonb,
                '{summaries}',
                coll_agg.summaries
            )
            ELSE jsonb_insert(
                c."content"::jsonb,
                '{summaries}',
                coll_agg.summaries
            )
        END
    )
WHERE c.id = :c_id
JOIN coll_agg ON coll_agg.summary_collection_id = c.id;
anayeaye commented 2 years ago

Incremental improvements, next will define custom schema and function. Changed:

-- Custom dashboard schema extends pgstac with functions supporting dashboard-specific api metadata
CREATE SCHEMA IF NOT EXISTS dashboard;

-- Add that schema to your path
SET SEARCH_PATH TO dashboard, pgstac, public;

-- This change is ONLY going to stick around for the current session
-- You can change the default search_path for a database role by altering the role
-- ALTER ROLE delta SET SEARCH_PATH TO dashboard, pgstac, public;

-- SQL Functions documentation - https://www.postgresql.org/docs/13/xfunc-sql.html

CREATE OR REPLACE FUNCTION update_default_summary(_collection_id text) RETURNS VOID AS $$
WITH coll_item_cte AS (
    SELECT jsonb_build_object(
        'summaries',
        jsonb_build_object(
            'datetime', (
                CASE
                WHEN (collections."content"->>'dashboard:is_periodic')::boolean
                THEN (to_jsonb(array[
                    to_char(min(datetime) at time zone 'Z', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'),
                    to_char(max(datetime) at time zone 'Z', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')]))
                ELSE jsonb_agg(distinct to_char(datetime at time zone 'Z', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'))
                END
            ),
            'cog_default', (
                CASE
                WHEN collections."content"->'item_assets' ? 'cog_default'
                THEN jsonb_build_object(
                    'min', min((items."content"->'assets'->'cog_default'->'raster:bands'-> 0 ->'statistics'->>'minimum')::float),
                    'max', max((items."content"->'assets'->'cog_default'->'raster:bands'-> 0 ->'statistics'->>'maximum')::float)
                    )
                ELSE NULL
                END
            )
        )
    ) summaries,
    collections.id coll_id
    FROM items
    JOIN collections on items.collection_id = collections.id
    WHERE collections.id = _collection_id
    GROUP BY collections."content" , collections.id
)
UPDATE collections SET "content" = "content" || coll_item_cte.summaries
FROM coll_item_cte 
WHERE collections.id = coll_item_cte.coll_id;
$$ LANGUAGE SQL SET SEARCH_PATH TO dashboard, pgstac, public;

CREATE OR REPLACE FUNCTION update_all_default_summaries() RETURNS VOID AS $$
SELECT 
    update_default_summary(id)
FROM collections
WHERE collections."content" ?| array['item_assets', 'dashboard:is_periodic'];
$$ LANGUAGE SQL SET SEARCH_PATH TO dashboard, pgstac, public;
anayeaye commented 2 years ago

Resolved by PR #43

Boilerplate python This expects that the database information is stored in a variable named con_str, so the code to retrieve and populate that variable is also included.

import json
import boto3
import psycopg
from psycopg import sql
from psycopg.conninfo import make_conninfo

def update_collection_summaries(cursor, collection_id: str) -> None:
  """Update summaries object in pgstac for all items in collection"""
  cursor.execute(
    sql.SQL("""
        SELECT update_default_summaries(id)
        FROM collections
        WHERE collections.id = (%s);
        """), (collection["id"],)
  )

def get_secret(secret_name:str, profile_name:str=None) -> None:
    """Retrieve secrets from AWS Secrets Manager

    Args:
        secret_name (str): name of aws secrets manager secret containing database connection secrets
        profile_name (str, optional): optional name of aws profile, default is used if not defined

    Returns:
        secrets (dict): decrypted secrets in dict
    """

    # Create a Secrets Manager client
    if profile_name:
        session = boto3.session.Session(profile_name=profile_name)
    else:
        session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager'
    )

    # In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
    # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
    # We rethrow the exception by default.

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        if e.response['Error']['Code'] == 'AccessDeniedException':
            raise e
        if e.response['Error']['Code'] == 'DecryptionFailureException':
            # Secrets Manager can't decrypt the protected secret text using the provided KMS key.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'InternalServiceErrorException':
            # An error occurred on the server side.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'InvalidParameterException':
            # You provided an invalid value for a parameter.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'InvalidRequestException':
            # You provided a parameter value that is not valid for the current state of the resource.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'ResourceNotFoundException':
            # We can't find the resource that you asked for.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
    else:
        # Decrypts secret using the associated KMS key.
        # Depending on whether the secret is a string or binary, one of these fields will be populated.
        if 'SecretString' in get_secret_value_response:
            return json.loads(get_secret_value_response['SecretString'])
        else:
            return json.loads(base64.b64decode(get_secret_value_response['SecretBinary']))

# String id of STAC collection to summarize
collection_id = "collection-id"

# Get connection info
secret_name = "name or arn of delta/pgstac (not admin) connection secret in target env"
con_secrets = get_secret(secret_name)

# Connect and execute update summaries
con_str = make_conninfo(
    dbname=con_secrets["dbname"],
    user=con_secrets["username"],
    password=con_secrets["password"],
    host=con_secrets["host"],
    port=con_secrets["port"],
)
with psycopg.connect(con_str, autocommit=True) as conn:
    with conn.cursor() as cur:
        print("Adding default collection summaries")
        update_collection_summaries(cur, collection_id)

cc: @abarciauskas-bgse @leothomas this function is now deployed to delta staging and can be executed using the python boiler plate code above or a just a psql connection and SELECT update_default_summaries('nightlights-hd-monthly');.