Pipeline
laktory.models.Pipeline
¤
Bases: BaseModel
, PulumiResource
, TerraformResource
Pipeline model to manage a full-fledged data pipeline including reading from data sources, applying data transformations through Spark and outputting to data sinks.
A pipeline is composed of collections of nodes
, each one defining its
own source, transformations and optional sink. A node may be the source of
another node.
A pipeline may be run manually by using python or the CLI, but it may also be deployed and scheduled using one of the supported orchestrators, such as a Databricks Delta Live Tables or job.
ATTRIBUTE | DESCRIPTION |
---|---|
databricks_job |
Defines the Databricks Job specifications when DATABRICKS_JOB is selected as the orchestrator.
TYPE:
|
dlt |
Defines the Delta Live Tables specifications when DLT is selected as the orchestrator.
TYPE:
|
name |
Name of the pipeline
TYPE:
|
nodes |
List of pipeline nodes. Each node defines a data source, a series of transformations and optionally a sink.
TYPE:
|
orchestrator |
Orchestrator used for scheduling and executing the pipeline. The
selected option defines which resources are to be deployed.
Supported options are:
- |
udfs |
List of user defined functions provided to the transformer.
TYPE:
|
workspacefile |
Workspace file used to store the JSON definition of the pipeline.
TYPE:
|
Examples:
This first example shows how to configure a simple pipeline with 2 nodes. Upon execution, raw data will be read from a CSV file and two DataFrames (bronze and silver) will be created and saved as parquet files. Notice how the first node is used as a data source for the second node.
import io
from laktory import models
pipeline_yaml = '''
name: pl-stock-prices
nodes:
- name: brz_stock_prices
layer: BRONZE
source:
format: CSV
path: ./raw/brz_stock_prices.csv
sink:
format: PARQUET
mode: OVERWRITE
path: ./dataframes/brz_stock_prices
- name: slv_stock_prices
layer: SILVER
source:
node_name: brz_stock_prices
sink:
format: PARQUET
mode: OVERWRITE
path: ./dataframes/slv_stock_prices
transformer:
nodes:
- with_column:
name: created_at
type: timestamp
sql_expr: data.created_at
- with_column:
name: symbol
sql_expr: data.symbol
- with_column:
name: close
type: double
sql_expr: data.close
- func_name: drop
func_args:
- value: data
- value: producer
- value: name
- value: description
'''
pl = models.Pipeline.model_validate_yaml(io.StringIO(pipeline_yaml))
# Execute pipeline
# pl.execute()
The next example defines a 3 nodes pipeline (1 bronze and 2 silvers)
orchestrated with a Databricks Job. Notice how nodes are used as data
sources not only for other nodes, but also for the other
keyword argument
of the smart join function (slv_stock_prices). Because we are using the
DATABRICKS_JOB orchestrator, the job configuration must be declared.
The tasks will be automatically created by the Pipeline model. Each
task will execute a single node using the notebook referenced in
databricks_job.notebook_path
the content of this notebook should be
similar to laktory.resources.notebooks.job_laktory.pl
import io
from laktory import models
pipeline_yaml = '''
name: pl-stock-prices
orchestrator: DATABRICKS_JOB
databricks_job:
name: job-pl-stock-prices
laktory_version: 0.3.0
notebook_path: /Workspace/.laktory/jobs/job_laktory_pl.py
clusters:
- name: node-cluster
spark_version: 14.0.x-scala2.12
node_type_id: Standard_DS3_v2
nodes:
- name: brz_stock_prices
layer: BRONZE
source:
path: /Volumes/dev/sources/landing/events/yahoo-finance/stock_price/
sink:
path: /Volumes/dev/sources/landing/tables/dev_stock_prices/
mode: OVERWRITE
- name: slv_stock_prices
layer: SILVER
source:
node_name: brz_stock_prices
sink:
path: /Volumes/dev/sources/landing/tables/slv_stock_prices/
mode: OVERWRITE
transformer:
nodes:
- with_column:
name: created_at
type: timestamp
sql_expr: data.created_at
- with_column:
name: symbol
sql_expr: data.symbol
- with_column:
name: close
type: double
sql_expr: data.close
- func_name: drop
func_args:
- value: data
- value: producer
- value: name
- value: description
- func_name: laktory.smart_join
func_kwargs:
'on':
- symbol
other:
node_name: slv_stock_meta
- name: slv_stock_meta
layer: SILVER
source:
path: /Volumes/dev/sources/landing/events/yahoo-finance/stock_meta/
sink:
path: /Volumes/dev/sources/landing/tables/slv_stock_meta/
mode: OVERWRITE
'''
pl = models.Pipeline.model_validate_yaml(io.StringIO(pipeline_yaml))
Finally, we re-implement the previous pipeline, but with a few key differences:
- Orchestrator is
DLT
instead of aDATABRICKS_JOB
- Sinks are Unity Catalog tables instead of storage locations
- Data is read as a stream in most nodes
slv_stock_meta
is simply a DLT view since it does not have an associated sink.
We also need to provide some basic configuration for the DLT pipeline.
import io
from laktory import models
pipeline_yaml = '''
name: pl-stock-prices
orchestrator: DLT
dlt:
catalog: dev
target: sandbox
access_controls:
- group_name: users
permission_level: CAN_VIEW
nodes:
- name: brz_stock_prices
layer: BRONZE
source:
path: /Volumes/dev/sources/landing/events/yahoo-finance/stock_price/
as_stream: true
sink:
table_name: brz_stock_prices
- name: slv_stock_prices
layer: SILVER
source:
node_name: brz_stock_prices
as_stream: true
sink:
table_name: slv_stock_prices
transformer:
nodes:
- with_column:
name: created_at
type: timestamp
sql_expr: data.created_at
- with_column:
name: symbol
sql_expr: data.symbol
- with_column:
name: close
type: double
sql_expr: data.close
- func_name: drop
func_args:
- value: data
- value: producer
- value: name
- value: description
- func_name: laktory.smart_join
func_kwargs:
'on':
- symbol
other:
node_name: slv_stock_meta
- name: slv_stock_meta
layer: SILVER
source:
path: /Volumes/dev/sources/landing/events/yahoo-finance/stock_meta/
'''
pl = models.Pipeline.model_validate_yaml(io.StringIO(pipeline_yaml))
References
Attributes¤
nodes_dict
property
¤
nodes_dict
Nodes dictionary whose keys are the node names.
RETURNS | DESCRIPTION |
---|---|
dict[str, PipelineNode]
|
Nodes |
dag
property
¤
dag
Networkx Directed Acyclic Graph representation of the pipeline. Useful to identify interdependencies between nodes.
RETURNS | DESCRIPTION |
---|---|
DiGraph
|
Directed Acyclic Graph |
sorted_nodes
property
¤
sorted_nodes
Topologically sorted nodes.
RETURNS | DESCRIPTION |
---|---|
list[PipelineNode]
|
List of Topologically sorted nodes. |
additional_core_resources
property
¤
additional_core_resources
- configuration workspace file
- configuration workspace file permissions
if orchestrator is DLT
:
- DLT Pipeline
if orchestrator is DATABRICKS_JOB
:
- Databricks Job
Functions¤
execute
¤
execute(spark=None, udfs=None, write_sinks=True, full_refresh=False)
Execute the pipeline (read sources and write sinks) by sequentially executing each node. The selected orchestrator might impact how data sources or sinks are processed.
PARAMETER | DESCRIPTION |
---|---|
spark |
Spark Session
DEFAULT:
|
udfs |
List of user-defined functions used in transformation chains.
DEFAULT:
|
write_sinks |
If
DEFAULT:
|
full_refresh |
If
TYPE:
|
Source code in laktory/models/pipeline.py
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 |
|
dag_figure
¤
dag_figure()
[UNDER DEVELOPMENT] Generate a figure representation of the pipeline DAG.
RETURNS | DESCRIPTION |
---|---|
Figure
|
Plotly figure representation of the pipeline. |
Source code in laktory/models/pipeline.py
656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 |
|
laktory.models.pipeline.PipelineDatabricksJob
¤
laktory.models.pipeline.PipelineUDF
¤
Bases: BaseModel
Pipeline User Define Function
ATTRIBUTE | DESCRIPTION |
---|---|
module_name |
Name of the module from which the function needs to be imported.
TYPE:
|
function_name |
Name of the function.
TYPE:
|
module_path |
Workspace filepath of the module, if not in the same directory as the pipeline notebook
TYPE:
|