Skip to content

BaseDataSink

laktory.models.datasinks.basedatasink.BaseDataSink ¤

Bases: BaseModel, PipelineChild

Base class for building data sink

ATTRIBUTE DESCRIPTION
is_primary

A primary sink will be used to read data for downstream nodes when moving from stream to batch. Don't apply for quarantine sinks.

TYPE: bool

is_quarantine

Sink used to store quarantined results from node expectations.

TYPE: bool

merge_cdc_options

Merge options to handle input DataFrames that are Change Data Capture (CDC). Only used when merge mode is selected.

TYPE: DataSinkMergeCDCOptions

mode

Write mode. - overwrite: Overwrite existing data - append: Append contents of the dataframe to existing data - error: Throw and exception if data already exists - ignore: Silently ignore this operation if data already exists - complete: Overwrite for streaming dataframes - merge: Append, update and optionally delete records. Requires cdc specification.

TYPE: Union[Literal['OVERWRITE', 'APPEND', 'IGNORE', 'ERROR', 'COMPLETE', 'UPDATE', 'MERGE'], None]

write_options

Other options passed to spark.write.options

TYPE: dict[str, str]

METHOD DESCRIPTION
write

Write dataframe into sink.

purge

Delete sink data and checkpoints

read

Read dataframe from sink.

ATTRIBUTE DESCRIPTION
dlt_apply_changes_kwargs

Keyword arguments for dlt.apply_changes function

TYPE: dict[str, str]

Attributes¤

dlt_apply_changes_kwargs property ¤

dlt_apply_changes_kwargs

Keyword arguments for dlt.apply_changes function

Functions¤

write ¤

write(df=None, mode=None, full_refresh=False, spark=None, view_definition=None)

Write dataframe into sink.

PARAMETER DESCRIPTION
df

Input dataframe.

TYPE: AnyDataFrame DEFAULT: None

mode

Write mode overwrite of the sink default mode.

TYPE: str DEFAULT: None

full_refresh

If True, tables are fully refreshed (re-built). Otherwise, only increments are processed.

TYPE: bool DEFAULT: False

spark

Spark Session for creating a view

DEFAULT: None

view_definition

View definition. Overwrites view definition defined in the sink.

TYPE: str DEFAULT: None

Source code in laktory/models/datasinks/basedatasink.py
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
def write(
    self,
    df: AnyDataFrame = None,
    mode: str = None,
    full_refresh: bool = False,
    spark=None,
    view_definition: str = None,
) -> None:
    """
    Write dataframe into sink.

    Parameters
    ----------
    df:
        Input dataframe.
    mode:
        Write mode overwrite of the sink default mode.
    full_refresh
        If `True`, tables are fully refreshed (re-built). Otherwise, only
        increments are processed.
    spark:
        Spark Session for creating a view
    view_definition:
        View definition. Overwrites view definition defined in the sink.

    Returns
    -------
    """
    _view_definition = view_definition
    if _view_definition is None:
        _view_definition = getattr(self, "parsed_view_definition", None)
        if _view_definition:
            _view_definition = _view_definition.parsed_expr(view=True)

    if _view_definition:
        if self.df_backend == "SPARK":
            if spark is None:
                raise ValueError(
                    "Spark session must be provided for creating a view."
                )
            self._write_spark_view(view_definition=_view_definition, spark=spark)
        else:
            raise ValueError(
                f"'{self.df_backend}' DataFrame backend is not supported for creating views"
            )

        logger.info("View created.")
        return

    if mode is None:
        mode = self.mode

    if is_spark_dataframe(df):
        self.dataframe_backend = "SPARK"
        self._write_spark(df=df, mode=mode, full_refresh=full_refresh)
    elif is_polars_dataframe(df=df):
        self.dataframe_backend = "POLARS"
        self._write_polars(df, mode=mode, full_refresh=full_refresh)
    else:
        raise ValueError(f"DataFrame type '{type(df)}' not supported")

    logger.info("Write completed.")

purge ¤

purge()

Delete sink data and checkpoints

Source code in laktory/models/datasinks/basedatasink.py
752
753
754
755
756
757
758
759
760
def purge(self):
    """
    Delete sink data and checkpoints
    """
    # TODO: Now that sink switch to overwrite when sink does not exists or when
    # a full refresh is requested, the purge method should not delete the data
    # by default, but only the checkpoints. Also consider truncating the table
    # instead of dropping it.
    raise NotImplementedError()

read ¤

read(spark=None, as_stream=None)

Read dataframe from sink.

PARAMETER DESCRIPTION
spark

Spark Session

DEFAULT: None

as_stream

If True, dataframe read as stream.

DEFAULT: None

Source code in laktory/models/datasinks/basedatasink.py
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
def read(self, spark=None, as_stream=None):
    """
    Read dataframe from sink.

    Parameters
    ----------
    spark:
        Spark Session
    as_stream:
        If `True`, dataframe read as stream.

    Returns
    -------
    """
    return self.as_source(as_stream=as_stream).read(spark=spark)

laktory.models.datasinks.basedatasink.DataSinkMergeCDCOptions ¤

Bases: BaseModel

Options for merging a change data capture (CDC).

They are also used to build the target using apply_changes method when using Databricks DLT.

ATTRIBUTE DESCRIPTION
delete_where

Specifies when a CDC event should be treated as a DELETE rather than an upsert.

TYPE: str

end_at_column_name

When using SCD type 2, name of the column storing the end time (or sequencing index) during which a row is active. This attribute is not used when using Databricks DLT which does not allow column rename.

TYPE: str

exclude_columns

A subset of columns to exclude in the target table.

TYPE: list[str]

ignore_null_updates

Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and ignore_null_updates is True, columns with a null will retain their existing values in the target. This also applies to nested columns with a value of null. When ignore_null_updates is False, existing values will be overwritten with null values.

TYPE: bool

include_columns

A subset of columns to include in the target table. Use include_columns to specify the complete list of columns to include.

TYPE: list[str]

order_by

The column name specifying the logical order of CDC events in the source data. Used to handle change events that arrive out of order.

TYPE: str

primary_keys

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

TYPE: list[str]

scd_type

Whether to store records as SCD type 1 or SCD type 2.

TYPE: Literal[1, 2]

start_at_column_name

When using SCD type 2, name of the column storing the start time (or sequencing index) during which a row is active. This attribute is not used when using Databricks DLT which does not allow column rename.

TYPE: str

References
METHOD DESCRIPTION
execute

Merge source into target delta from sink

Functions¤

execute ¤

execute(source)

Merge source into target delta from sink

PARAMETER DESCRIPTION
source

Source DataFrame to merge into target (sink).

TYPE: SparkDataFrame

Source code in laktory/models/datasinks/basedatasink.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
def execute(self, source: SparkDataFrame):
    """
    Merge source into target delta from sink

    Parameters
    ----------
    source:
        Source DataFrame to merge into target (sink).
    """

    from delta.tables import DeltaTable

    self._source_schema = source.schema
    spark = source.sparkSession

    if self.target_path:
        if not DeltaTable.isDeltaTable(spark, self.target_path):
            self._init_target(source)
    else:
        try:
            spark.catalog.getTable(self.target_name)
        except Exception:
            self._init_target(source)

    if source.isStreaming:
        if self.sink is None:
            raise ValueError("Sink value required to fetch checkpoint location.")

        if self.sink and self.sink._checkpoint_location is None:
            raise ValueError(
                f"Checkpoint location not specified for sink '{self.sink}'"
            )

        query = (
            source.writeStream.foreachBatch(
                lambda batch_df, batch_id: self._execute(source=batch_df)
            )
            .trigger(availableNow=True)
            .options(
                checkpointLocation=self.sink._checkpoint_location,
            )
            .start()
        )
        query.awaitTermination()

    else:
        self._execute(source=source)

--

laktory.models.datasinks.DataSinksUnion module-attribute ¤

DataSinksUnion = Union[FileDataSink, TableDataSink]