Skip to content

BaseDataSource

laktory.models.datasources.basedatasource.BaseDataSource ¤

Bases: BaseModel, PipelineChild

Base class for building data source

ATTRIBUTE DESCRIPTION
as_stream

If Truesource is read as a streaming DataFrame.

TYPE: bool

broadcast

If True DataFrame is broadcasted

TYPE: Union[bool, None]

dataframe_backend

Type of dataframe

TYPE: Literal['SPARK', 'POLARS']

drops

List of columns to drop

TYPE: Union[list, None]

filter

SQL expression used to select specific rows from the source table

TYPE: Union[str, None]

renames

Mapping between the source table column names and new column names

TYPE: Union[dict[str, str], None]

selects

Columns to select from the source table. Can be specified as a list or as a dictionary to rename the source columns

TYPE: Union[list[str], dict[str, str], None]

watermark

Spark structured streaming watermark specifications

TYPE: Union[Watermark, None]

METHOD DESCRIPTION
read

Read data with options specified in attributes.

ATTRIBUTE DESCRIPTION
is_orchestrator_dlt

If True, data source is used in the context of a DLT pipeline

TYPE: bool

Attributes¤

is_orchestrator_dlt property ¤

is_orchestrator_dlt

If True, data source is used in the context of a DLT pipeline

Functions¤

read ¤

read(spark=None)

Read data with options specified in attributes.

PARAMETER DESCRIPTION
spark

Spark context

DEFAULT: None

RETURNS DESCRIPTION
DataFrame

Resulting dataframe

Source code in laktory/models/datasources/basedatasource.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def read(self, spark=None) -> AnyDataFrame:
    """
    Read data with options specified in attributes.

    Parameters
    ----------
    spark:
        Spark context

    Returns
    -------
    : DataFrame
        Resulting dataframe
    """
    if self.mock_df is not None:
        df = self.mock_df
    elif self.df_backend == "SPARK":
        df = self._read_spark(spark=spark)
    elif self.df_backend == "POLARS":
        df = self._read_polars()
    else:
        raise ValueError(f"DataFrame type '{self.df_backend}' is not supported.")

    if is_spark_dataframe(df):
        df = self._post_read_spark(df)
    elif is_polars_dataframe(df):
        df = self._post_read_polars(df)

    logger.info("Read completed.")

    return df

laktory.models.datasources.basedatasource.Watermark ¤

Bases: BaseModel

Definition of a spark structured streaming watermark for joining data streams.

ATTRIBUTE DESCRIPTION
column

Event time column name

TYPE: str

threshold

How late, expressed in seconds, the data is expected to be with respect to event time.

TYPE: str

References

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking


laktory.models.datasources.DataSourcesUnion module-attribute ¤