Skip to content

PipelineNode

laktory.models.PipelineNode ¤

Bases: BaseModel, PipelineChild

Pipeline base component generating a DataFrame by reading a data source and applying a transformer (chain of dataframe transformations). Optional output to a data sink.

PARAMETER DESCRIPTION
dataframe_backend

Type of DataFrame backend

TYPE: DataFrameBackends DEFAULT: None

dataframe_api

DataFrame API to use in DataFrame Transformer nodes. Either 'NATIVE' (backend-specific) or 'NARWHALS' (backend-agnostic).

TYPE: Literal[NARWHALS, NATIVE] | VariableType DEFAULT: None

variables

Dict of variables to be injected in the model at runtime

TYPE: dict[str, Any] DEFAULT: {}

dlt_template

Specify which template (notebook) to use when Databricks pipeline is selected as the orchestrator.

TYPE: str | None | VariableType DEFAULT: 'DEFAULT'

comment

Comment for the associated table or view

TYPE: str | VariableType DEFAULT: None

expectations

List of data expectations. Can trigger warnings, drop invalid records or fail a pipeline.

TYPE: list[Union[DataQualityExpectation, VariableType]] | VariableType DEFAULT: []

expectations_checkpoint_path

Path to which the checkpoint file for which expectations on a streaming dataframe should be written.

TYPE: str | VariableType DEFAULT: None

name

Name given to the node.

TYPE: str | VariableType

primary_keys

A list of column names that uniquely identify each row in the DataFrame. These columns are used to: - Document the uniqueness constraints of the node's output data. - Define the default primary keys for sinks CDC merge operations - Referenced in expectations and unit tests. While optional, specifying primary_keys helps enforce data integrity and ensures that downstream operations, such as deduplication, are consistent and reliable.

TYPE: list[Union[str, VariableType]] | VariableType DEFAULT: None

source

Definition of the data source(s)

TYPE: FileDataSource | UnityCatalogDataSource | HiveMetastoreDataSource | DataFrameDataSource | PipelineNodeDataSource | None | VariableType DEFAULT: None

sinks

Definition of the data sink(s). Set is_quarantine to True to store node quarantine DataFrame.

TYPE: list[Union[PipelineViewDataSink, FileDataSink, UnityCatalogDataSink, HiveMetastoreDataSink, VariableType]] | VariableType DEFAULT: None

transformer

Data transformations applied between the source and the sink(s).

TYPE: DataFrameTransformer | VariableType DEFAULT: None

root_path

Location of the pipeline node root used to store logs, metrics and checkpoints.

TYPE: str | VariableType DEFAULT: None

Examples:

A node reading stock prices data from a CSV file and writing a DataFrame as a parquet file.

import io

import laktory as lk

node_yaml = '''
    name: brz_stock_prices
    source:
      path: "./events/stock_prices/"
      format: JSON
    sinks:
    - path: ./tables/brz_stock_prices/
      format: PARQUET
'''

node = lk.models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))

# node.execute()

A node reading stock prices from an upstream node and writing a DataFrame to a data table.

import io

import laktory as lk

node_yaml = '''
    name: slv_stock_prices
    source:
      node_name: brz_stock_prices
    sinks:
    - schema_name: finance
      table_name: slv_stock_prices
    transformer:
      nodes:
      - expr: |
            SELECT
              data.created_at AS created_at,
              data.symbol AS symbol,
              data.open AS open,
              data.close AS close,
              data.high AS high,
              data.low AS low,
              data.volume AS volume
            FROM
              {df}
      - func_name: drop_duplicates
        func_kwargs:
          subset:
            - symbol
            - timestamp
'''

node = lk.models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))

# node.execute()
References
METHOD DESCRIPTION
check_expectations

Check expectations, raise errors, warnings where required and build

execute

Execute pipeline node by:

inject_vars

Inject model variables values into a model attributes.

inject_vars_into_dump

Inject model variables values into a model dump.

model_validate_json_file

Load model from json file object

model_validate_yaml

Load model from yaml file object using laktory.yaml.RecursiveLoader. Supports

push_vars

Push variable values to all child recursively

validate_assignment_disabled

Updating a model attribute inside a model validator when validate_assignment

ATTRIBUTE DESCRIPTION
all_sinks

List of all sinks (output and quarantine).

data_sources

Get all sources feeding the pipeline node

TYPE: list[BaseDataSource]

has_output_sinks

True if node has at least one output sink.

TYPE: bool

has_sinks

True if node has at least one sink.

