Data Sources and Sinks
Data Sources¤
The DataSource
models facilitate loading data into a dataframe. It provides
reusable mechanisms for reading data of various nature given different
configuration and various execution contexts.
It is generally used as a component of a pipeline node. In this context, the sink may be used to store the output of the node or some quarantined data if expectations are set and not met.
File Data Source¤
API Documentation
File data source supports reading multiple files stored on a storage container
from laktory import models
source = models.FileDataSource(
path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
format="JSON",
as_stream=False,
)
df = source.read(spark=spark)
Reading the same dataset, but as a spark streaming source, is as easy as changing as_stream
to True
.
from laktory import models
source = models.FileDataSource(
path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
format="JSON",
as_stream=True,
)
df_stream = source.read(spark=spark)
Table Data Source¤
API Documentation
When your data is already loaded into a lakehouse data table, you can use the
TableDataSource
model instead
from laktory import models
source = models.TableDataSource(
table_name="brz_stock_prices",
selects=["symbol", "open", "close"],
filter="symbol='AAPL'",
warehouse="DATABRICKS",
as_stream=True,
)
df = source.read(spark=spark)
- the
selects
argument is used to select onlysymbol
,open
andclose
columns - the
filter
argument is used to select only rows associated with Apple stock.
More data sources (like Kafka / Event Hub / Kinesis streams) will be supported in the future.
Pipeline Node Data Source¤
API Documentation
To establish a relationship between two nodes in a data pipeline, the
PipelineNodeDataSource
must be used. Assuming each node is a vertex in a
directed acyclic graph (DAG), using a PipelineNodeDataSource
creates an edge
between two vertices. It also defines the execution order of the nodes.
from laktory import models
source = models.PipelineNodeDataSource(
node_name="brz_stock_prices",
as_stream=True,
)
- Single Worker Execution: The source uses the in-memory output dataframe from the upstream node.
- Multi-Workers Execution: The source uses the upstream node sink as a source for read the dataframe.
- DLT Execution: The source uses
dlt.read()
anddlt,read_stream()
to read data from the upstream node.
Data Sinks¤
API Documentation
Analogously to DataSource
, DataSink
models facilitate writing a dataframe
into a target location. It provides re-usable mechanisms for writing data
in various formats, adapting to different execution contexts.
It is generally used as a component of a pipeline node.
Data sinks also support the merge of a Change Data Capture (CDC).
File Data Sink¤
API Documentation
File data sink supports writing a dataframe as files to a disk location using a variety of storage format. For streaming dataframes, you also need to specify a checkpoint location.
from laktory import models
df = spark.createDataFrame([("AAPL"), ("GOOGL")], ["symbol"])
sink = models.FileDataSink(
path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
format="PARQUET",
mode="OVERWRITE",
)
sink.write(df)
Table Data Sink¤
API Documentation
The TableDataSink class provides a convenient way to write a DataFrame to a lakehouse or data warehouse table. It simplifies the process of persisting data in a structured format, supporting both physical tables and SQL views.
To write a DataFrame to a physical table:
from laktory import models
df = spark.createDataFrame([("AAPL"), ("GOOGL")], ["symbol"])
sink = models.TableDataSink(
schema_name="finance",
table_name="brz_stock_prices",
)
sink.write(df)
TableDataSink
also supports creating non-materialized SQL views instead of
physical tables. To write a DataFrame as a SQL view:
from laktory import models
df = spark.createDataFrame([("AAPL"), ("GOOGL")], ["symbol"])
sink = models.TableDataSink(
schema_name="finance",
table_name="brz_stock_prices",
table_type="VIEW",
)
sink.write(df)