Skip to content

TableDataSink

laktory.models.datasinks.TableDataSink ¤

Bases: BaseDataSink

Table data sink on a metastore such as Hive, Unity Catalog or on a data warehouse such as Snowflake, BigQuery, etc.

ATTRIBUTE DESCRIPTION
checkpoint_location

Path to which the checkpoint file for streaming dataframe should be written.

TYPE: Union[str, None]

catalog_name

Name of the catalog of the sink table

TYPE: Union[str, None]

table_name

Name of the sink table

TYPE: Union[str, None]

table_type

Type of table. "TABLE" and "VIEW" are currently supported.

TYPE: Literal['TABLE', 'VIEW']

schema_name

Name of the schema of the source table

TYPE: Union[str, None]

view_definition

View definition of "VIEW" table_type is selected.

TYPE: str

warehouse

Type of warehouse to which the table should be published

TYPE: Union[Literal['DATABRICKS'], None]

Examples:

import pandas as pd

from laktory import models

df = spark.createDataFrame(
    pd.DataFrame(
        {
            "symbol": ["AAPL", "GOOGL"],
            "price": [200.0, 205.0],
            "tstamp": ["2023-09-01", "2023-09-01"],
        }
    )
)

sink = models.TableDataSink(
    catalog_name="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    schema_name="finance",
    table_name="slv_stock_prices",
    mode="OVERWRITE",
)
# sink.write(df)

# Sink with Change Data Capture processing
sink = models.TableDataSink(
    catalog_name="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    schema_name="finance",
    table_name="slv_stock_prices",
    mode="MERGE",
    merge_cdc_options={
        "scd_type": 1,
        "primary_keys": ["symbol", "tstamp"],
    },
)
# sink.write(df)
METHOD DESCRIPTION
purge

Delete sink data and checkpoints

as_source

Generate a table data source with the same properties as the sink.

ATTRIBUTE DESCRIPTION
full_name

Table full name {catalog_name}.{schema_name}.{table_name}

TYPE: str

Attributes¤

full_name property ¤

full_name

Table full name {catalog_name}.{schema_name}.{table_name}

Functions¤

purge ¤

purge(spark=None)

Delete sink data and checkpoints

Source code in laktory/models/datasinks/tabledatasink.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def purge(self, spark=None):
    """
    Delete sink data and checkpoints
    """
    # TODO: Now that sink switch to overwrite when sink does not exists or when
    # a full refresh is requested, the purge method should not delete the data
    # by default, but only the checkpoints. Also consider truncating the table
    # instead of dropping it.

    # Remove Data
    if self.warehouse == "DATABRICKS":
        logger.info(
            f"Dropping {self.table_type} {self.full_name}",
        )
        spark.sql(f"DROP {self.table_type} IF EXISTS {self.full_name}")

        path = self.write_options.get("path", None)
        if path and os.path.exists(path):
            is_dir = os.path.isdir(path)
            if is_dir:
                logger.info(f"Deleting data dir {path}")
                shutil.rmtree(path)
            else:
                logger.info(f"Deleting data file {path}")
                os.remove(path)
    else:
        raise NotImplementedError(
            f"Warehouse '{self.warehouse}' is not yet supported."
        )

    # Remove Checkpoint
    self._purge_checkpoint(spark=spark)

as_source ¤

as_source(as_stream=None)

Generate a table data source with the same properties as the sink.

PARAMETER DESCRIPTION
as_stream

If True, sink will be read as stream.

DEFAULT: None

RETURNS DESCRIPTION
TableDataSource

Table Data Source

Source code in laktory/models/datasinks/tabledatasink.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def as_source(self, as_stream=None) -> TableDataSource:
    """
    Generate a table data source with the same properties as the sink.

    Parameters
    ----------
    as_stream:
        If `True`, sink will be read as stream.

    Returns
    -------
    :
        Table Data Source
    """
    source = TableDataSource(
        catalog_name=self.catalog_name,
        table_name=self.table_name,
        schema_name=self.schema_name,
        warehouse=self.warehouse,
    )

    if as_stream:
        source.as_stream = as_stream

    if self.dataframe_backend:
        source.dataframe_backend = self.dataframe_backend
    source.parent = self.parent

    return source