Updates to gfe-db are triggered by new commits instead of just new branches (quarterly releases)
Execution state is tracked for each commit and release version in the IMGTHLA repo
Pipeline execution requests are idempotent when executions are in progress
Errors during pipeline executions are handled
Concurrency for database loading is constrained to one release version at a time to avoid fatal collisions
Formatted notifications of pipeline executions are sent by email
Pydantic models are implemented for automatic validation
New Commits
The CheckSourceUpdate Lambda function monitors the IMGTHLA source repository for new commits daily. If one or more new commits are found, the service resolves the release version number and creates new rows in the Execution State table (DynamoDB) and processes only the most recent commit for any given release. The commit's execution status is updated throughout pipeline execution using an enum class:
class ExecutionStatus(str, Enum):
"""
ExecutionStatus is synced using the Step Functions DynamoDB integration:
NOT_PROCESSED: never processed (set by CheckSourceUpdate) ✅
SKIPPED: never processed (set by CheckSourceUpdate) ✅
PENDING: state machine execution started (set by CheckSourceUpdate) ✅
BUILD_IN_PROGRESS: build started (set by State Machine) ✅
BUILD_SUCCESS: build succeeded (set by State Machine) ✅
LOAD_IN_PROGRESS: load started (set by State Machine) ✅
LOAD_SUCCESS: load succeeded (set by State Machine) ✅
LOAD_FAILED: load failed (set by State Machine) ✅
LOAD_INVALID: load invalid from query results (set by State Machine) ✅
LOAD_SKIPPED: load skipped (set by State Machine) ✅
BUILD_FAILED: build failed (set by State Machine) ✅
EXECUTION_FAILED: build or load failed (set by State Machine) ✅
ABORTED: build or load aborted (set by UpdateExecutionState) ✅
"""
NOT_PROCESSED = "NOT_PROCESSED"
SKIPPED = "SKIPPED"
PENDING = "PENDING"
BUILD_IN_PROGRESS = "BUILD_IN_PROGRESS"
BUILD_SUCCESS = "BUILD_SUCCESS"
BUILD_FAILED = "BUILD_FAILED"
LOAD_IN_PROGRESS = "LOAD_IN_PROGRESS"
LOAD_COMPLETE = "LOAD_COMPLETE"
LOAD_SUCCESS = "LOAD_SUCCESS"
LOAD_FAILED = "LOAD_FAILED"
LOAD_INVALID = "LOAD_INVALID"
LOAD_SKIPPED = "LOAD_SKIPPED"
EXECUTION_FAILED = "EXECUTION_FAILED"
ABORTED = "ABORTED"
Execution State
A DynamoDB table is deployed to store state for pipeline executions. For new deployments, the repository's state is built using the GitHub REST API and loaded into the table.
Pipeline Request Idempotency
Idempotency is acheived by using SQS FIFO queues, where the group ID is the unique deployment ID (${STAGE}-${APP_NAME}) and the deduplication ID is the release version. This means that when messages are in the queue, duplicate messages are not processed and that releases are loaded in chronological order.
Error Handling
Errors that occur during pipeline execution are caught and the state table entry is updated to reflect failure or aborted executions.
Database Concurrency Management
Database concurrency is maintained by a state machine called the Load Concurrency Manager (LCM). The LCM runs continuously when the main pipeline is running (by monitoring a CloudWatch Alarm) and handles the pre- and post-execution backups. This is to ensure that the database is not overloaded or shut-down during the loading process. All requests for loading data to Neo4j pass through a FIFO queue to avoid duplication and maintain the order of release versions. The consumer of the FIFO queue (Message Received?) will only receive one message at a time, and will not be invoked again until loading has succeeded or failed. Once the queue is empty and all releases have been loaded, the LCM will stop running.
Success/Failure Notifications
Notifications are sent by email including execution outcomes, validation results and error information in the event of failure.
Pydantic Models
Pydantic is a Python framework for ensuring data integrity. Every object within the pipeline now uses a Pydantic class for automatic schema and type validation. This prevents the state table from having corrupt or missing fields when reading and writing records.
Infrastructure Changes
New Lambda functions
FormatResults - Formats notification messages
InvokeLoadConcurrencyManager - Triggers the LCM when the Update Pipeline state machine has executions in progress
LcmReceiveMessage - Checks the GfeDbLoadQueue for messages
UpdateExecutionState - Handle aborted state machines and updates the state table
Lambda Layers
GfeDbModelsLayer - Contains the logic and methods for building execution state from GitHub API calls as well as models for data handling and validation
SQS Queues
GfeDbProcessingQueue - Queues releases for processing
GfeDbLoadQueue - Queues releases for loading once they are built
DynamoDB table
GfeDbExecutionStateTable - Stores state for each commit, release and execution combination
State Machines
LoadConcurrencyManager - Runs continuously during release processing and limits concurrency of loading to 1 release at a time
Known Issues
[ ] Some of the earlier releases are missing because 1) their commits are not on the default branch (Latest), and 2) because of inconsistencies in the versioning and availability of metadata (fix in progress)
[ ] Commits should only be processed if they include a change to hla.dat or msf/ since these assets contain the source data (fix in progress)
Next Steps
Address the known issues
Merge CSV builds before loading to Neo4j to speed up loading
Description
New Commits
The CheckSourceUpdate Lambda function monitors the IMGTHLA source repository for new commits daily. If one or more new commits are found, the service resolves the release version number and creates new rows in the Execution State table (DynamoDB) and processes only the most recent commit for any given release. The commit's execution status is updated throughout pipeline execution using an enum class:
Execution State
A DynamoDB table is deployed to store state for pipeline executions. For new deployments, the repository's state is built using the GitHub REST API and loaded into the table.
Pipeline Request Idempotency
Idempotency is acheived by using SQS FIFO queues, where the group ID is the unique deployment ID (
${STAGE}-${APP_NAME}
) and the deduplication ID is the release version. This means that when messages are in the queue, duplicate messages are not processed and that releases are loaded in chronological order.Error Handling
Errors that occur during pipeline execution are caught and the state table entry is updated to reflect failure or aborted executions.
Database Concurrency Management
Database concurrency is maintained by a state machine called the Load Concurrency Manager (LCM). The LCM runs continuously when the main pipeline is running (by monitoring a CloudWatch Alarm) and handles the pre- and post-execution backups. This is to ensure that the database is not overloaded or shut-down during the loading process. All requests for loading data to Neo4j pass through a FIFO queue to avoid duplication and maintain the order of release versions. The consumer of the FIFO queue (
Message Received?
) will only receive one message at a time, and will not be invoked again until loading has succeeded or failed. Once the queue is empty and all releases have been loaded, the LCM will stop running.Success/Failure Notifications
Notifications are sent by email including execution outcomes, validation results and error information in the event of failure.
Pydantic Models
Pydantic is a Python framework for ensuring data integrity. Every object within the pipeline now uses a Pydantic class for automatic schema and type validation. This prevents the state table from having corrupt or missing fields when reading and writing records.
Infrastructure Changes
Known Issues
Latest
), and 2) because of inconsistencies in the versioning and availability of metadata (fix in progress)hla.dat
ormsf/
since these assets contain the source data (fix in progress)Next Steps