Skip to content

BaseDataSink

laktory.models.datasinks.basedatasink.BaseDataSink ¤

Bases: BaseModel, PipelineChild

Base class for building data sink

ATTRIBUTE DESCRIPTION
is_primary

A primary sink will be used to read data for downstream nodes when moving from stream to batch. Don't apply for quarantine sinks.

TYPE: bool

is_quarantine

Sink used to store quarantined results from node expectations.

TYPE: bool

merge_cdc_options

Merge options to handle input DataFrames that are Change Data Capture (CDC). Only used when merge mode is selected.

TYPE: DataSinkMergeCDCOptions

mode

Write mode. - overwrite: Overwrite existing data - append: Append contents of the dataframe to existing data - error: Throw and exception if data already exists - ignore: Silently ignore this operation if data already exists - complete: Overwrite for streaming dataframes - merge: Append, update and optionally delete records. Requires cdc specification.

TYPE: Union[Literal['OVERWRITE', 'APPEND', 'IGNORE', 'ERROR', 'COMPLETE', 'UPDATE', 'MERGE'], None]

write_options

Other options passed to spark.write.options

TYPE: dict[str, str]

METHOD DESCRIPTION
write

Write dataframe into sink.

purge

Delete sink data and checkpoints

read

Read dataframe from sink.

ATTRIBUTE DESCRIPTION
dlt_apply_changes_kwargs

Keyword arguments for dlt.apply_changes function

TYPE: dict[str, str]

Attributes¤

dlt_apply_changes_kwargs property ¤

dlt_apply_changes_kwargs

Keyword arguments for dlt.apply_changes function

Functions¤

write ¤

write(df=None, mode=None, spark=None, view_definition=None)

Write dataframe into sink.

PARAMETER DESCRIPTION
df

Input dataframe.

TYPE: AnyDataFrame DEFAULT: None

mode

Write mode overwrite of the sink default mode.

TYPE: str DEFAULT: None

spark

Spark Session for creating a view

DEFAULT: None

view_definition

View definition. Overwrites view definition defined in the sink.

TYPE: str DEFAULT: None

Source code in laktory/models/datasinks/basedatasink.py
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
def write(
    self,
    df: AnyDataFrame = None,
    mode: str = None,
    spark=None,
    view_definition: str = None,
) -> None:
    """
    Write dataframe into sink.

    Parameters
    ----------
    df:
        Input dataframe.
    mode:
        Write mode overwrite of the sink default mode.
    spark:
        Spark Session for creating a view
    view_definition:
        View definition. Overwrites view definition defined in the sink.

    Returns
    -------
    """
    _view_definition = view_definition
    if _view_definition is None:
        _view_definition = getattr(self, "parsed_view_definition", None)
        if _view_definition:
            _view_definition = _view_definition.parsed_expr(view=True)

    if _view_definition:
        if self.df_backend == "SPARK":
            if spark is None:
                raise ValueError(
                    "Spark session must be provided for creating a view."
                )
            self._write_spark_view(view_definition=_view_definition, spark=spark)
        else:
            raise ValueError(
                f"'{self.df_backend}' DataFrame backend is not supported for creating views"
            )

        logger.info("View created.")
        return

    if mode is None:
        mode = self.mode

    if is_spark_dataframe(df):
        self._write_spark(df=df, mode=mode)
    elif is_polars_dataframe(df=df):
        self._write_polars(df, mode=mode)
    else:
        raise ValueError(f"DataFrame type '{type(df)}' not supported")

    logger.info("Write completed.")

purge ¤

purge()

Delete sink data and checkpoints

Source code in laktory/models/datasinks/basedatasink.py
724
725
726
727
728
def purge(self):
    """
    Delete sink data and checkpoints
    """
    raise NotImplementedError()

read ¤

read(spark=None, as_stream=None)

Read dataframe from sink.

PARAMETER DESCRIPTION
spark

Spark Session

DEFAULT: None

as_stream

If True, dataframe read as stream.

DEFAULT: None

Source code in laktory/models/datasinks/basedatasink.py
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
def read(self, spark=None, as_stream=None):
    """
    Read dataframe from sink.

    Parameters
    ----------
    spark:
        Spark Session
    as_stream:
        If `True`, dataframe read as stream.

    Returns
    -------
    """
    return self.as_source(as_stream=as_stream).read(spark=spark)

laktory.models.datasinks.basedatasink.DataSinkMergeCDCOptions ¤

Bases: BaseModel

Options for merging a change data capture (CDC).

They are also used to build the target using apply_changes method when using Databricks DLT.

ATTRIBUTE DESCRIPTION
delete_where

Specifies when a CDC event should be treated as a DELETE rather than an upsert.

TYPE: str

end_at_column_name

When using SCD type 2, name of the column storing the end time (or sequencing index) during which a row is active. This attribute is not used when using Databricks DLT which does not allow column rename.

TYPE: str

exclude_columns

A subset of columns to exclude in the target table.

TYPE: list[str]

ignore_null_updates

Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and ignore_null_updates is True, columns with a null will retain their existing values in the target. This also applies to nested columns with a value of null. When ignore_null_updates is False, existing values will be overwritten with null values.

TYPE: bool

include_columns

A subset of columns to include in the target table. Use include_columns to specify the complete list of columns to include.

TYPE: list[str]

order_by

The column name specifying the logical order of CDC events in the source data. Used to handle change events that arrive out of order.

TYPE: str

primary_keys

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

TYPE: list[str]

scd_type

Whether to store records as SCD type 1 or SCD type 2.

TYPE: Literal[1, 2]

start_at_column_name

When using SCD type 2, name of the column storing the start time (or sequencing index) during which a row is active. This attribute is not used when using Databricks DLT which does not allow column rename.

TYPE: str

References
METHOD DESCRIPTION
execute

Merge source into target delta from sink

Functions¤

execute ¤

execute(source)

Merge source into target delta from sink

PARAMETER DESCRIPTION
source

Source DataFrame to merge into target (sink).

TYPE: SparkDataFrame

Source code in laktory/models/datasinks/basedatasink.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
def execute(self, source: SparkDataFrame):
    """
    Merge source into target delta from sink

    Parameters
    ----------
    source:
        Source DataFrame to merge into target (sink).
    """

    from delta.tables import DeltaTable

    self._source_schema = source.schema
    spark = source.sparkSession

    if self.target_path:
        if not DeltaTable.isDeltaTable(spark, self.target_path):
            self._init_target(source)
    else:
        try:
            spark.catalog.getTable(self.target_name)
        except Exception:
            self._init_target(source)

    if source.isStreaming:
        if self.sink is None:
            raise ValueError("Sink value required to fetch checkpoint location.")

        if self.sink and self.sink._checkpoint_location is None:
            raise ValueError(
                f"Checkpoint location not specified for sink '{self.sink}'"
            )

        query = (
            source.writeStream.foreachBatch(
                lambda batch_df, batch_id: self._execute(source=batch_df)
            )
            .trigger(availableNow=True)
            .options(
                checkpointLocation=self.sink._checkpoint_location,
            )
            .start()
        )
        query.awaitTermination()

    else:
        self._execute(source=source)

--

laktory.models.datasinks.DataSinksUnion module-attribute ¤

DataSinksUnion = Union[FileDataSink, TableDataSink]