Data Sources and Sinks
Data Sources¤
The DataSource
models facilitate loading data into a dataframe. It provides
reusable mechanisms for reading data of various nature given different
configuration and various execution contexts.
It is generally used as a component of a pipeline node.
File Data Source¤
API Documentation
File data source supports reading multiple files stored on a storage container
from laktory import models
source = models.FileDataSource(
path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
format="JSON",
as_stream=False,
)
df = source.read(spark=spark)
Reading the same dataset, but as a spark streaming source, is as easy as changing as_stream
to True
.
from laktory import models
source = models.FileDataSource(
path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
format="JSON",
as_stream=True,
)
df_stream = source.read(spark=spark)
Table Data Source¤
API Documentation
When your data is already loaded into a lakehouse data table, you can use the
TableDataSource
model instead
from laktory import models
source = models.TableDataSource(
table_name="brz_stock_prices",
selects=["symbol", "open", "close"],
filter="symbol='AAPL'",
warehouse="DATABRICKS",
as_stream=True,
)
df = source.read(spark=spark)
- the
selects
argument is used to select onlysymbol
,open
andclose
columns - the
filter
argument is used to select only rows associated with Apple stock.
More data sources (like Kafka / Event Hub / Kinesis streams) will be supported in the future.
Pipeline Node Data Source¤
API Documentation
To establish a relationship between two nodes in a data pipeline, the
PipelineNodeDataSource
must be used. Assuming each node is a vertex in a
directed acyclic graph (DAG), using a PipelineNodeDataSource
creates an edge
between two vertices. It also defines the execution order of the nodes.
from laktory import models
source = models.PipelineNodeDataSource(
node_name="brz_stock_prices",
as_stream=True,
)
- Single Worker Execution: The source uses the in-memory output dataframe from the upstream node.
- Multi-Workers Execution: The source uses the upstream node sink as a source for read the dataframe.
- DLT Execution: The source uses
dlt.read()
anddlt,read_stream()
to read data from the upstream node.
Data Sinks¤
API Documentation
Analogously to DataSource
, DataSink
models facilitate writing a dataframe
into a target location. It provides re-usable mechanisms for writing data
in various formats, adapting to different execution contexts.
It is generally used as a component of a pipeline node.
File Data Sink¤
API Documentation
File data sink supports writing a dataframe as files to a disk location using a variety of storage format. For streaming dataframes, you also need to specify a checkpoint location.
from laktory import models
sink = models.FileDataSink(
path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
format="PARQUET",
mode="OVERWRITE",
)
sink.write(df)
Table Data Sink¤
API Documentation
The Table Data Sink allows writing the dataframe to a lakehouse or data warehouse table.
from laktory import models
sink = models.FileDataSource(
schema_name="finance",
table_name="brz_stock_prices",
warehouse="DATABRICKS",
)
sink.write(df)