koralium / flowtide

Streaming integration engine
https://koralium.github.io/flowtide/
Apache License 2.0
28 stars 2 forks source link
denormalize engine integration materialize stream


Flowtide.NET

Streaming integration engine
Explore the docs »

Flowtide is a data streaming and integration engine designed for seamless and efficient real-time data processing. It stands out by offering differential data computation, versatile connector support, SQL language support for data streams and integrations, and high-performance operations among other features. Flowtide is tailored for developers and companies looking to efficiently handle complex data transformations and integrations.

Key Features

Getting started

To get started with Flowtide, the easiest way is to create a new C# project. This guide will show an example with SQL Server, but you can change to another connector as well.

Create a minimal API AspNetCore application and install the following nuget package:

Writing the SQL code

The first step is to create an execution plan, this can be be done with any substrait plan creator. But it is also possible to do it with SQL inside flowtide. This tutorial will show you how to create a plan with SQL.

var sqlText = @"
INSERT INTO {sqlserver database name}.{schema name}.{destinationname}
SELECT t.val FROM {sqlserver database name}.{schema name}.{tablename} t
LEFT JOIN {sqlserver database name}.{schema name}.{othertablename} o
ON t.val = o.val
WHERE t.val = 123;
";

Replace all values with that are between { } with your own table names in your SQL Server.

Configure the stream

Next we will add a Flowtide stream to the service collection and add our SQL text as a execution plan:

builder.Services.AddFlowtideStream("myStream)
  .AddSqlTextAsPlan(sqlText)

The stream name is added on all logs and metrics, so keeping it unique in your environment can help when setting up monitoring.

Add connectors

So far we have written SQL code and started configuring the stream. But we have not yet instructed Flowtide what it should connect to. Some examples of sinks and sources are:

This is done by using the AddConnectors method:

builder.Services.AddFlowtideStream("myStream)
  ...
  .AddConnectors(connectorManager => {
    // Add a SQL Server database as an available source
    connectorManager.AddSqlServerSource(() => "Server={your server};Database={your database};Trusted_Connection=True;");
    // Add another SQL Server database as a sink
    connectorManager.AddSqlServerSink(() => "Server={your server};Database={your database};Trusted_Connection=True;");
  })

Configuring state storage

A Flowtide stream requires state storage to function. This can be stored on a file system or on a cloud storage solution.

In this tutorial we will use a local development storage. This storage gets cleared between each test run which make it good for development.

builder.Services.AddFlowtideStream("myStream)
  ...
  .AddStorage(storage => {
    storage.AddTemporaryDevelopmentStorage();
  });

If you want to use persistent storage on the local file system, you can instead use:

.AddStorage(storage => {
  storage.AddFasterKVFileSystemStorage("./stateData");
});

Adding the UI

If you want to add the UI to visualize the progress of the stream, add the following code after "var app = builder.Build();".

app.UseFlowtideUI("/stream");

Full example

Here is the full code example to get started:

var builder = WebApplication.CreateBuilder(args);

var sqlText = @"
INSERT INTO {sqlserver database name}.{schema name}.{destinationname}
SELECT t.val FROM {sqlserver database name}.{schema name}.{tablename} t
LEFT JOIN {sqlserver database name}.{schema name}.{othertablename} o
ON t.val = o.val
WHERE t.val = 123;
";

builder.Services.AddFlowtideStream("myStream)
  .AddSqlTextAsPlan(sqlText)
  .AddConnectors(connectorManager => {
    // Add a SQL Server database as an available source
    connectorManager.AddSqlServerSource(() => "Server={your server};Database={your database};Trusted_Connection=True;");
    // Add another SQL Server database as a sink
    connectorManager.AddSqlServerSink(() => "Server={your server};Database={your database};Trusted_Connection=True;");
  })
  .AddStorage(storage => {
    storage.AddTemporaryDevelopmentStorage();
  });

var app = builder.Build();
app.UseFlowtideUI("/stream");

app.Run();