unity-sds / unity-on-demand

Unity MGSS On-Demand
Apache License 2.0
0 stars 0 forks source link

Create Schema that Validates Initiators Configuration #38

Open galenatjpl opened 2 months ago

galenatjpl commented 2 months ago

Create schema for Initiators config based on initial use case example:

initiator_config:

  payload_type:

    # url payload type supports triggers use-cases:
    # - S3 event (s3://)
    # - file system event (HECC or on-prem) (file://)
    # - CMR based (https:// or s3://)
    # Other potential payload types: JSON, SQL query result, XML, etc.
    url:

      # SBG example: L1B data staged to S3 bucket and payload is S3 url
      - regexes:
            - !!python/regexp '/(?P<id>SISTER_EMIT_L1B_RDN_(?P<observation_date_time>\d{8}T\d{6})_(?P<product_counter>)_OBS\.bin)$'
        evaluators:

          # If the regex matches, the router submits a JSON payload to the eval_sbg_l2_readiness SNS topic that contains 
          # the payload and the regex match groupdict() as JSON. The groupdict JSON can further provide fields to evaluate
          # criteria for go/no-go but should remain lightweight. For example, if the payload was:
          #
          # s3://sps-dev-ds-storage/urn:nasa:unity:unity:dev:SBG-L1B_PRE___1/urn:nasa:unity:unity:dev:SBG-L1B_PRE___1:SISTER_EMIT_L1B_RDN_20240103T131936_001/SISTER_EMIT_L1B_RDN_20240103T131936_001_OBS.bin
          #
          # The payload submitted to the SNS topic would be:
          # {
          #   "payload": "s3://sps-dev-ds-storage/urn:nasa:unity:unity:dev:SBG-L1B_PRE___1/urn:nasa:unity:unity:dev:SBG-L1B_PRE___1:SISTER_EMIT_L1B_RDN_20240103T131936_001/SISTER_EMIT_L1B_RDN_20240103T131936_001_OBS.bin",
          #   "groupdict": {
          #     "id": "SISTER_EMIT_L1B_RDN_20240103T131936_001/SISTER_EMIT_L1B_RDN_20240103T131936_001_OBS.bin",
          #     "observation_date_time": "20240103T131936",
          #     "product_counter": "001",
          #      "actions": {} TODO: fill out dag_id, airflow_base_api_endpoint, etc.
          #   }
          # }
          - name: eval_sbg_l2_readiness
            actions:
              - name: submit_to_sns_topic
                params:
                  # topic_arn is optional to allow specific routing to an SNS topic;
                  # if this was null, empty or absent, then the assumption is a SNS
                  # topic in the local AWS account using the evaluator name as the SNS topic
                  #topic_arn: arn:aws:sns:us-west-2:123456789012:eval_sbg_l2_readiness
                  on_success:
                    actions:
                      - name: submit_dag_by_id
                        params:
                          dag_id: submit_sbg_l2_dag
                          airflow_base_api_endpoint: xxx
                          airflow_username: <SSM parameter, e.g. /unity/airflow/username> <ARN to username entry in AWS Secrets Manager>
                          airflow_password: <SSM parameter, e.g. /unity/airflow/password> <ARN to password entry in Secrets Manager>

      # M2020 example: xyz left finder; example of matching any one of a set of regexes
      - regexes:
            - !!python/regexp 'ids-pipeline/pipes/nonlin_xyz_left/inputque/.L.{17}_.{3}RAS_N.{26}\.VIC-link'
            - !!python/regexp 'ids-pipeline/pipes/nonlin_xyz_left/inputque/.R.{17}_.{3}RAS_N.{26}\.VIC-link'
            - !!python/regexp 'ids-pipeline/pipes/nonlin_xyz_left/inputque/.L.{17}_.{3}DSP_N.{26}\.VIC-link'
        evaluators:

          # If any of the regexes match, the router submits a JSON payload to the eval_m2020_xyz_left_finder SNS topic that contains 
          # the payload and the regex match groupdict() as JSON. The groupdict JSON can further provide fields to evaluate criteria
          # for go/no-go but should remain lightweight.
          - name: eval_m2020_xyz_left_finder
            actions:
              - name: submit_to_sns_topic
                params:
                  # topic_arn is optional to allow specific routing to an SNS topic;
                  # if this was null, empty or absent, then the assumption is a SNS
                  # topic in the local AWS account using the evaluator name as the SNS topic
                  #topic_arn: arn:aws:sns:us-west-2:123456789012:eval_m2020_xyz_left_finder
                  on_success:
                    actions:
                      - name: submit_dag_by_id
                        params:
                          dag_id: submit_nonlin_xyz_left
                          airflow_base_api_endpoint: xxx
                          airflow_username: <SSM parameter, e.g. /unity/airflow/username> <ARN to username entry in AWS Secrets Manager>
                          airflow_password: <SSM parameter, e.g. /unity/airflow/password> <ARN to password entry in Secrets Manager>

      # NISAR example: GDS stages satellite telemetry to S3 bucket and payload is S3 url
      - regexes:
            - !!python/regexp '/(?P<id>(?P<Mission>NISAR)_S(?P<SCID>\d{3})_(?P<Station>\w{2,3})_(?P<Antenna>\w{3,4})_M(?P<Mode>\d{2})_P(?P<Pass>\d{5})_R(?P<Receiver>\d{2})_C(?P<Channel>\d{2})_G(?P<Group>\d{2})_(?P<FileCreationDateTime>\d{4}_\d{3}_\d{2}_\d{2}_\d{2}_\d{6})\d{3}\.vc(?P<VCID>\w{2}))$'
        evaluators:

          # If the regex matches, the router submits a JSON payload to the eval_nisar_ingest SNS topic that contains 
          # the payload and the regex match groupdict() as JSON. The groupdict JSON can further provide fields to evaluate
          # criteria for go/no-go but should remain lightweight.
          - name: eval_nisar_ingest
            actions:
              - name: submit_to_sns_topic
                params:
                  # topic_arn is optional to allow specific routing to an SNS topic;
                  # if this was null, empty or absent, then the assumption is a SNS
                  # topic in the local AWS account using the evaluator name as the SNS topic
                  #topic_arn: arn:aws:sns:us-west-2:123456789012:eval_nisar_ingest
                  on_success:
                    actions:
                      - name: submit_dag_by_id
                        params:
                          dag_id: submit_nisar_tlm_ingest
                          airflow_base_api_endpoint: xxx
                          airflow_username: <SSM parameter, e.g. /unity/airflow/username> <ARN to username entry in AWS Secrets Manager>
                          airflow_password: <SSM parameter, e.g. /unity/airflow/password> <ARN to password entry in Secrets Manager>

      # NISAR example: GDS stages LDF (list of delivered files) to S3 bucket and payload is S3 url
      - regexes:
            - !!python/regexp '/(?P<id>(?P<Mission>NISAR)_S(?P<SCID>\d{3})_(?P<Station>\w{2,3})_(?P<Antenna>\w{3,4})_M(?P<Mode>\d{2})_P(?P<Pass>\d{5})_R(?P<Receiver>\d{2})_C(?P<Channel>\d{2})_G(?P<Group>\d{2})_(?P<FileCreationDateTime>\d{4}_\d{3}_\d{2}_\d{2}_\d{2}_\d{5})(?P<R>\d{1,4})\.ldf)$'
        evaluators:

          # If the regex matches, the router submits a JSON payload to the eval_nisar_l0a_readiness DAG via Airflow REST API
          # that contains the the payload and the regex match groupdict() as JSON. The groupdict JSON can further provide 
          # fields to evaluate criteria for go/no-go but in this case we make a call to submit a dag execution for a heavyweight
          # and possibly long-running evaluation (current worst case pass: 1877 telemetry files, each 200 MB, all ingested and checksum verified; dependency on GDS staging)
          - name: eval_nisar_l0a_readiness
            actions:
              - name: submit_dag_by_id
                params:
                  dag_id: eval_nisar_l0a_readiness
                  airflow_base_api_endpoint: xxx
                  airflow_username: <SSM parameter, e.g. /unity/airflow/username> <ARN to username entry in AWS Secrets Manager>
                  airflow_password: <SSM parameter, e.g. /unity/airflow/password> <ARN to password entry in Secrets Manager>
                  on_success:
                    actions:
                      - name: submit_dag_by_id
                        params:
                          dag_id: submit_nisar_l0a_te_dag
                          # These are commented out because by default they will be pulled from the above configuration since we're in airflow.
                          # Otherwise these can be uncommented out for explicit configuration (e.g. another SPS cluster)
                          #airflow_base_api_endpoint: xxx
                          #airflow_username: <ARN to username entry in AWS Secrets Manager>
                          #airflow_password: <ARN to password entry in Secrets Manager>