TYPE: bool

is_orchestrator_dlt

If True, pipeline node is used in the context of a DLT pipeline

TYPE: bool

output_df

Dataframe resulting from reading source, applying transformer and dropping rows not meeting data quality

TYPE: AnyFrame

output_sinks

List of sinks writing the output DataFrame

TYPE: list[DataSinksUnion]

primary_sink

Primary output sink used as a source for downstream nodes.

TYPE: DataSinksUnion | None

quarantine_df

DataFrame storing stage_df rows not meeting data quality expectations.

TYPE: AnyFrame

quarantine_sinks

List of sinks writing the quarantine DataFrame

TYPE: list[DataSinksUnion]

sinks_count

Total number of sinks.

TYPE: int

stage_df

Dataframe resulting from reading source and applying transformer, before data quality checks are applied.

TYPE: AnyFrame

upstream_node_names

Pipeline node names required to execute current node.

TYPE: list[str]

all_sinks property ¤

List of all sinks (output and quarantine).

data_sources property ¤

Get all sources feeding the pipeline node

has_output_sinks property ¤

True if node has at least one output sink.

has_sinks property ¤

True if node has at least one sink.

is_orchestrator_dlt property ¤

If True, pipeline node is used in the context of a DLT pipeline

output_df property ¤

Dataframe resulting from reading source, applying transformer and dropping rows not meeting data quality expectations.

output_sinks property ¤

List of sinks writing the output DataFrame

primary_sink property ¤

Primary output sink used as a source for downstream nodes.

quarantine_df property ¤

DataFrame storing stage_df rows not meeting data quality expectations.

quarantine_sinks property ¤

List of sinks writing the quarantine DataFrame

sinks_count property ¤

Total number of sinks.

stage_df property ¤

Dataframe resulting from reading source and applying transformer, before data quality checks are applied.

upstream_node_names property ¤

Pipeline node names required to execute current node.

check_expectations() ¤

Check expectations, raise errors, warnings where required and build filtered and quarantine DataFrames.

Some actions have to be disabled when selected orchestrator is Databricks DLT:

  • Raising error on Failure when expectation is supported by DLT
  • Dropping rows when expectation is supported by DLT
Source code in laktory/models/pipeline/pipelinenode.py
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
def check_expectations(self):
    """
    Check expectations, raise errors, warnings where required and build
    filtered and quarantine DataFrames.

    Some actions have to be disabled when selected orchestrator is
    Databricks DLT:

    * Raising error on Failure when expectation is supported by DLT
    * Dropping rows when expectation is supported by DLT
    """

    # Data Quality Checks
    qfilter = None  # Quarantine filter
    kfilter = None  # Keep filter
    if self._stage_df is None:
        # Node without source or transformer
        return
    is_streaming = getattr(nw.to_native(self._stage_df), "isStreaming", False)
    if not self.expectations:
        return

    def _batch_check(df, node):
        for e in node.expectations:
            # Run Check: this only warn or raise exceptions.
            if not e.is_dlt_managed:
                e.run_check(
                    df,
                    raise_or_warn=True,
                    node=node,
                )

    def _stream_check(batch_df, batch_id, node):
        _batch_check(
            batch_df,
            node,
        )

    logger.info("Checking Data Quality Expectations")

    if not is_streaming:
        _batch_check(
            self._stage_df,
            self,
        )

    else:
        skip = False

        if self.is_dlt_execute:
            names = []
            for e in self.expectations:
                if not e.is_dlt_compatible:
                    names += [e.name]
            if names:
                raise TypeError(
                    f"Expectations {names} are not natively supported by DLT and can't be computed on a streaming DataFrame with DLT executor."
                )

            skip = True

        backend = DataFrameBackends.from_df(self._stage_df)
        if backend not in STREAMING_BACKENDS:
            raise TypeError(
                f"DataFrame backend {backend} is not supported for streaming operations"
            )

        if self._expectations_checkpoint_path is None:
            raise ValueError(
                f"Expectations Checkpoint not specified for node '{self.name}'"
            )

        # TODO: Refactor for backend other than spark
        if not skip:
            query = (
                self._stage_df.to_native()
                .writeStream.foreachBatch(
                    lambda batch_df, batch_id: _stream_check(
                        nw.from_native(batch_df), batch_id, self
                    )
                )
                .trigger(availableNow=True)
                .options(
                    checkpointLocation=self._expectations_checkpoint_path,
                )
                .start()
            )
            query.awaitTermination()

    # Build Filters
    for e in self.expectations:
        # Update Keep Filter
        if not e.is_dlt_managed:
            _filter = e.keep_filter
            if _filter is not None:
                if kfilter is None:
                    kfilter = _filter
                else:
                    kfilter = kfilter & _filter

        # Update Quarantine Filter
        _filter = e.quarantine_filter
        if _filter is not None:
            if qfilter is None:
                qfilter = _filter
            else:
                qfilter = qfilter & _filter

    if qfilter is not None:
        logger.info("Building quarantine DataFrame")
        self._quarantine_df = self._stage_df.filter(qfilter)
    else:
        self._quarantine_df = self._stage_df  # .filter("False")

    if kfilter is not None:
        logger.info("Dropping invalid rows")
        self._output_df = self._stage_df.filter(kfilter)
    else:
        self._output_df = self._stage_df

