Skip to content

Pipeline

laktory.models.Pipeline ¤

Bases: BaseModel, PulumiResource, TerraformResource

Pipeline model to manage a full-fledged data pipeline including reading from data sources, applying data transformations through Spark and outputting to data sinks.

A pipeline is composed of collections of nodes, each one defining its own source, transformations and optional sink. A node may be the source of another node.

A pipeline may be run manually by using python or the CLI, but it may also be deployed and scheduled using one of the supported orchestrators, such as a Databricks Delta Live Tables or job.

ATTRIBUTE DESCRIPTION
databricks_job

Defines the Databricks Job specifications when DATABRICKS_JOB is selected as the orchestrator.

TYPE: Union[PipelineDatabricksJob, None]

dlt

Defines the Delta Live Tables specifications when DLT is selected as the orchestrator.

TYPE: Union[DLTPipeline, None]

name

Name of the pipeline

TYPE: str

nodes

List of pipeline nodes. Each node defines a data source, a series of transformations and optionally a sink.

TYPE: list[Union[PipelineNode]]

orchestrator

Orchestrator used for scheduling and executing the pipeline. The selected option defines which resources are to be deployed. Supported options are: - DLT: When orchestrated through Databricks DLT, each pipeline node creates a DLT table (or view, if no sink is defined). Behind the scenes, PipelineNodeDataSource leverages native dlt read and read_stream functions to defined the interdependencies between the tables as in a standard DLT pipeline. This is the recommended orchestrator as it is the most feature rich. - DATABRICKS_JOB: When deployed through a Databricks Job, a task is created for each pipeline node and all the required dependencies are set automatically. If a given task (or pipeline node) uses a PipelineNodeDataSource as the source, the data will be read from the upstream node sink.

TYPE: Union[Literal['DLT', 'DATABRICKS_JOB'], None]

udfs

List of user defined functions provided to the transformer.

TYPE: list[PipelineUDF]

workspacefile

Workspace file used to store the JSON definition of the pipeline.

TYPE: PipelineWorkspaceFile

Examples:

This first example shows how to configure a simple pipeline with 2 nodes. Upon execution, raw data will be read from a CSV file and two DataFrames (bronze and silver) will be created and saved as parquet files. Notice how the first node is used as a data source for the second node.

import io
from laktory import models

pipeline_yaml = '''
    name: pl-stock-prices
    nodes:
    - name: brz_stock_prices
      layer: BRONZE
      source:
        format: CSV
        path: ./raw/brz_stock_prices.csv
      sink:
        format: PARQUET
        mode: OVERWRITE
        path: ./dataframes/brz_stock_prices

    - name: slv_stock_prices
      layer: SILVER
      source:
        node_name: brz_stock_prices
      sink:
        format: PARQUET
        mode: OVERWRITE
        path: ./dataframes/slv_stock_prices
      transformer:
        nodes:
        - with_column:
            name: created_at
            type: timestamp
            sql_expr: data.created_at
        - with_column:
            name: symbol
            sql_expr: data.symbol
        - with_column:
            name: close
            type: double
            sql_expr: data.close
        - func_name: drop
          func_args:
          - value: data
          - value: producer
          - value: name
          - value: description
'''

pl = models.Pipeline.model_validate_yaml(io.StringIO(pipeline_yaml))

# Execute pipeline
# pl.execute()

The next example defines a 3 nodes pipeline (1 bronze and 2 silvers) orchestrated with a Databricks Job. Notice how nodes are used as data sources not only for other nodes, but also for the other keyword argument of the smart join function (slv_stock_prices). Because we are using the DATABRICKS_JOB orchestrator, the job configuration must be declared. The tasks will be automatically created by the Pipeline model. Each task will execute a single node using the notebook referenced in databricks_job.notebook_path the content of this notebook should be similar to laktory.resources.notebooks.job_laktory.pl

import io
from laktory import models

pipeline_yaml = '''
    name: pl-stock-prices
    orchestrator: DATABRICKS_JOB
    databricks_job:
      name: job-pl-stock-prices
      laktory_version: 0.3.0
      notebook_path: /Workspace/.laktory/jobs/job_laktory_pl.py
      clusters:
        - name: node-cluster
          spark_version: 14.0.x-scala2.12
          node_type_id: Standard_DS3_v2

    nodes:
    - name: brz_stock_prices
      layer: BRONZE
      source:
        path: /Volumes/dev/sources/landing/events/yahoo-finance/stock_price/
      sink:
          path: /Volumes/dev/sources/landing/tables/dev_stock_prices/
          mode: OVERWRITE

    - name: slv_stock_prices
      layer: SILVER
      source:
        node_name: brz_stock_prices
      sink:
          path: /Volumes/dev/sources/landing/tables/slv_stock_prices/
          mode: OVERWRITE
      transformer:
        nodes:
        - with_column:
            name: created_at
            type: timestamp
            sql_expr: data.created_at
        - with_column:
            name: symbol
            sql_expr: data.symbol
        - with_column:
            name: close
            type: double
            sql_expr: data.close
        - func_name: drop
          func_args:
          - value: data
          - value: producer
          - value: name
          - value: description
        - func_name: laktory.smart_join
          func_kwargs:
            'on':
              - symbol
            other:
              node_name: slv_stock_meta

    - name: slv_stock_meta
      layer: SILVER
      source:
        path: /Volumes/dev/sources/landing/events/yahoo-finance/stock_meta/
      sink:
        path: /Volumes/dev/sources/landing/tables/slv_stock_meta/
        mode: OVERWRITE

'''
pl = models.Pipeline.model_validate_yaml(io.StringIO(pipeline_yaml))

Finally, we re-implement the previous pipeline, but with a few key differences:

  • Orchestrator is DLT instead of a DATABRICKS_JOB
  • Sinks are Unity Catalog tables instead of storage locations
  • Data is read as a stream in most nodes
  • slv_stock_meta is simply a DLT view since it does not have an associated sink.

We also need to provide some basic configuration for the DLT pipeline.

import io
from laktory import models

pipeline_yaml = '''
    name: pl-stock-prices
    orchestrator: DLT
    dlt:
        catalog: dev
        target: sandbox
        access_controls:
        - group_name: users
          permission_level: CAN_VIEW

    nodes:
    - name: brz_stock_prices
      layer: BRONZE
      source:
        path: /Volumes/dev/sources/landing/events/yahoo-finance/stock_price/
        as_stream: true
      sink:
        table_name: brz_stock_prices

    - name: slv_stock_prices
      layer: SILVER
      source:
        node_name: brz_stock_prices
        as_stream: true
      sink:
        table_name: slv_stock_prices
      transformer:
        nodes:
        - with_column:
            name: created_at
            type: timestamp
            sql_expr: data.created_at
        - with_column:
            name: symbol
            sql_expr: data.symbol
        - with_column:
            name: close
            type: double
            sql_expr: data.close
        - func_name: drop
          func_args:
          - value: data
          - value: producer
          - value: name
          - value: description
        - func_name: laktory.smart_join
          func_kwargs:
            'on':
              - symbol
            other:
              node_name: slv_stock_meta

    - name: slv_stock_meta
      layer: SILVER
      source:
        path: /Volumes/dev/sources/landing/events/yahoo-finance/stock_meta/

'''
pl = models.Pipeline.model_validate_yaml(io.StringIO(pipeline_yaml))
References

Attributes¤

is_orchestrator_dlt property ¤

is_orchestrator_dlt

If True, pipeline orchestrator is DLT

nodes_dict property ¤

nodes_dict

Nodes dictionary whose keys are the node names.

RETURNS DESCRIPTION
dict[str, PipelineNode]

Nodes

dag property ¤

dag

Networkx Directed Acyclic Graph representation of the pipeline. Useful to identify interdependencies between nodes.

RETURNS DESCRIPTION
DiGraph

Directed Acyclic Graph

sorted_nodes property ¤

sorted_nodes

Topologically sorted nodes.

RETURNS DESCRIPTION
list[PipelineNode]

List of Topologically sorted nodes.

resource_type_id property ¤

resource_type_id

pl

additional_core_resources property ¤

additional_core_resources
  • configuration workspace file
  • configuration workspace file permissions

if orchestrator is DLT:

  • DLT Pipeline

if orchestrator is DATABRICKS_JOB:

  • Databricks Job

Functions¤

execute ¤

execute(spark=None, udfs=None, write_sinks=True, full_refresh=False)

Execute the pipeline (read sources and write sinks) by sequentially executing each node. The selected orchestrator might impact how data sources or sinks are processed.

PARAMETER DESCRIPTION
spark

Spark Session

DEFAULT: None

udfs

List of user-defined functions used in transformation chains.

DEFAULT: None

write_sinks

If False writing of node sinks will be skipped

DEFAULT: True

full_refresh

If True all nodes will be completely re-processed by deleting existing data and checkpoints before processing.

TYPE: bool DEFAULT: False

Source code in laktory/models/pipeline.py
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
def execute(
    self, spark=None, udfs=None, write_sinks=True, full_refresh: bool = False
) -> None:
    """
    Execute the pipeline (read sources and write sinks) by sequentially
    executing each node. The selected orchestrator might impact how
    data sources or sinks are processed.

    Parameters
    ----------
    spark:
        Spark Session
    udfs:
        List of user-defined functions used in transformation chains.
    write_sinks:
        If `False` writing of node sinks will be skipped
    full_refresh:
        If `True` all nodes will be completely re-processed by deleting
        existing data and checkpoints before processing.
    """
    logger.info("Executing Pipeline")

    for inode, node in enumerate(self.sorted_nodes):
        node.execute(
            spark=spark,
            udfs=udfs,
            write_sink=write_sinks,
            full_refresh=full_refresh,
        )

dag_figure ¤

dag_figure()

[UNDER DEVELOPMENT] Generate a figure representation of the pipeline DAG.

RETURNS DESCRIPTION
Figure

Plotly figure representation of the pipeline.

Source code in laktory/models/pipeline.py
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
def dag_figure(self) -> Figure:
    """
    [UNDER DEVELOPMENT] Generate a figure representation of the pipeline
    DAG.

    Returns
    -------
    :
        Plotly figure representation of the pipeline.
    """
    import plotly.graph_objs as go

    dag = self.dag
    pos = nx.spring_layout(dag)

    # ------------------------------------------------------------------- #
    # Edges                                                               #
    # ------------------------------------------------------------------- #

    edge_traces = []
    for e in dag.edges:
        edge_traces += [
            go.Scatter(
                x=[pos[e[0]][0], pos[e[1]][0]],
                y=[pos[e[0]][1], pos[e[1]][1]],
                line={
                    "color": "#a006b1",
                },
                marker=dict(
                    symbol="arrow-bar-up",
                    angleref="previous",
                    size=30,
                ),
                mode="lines+markers",
                hoverinfo="none",
                showlegend=False,
            )
        ]

    # ------------------------------------------------------------------- #
    # Nodes                                                               #
    # ------------------------------------------------------------------- #

    nodes_trace = go.Scatter(
        x=[_p[0] for _p in pos.values()],
        y=[_p[1] for _p in pos.values()],
        text=list(dag.nodes),
        name="pl nodes",
        mode="markers+text",
        marker=dict(
            size=50,
            color="#06d49e",
            line=dict(
                width=2,
                color="#dff2ed",
            ),
        ),
        textfont=dict(
            color="#a006b1",
        ),
    )

    return go.Figure(data=[nodes_trace] + edge_traces)

laktory.models.pipeline.PipelineDatabricksJob ¤

Bases: Job

Databricks job specifically designed to run a Laktory pipeline

ATTRIBUTE DESCRIPTION
laktory_version

Laktory version to use in the notebook tasks

TYPE: Union[str, None]

notebook_path

Path for the notebook. If None, default path for laktory job notebooks is used.

TYPE: Union[str, None]


laktory.models.pipeline.PipelineUDF ¤

Bases: BaseModel

Pipeline User Define Function

ATTRIBUTE DESCRIPTION
module_name

Name of the module from which the function needs to be imported.

TYPE: str

function_name

Name of the function.

TYPE: str

module_path

Workspace filepath of the module, if not in the same directory as the pipeline notebook

TYPE: str