Skip to content

FileDataSink

laktory.models.datasinks.FileDataSink ¤

Bases: BaseDataSink

Disk file(s) data sink such as csv, parquet or Delta Table.

ATTRIBUTE DESCRIPTION
checkpoint_location

Path to which the checkpoint file for streaming dataframe should be written. If None, parent directory of path is used.

TYPE: Union[str, None]

format

Format of the data files

TYPE: Literal['CSV', 'PARQUET', 'DELTA', 'JSON', 'EXCEL']

path

Path to which the DataFrame needs to be written.

TYPE: str

Examples:

import pandas as pd

from laktory import models

df = spark.createDataFrame(
    pd.DataFrame(
        {
            "symbol": ["AAPL", "GOOGL"],
            "price": [200.0, 205.0],
            "tstamp": ["2023-09-01", "2023-09-01"],
        }
    )
)

sink = models.FileDataSink(
    path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    format="PARQUET",
    mode="OVERWRITE",
)
# sink.write(df)

# Sink with Change Data Capture processing
sink = models.FileDataSink(
    path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    format="DELTA",
    mode="MERGE",
    merge_cdc_options={
        "scd_type": 1,
        "primary_keys": ["symbol", "tstamp"],
    },
)
# sink.write(df)
METHOD DESCRIPTION
purge

Delete sink data and checkpoints

as_source

Generate a file data source with the same path as the sink.

Functions¤

purge ¤

purge(spark=None)

Delete sink data and checkpoints

Source code in laktory/models/datasinks/filedatasink.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def purge(self, spark=None):
    """
    Delete sink data and checkpoints
    """
    # TODO: Now that sink switch to overwrite when sink does not exists or when
    # a full refresh is requested, the purge method should not delete the data
    # by default, but only the checkpoints. Also consider truncating the table
    # instead of dropping it.

    # Remove Data
    if os.path.exists(self.path):
        is_dir = os.path.isdir(self.path)
        if is_dir:
            logger.info(f"Deleting data dir {self.path}")
            shutil.rmtree(self.path)
        else:
            logger.info(f"Deleting data file {self.path}")
            os.remove(self.path)

    # TODO: Add support for Databricks dbfs / workspace / Volume?

    # Remove Checkpoint
    self._purge_checkpoint(spark=spark)

as_source ¤

as_source(as_stream=None)

Generate a file data source with the same path as the sink.

PARAMETER DESCRIPTION
as_stream

If True, sink will be read as stream.

TYPE: bool DEFAULT: None

RETURNS DESCRIPTION
FileDataSource

File Data Source

Source code in laktory/models/datasinks/filedatasink.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def as_source(self, as_stream: bool = None) -> FileDataSource:
    """
    Generate a file data source with the same path as the sink.

    Parameters
    ----------
    as_stream:
        If `True`, sink will be read as stream.

    Returns
    -------
    :
        File Data Source
    """

    source = FileDataSource(
        path=self.path,
        format=self.format,
    )

    if as_stream:
        source.as_stream = as_stream

    if self.dataframe_backend:
        source.dataframe_backend = self.dataframe_backend
    source.parent = self.parent

    return source