execute(apply_transformer=True, write_sinks=True, full_refresh=False, named_dfs=None) ¤

Execute pipeline node by:

  • Reading the source
  • Applying the user defined (and layer-specific if applicable) transformations
  • Checking expectations
  • Writing the sinks
PARAMETER DESCRIPTION
apply_transformer

Flag to apply transformer in the execution

TYPE: bool DEFAULT: True

write_sinks

Flag to include writing sink in the execution

TYPE: bool DEFAULT: True

full_refresh

If True dataframe will be completely re-processed by deleting existing data and checkpoint before processing.

TYPE: bool DEFAULT: False

named_dfs

Named DataFrame passed to transformer nodes

TYPE: dict[str, AnyFrame] DEFAULT: None

RETURNS DESCRIPTION
AnyFrame

output Spark DataFrame

Source code in laktory/models/pipeline/pipelinenode.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
def execute(
    self,
    apply_transformer: bool = True,
    write_sinks: bool = True,
    full_refresh: bool = False,
    named_dfs: dict[str, AnyFrame] = None,
) -> AnyFrame:
    """
    Execute pipeline node by:

    - Reading the source
    - Applying the user defined (and layer-specific if applicable) transformations
    - Checking expectations
    - Writing the sinks

    Parameters
    ----------
    apply_transformer:
        Flag to apply transformer in the execution
    write_sinks:
        Flag to include writing sink in the execution
    full_refresh:
        If `True` dataframe will be completely re-processed by deleting
        existing data and checkpoint before processing.
    named_dfs:
        Named DataFrame passed to transformer nodes

    Returns
    -------
    :
        output Spark DataFrame
    """
    logger.info(f"Executing pipeline node {self.name}")

    # Install dependencies
    pl = self.parent_pipeline
    if pl and not pl._imports_imported:
        for package_name in pl._imports:
            try:
                logger.info(f"Importing {package_name}")
                importlib.import_module(package_name)
            except ModuleNotFoundError:
                logger.info(f"Importing {package_name} failed.")
        pl._imports_imported = True

    # Parse DLT
    if self.is_orchestrator_dlt:
        logger.info("DLT orchestrator selected. Sinks writing will be skipped.")
        write_sinks = False
        full_refresh = False

    # Refresh
    if full_refresh:
        self.purge()

    # Read Source
    self._stage_df = None
    if self.source:
        self._stage_df = self.source.read()

    # Apply transformer
    if named_dfs is None:
        named_dfs = {}
    if apply_transformer and self.transformer:
        self._stage_df = self.transformer.execute(
            self._stage_df, named_dfs=named_dfs
        )

    # Check expectations
    self._output_df = self._stage_df
    self._quarantine_df = None
    self.check_expectations()

    # Output and Quarantine to Sinks
    if write_sinks:
        for s in self.output_sinks:
            if self.is_view:
                s.write()
                self._output_df = s.as_source().read()
            else:
                s.write(self._output_df, full_refresh=full_refresh)

        if self._quarantine_df is not None:
            for s in self.quarantine_sinks:
                s.write(self._quarantine_df, full_refresh=full_refresh)

    return self._output_df

inject_vars(inplace=False, vars=None) ¤

Inject model variables values into a model attributes.

PARAMETER DESCRIPTION
inplace

If True model is modified in place. Otherwise, a new model instance is returned.

TYPE: bool DEFAULT: False

vars

A dictionary of variables to be injected in addition to the model internal variables.

TYPE: dict DEFAULT: None

RETURNS DESCRIPTION

Model instance.

Examples:

from typing import Union

from laktory import models


class Cluster(models.BaseModel):
    name: str = None
    size: Union[int, str] = None


