elsa-workflows / elsa-core

A .NET workflows library
https://v3.elsaworkflows.io/
MIT License
6.02k stars 1.1k forks source link

Data Processing Pipelines / ETL #4809

Open sfmskywalker opened 5 months ago

sfmskywalker commented 5 months ago

We're looking to enhance Elsa's data processing capabilities. To do this, we're drawing ideas from Azure Data Factory. Here's a breakdown of the new concepts we're considering for integration:

  1. Data Set: This would be a reference to a specific set of data located somewhere. It's like a pointer telling Elsa where to find the data it needs.

  2. Connected Service: Think of this as a "connection string". It's a way for Elsa to connect to various services to read and write data. This is crucial for moving data in and out of Elsa.

  3. Activity Connection to Data Sets: We plan to enable each activity in Elsa to be linked to a source Data Set and a target Data Set (also known as a sink Data Set). When an activity runs, it would be able to pull data from its source Data Set and send it to its target Data Set. This process could involve data mapping and transformation, depending on what the activity is designed to do.

  4. Background Execution and Data Processing: Activities would be able to run in the background, even on different nodes within a cluster, which is something Elsa already supports. They can handle data in chunks, sending processed data to subsequent activities or creating new data sets.

  5. Data Set vs Variables and Storage Driver: A significant upgrade in Elsa would be the way it handles data sets compared to its current Variables and Storage Driver concept. The idea is to store Data Sets externally on the Linked Service, providing more flexibility and advanced functionality.

  6. User Experience Enhancements: From a user interface perspective, users will still have the ability to use outputs from activities. But in addition to capturing outputs via Variables, users would also have the option to direct these outputs to a chosen Data Set.

In summary, these enhancements are aimed at making Elsa more capable in handling data, especially in terms of processing, storing, and transferring data between different activities and services.

sfmskywalker commented 5 months ago

Components:

  1. LinkedService:

    • Abstract Class: Create an abstract base class LinkedService to define common properties for connection details.
    • Concrete Implementations: Develop subclasses for specific connection types like SqlServerLinkedService, RedisLinkedService, and GraphQLLinkedService.
    • Properties: Include properties for connection strings, authentication details, and any other necessary configuration.
  2. DataSet:

    • Abstract Class: Create an abstract base class DataSet for representing data structures used within workflows.
    • Concrete Implementations: Develop subclasses like SqlDataSet, RedisDataSet, and GraphQLDataSet, each corresponding to their respective LinkedService.
    • Methods: Include methods for data operations such as Read(), Write(), Update(), and Delete().
    • Linked Service Reference: Each DataSet instance should reference a LinkedService to define how to connect to the data source.
  3. Custom Activities in Elsa:

    • Develop custom activities that utilise DataSet and LinkedService, enabling operations like data retrieval, transformation, and loading.
  4. Configuration and Management:

    • Implement JSON-based configuration for defining LinkedServices and DataSets.
    • Provide APIs and potentially UI extensions for managing these configurations.
  5. Security and Compliance:

    • Ensure encryption of sensitive data in LinkedService configurations.
    • Implement features for auditing and compliance.
  6. Scalability and Performance:

    • Design asynchronous data operations and incorporate caching where appropriate.

Deserialization and Dynamic Object Creation:

Usage Example:

Pseudo Code Example:

// Abstract LinkedService class
abstract class LinkedService {
    String connectionString
    // Other common properties
}

// Specific LinkedService for SQL Server
class SqlServerLinkedService extends LinkedService {
    // SQL Server specific properties
}

// Abstract DataSet class
abstract class DataSet {
    LinkedService linkedService  // Reference to a LinkedService
    abstract method Read()
    abstract method Write(data)
    // Other data operation methods
}

// Specific DataSet for SQL operations
class SqlDataSet extends DataSet {
    method Read() {
        // Use linkedService to read data
    }
    // Other methods
}

// Usage in Elsa Workflow
class DataProcessingActivity extends Activity {
    DataSet dataSet
    method Execute() {
        // Data processing logic
    }
}
mohdali commented 5 months ago

This sounds great and is something I would be interested in as well.

Adding few points:

1- Handling data streams: IoT/MQTT/OPC UA continuous streams of data that need processing in the workflow pipeline to a certain destination (database or logs or blob storage). The workflow should be able to handle those streams and provide logging and fault handling. At the same time, we don't need a new instance for every chunk of data generated. (reference can be drawn from Azure Stream Analytics https://learn.microsoft.com/en-us/azure/stream-analytics/stream-analytics-introduction)

2- Data buffering / retry: in case of transient errors, data might be buffered at source and sent when the connection is resumed. Similarly, if destination is not available, we might need to buffer the data and retry when connection is re-established.

3- Store metrics such as number of records processed, number of errors or alerts, etc., for easier monitoring.