Skip to content

BaseDataSink

laktory.models.datasinks.basedatasink.BaseDataSink ¤

Bases: BaseModel

Base class for building data sink

ATTRIBUTE DESCRIPTION
mode

Write mode. - overwrite: Overwrite existing data - append: Append contents of the dataframe to existing data - error: Throw and exception if data already exists - ignore: Silently ignore this operation if data already exists - complete: Overwrite for streaming dataframes

TYPE: Union[Literal['OVERWRITE', 'APPEND', 'IGNORE', 'ERROR', 'COMPLETE', 'UPDATE'], None]

write_options

Other options passed to spark.write.options

TYPE: dict[str, str]

Functions¤

write ¤

write(df, mode=None)

Write dataframe into sink.

PARAMETER DESCRIPTION
df

Input dataframe

TYPE: AnyDataFrame

mode

Write mode overwrite of the sink default mode.

DEFAULT: None

Source code in laktory/models/datasinks/basedatasink.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def write(self, df: AnyDataFrame, mode=None) -> None:
    """
    Write dataframe into sink.

    Parameters
    ----------
    df:
        Input dataframe
    mode:
        Write mode overwrite of the sink default mode.

    Returns
    -------
    """
    if mode is None:
        mode = self.mode
    if is_spark_dataframe(df):
        self._write_spark(df, mode=mode)
    elif is_polars_dataframe(df):
        self._write_polars(df, mode=mode)
    else:
        raise ValueError()

purge ¤

purge()

Delete sink data and checkpoints

Source code in laktory/models/datasinks/basedatasink.py
81
82
83
84
85
def purge(self):
    """
    Delete sink data and checkpoints
    """
    raise NotImplementedError()

read ¤

read(spark=None, as_stream=None)

Read dataframe from sink.

PARAMETER DESCRIPTION
spark

Spark Session

DEFAULT: None

as_stream

If True, dataframe read as stream.

DEFAULT: None

Source code in laktory/models/datasinks/basedatasink.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def read(self, spark=None, as_stream=None):
    """
    Read dataframe from sink.

    Parameters
    ----------
    spark:
        Spark Session
    as_stream:
        If `True`, dataframe read as stream.

    Returns
    -------
    """
    return self.as_source(as_stream=as_stream).read(spark=spark)

--

laktory.models.datasinks.DataSinksUnion module-attribute ¤

DataSinksUnion = Union[FileDataSink, TableDataSink]