c = Cluster(
    name="cluster-${vars.my_cluster}",
    size="${{ 4 if vars.env == 'prod' else 2 }}",
    variables={
        "env": "dev",
    },
).inject_vars()
print(c)
# > variables={'env': 'dev'} name='cluster-${vars.my_cluster}' size=2
References
Source code in laktory/models/basemodel.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def inject_vars(self, inplace: bool = False, vars: dict = None):
    """
    Inject model variables values into a model attributes.

    Parameters
    ----------
    inplace:
        If `True` model is modified in place. Otherwise, a new model
        instance is returned.
    vars:
        A dictionary of variables to be injected in addition to the
        model internal variables.


    Returns
    -------
    :
        Model instance.

    Examples
    --------
    ```py
    from typing import Union

    from laktory import models


    class Cluster(models.BaseModel):
        name: str = None
        size: Union[int, str] = None


    c = Cluster(
        name="cluster-${vars.my_cluster}",
        size="${{ 4 if vars.env == 'prod' else 2 }}",
        variables={
            "env": "dev",
        },
    ).inject_vars()
    print(c)
    # > variables={'env': 'dev'} name='cluster-${vars.my_cluster}' size=2
    ```

    References
    ----------
    * [variables](https://www.laktory.ai/concepts/variables/)
    """

    # Fetching vars
    if vars is None:
        vars = {}
    vars = deepcopy(vars)
    vars.update(self.variables)

    # Create copy
    if not inplace:
        self = self.model_copy(deep=True)

    # Inject into field values
    for k in list(self.model_fields_set):
        if k == "variables":
            continue
        o = getattr(self, k)

        if isinstance(o, BaseModel) or isinstance(o, dict) or isinstance(o, list):
            # Mutable objects will be updated in place
            _resolve_values(o, vars)
        else:
            # Simple objects must be updated explicitly
            setattr(self, k, _resolve_value(o, vars))

    # Inject into child resources
    if hasattr(self, "core_resources"):
        for r in self.core_resources:
            if r == self:
                continue
            r.inject_vars(vars=vars, inplace=True)

    if not inplace:
        return self

inject_vars_into_dump(dump, inplace=False, vars=None) ¤

Inject model variables values into a model dump.

PARAMETER DESCRIPTION
dump

Model dump (or any other general purpose mutable object)

TYPE: dict[str, Any]

inplace

If True model is modified in place. Otherwise, a new model instance is returned.

TYPE: bool DEFAULT: False

vars

A dictionary of variables to be injected in addition to the model internal variables.

TYPE: dict[str, Any] DEFAULT: None

RETURNS DESCRIPTION

Model dump with injected variables.

Examples:

from laktory import models

m = models.BaseModel(
    variables={
        "env": "dev",
    },
)
data = {
    "name": "cluster-${vars.my_cluster}",
    "size": "${{ 4 if vars.env == 'prod' else 2 }}",
}
print(m.inject_vars_into_dump(data))
# > {'name': 'cluster-${vars.my_cluster}', 'size': 2}
References
Source code in laktory/models/basemodel.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def inject_vars_into_dump(
    self, dump: dict[str, Any], inplace: bool = False, vars: dict[str, Any] = None
):
    """
    Inject model variables values into a model dump.

    Parameters
    ----------
    dump:
        Model dump (or any other general purpose mutable object)
    inplace:
        If `True` model is modified in place. Otherwise, a new model
        instance is returned.
    vars:
        A dictionary of variables to be injected in addition to the
        model internal variables.


    Returns
    -------
    :
        Model dump with injected variables.


    Examples
    --------
    ```py
    from laktory import models

    m = models.BaseModel(
        variables={
            "env": "dev",
        },
    )
    data = {
        "name": "cluster-${vars.my_cluster}",
        "size": "${{ 4 if vars.env == 'prod' else 2 }}",
    }
    print(m.inject_vars_into_dump(data))
    # > {'name': 'cluster-${vars.my_cluster}', 'size': 2}
    ```

    References
    ----------
    * [variables](https://www.laktory.ai/concepts/variables/)
    """

    # Setting vars
    if vars is None:
        vars = {}
    vars = deepcopy(vars)
    vars.update(self.variables)

    # Create copy
    if not inplace:
        dump = copy.deepcopy(dump)

    # Inject into field values
    _resolve_values(dump, vars)

    if not inplace:
        return dump

model_validate_json_file(fp) classmethod ¤

Load model from json file object

PARAMETER DESCRIPTION
fp

