In the changing field of data engineering, having strong, scalable, and user-friendly tools is essential. We introduce Teal, a new open-source ETL tool designed to improve your data transformation and orchestration.
Teal combines the best features of tools like dbt, Dagster, and Airflow, while solving common problems found in traditional Python-based solutions. Our goal is to provide data engineers and analysts with a powerful, easy-to-use platform that simplifies complex workflows and increases productivity.
Why Choose Teal?
go install github.com/go-teal/teal/cmd/teal@latest
mkdir my_test_project
cd my_test_project
teal init
❯ ls -al
total 16
drwxr-xr-x@ 6 wwtlf wwtlf 192 24 Jun 21:23 .
drwxr-xr-x 5 wwtlf wwtlf 160 24 Jun 21:21 ..
drwxr-xr-x@ 3 wwtlf wwtlf 96 24 Jun 07:46 assets
-rw-r--r--@ 1 wwtlf wwtlf 302 24 Jun 07:51 config.yaml
drwxr-xr-x@ 2 wwtlf wwtlf 64 24 Jun 20:03 docs
-rw-r--r--@ 1 wwtlf wwtlf 137 24 Jun 07:46 profile.yaml
version: '1.0.0'
module: github.com/my_user/my_test_project
connections:
- name: default
type: duckdb
config:
path: ./store/test.duckdb
extensions:
- postgres
- httpfs
# extraParams:
# - name: "name"
# value: "value"
module
param will be used as a module in go.modpath
exists.version: '1.0.0'
name: 'my-test-project'
connection: 'default'
models:
stages:
- name: staging
- name: dds
- name: mart
name
will be used as a name for the binary fileteal gen
You'll see the following outpout
project-path: .
config-file: ./config.yaml
Building: staging.addresses.sql
Building: staging.transactions.sql
Building: staging.wallets.sql
Building: dds.dim_addresses.sql
Building: dds.fact_transactions.sql
Building: mart.mart_wallet_report.sql
Files 10
./cmd/my-test-project/main._go .................................................. [OK]
./go.mod ........................................................................ [OK]
./internal/assets/staging.addresses.go .......................................... [OK]
./internal/assets/staging.transactions.go ....................................... [OK]
./internal/assets/staging.wallets.go ............................................ [OK]
./internal/assets/dds.dim_addresses.go .......................................... [OK]
./internal/assets/dds.fact_transactions.go ...................................... [OK]
./internal/assets/mart.mart_wallet_report.go .................................... [OK]
./internal/assets/configs.go .................................................... [OK]
./docs/graph.wsd ................................................................ [OK]
Your DAG is depicted in the PlantUML file graph.wsd
main._go
to my-test-project.go
_ "github.com/marcboeker/go-duckdb"
in my-test-project.go
.go mod tidy
.
├── assets
│ └── models
│ ├── dds
│ │ ├── dim_addresses.sql
│ │ └── fact_transactions.sql
│ ├── mart
│ │ └── mart_wallet_report.sql
│ └── staging
│ ├── addresses.sql
│ ├── transactions.sql
│ └── wallets.sql
├── cmd
│ └── my-test-project
│ └── main.go
├── config.yaml
├── docs
│ └── graph.wsd
├── go.mod
├── go.sum
├── internal
│ └── assets
│ ├── configs.go
│ ├── dds.dim_addresses.go
│ ├── dds.fact_transactions.go
│ ├── mart.mart_wallet_report.go
│ ├── staging.addresses.go
│ ├── staging.transactions.go
│ └── staging.wallets.go
├── profile.yaml
└── store
├── addresses.csv
├── transactions.csv
└── wallets.csv
go run ./cmd/my-test-project
package main
import (
_ "github.com/marcboeker/go-duckdb"
"fmt"
"os"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/go-teal/teal/pkg/core"
"github.com/go-teal/teal/pkg/dags"
"github.com/my_user/my_test_project/internal/assets"
)
func main() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
fmt.Println("my-test-project")
core.GetInstance().Init("config.yaml", ".")
config := core.GetInstance().Config
dag := dags.InitChannelDag(assets.DAG, assets.PorjectAssets, config, "instance 1")
wg := dag.Run()
result := <-dag.Push("TEST", nil, make(chan map[string]interface{}))
fmt.Println(result)
dag.Stop()
wg.Wait()
}
What this code does:
dag.Run()
builds a DAG based on Ref from your .sql models, where each node is an asset and each edge is a GO channel.result := <-dag.Push("TEST", nil, make(chan map[string]interface{}))
triggers the execution of this DAG synchronously.dag.Stop()
sends the deactivation command.version: '1.0.0'
module: github.com/my_user/my_test_project
connections:
- name: default
type: duckdb
config:
path: ./store/test.duckdb
extensions:
- postgres
- httpfs
# extraParams:
# - name: "name"
# value: "value"
Teal supports multiple connections.
The following databases are supported at the moment (v0.1.2):
Param | Type | Description |
---|---|---|
version | String constant | 1.0.0 |
module | String | Generated go module name |
connections | String | Array of database connections |
connections.name | String | Name of the connection for model profile |
connections.type | String | Driver name of the database connection, DuckDB, PostgreSQL, etc. |
version: '1.0.0'
name: 'my-test-project'
connection: 'default'
models:
stages:
- name: staging
models:
- name: model1
# see models pfofiles
- name: dds
- name: mart
Param | Type | Description |
---|---|---|
version | String constant | 1.0.0 |
name | String | Generated folder name for main.go |
connection | String | Connection from config.yaml by default |
models.stages: | Array of stages | list of stages for models. For each stage a folder assets/models /<stage name> must be created in advance |
models.stages | See: Model Profile |
The asset profile can be specified via the profile.yaml
file or via a GO template in your sql model file in the sub-template {{ define "profile.yaml" }} ... {{ end }}
:
{{ define "profile.yaml" }}
connection: 'default'
materialization: 'table'
is_data_framed: true
{{ end }}
select
id,
wallet_id,
wallet_address,
currency
from read_csv('store/addresses.csv',
delim = ',',
header = true,
columns = {
'id': 'INT',
'wallet_id': 'VARCHAR',
'wallet_address': 'VARCHAR',
'currency': 'VARCHAR'}
)
Param | Type | Default value | Description |
---|---|---|---|
name | String | filename | The model name must be the same as the file name without regard to the system extension (.sql) |
connection | String | profile.connection | Connection name from config.yaml |
materialization | String | table | See Materializations |
is_data_framed | boolean | false | See Cross database references |
persist_inputs | boolean | false | See Cross database references |
Materializations | Description |
---|---|
table | The result of SQL query execution is stored in the table corresponding to the model name. If the table does not exist, it will be created. If the table already exists, it will be cleared using the truncate method. |
incremental | The result of the query execution is added to the existing table. If the table does not exist, it will be created. |
view | The SQL query is saved as a View. |
Functions in double braces {{ Ref "staging.model" }}
are static, i.e. values are substituted at the moment of project generation.
Functions in triple braces {{{ Ref "staging.model" }}}
are dynamic, i.e. they are executed at the moment of activation of your asset. After project generation, triple brackets are replaced by double brackets in the source code of assetts
Native available functions:
Function | Input Parameters | Output data | Description |
---|---|---|---|
Ref | "<staging name>.<model name>" |
string | Ref is the main function on which the DAG is based. It points to the model that will be replaced by the table name after the template is executed. |
this | No | string | this function returns the name of the current table |
IsIncremental | No | boolean | IsIncremental function returns the sign of model execution in the increment mode |
_ "github.com/marcboeker/go-duckdb"
must be added to main.go
Param | Type | Description |
---|---|---|
extensions | Array of strings | List of DuckDB extenstions. Extenstions will be install during the creation of database and loaded befor the asset execution |
path | String | Path to the DuckDB database file |
extraParams | Object | Pairs of the name->values parameters for DuckDB configuration |
The following two model profile parameters are responsible for cross base references: