Skip to content

TableDataSink

laktory.models.datasinks.TableDataSink ¤

Bases: BaseDataSink

Data Table data sink such as table on a Databricks catalog or on a data warehouse such as Snowflake, BigQuery, etc.

ATTRIBUTE DESCRIPTION
checkpoint_location

Path to which the checkpoint file for streaming dataframe should be written.

TYPE: Union[str, None]

catalog_name

Name of the catalog of the source table

TYPE: Union[str, None]

table_name

Name of the source table

TYPE: Union[str, None]

schema_name

Name of the schema of the source table

TYPE: Union[str, None]

warehouse

Type of warehouse to which the table should be published

TYPE: Union[Literal['DATABRICKS'], None]

Examples:

from laktory import models
import pandas as pd

df = spark.createDataFrame(
    pd.DataFrame(
        {
            "symbol": ["AAPL", "GOOGL"],
            "price": [200.0, 205.0],
            "tstamp": ["2023-09-01", "2023-09-01"],
        }
    )
)

sink = models.TableDataSink(
    catalog_name="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    schema_name="finance",
    table_name="slv_stock_prices",
    mode="OVERWRITE",
)
# sink.write(df)

Attributes¤

full_name property ¤

full_name

Table full name {catalog_name}.{schema_name}.{table_name}

Functions¤

purge ¤

purge(spark=None)

Delete sink data and checkpoints

Source code in laktory/models/datasinks/tabledatasink.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def purge(self, spark=None):
    """
    Delete sink data and checkpoints
    """
    if self._checkpoint_location:
        if os.path.exists(self._checkpoint_location):
            logger.info(
                f"Deleting checkpoint at {self._checkpoint_location}",
            )
            shutil.rmtree(self._checkpoint_location)

    if self.warehouse == "DATABRICKS":
        logger.info(
            f"Dropping table {self.full_name}",
        )
        spark.sql(f"DROP TABLE IF EXISTS {self.full_name}")
    else:
        raise NotImplementedError(
            f"Warehouse '{self.warehouse}' is not yet supported."
        )

as_source ¤

as_source(as_stream=None)

Generate a table data source with the same properties as the sink.

PARAMETER DESCRIPTION
as_stream

If True, sink will be read as stream.

DEFAULT: None

RETURNS DESCRIPTION
TableDataSource

Table Data Source

Source code in laktory/models/datasinks/tabledatasink.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def as_source(self, as_stream=None) -> TableDataSource:
    """
    Generate a table data source with the same properties as the sink.

    Parameters
    ----------
    as_stream:
        If `True`, sink will be read as stream.

    Returns
    -------
    :
        Table Data Source
    """
    source = TableDataSource(
        catalog_name=self.catalog_name,
        table_name=self.table_name,
        schema_name=self.schema_name,
        warehouse=self.warehouse,
    )
    if as_stream:
        source.as_stream = as_stream

    return source