Skip to content

TableDataSink

laktory.models.datasinks.TableDataSink ¤

Bases: BaseDataSink

Table data sink on a metastore such as Hive, Unity Catalog or on a data warehouse such as Snowflake, BigQuery, etc.

ATTRIBUTE DESCRIPTION
checkpoint_location

Path to which the checkpoint file for streaming dataframe should be written.

TYPE: Union[str, None]

catalog_name

Name of the catalog of the sink table

TYPE: Union[str, None]

table_name

Name of the sink table

TYPE: Union[str, None]

table_type

Type of table. "TABLE" and "VIEW" are currently supported.

TYPE: Literal['TABLE', 'VIEW']

schema_name

Name of the schema of the source table

TYPE: Union[str, None]

view_definition

View definition of "VIEW" table_type is selected.

TYPE: str

warehouse

Type of warehouse to which the table should be published

TYPE: Union[Literal['DATABRICKS'], None]

Examples:

from laktory import models
import pandas as pd

df = spark.createDataFrame(
    pd.DataFrame(
        {
            "symbol": ["AAPL", "GOOGL"],
            "price": [200.0, 205.0],
            "tstamp": ["2023-09-01", "2023-09-01"],
        }
    )
)

sink = models.TableDataSink(
    catalog_name="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    schema_name="finance",
    table_name="slv_stock_prices",
    mode="OVERWRITE",
)
# sink.write(df)

# Sink with Change Data Capture processing
sink = models.TableDataSink(
    catalog_name="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    schema_name="finance",
    table_name="slv_stock_prices",
    mode="MERGE",
    merge_cdc_options={
        "scd_type": 1,
        "primary_keys": ["symbol", "tstamp"],
    },
)
# sink.write(df)
METHOD DESCRIPTION
purge

Delete sink data and checkpoints

as_source

Generate a table data source with the same properties as the sink.

ATTRIBUTE DESCRIPTION
full_name

Table full name {catalog_name}.{schema_name}.{table_name}

TYPE: str

Attributes¤

full_name property ¤

full_name

Table full name {catalog_name}.{schema_name}.{table_name}

Functions¤

purge ¤

purge(spark=None)

Delete sink data and checkpoints

Source code in laktory/models/datasinks/tabledatasink.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def purge(self, spark=None):
    """
    Delete sink data and checkpoints
    """
    # Remove Data
    if self.warehouse == "DATABRICKS":
        logger.info(
            f"Dropping {self.table_type} {self.full_name}",
        )
        spark.sql(f"DROP {self.table_type} IF EXISTS {self.full_name}")

        path = self.write_options.get("path", None)
        if path and os.path.exists(path):
            is_dir = os.path.isdir(path)
            if is_dir:
                logger.info(f"Deleting data dir {path}")
                shutil.rmtree(path)
            else:
                logger.info(f"Deleting data file {path}")
                os.remove(path)
    else:
        raise NotImplementedError(
            f"Warehouse '{self.warehouse}' is not yet supported."
        )

    # Remove Checkpoint
    self._purge_checkpoint(spark=spark)

as_source ¤

as_source(as_stream=None)

Generate a table data source with the same properties as the sink.

PARAMETER DESCRIPTION
as_stream

If True, sink will be read as stream.

DEFAULT: None

RETURNS DESCRIPTION
TableDataSource

Table Data Source

Source code in laktory/models/datasinks/tabledatasink.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def as_source(self, as_stream=None) -> TableDataSource:
    """
    Generate a table data source with the same properties as the sink.

    Parameters
    ----------
    as_stream:
        If `True`, sink will be read as stream.

    Returns
    -------
    :
        Table Data Source
    """
    source = TableDataSource(
        catalog_name=self.catalog_name,
        table_name=self.table_name,
        schema_name=self.schema_name,
        warehouse=self.warehouse,
    )

    if as_stream:
        source.as_stream = as_stream

    source.parent = self.parent

    return source