Energinet-DataHub / ARCHIVED-geh-aggregations

This project aims to create an engine that is able to do calculations on billions of metering points and deliver the results within minutes
Apache License 2.0
2 stars 0 forks source link
green-energy-hub

Aggregations

image

This repository is obsolete, it is no longer being actively maintained by the organisation.

The work/domain has been superseded by the wholesale repository/domain.

codecov

Sonarcloud Status

Sonarcloud Status

Table of content

Intro

The aggregation domain is in charge of doing calculations on the time series sent to Green Energy Hub and executing the balance and wholesale settlement process.

The main calculations the domain is responsible to process are consumption, production, exchange between grid areas and the current grid loss within a grid area.
All calculations return a result for grid area, balance responsible parties and energy suppliers.

The times series sent to Green Energy Hub is processed and enriched in the Time Series domain before they can be picked up by the Aggregations domain.

The calculated results are packaged and forwarded to the legitimate Market Participants such as:

Market Participants
Grid Access Provider
Balance Responsible Party
Energy Supplier

These are the business processes maintained by this domain:

Processes
Submission of calculated energy time series
Request for calculated energy time series
Aggregation of wholesale services
Request for aggregated subscriptions or fees
Request for aggregated tariffs
Request for settlement basis

Domain Overview

domain-overview.png

Domain Road Map

In the current program increment we are working on the following features:

Dataflow between domains

Market roles context streams

Input into the aggregations domain

Delta Lake (metering points)

The Aggregations domain does its calculation on data residing in a delta lake. This data is read in in the beginning of the aggregation job and used through out the calculations. See here how we read the data in the python code

(TBD) Reading historical data.

Eventhub input

(TBD)


Output from the aggregations domain

The coordinator has the responsibility for sending results from the aggregation jobs out of the Aggregations domain. It collects the result from the job in the CoordinatorService handles it and sends it out to a destination eventhub. This is the current implementation. But results could easily be send to another type of endpoint.

Format of the message

(TBD)

Talking to the Post Office eventhub endpoint via the messaging framework

(TBD) Link to the framework once it has been moved. Right now it is embedded as projects.

Protobuf

(TBD) Link to general description of the use of protobuf.


How do we do aggregations?

The aggregations/calculations of the metering points stored in the delta lake are done by databricks jobs containing python code utilizing pyspark.

Coordinator function

The coordinator has a descriptive name in the sense that it does what it says on the tin. It allows an external entity to trigger an aggregation job via an http interface.

Peek here to see we start and manage databricks from the coordinator Once the calculations are done the databricks job notifies the coordinator about the path of the result. The coordinator receives the path in Coordinatortriggers/ResultReceiver and from there the CoordinatorService fetches a stream of the result that the databricks job put in the blob. The format of the result is JSON which is gzip compressed. The stream is decompressed before the input is further processed. The inputprocessor finds a strategy that matches the name of the aggregation result and hands over the next steps to the strategy.

Implementing aggregation result strategies

The coordinator utilizes a strategy pattern for handling different results returned from the databricks job. The strategy is matched by the name of the result in the blob path returned.(See InputStringParserTest)

To implement a new strategy use the following approach:

public class YourResultDtoStrategy: BaseStrategy<YourResultDto>, IDispatchStrategy

public YourResultDtoStrategy(
            ILogger<YourResultDto> logger,
            Dispatcher dispatcher)
        : base(logger, dispatcher)
        {
        }

public override string FriendlyNameInstance => "your_result";

public override IEnumerable<IOutboundMessage> PrepareMessages(
            IEnumerable<YourResultDto> list,
            ProcessType processType,
            string timeIntervalStart,
            string timeIntervalEnd)
        {
            // Code for transforming list of YourResultDtos to IEnumerable<IOutboundMessage>
        }

Your Strategy is automatically DI registered during startup and will then be called when YourResult is received. If the system can't find an appropriate strategy it will log it with the following message:

IDispatchStrategy not found in input processor map. yourresult


Databricks workspace

This is the instance in which the databricks cluster resides. (TBD) When the instance is in the shared domain, describe that.

Python code

The aggregation job itself is defined by python code. The code is both compiled into a wheel file and a python file triggered by the job. The starting point for the databricks job is in aggregation_trigger.py The specific aggregations in .\source\databricks\geh_stream\aggregation_utils\aggregators these are compiled into a wheel file and installed as a library on the cluster.

Dataframe results

The results of the aggregation dataframes are combined in aggregation_trigger.py and then sent back to the coordinator as json.


Getting started

Setting up infrastructure

The instances able to run the aggregations are created with infrastructure as code (Terraform). The code for this can be found in ./build. This IaC is triggered by github and the following describes how to get started with provisioning your own infrastructure.

Note: We use a delta lake for the time series data which is not currently commissioned by the IaC. You need to setup and reference one yourself.

(TBD) Link the general description of how Terraform and IaC works.

(TBD) Info about the shared resources and the role of the keyvault.

(TBD) Info about environments.

Read more on aggregations infrastructure

Learn more about the aggregations infrastructure here.


Test

Read about general QA that applies to the entire Green Energy Hub here.

The aggregation domain has Databricks jobs and libraries implemented in Python. Currently, the aggregation domain has a test suite of pytest unit tests. Information about and instructions on how to execute these tests are outlined here.

Generating test data

The time series test data is created using the databricks workbook.

The creation of test data is based on this file generated from the current danish DataHub system. The test data file consists of the following data properties:

Data properties Description
GridArea
Supplier Energy supplier
Type_Of_MP Type of metering point eg. production, consumption, exchange
Physical_Status Status of metering point eg. new, connected, disconnected
Settlement_Method Consumption based property to indicate if time series value is flex or hourly based
Reading_Occurrence Resolution eg. 1 hour, 15 minutes etc.
Count Represents count of data entries having identical configuration
FromGridArea Related to exchange to specify time series exiting the grid area
ToGridArea Related to exchange to specify time series entering the grid area
BRP Balance Responsible Party

How can you generate test data in your Delta Lake

The databricks workbook can be used to generate the amount of data needed and is currently configured to create time series data for more than 50 grid areas and approximately 4 million metering points.

The creation of test data is split into two parts:

  1. Create test data based on this file for one hour based on old school iteration that takes a while.
  2. Create test data based on latest one full hour loaded as dataframe from Delta Lake.

The reason for the split of data creation is to take full advantage of the distributed architecture of Spark and Delta Lake.

The generated time series data are setup to be stored in a delta lake where from the aggregation job fetches the data to run an aggregation upon.

If test data is created solely based on this file, then the data stored will amount to approximately 202.574.280 entries per day resulting in about 2.16 GB of storage.

Triggering aggregations via coordinator

An example:


https://azfun-coordinator-aggregations-XXXXXX.azurewebsites.net/api/KickStartJob?beginTime=2013-01-01T23%3A00%3A00%2B0100&endTime=2013-01-30T23%3A00%3A00%2B0100&processType=D03

This will ask the coordinator to do an aggregation in the specified time frame with process type D03.

Viewing results of aggregations

If you are using this domain without having a target eventhub for handling the results an alternative approach would be to change CoordinatorService and then perhaps either: