Skip to content

PipelineNode

laktory.models.PipelineNode ¤

Bases: BaseModel, PipelineChild

Pipeline base component generating a DataFrame by reading a data source and applying a transformer (chain of dataframe transformations). Optional output to a data sink. Some basic transformations are also natively supported.

ATTRIBUTE DESCRIPTION
add_layer_columns

If True and layer not None layer-specific columns like timestamps are added to the resulting DataFrame.

TYPE: bool

dlt_template

Specify which template (notebook) to use if pipeline is run with Databricks Delta Live Tables. If None default laktory template notebook is used.

TYPE: Union[str, None]

drop_duplicates

If True: - drop duplicated rows using primary_key if defined or all columns if not defined. If list of strings: - drop duplicated rows using drop_duplicates as the subset.

TYPE: Union[bool, list[str], None]

drop_source_columns

If True, drop columns from the source after read and only keep columns resulting from executing the SparkChain.

TYPE: Union[bool, None]

expectations

List of expectations for the DataFrame. Can be used as warnings, drop invalid records or fail a pipeline.

TYPE: list[DataQualityExpectation]

layer

Layer in the medallion architecture

TYPE: Literal['BRONZE', 'SILVER', 'GOLD']

name

Name given to the node. Required to reference a node in a data source.

TYPE: Union[str, None]

primary_keys

A list of column names that uniquely identify each row in the DataFrame. These columns are used to: - Document the uniqueness constraints of the node's output data. - Define the default subset for dropping duplicate rows if no explicit subset is provided in drop_duplicates. - Define the default primary keys for sinks CDC merge operations - Referenced in expectations and unit tests. While optional, specifying primary_keys helps enforce data integrity and ensures that downstream operations, such as deduplication, are consistent and reliable.

TYPE: list[str]

root_path

Location of the pipeline node root used to store logs, metrics and checkpoints.

TYPE: str

source

Definition of the data source

TYPE: DataSourcesUnion

sinks

Definition of the data sink(s). Set is_quarantine to True to store node quarantine DataFrame.

TYPE: list[DataSinksUnion]

transformer

Spark or Polars chain defining the data transformations applied to the data source.

TYPE: Union[SparkChain, PolarsChain, None]

timestamp_key

Name of the column storing a timestamp associated with each row. It is used as the default column by the builder when creating watermarks.

TYPE: str

Examples:

A node reading stock prices data from a CSV file and writing a DataFrame to disk as a parquet file.

import io
from laktory import models

node_yaml = '''
    name: brz_stock_prices
    layer: BRONZE
    source:
        format: CSV
        path: ./raw/brz_stock_prices.csv
    sinks:
    -   format: PARQUET
        mode: OVERWRITE
        path: ./dataframes/brz_stock_prices
'''

node = models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))

# node.execute(spark)

A node reading stock prices from an upstream node and writing a DataFrame to a data table.

import io
from laktory import models

node_yaml = '''
    name: slv_stock_prices
    layer: SILVER
    source:
      node_name: brz_stock_prices
    sinks:
    - catalog_name: hive_metastore
      schema_name: default
      table_name: slv_stock_prices
    transformer:
      nodes:
      - with_column:
          name: created_at
          type: timestamp
          expr: data.created_at
      - with_column:
          name: symbol
          expr: data.symbol
      - with_column:
          name: close
          type: double
          expr: data.close
'''

node = models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))

# node.execute(spark)
METHOD DESCRIPTION
push_df_backend

Need to push dataframe_backend which is required to differentiate between spark and polars transformer

execute

Execute pipeline node by:

check_expectations

Check expectations, raise errors, warnings where required and build

ATTRIBUTE DESCRIPTION
is_orchestrator_dlt

If True, pipeline node is used in the context of a DLT pipeline

TYPE: bool

stage_df

Dataframe resulting from reading source and applying transformer, before data quality checks are applied.

TYPE: AnyDataFrame

output_df

Dataframe resulting from reading source, applying transformer and dropping rows not meeting data quality

TYPE: AnyDataFrame

quarantine_df

DataFrame storing stage_df rows not meeting data quality expectations.

TYPE: AnyDataFrame

output_sinks

List of sinks writing the output DataFrame

TYPE: list[DataSinksUnion]

quarantine_sinks

List of sinks writing the quarantine DataFrame

TYPE: list[DataSinksUnion]

all_sinks

List of all sinks (output and quarantine).

sinks_count

Total number of sinks.

TYPE: int

has_output_sinks

True if node has at least one output sink.

TYPE: bool

has_sinks

True if node has at least one sink.

TYPE: bool

primary_sink

Primary output sink used as a source for downstream nodes.

TYPE: Union[DataSinksUnion, None]

upstream_node_names

Pipeline node names required to execute current node.

TYPE: list[str]

data_sources

Get all sources feeding the pipeline node

TYPE: list[BaseDataSource]

Attributes¤

is_orchestrator_dlt property ¤

is_orchestrator_dlt

If True, pipeline node is used in the context of a DLT pipeline

stage_df property ¤

stage_df

Dataframe resulting from reading source and applying transformer, before data quality checks are applied.

output_df property ¤

output_df

Dataframe resulting from reading source, applying transformer and dropping rows not meeting data quality expectations.

quarantine_df property ¤

quarantine_df

DataFrame storing stage_df rows not meeting data quality expectations.

output_sinks property ¤

output_sinks

List of sinks writing the output DataFrame

quarantine_sinks property ¤

quarantine_sinks

List of sinks writing the quarantine DataFrame

all_sinks property ¤

all_sinks

List of all sinks (output and quarantine).

sinks_count property ¤

sinks_count

Total number of sinks.

has_output_sinks property ¤

has_output_sinks

True if node has at least one output sink.

has_sinks property ¤

has_sinks

True if node has at least one sink.

primary_sink property ¤

primary_sink

Primary output sink used as a source for downstream nodes.

upstream_node_names property ¤

upstream_node_names

Pipeline node names required to execute current node.

data_sources property ¤

data_sources

Get all sources feeding the pipeline node

Functions¤

push_df_backend classmethod ¤

push_df_backend(data)

Need to push dataframe_backend which is required to differentiate between spark and polars transformer

Source code in laktory/models/pipeline/pipelinenode.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@model_validator(mode="before")
@classmethod
def push_df_backend(cls, data: Any) -> Any:
    """Need to push dataframe_backend which is required to differentiate between spark and polars transformer"""
    df_backend = data.get("dataframe_backend", None)
    if df_backend:
        if "source" in data.keys():
            if isinstance(data["source"], dict):
                data["source"]["dataframe_backend"] = data["source"].get(
                    "dataframe_backend", df_backend
                )
        if "transformer" in data.keys():
            if isinstance(data["transformer"], dict):
                data["transformer"]["dataframe_backend"] = data["transformer"].get(
                    "dataframe_backend", df_backend
                )

    return data

execute ¤

execute(apply_transformer=True, spark=None, udfs=None, write_sinks=True, full_refresh=False)

Execute pipeline node by:

  • Reading the source
  • Applying the user defined (and layer-specific if applicable) transformations
  • Checking expectations
  • Writing the sinks
PARAMETER DESCRIPTION
apply_transformer

Flag to apply transformer in the execution

TYPE: bool DEFAULT: True

spark

Spark session

TYPE: SparkSession DEFAULT: None

udfs

User-defined functions

TYPE: list[Callable] DEFAULT: None

write_sinks

Flag to include writing sink in the execution

TYPE: bool DEFAULT: True

full_refresh

If True dataframe will be completely re-processed by deleting existing data and checkpoint before processing.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
AnyDataFrame

output Spark DataFrame

