Skip to content

read_stream

laktory.dlt.read_stream ยค

read_stream(*args, fmt='delta', **kwargs)

When is_debug() is True read table from storage as stream, else read table from pipeline with native Databricks dlt.read_stream

RETURNS DESCRIPTION

Ouput dataframe

Examples:

from laktory import dlt

dlt.spark = spark

def define_table():
    @dlt.table(name="slv_stock_prices")
    def get_df():
        df = dlt.read_stream("dev.finance.brz_stock_prices")
        return df

    return get_df

define_table()
Source code in laktory/dlt/__init__.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def read_stream(*args, fmt="delta", **kwargs):
    """
    When `is_debug()` is `True` read table from storage as stream, else read
    table from pipeline with native Databricks `dlt.read_stream`

    Returns
    -------
    :
        Ouput dataframe

    Examples
    --------
    ```py
    from laktory import dlt

    dlt.spark = spark

    def define_table():
        @dlt.table(name="slv_stock_prices")
        def get_df():
            df = dlt.read_stream("dev.finance.brz_stock_prices")
            return df

        return get_df

    define_table()
    ```
    """

    if is_debug():
        return spark.readStream.format(fmt).table(args[0])
    else:
        # Remove catalog and schema from naming space
        args = list(args)
        args[0] = args[0].split(".")[-1]
        return _read_stream(*args, **kwargs)