file object structured as a json file

TYPE: TextIO

RETURNS DESCRIPTION
Model

Model instance

Source code in laktory/models/basemodel.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
@classmethod
def model_validate_json_file(cls: Type[Model], fp: TextIO) -> Model:
    """
    Load model from json file object

    Parameters
    ----------
    fp:
        file object structured as a json file

    Returns
    -------
    :
        Model instance
    """
    data = json.load(fp)
    return cls.model_validate(data)

model_validate_yaml(fp) classmethod ¤

Load model from yaml file object using laktory.yaml.RecursiveLoader. Supports reference to external yaml and sql files using !use, !extend and !update tags. Path to external files can be defined using model or environment variables.

Referenced path should always be relative to the file they are referenced from.

Custom Tags
  • !use {filepath}: Directly inject the content of the file at filepath

  • - !extend {filepath}: Extend the current list with the elements found in the file at filepath. Similar to python list.extend method.

  • <<: !update {filepath}: Merge the current dictionary with the content of the dictionary defined at filepath. Similar to python dict.update method.

PARAMETER DESCRIPTION
fp

file object structured as a yaml file

TYPE: TextIO

RETURNS DESCRIPTION
Model

Model instance

Examples:

businesses:
  apple:
    symbol: aapl
    address: !use addresses.yaml
    <<: !update common.yaml
    emails:
      - jane.doe@apple.com
      - extend! emails.yaml
  amazon:
    symbol: amzn
    address: !use addresses.yaml
    <<: update! common.yaml
    emails:
      - john.doe@amazon.com
      - extend! emails.yaml
Source code in laktory/models/basemodel.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
@classmethod
def model_validate_yaml(cls: Type[Model], fp: TextIO) -> Model:
    """
    Load model from yaml file object using laktory.yaml.RecursiveLoader. Supports
    reference to external yaml and sql files using `!use`, `!extend` and `!update` tags.
    Path to external files can be defined using model or environment variables.

    Referenced path should always be relative to the file they are referenced from.

    Custom Tags
    -----------
    - `!use {filepath}`:
        Directly inject the content of the file at `filepath`

    - `- !extend {filepath}`:
        Extend the current list with the elements found in the file at `filepath`.
        Similar to python list.extend method.

    - `<<: !update {filepath}`:
        Merge the current dictionary with the content of the dictionary defined at
        `filepath`. Similar to python dict.update method.

    Parameters
    ----------
    fp:
        file object structured as a yaml file

    Returns
    -------
    :
        Model instance

    Examples
    --------
    ```yaml
    businesses:
      apple:
        symbol: aapl
        address: !use addresses.yaml
        <<: !update common.yaml
        emails:
          - jane.doe@apple.com
          - extend! emails.yaml
      amazon:
        symbol: amzn
        address: !use addresses.yaml
        <<: update! common.yaml
        emails:
          - john.doe@amazon.com
          - extend! emails.yaml
    ```
    """

    data = RecursiveLoader.load(fp)
    return cls.model_validate(data)

push_vars(update_core_resources=False) ¤

Push variable values to all child recursively

Source code in laktory/models/basemodel.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def push_vars(self, update_core_resources=False) -> Any:
    """Push variable values to all child recursively"""

    def _update_model(m):
        if not isinstance(m, BaseModel):
            return
        for k, v in self.variables.items():
            m.variables[k] = m.variables.get(k, v)
        m.push_vars()

    def _push_vars(o):
        if isinstance(o, list):
            for _o in o:
                _push_vars(_o)
        elif isinstance(o, dict):
            for _o in o.values():
                _push_vars(_o)
        else:
            _update_model(o)

    for k in self.model_fields.keys():
        _push_vars(getattr(self, k))

    if update_core_resources and hasattr(self, "core_resources"):
        for r in self.core_resources:
            if r != self:
                _push_vars(r)

    return None

validate_assignment_disabled() ¤

Updating a model attribute inside a model validator when validate_assignment is True causes an infinite recursion by design and must be turned off temporarily.

Source code in laktory/models/basemodel.py
323
324
325
326
327
328
329
330
331
332
333
334
335
@contextmanager
def validate_assignment_disabled(self):
    """
    Updating a model attribute inside a model validator when `validate_assignment`
    is `True` causes an infinite recursion by design and must be turned off
    temporarily.
    """
    original_state = self.model_config["validate_assignment"]
    self.model_config["validate_assignment"] = False
    try:
        yield
    finally:
        self.model_config["validate_assignment"] = original_state