Data Sources and Sinks
Data Sources¤
The DataSource
models facilitate loading data into a dataframe. It provides
reusable mechanisms for reading data of various nature given different
configuration and various execution contexts.
It is generally used as a component of a pipeline node. In this context, the sink may be used to store the output of the node or some quarantined data if expectations are set and not met.
File Data Source¤
API Documentation
File data source supports reading files stored on disk.
import laktory as lk
source = lk.models.FileDataSource(
path="/Volumes/sources/landing/events/yahoo-finance/stock_prices/",
format="JSON",
as_stream=False,
dataframe_backend="PYSPARK"
)
df = source.read()
as_stream
to True
.
from laktory import models
source = models.FileDataSource(
path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
format="JSON",
as_stream=True,
dataframe_backend="PYSPARK"
)
df_stream = source.read()
You can also select a different DataFrame backend for reading your files
import laktory as lk
source = lk.models.FileDataSource(
path="/Volumes/sources/landing/events/yahoo-finance/stock_prices.parquet",
format="PARQUET",
dataframe_backend="POLARS"
)
df = source.read()
Table Data Source¤
API Documentation
laktory.models.HiveMetastoreDataSource
laktory.models.UnityCatalogDataSource
When your data is already loaded into data table, you can use the
UnityCatalogDataSource
or HiveMetastoreDataSource
models instead
import laktory as lk
source = lk.models.UnityCatalogDataSource(
table_name="brz_stock_prices",
selects=["symbol", "open", "close"],
filter="symbol='AAPL'",
as_stream=True,
)
df = source.read()
- the
selects
argument is used to select onlysymbol
,open
andclose
columns - the
filter
argument is used to select only rows associated with Apple stock.
More data sources (like Kafka / Event Hub / Kinesis streams) will be supported in the future.
Pipeline Node Data Source¤
API Documentation
To establish a relationship between two nodes in a data pipeline, the
PipelineNodeDataSource
must be used. Assuming each node is a vertex in a
directed acyclic graph (DAG), using a PipelineNodeDataSource
creates an edge
between two vertices. It also defines the execution order of the nodes.
import laktory as lk
source = lk.models.PipelineNodeDataSource(
node_name="brz_stock_prices",
as_stream=True,
)
- Single Worker Execution: The source uses the in-memory output dataframe from the upstream node.
- Multi-Workers Execution: The source uses the upstream node sink as a source for read the dataframe.
- DLT Execution: The source uses
dlt.read()
anddlt,read_stream()
to read data from the upstream node.
Data Sinks¤
API Documentation
laktory.models.FileDataSink
laktory.models.UnityCatalogDataSink
Analogously to DataSource
, DataSink
models facilitate writing a dataframe
into a target location. It provides re-usable mechanisms for writing data
in various formats, adapting to different execution contexts.
It is generally used as a component of a pipeline node.
Data sinks also support the merge of a Change Data Capture (CDC).
File Data Sink¤
API Documentation
File data sink supports writing a dataframe as files to a disk location using a variety of storage format. For streaming dataframes, you also need to specify a checkpoint location.
import narwhals as nw
import polars as pl
import laktory as lk
df = nw.from_native(
pl.DataFrame({"symbol": ["AAPL", "GOOGL"]})
)
sink = lk.models.FileDataSink(
path="/Volumes/sources/landing/events/yahoo-finance/stock_price.parquet",
format="PARQUET",
mode="OVERWRITE",
)
sink.write(df)
Table Data Sink¤
API Documentation
laktory.models.UnityCatalogDataSink
laktory.models.HiveMetastoreDataSink
The UnityCatlaogDataSink
and HiveMetastoreDataSink
classes provide a convenient way
to write a DataFrame to a data table. It simplifies the process of persisting data
in a structured format, supporting both physical tables and SQL views.
To write a DataFrame to a physical table:
import narwhals as nw
import laktory as lk
df = nw.from_native(
spark.createDataFrame([("AAPL"), ("GOOGL")], ["symbol"])
)
sink = lk.models.UnityCatalogDataSink(
schema_name="finance",
table_name="brz_stock_prices",
)
sink.write(df)
UnityCatlaogDataSink
also supports creating non-materialized SQL views instead of
physical tables. To write a DataFrame as a SQL view:
import narwhas as nw
import laktory as lk
df = nw.from_native(
spark.createDataFrame([("AAPL"), ("GOOGL")], ["symbol"])
)
sink = lk.models.TableDataSink(
schema_name="finance",
table_name="brz_stock_prices",
table_type="VIEW",
view_definition="SELECT * from {df}"
)
sink.write(df)
Pipeline View Data Sink¤
API Documentation
laktory.models.PipelineViewDataSink
laktory.models.PipelineViewDataSink
The PipelineViewDataSink
can be used in the context of a Declarative Pipeline
such as Databricks Lakeflow Declarative Pipeline. A virtual view is created in the context of the pipeline, but the
data is not materialized. Views are useful for simplifying complex queries, encapsulating business logic, and providing
a consistent interface to the underlying data without duplicating storage.
import laktory as lk
sink = lk.models.PipelineViewDataSink(
pipeline_view_name="brz_stock_prices",
)