FlipsideCrypto / near-models

Data models for the NEAR blockchain, originally built by the MetricsDAO community.
https://flipsidecrypto.github.io/near-models/
MIT License
16 stars 12 forks source link

Near DBT Project

Curated SQL Views and Metrics for the Near Blockchain.

What's Near? Learn more here

Variables

To control which external table environment a model references, as well as, whether a Stream is invoked at runtime using control variables:

Default values are False

To control the creation of UDF or SP macros with dbt run:

Default values are False

Applying Model Tags

Database / Schema level tags

Database and schema tags are applied via the add_database_or_schema_tags macro. These tags are inherited by their downstream objects. To add/modify tags call the appropriate tag set function within the macro.

{{ set_database_tag_value('SOME_DATABASE_TAG_KEY','SOME_DATABASE_TAG_VALUE') }}
{{ set_schema_tag_value('SOME_SCHEMA_TAG_KEY','SOME_SCHEMA_TAG_VALUE') }}

Model tags

To add/update a model's snowflake tags, add/modify the meta model property under config. Only table level tags are supported at this time via DBT.

{{ config(
    ...,
    meta={
        'database_tags':{
            'table': {
                'PURPOSE': 'SOME_PURPOSE'
            }
        }
    },
    ...
) }}

By default, model tags are pushed to Snowflake on each DBT run. You can disable this by setting the UPDATE_SNOWFLAKE_TAGS project variable to False during a run.

dbt run --var '{"UPDATE_SNOWFLAKE_TAGS":False}' -s models/core/core__ez_dex_swaps.sql

Querying for existing tags on a model in snowflake

select *
from table(near.information_schema.tag_references('near.core.ez_dex_swaps', 'table'));

Branching / PRs

When conducting work please branch off of main with a description branch name and generate a pull request. At least one other individual must review the PR before it can be merged into main. Once merged into main DBT Cloud will run the new models and output the results into the PROD schema.

When creating a PR please include the following details in the PR description:

  1. List of Tables Created or Modified
  2. Description of changes.
  3. Implication of changes (if any).

Fixing Data Issues

Manual Batch Refresh

If data needs to be re-run for some reason, partitions of data can be re-reun through the models by utilizing the column _partition_by_block_number and passing environment variables.

Any data refresh will need to be done in a batch due to the nature of the receipt <> tx hash mapping. The view silver__receipt_tx_hash_mapping is a recursive AncestryTree that follows linked receipt outcome ids to map all receipts generated in a transaction back to the primary hash. Receipts can be generated many blocks after the transaction occurs, so a generous buffer is required to ensure all receipts are captured.

The fix makes use of project variables to pass the following parameters:

FRONT_BUFFER and END_BUFFER are set to 1 by default and indicate how many partitions (10k blocks) should be added on to the front or end of the range.

dbt run -s [tags] --vars [variables]

dbt Model Tags

To help with targeted refreshes, a number of tags have been applied to the models. These are defined below:

Tag Description View Models
load Runs models that load data into Snowflake from S3. The 2 load_X models are staging tables for data, which is then parsed and transformed up until the final txs/receipts models. link
load_shards Just the load models that touch shards link
load_blocks Just the load models that touch blocks link
receipt_map Runs the receipt-mapping models that must use a partition. This set of models cannot simply run with incremental logic due to the recursive tree used to map receipt IDs to Tx Hashes. link
actions Just the 3 action events models, an important set of intermediary models before curated activity. Note: These are also tagged with s3_curated. link
curated Models that are used to generate the curated tables link
core All public views are tagged with core, regardless of schema. At this time, that includes core and social. link

Note: there are other tags that are currently not used. All legacy models are tagged with something that includes rpc, but all are presently disabled to avoid an accidental run.
You can visualize these tags by using the DAG Explorer in the docs.

Load Missing Blocks or Shards

Blocks and shards can be re-loaded using the load tag(s).

The logic in the load_x models will only check the external table for blocks and shards known to be missing. It will query the sequence gap test table(s), thus the partition range is not required for this step, as it will take that from the test table.

dbt run -s tag:load --vars '{"MANUAL_FIX": True}'

Map Tx Hash <> Receipt Hash

The middle step is to map receipt IDs to transaction hashes. This is done in 3 models, which are tagged with receipt_map. 2 of these models are helper views that recursively map out the receipt->parent receipt->...->transaction, thus linking all receipts to a transaction. This step is computationally intensive, and requires a tight partition range. For present blocks with more activity, <250k is recommended.

The following will read a range of receipts from silver__streamline_receipts and link receipts to the corresponding tx hash, saving the result to silver__streamline_receipts_final. It will then insert receipt data into the tx object in silver__streamline_transactions_final.

dbt run -s tag:receipt_map --vars '{"MANUAL_FIX": True, "RANGE_START": X, "RANGE_END": Y}'

If the range being mapped is the same range as the block/shard re-walk, then the tag can simply be appended to the load job and the receipts will be mapped after ingestion.

dbt run -s tag:load tag:receipt_map --vars '{"MANUAL_FIX": True, "RANGE_START": X, "RANGE_END": Y}'

The end result of this run will be streamline__receipts_final and streamline__transactions_final (link).

Update curated models

Actions and curated models include the conditional based on target name so the tags actions and curated can be included to re-run the fixed data in downstream silver models. If missing data is loaded in new, this should not be necessary as _load_timestamp will be set to when the data hits snowflake and will flow through the standard incremental logic in the curated models. However, the range can be run with the curated tag:

dbt run -s tag:curated --vars '{"MANUAL_FIX": True, "RANGE_START": X, "RANGE_END": Y}'

Or

dbt run -s tag:load tag:receipt_map tag:curated --vars '{"MANUAL_FIX": True, "RANGE_START": X, "RANGE_END": Y}'

Incremental Load Strategy

Because data must be run in batches to properly map receipts to a transaction hash, a conditional is added to curated models using jinja. This should be present on everything after the mapping process. Most data will have no issue running with a standard incremental load. This filter is required for the above commands in the Manual Batch Refresh section.

Include the following conditional, as targeted runs of block partitions may be required:

        {% if var("MANUAL_FIX") %}
            {{ partition_load_manual('no_buffer') }}
        {% else %}
            {{ incremental_load_filter('_inserted_timestamp') }}
        {% endif %}

More DBT Resources: