Skip to content

PipelineNode

laktory.models.PipelineNode ¤

Bases: BaseModel

Pipeline base component generating a DataFrame by reading a data source and applying a transformer (chain of dataframe transformations). Optional output to a data sink. Some basic transformations are also natively supported.

ATTRIBUTE DESCRIPTION
add_layer_columns

If True and layer not None layer-specific columns like timestamps are added to the resulting DataFrame.

TYPE: bool

dlt_template

Specify which template (notebook) to use if pipeline is run with Databricks Delta Live Tables. If None default laktory template notebook is used.

TYPE: Union[str, None]

drop_duplicates

If True: - drop duplicated rows using primary_key if defined or all columns if not defined. If list of strings: - drop duplicated rows using drop_duplicates as the subset.

TYPE: Union[bool, list[str], None]

drop_source_columns

If True, drop columns from the source after read and only keep columns resulting from executing the SparkChain.

TYPE: Union[bool, None]

expectations

List of expectations for the DataFrame. Can be used as warnings, drop invalid records or fail a pipeline.

TYPE: list[PipelineNodeExpectation]

layer

Layer in the medallion architecture

TYPE: Literal['BRONZE', 'SILVER', 'GOLD']

name

Name given to the node. Required to reference a node in a data source.

TYPE: Union[str, None]

primary_key

Name of the column storing a unique identifier for each row. It is used by the node to drop duplicated rows.

TYPE: str

source

Definition of the data source

TYPE: DataSourcesUnion

sink

Definition of the data sink

TYPE: Union[DataSinksUnion, None]

transformer

Spark or Polars chain defining the data transformations applied to the data source.

TYPE: Union[SparkChain, PolarsChain, None]

timestamp_key

Name of the column storing a timestamp associated with each row. It is used as the default column by the builder when creating watermarks.

TYPE: str

Examples:

A node reading stock prices data from a CSV file and writing a DataFrame to disk as a parquet file.

import io
from laktory import models

node_yaml = '''
    name: brz_stock_prices
    layer: BRONZE
    source:
        format: CSV
        path: ./raw/brz_stock_prices.csv
    sink:
        format: PARQUET
        mode: OVERWRITE
        path: ./dataframes/brz_stock_prices
'''

node = models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))

# node.execute(spark)

A node reading stock prices from an upstream node and writing a DataFrame to a data table.

import io
from laktory import models

node_yaml = '''
    name: slv_stock_prices
    layer: SILVER
    source:
      node_name: brz_stock_prices
    sink:
      catalog_name: hive_metastore
      schema_name: default
      table_name: slv_stock_prices
    transformer:
      nodes:
      - with_column:
          name: created_at
          type: timestamp
          sql_expr: data.created_at
      - with_column:
          name: symbol
          sql_expr: data.symbol
      - with_column:
          name: close
          type: double
          sql_expr: data.close
'''

node = models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))

# node.execute(spark)

Attributes¤

is_orchestrator_dlt property ¤

is_orchestrator_dlt

If True, pipeline node is used in the context of a DLT pipeline

is_from_cdc property ¤

is_from_cdc

If True CDC source is used to build the table

apply_changes_kwargs property ¤

apply_changes_kwargs

Keyword arguments for dlt.apply_changes function

output_df property ¤

output_df

Node Dataframe after reading source and applying transformer.

Functions¤

get_sources ¤

get_sources(cls=BaseDataSource)

Get all sources feeding the pipeline node

Source code in laktory/models/pipelinenode.py
458
459
460
461
462
463
464
465
466
467
468
def get_sources(self, cls=BaseDataSource) -> list[BaseDataSource]:
    """Get all sources feeding the pipeline node"""
    sources = []

    if isinstance(self.source, cls):
        sources += [self.source]

    if self.transformer:
        sources += self.transformer.get_sources(cls)

    return sources

execute ¤

execute(apply_transformer=True, spark=None, udfs=None, write_sink=True, full_refresh=False)

Execute pipeline node by:

  • Reading the source
  • Applying the user defined (and layer-specific if applicable) transformations
  • Writing the sink
PARAMETER DESCRIPTION
apply_transformer

Flag to apply transformer in the execution

TYPE: bool DEFAULT: True

spark

Spark session

TYPE: SparkSession DEFAULT: None

udfs

User-defined functions

TYPE: list[Callable] DEFAULT: None

write_sink

Flag to include writing sink in the execution

TYPE: bool DEFAULT: True

full_refresh

If True dataframe will be completely re-processed by deleting existing data and checkpoint before processing.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
AnyDataFrame

output Spark DataFrame

Source code in laktory/models/pipelinenode.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
def execute(
    self,
    apply_transformer: bool = True,
    spark: SparkSession = None,
    udfs: list[Callable] = None,
    write_sink: bool = True,
    full_refresh: bool = False,
) -> AnyDataFrame:
    """
    Execute pipeline node by:

    - Reading the source
    - Applying the user defined (and layer-specific if applicable) transformations
    - Writing the sink

    Parameters
    ----------
    apply_transformer:
        Flag to apply transformer in the execution
    spark: SparkSession
        Spark session
    udfs:
        User-defined functions
    write_sink:
        Flag to include writing sink in the execution
    full_refresh:
        If `True` dataframe will be completely re-processed by deleting
        existing data and checkpoint before processing.

    Returns
    -------
    :
        output Spark DataFrame
    """
    logger.info(f"Executing pipeline node {self.name} ({self.layer})")

    # Parse DLT
    if self.is_orchestrator_dlt:
        logger.info("DLT orchestrator selected. Sinks writing will be skipped.")
        write_sink = False
        full_refresh = False

    # Refresh
    if full_refresh:
        if self.sink:
            self.sink.purge(spark=spark)

    # Read Source
    self._output_df = self.source.read(spark)

    # Save source
    self._source_columns = self._output_df.columns

    if self.source.is_cdc and not self.is_orchestrator_dlt:
        pass
        # TODO: Apply SCD transformations
        #       Best strategy is probably to build a spark dataframe function and add a node in the chain with
        #       that function
        # https://iterationinsights.com/article/how-to-implement-slowly-changing-dimensions-scd-type-2-using-delta-table
        # https://www.linkedin.com/pulse/implementing-slowly-changing-dimension-2-using-lau-johansson-yemxf/

    # Apply transformer
    if apply_transformer:

        if "spark" in str(type(self._output_df)).lower():
            dftype = "spark"
        elif "polars" in str(type(self._output_df)).lower():
            dftype = "polars"
        else:
            raise ValueError("DataFrame type not supported")

        # Set Transformer
        transformer = self.transformer
        if transformer is None:
            if dftype == "spark":
                transformer = SparkChain(nodes=[])
            elif dftype == "polars":
                transformer = PolarsChain(nodes=[])
        else:
            transformer = transformer.model_copy()

        # Add layer-specific chain nodes
        if dftype == "spark" and self.layer_spark_chain:
            transformer.nodes += self.layer_spark_chain.nodes
        elif dftype == "polars" and self.layer_polars_chain:
            transformer.nodes += self.layer_polars_chain.nodes

        if transformer.nodes:
            self._output_df = transformer.execute(self._output_df, udfs=udfs)

    # Output to sink
    if write_sink and self.sink:
        self.sink.write(self._output_df)

    return self._output_df