Skip to content

FileDataSink

laktory.models.datasinks.FileDataSink ¤

Bases: BaseDataSink

Disk file(s) data sink such as csv, parquet or Delta Table.

ATTRIBUTE DESCRIPTION
checkpoint_location

Path to which the checkpoint file for streaming dataframe should be written. If None, parent directory of path is used.

TYPE: Union[str, None]

format

Format of the data files

TYPE: Literal['CSV', 'PARQUET', 'DELTA', 'JSON', 'EXCEL']

path

Path to which the DataFrame needs to be written.

TYPE: str

Examples:

from laktory import models
import pandas as pd

df = spark.createDataFrame(
    pd.DataFrame(
        {
            "symbol": ["AAPL", "GOOGL"],
            "price": [200.0, 205.0],
            "tstamp": ["2023-09-01", "2023-09-01"],
        }
    )
)

sink = models.FileDataSink(
    path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    format="PARQUET",
    mode="OVERWRITE",
)
# sink.write(df)

# Sink with Change Data Capture processing
sink = models.FileDataSink(
    path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    format="DELTA",
    mode="MERGE",
    merge_cdc_options={
        "scd_type": 1,
        "primary_keys": ["symbol", "tstamp"],
    },
)
# sink.write(df)
METHOD DESCRIPTION
path_to_string

Required to apply settings before instantiating resources and setting default values

purge

Delete sink data and checkpoints

as_source

Generate a file data source with the same path as the sink.

Functions¤

path_to_string classmethod ¤

path_to_string(data)

Required to apply settings before instantiating resources and setting default values

Source code in laktory/models/datasinks/filedatasink.py
75
76
77
78
79
80
81
82
83
@model_validator(mode="before")
@classmethod
def path_to_string(cls, data: Any) -> Any:
    """Required to apply settings before instantiating resources and setting default values"""
    path = data.get("path", None)
    if path and isinstance(path, Path):
        data["path"] = str(path)

    return data

purge ¤

purge(spark=None)

Delete sink data and checkpoints

Source code in laktory/models/datasinks/filedatasink.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def purge(self, spark=None):
    """
    Delete sink data and checkpoints
    """
    # Remove Data
    if os.path.exists(self.path):
        is_dir = os.path.isdir(self.path)
        if is_dir:
            logger.info(f"Deleting data dir {self.path}")
            shutil.rmtree(self.path)
        else:
            logger.info(f"Deleting data file {self.path}")
            os.remove(self.path)

    # TODO: Add support for Databricks dbfs / workspace / Volume?

    # Remove Checkpoint
    self._purge_checkpoint(spark=spark)

as_source ¤

as_source(as_stream=None)

Generate a file data source with the same path as the sink.

PARAMETER DESCRIPTION
as_stream

If True, sink will be read as stream.

TYPE: bool DEFAULT: None

RETURNS DESCRIPTION
FileDataSource

File Data Source

Source code in laktory/models/datasinks/filedatasink.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def as_source(self, as_stream: bool = None) -> FileDataSource:
    """
    Generate a file data source with the same path as the sink.

    Parameters
    ----------
    as_stream:
        If `True`, sink will be read as stream.

    Returns
    -------
    :
        File Data Source
    """

    source = FileDataSource(
        path=self.path,
        format=self.format,
    )

    if as_stream:
        source.as_stream = as_stream

    source.parent = self.parent

    return source