Skip to content

FileDataSink

laktory.models.datasinks.FileDataSink ¤

Bases: BaseDataSink

Disk file(s) data sink such as csv, parquet or Delta Table.

ATTRIBUTE DESCRIPTION
checkpoint_location

Path to which the checkpoint file for streaming dataframe should be written. If None, parent directory of path is used.

TYPE: Union[str, None]

format

Format of the data files

TYPE: Literal['CSV', 'PARQUET', 'DELTA', 'JSON', 'EXCEL']

path

Path to which the DataFrame needs to be written.

TYPE: str

Examples:

from laktory import models
import pandas as pd

df = spark.createDataFrame(
    pd.DataFrame(
        {
            "symbol": ["AAPL", "GOOGL"],
            "price": [200.0, 205.0],
            "tstamp": ["2023-09-01", "2023-09-01"],
        }
    )
)

sink = models.FileDataSink(
    path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    format="PARQUET",
    mode="OVERWRITE",
)
# sink.write(df)

Functions¤

purge ¤

purge(spark=None)

Delete sink data and checkpoints

Source code in laktory/models/datasinks/filedatasink.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def purge(self, spark=None):
    """
    Delete sink data and checkpoints
    """
    # Remove Data
    if os.path.exists(self.path):
        is_dir = os.path.isdir(self.path)
        if is_dir:
            logger.info(f"Deleting data dir {self.path}")
            shutil.rmtree(self.path)
        else:
            logger.info(f"Deleting data file {self.path}")
            os.remove(self.path)

    # Remove Checkpoint
    if self._checkpoint_location:
        if os.path.exists(self._checkpoint_location):
            logger.info(
                f"Deleting checkpoint at {self._checkpoint_location}",
            )
            shutil.rmtree(self._checkpoint_location)

as_source ¤

as_source(as_stream=None)

Generate a file data source with the same path as the sink.

PARAMETER DESCRIPTION
as_stream

If True, sink will be read as stream.

TYPE: bool DEFAULT: None

RETURNS DESCRIPTION
FileDataSource

File Data Source

Source code in laktory/models/datasinks/filedatasink.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def as_source(self, as_stream: bool = None) -> FileDataSource:
    """
    Generate a file data source with the same path as the sink.

    Parameters
    ----------
    as_stream:
        If `True`, sink will be read as stream.

    Returns
    -------
    :
        File Data Source
    """

    source = FileDataSource(
        path=self.path,
        format=self.format,
    )
    if as_stream:
        source.as_stream = as_stream

    return source