Skip to content

BaseDataSource

laktory.models.datasources.basedatasource.BaseDataSource ¤

Bases: BaseModel

Base class for building data source

ATTRIBUTE DESCRIPTION
as_stream

If Truesource is read as a streaming DataFrame.

TYPE: bool

broadcast

If True DataFrame is broadcasted

TYPE: Union[bool, None]

cdc

Change data capture specifications

TYPE: Union[DataSourceCDC, None]

dataframe_type

Type of dataframe

TYPE: Literal['SPARK', 'POLARS']

drops

List of columns to drop

TYPE: Union[list, None]

filter

SQL expression used to select specific rows from the source table

TYPE: Union[str, None]

renames

Mapping between the source table column names and new column names

TYPE: Union[dict[str, str], None]

selects

Columns to select from the source table. Can be specified as a list or as a dictionary to rename the source columns

TYPE: Union[list[str], dict[str, str], None]

watermark

Spark structured streaming watermark specifications

TYPE: Union[Watermark, None]

Attributes¤

is_cdc property ¤

is_cdc

If True source data is a change data capture (CDC)

is_orchestrator_dlt property ¤

is_orchestrator_dlt

If True, data source is used in the context of a DLT pipeline

Functions¤

read ¤

read(spark=None)

Read data with options specified in attributes.

PARAMETER DESCRIPTION
spark

Spark context

DEFAULT: None

RETURNS DESCRIPTION
DataFrame

Resulting dataframe

Source code in laktory/models/datasources/basedatasource.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def read(self, spark=None) -> AnyDataFrame:
    """
    Read data with options specified in attributes.

    Parameters
    ----------
    spark:
        Spark context

    Returns
    -------
    : DataFrame
        Resulting dataframe
    """
    if self.mock_df is not None:
        df = self.mock_df
    elif self.dataframe_type == "SPARK":
        df = self._read_spark(spark=spark)
    elif self.dataframe_type == "POLARS":
        df = self._read_polars()
    else:
        raise ValueError(
            f"DataFrame type '{self.dataframe_type}' is not supported."
        )

    if is_spark_dataframe(df):
        df = self._post_read_spark(df)
    elif is_polars_dataframe(df):
        df = self._post_read_polars(df)

    return df

laktory.models.datasources.basedatasource.DataSourceCDC ¤

Bases: BaseModel

Defines the change data capture (CDC) properties of a data source. They are used to build the target using apply_changes method from Databricks DLT.

ATTRIBUTE DESCRIPTION
apply_as_deletes

Specifies when a CDC event should be treated as a DELETE rather than an upsert. To handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones.

TYPE: Union[str, None]

apply_as_truncates

Specifies when a CDC event should be treated as a full table TRUNCATE. Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.

TYPE: Union[str, None]

columns

A subset of columns to include in the target table. Use columns to specify the complete list of columns to include.

TYPE: Union[list[str], None]

except_columns

A subset of columns to exclude in the target table.

TYPE: Union[list[str], None]

ignore_null_updates

Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and ignore_null_updates is True, columns with a null will retain their existing values in the target. This also applies to nested columns with a value of null. When ignore_null_updates is False, existing values will be overwritten with null values.

TYPE: Union[bool, None]

primary_keys

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

TYPE: list[str]

scd_type

Whether to store records as SCD type 1 or SCD type 2.

TYPE: Literal[1, 2]

sequence_by

The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order.

TYPE: str

track_history_columns

A subset of output columns to be tracked for history in the target table.

TYPE: Union[list[str], None]

track_history_except_columns

A subset of output columns to be excluded from tracking.

TYPE: Union[list[str], None]

References

https://docs.databricks.com/en/delta-live-tables/python-ref.html#change-data-capture-with-python-in-delta-live-tables


laktory.models.datasources.basedatasource.Watermark ¤

Bases: BaseModel

Definition of a spark structured streaming watermark for joining data streams.

ATTRIBUTE DESCRIPTION
column

Event time column name

TYPE: str

threshold

How late, expressed in seconds, the data is expected to be with respect to event time.

TYPE: str

References

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking


laktory.models.datasources.DataSourcesUnion module-attribute ¤