Source code in laktory/models/pipeline/pipelinenode.py
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
def execute(
    self,
    apply_transformer: bool = True,
    spark: SparkSession = None,
    udfs: list[Callable] = None,
    write_sinks: bool = True,
    full_refresh: bool = False,
) -> AnyDataFrame:
    """
    Execute pipeline node by:

    - Reading the source
    - Applying the user defined (and layer-specific if applicable) transformations
    - Checking expectations
    - Writing the sinks

    Parameters
    ----------
    apply_transformer:
        Flag to apply transformer in the execution
    spark: SparkSession
        Spark session
    udfs:
        User-defined functions
    write_sinks:
        Flag to include writing sink in the execution
    full_refresh:
        If `True` dataframe will be completely re-processed by deleting
        existing data and checkpoint before processing.

    Returns
    -------
    :
        output Spark DataFrame
    """
    logger.info(f"Executing pipeline node {self.name}")

    # Parse DLT
    if self.is_orchestrator_dlt:
        logger.info("DLT orchestrator selected. Sinks writing will be skipped.")
        write_sinks = False
        full_refresh = False

    # Refresh
    if full_refresh:
        self.purge(spark)

    # Read Source
    self._stage_df = self.source.read(spark)

    # Save source
    self._source_columns = self._stage_df.columns

    # Apply transformer
    if apply_transformer:
        if self.is_view and self.transformer:
            self._view_definition = self.transformer.get_view_definition()

        if "spark" in str(type(self._stage_df)).lower():
            dftype = "spark"
        elif "polars" in str(type(self._stage_df)).lower():
            dftype = "polars"
        else:
            raise ValueError("DataFrame backend not supported")

        # Set Transformer
        transformer = self.transformer
        if transformer is None:
            if dftype == "spark":
                transformer = SparkChain(nodes=[])
            elif dftype == "polars":
                transformer = PolarsChain(nodes=[])
        else:
            transformer = transformer.model_copy()

        # Add layer-specific chain nodes
        if dftype == "spark" and self.layer_spark_chain:
            transformer.nodes += self.layer_spark_chain.nodes
        elif dftype == "polars" and self.layer_polars_chain:
            transformer.nodes += self.layer_polars_chain.nodes

        if transformer.nodes:
            self._stage_df = transformer.execute(self._stage_df, udfs=udfs)

    # Check expectations
    self.check_expectations()

    # Output and Quarantine to Sinks
    if write_sinks:
        for s in self.output_sinks:
            if self.is_view:
                s.write(view_definition=self._view_definition, spark=spark)
                self._output_df = s.as_source().read(spark=spark)
            else:
                s.write(self._output_df)
        if self._quarantine_df is not None:
            for s in self.quarantine_sinks:
                s.write(self._quarantine_df)

    return self._output_df

check_expectations ¤

check_expectations()

Check expectations, raise errors, warnings where required and build filtered and quarantine DataFrames.

Some actions have to be disabled when selected orchestrator is Databricks DLT:

  • Raising error on Failure when expectation is supported by DLT
  • Dropping rows when expectation is supported by DLT
Source code in laktory/models/pipeline/pipelinenode.py
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
def check_expectations(self):
    """
    Check expectations, raise errors, warnings where required and build
    filtered and quarantine DataFrames.

    Some actions have to be disabled when selected orchestrator is
    Databricks DLT:

    * Raising error on Failure when expectation is supported by DLT
    * Dropping rows when expectation is supported by DLT
    """

    # Data Quality Checks
    is_streaming = getattr(self._stage_df, "isStreaming", False)
    qfilter = None  # Quarantine filter
    kfilter = None  # Keep filter
    if not self.expectations:
        self._output_df = self._stage_df
        self._quarantine_df = None
        return

    logger.info("Checking Data Quality Expectations")

    def _batch_check(df, node):
        for e in node.expectations:
            is_dlt_managed = node.is_dlt_run and e.is_dlt_compatible

            # Run Check
            if not is_dlt_managed:
                e.run_check(
                    df,
                    raise_or_warn=True,
                    node=node,
                )

    def _stream_check(batch_df, batch_id, node):
        _batch_check(
            batch_df,
            node,
        )

    # Warn or Fail
    if is_streaming and self.is_dlt_run:
        # TODO: Enable when DLT supports foreachBatch (in case some expectations are not supported by DLT)
        pass

    elif is_streaming:
        if self._expectations_checkpoint_location is None:
            raise ValueError(
                f"Expectations Checkpoint not specified for node '{self.name}'"
            )
        query = (
            self._stage_df.writeStream.foreachBatch(
                lambda batch_df, batch_id: _stream_check(batch_df, batch_id, self)
            )
            .trigger(availableNow=True)
            .options(
                checkpointLocation=self._expectations_checkpoint_location,
            )
            .start()
        )
        query.awaitTermination()

    else:
        _batch_check(
            self._stage_df,
            self,
        )

    # Build Filters
    for e in self.expectations:
        is_dlt_managed = self.is_dlt_run and e.is_dlt_compatible

        # Update Keep Filter
        if not is_dlt_managed:
            _filter = e.keep_filter
            if _filter is not None:
                if kfilter is None:
                    kfilter = _filter
                else:
                    kfilter = kfilter & _filter

        # Update Quarantine Filter
        _filter = e.quarantine_filter
        if _filter is not None:
            if qfilter is None:
                qfilter = _filter
            else:
                qfilter = qfilter & _filter

    if qfilter is not None:
        logger.info("Building quarantine DataFrame")
        self._quarantine_df = self._stage_df.filter(qfilter)
    else:
        self._quarantine_df = self._stage_df.filter("False")

    if kfilter is not None:
        logger.info("Dropping invalid rows")
        self._output_df = self._stage_df.filter(kfilter)
    else:
        self._output_df = self._stage_df