Data Pipeline
API Documentation
The Pipeline model is the cornerstone of Laktory, facilitating the process of reading, transforming, and writing data.
Pipeline Node¤
API Documentation
A pipeline consists of a sequence of nodes. Each node generates a Spark or Polars DataFrame by reading from a source, applying transformations, and optionally writing the output to a sink.
Sources and Sinks¤
Laktory supports various sources and sinks, including data files and data warehouse tables. By designating a node as the source for another downstream node, you create dependencies between nodes, forming a directed acyclic graph (DAG).
Transformer¤
The transformations are defined through a transformer which is a chain of SQL statements and/or Spark/Polars. This flexible and highly modular framework supports scalable batch and streaming operations.
Serialization¤
The entire pipeline definition is serializable, ensuring portability for deployment on remote compute environments. This makes Laktory ideal for a DataOps approach using infrastructure-as-code principles.
Here is an example of a pipeline declaration:
name: stock_prices
nodes:
- name: brz_stock_prices
layer: BRONZE
source:
path: "./events/stock_prices"
sink:
schema_name: finance
table_name: brz_stock_prices
transformer:
nodes: []
- name: slv_stock_prices
layer: SILVER
source:
node_name: brz_stock_prices
sink:
schema_name: finance
table_name: brz_stock_prices
transformer:
nodes:
- sql_expr: |
SELECT
data.created_at AS created_at,
data.symbol AS symbol,
data.open AS open,
data.close AS close,
data.high AS high,
data.low AS low,
data.volume AS volume
FROM
{df}
- func_name: drop_duplicates
func_kwargs:
subset:
- symbol
- timestamp
...
Layers¤
Pipeline nodes allow you to select a target medallion architecture layer (BRONZE
, SILVER
, GOLD
). Basic
transformations are preset based on the selected layer. For example, SILVER
nodes automatically drop unnecessary
columns and duplicates by default.
Execution¤
Local¤
You can execute the pipeline in a local or remote Spark session using the pipeline.execute(spark)
command. If Polars
is the DataFrame engine, the pipeline can run in a simple Python environment without external dependencies. In all
cases, each node processes sequentially: reading data from the source, applying transformations, and writing to the
sink.
from laktory import models
with open("pipeline_node.yaml") as fp:
node = models.PipelineNode.model_validate(fp)
node.execute(spark)
node.output_df.laktory.display()
laktory.models.pipelinenode - INFO - Executing pipeline node slv_stock_prices (None)
laktory.models.datasources.filedatasource - INFO - Reading /Volumes/dev/sources/landing/tables/brz_stock_prices/ as static
laktory.models.transformers.basechain - INFO - Executing SPARK chain
laktory.models.transformers.basechain - INFO - Executing SPARK chain node 0 (SparkChainNode).
laktory.models.transformers.sparkchainnode - INFO - DataFrame df as
SELECT
data.created_at AS created_at,
data.symbol AS symbol,
data.open AS open,
data.close AS close,
data.high AS high,
data.low AS low,
data.volume AS volume
FROM
{df}
laktory.models.transformers.basechain - INFO - Executing SPARK chain node 1 (SparkChainNode).
laktory.models.transformers.sparkchainnode - INFO - DataFrame df as drop_duplicates(subset=['symbol', 'created_at'])
laktory.models.datasinks.filedatasink - INFO - Writing df as static DELTA to finance.slv_stock_prices with mode OVERWRITE and options {'mergeSchema': 'false', 'overwriteSchema': 'true'}
+-------------------------+------+------------------+------------------+------------------+------------------+---------+
|created_at |symbol|open |close |high |low |volume |
+-------------------------+------+------------------+------------------+------------------+------------------+---------+
|2023-07-06T11:30:00-04:00|MSFT |338.7200012207031 |341.6199951171875 |341.6600036621094 |338.4200134277344 |2850613.0|
|2023-02-15T13:30:00-05:00|AAPL |154.3800048828125 |155.2321014404297 |155.32550048828125|154.14999389648438|6005631.0|
|2023-02-15T10:30:00-05:00|MSFT |268.0098876953125 |267.9599914550781 |268.6300048828125 |266.5299987792969 |5300365.0|
|2023-10-18T13:30:00-04:00|MSFT |332.7200012207031 |331.54998779296875|332.7200012207031 |330.739990234375 |2036767.0|
|2023-10-19T12:30:00-04:00|AAPL |176.69000244140625|177.47999572753906|177.83999633789062|175.4600067138672 |7575857.0|
|2023-05-16T11:30:00-04:00|AMZN |113.59500122070312|114.4832992553711 |114.48999786376953|113.2750015258789 |8034165.0|
|2023-07-06T10:30:00-04:00|MSFT |340.5799865722656 |338.70001220703125|341.1199951171875 |338.0899963378906 |3748565.0|
|2023-03-30T10:30:00-04:00|GOOGL |100.59500122070312|100.4749984741211 |100.875 |100.24019622802734|3869214.0|
|2023-01-17T15:30:00-05:00|GOOGL |91.55500030517578 |91.30999755859375 |91.61000061035156 |91.23999786376953 |3977790.0|
|2023-03-22T12:30:00-04:00|AMZN |99.94010162353516 |100.193603515625 |100.21659851074219|99.83219909667969 |3250304.0|
+-------------------------+------+------------------+------------------+------------------+------------------+---------+
only showing top 10 rows
Orchestrators¤
While local execution is ideal for small datasets or prototyping, orchestrators unlock more advanced features such as parallel processing, automatic schema management, and historical re-processing. The desired orchestrator can be configured directly within the pipeline.
- name: stock_prices
nodes: ...
orchestrator: DLT
dlt:
catalog: dev
target: finance
configuration:
pipeline_name: dlt-stock-prices
clusters:
- name : default
node_type_id: Standard_DS3_v2
autoscale:
min_workers: 1
max_workers: 2
libraries:
- notebook:
path: /.laktory/dlt/dlt_laktory_pl.py
The choice of orchestrator determines which resources are deployed when
running the laktory deploy
CLI command.
Delta Live Tables (DLT)¤
Databricks Delta Live Tables is the recommended orchestrator, offering features like automatic schema change management, continuous execution, and autoscaling.
Each pipeline node runs inside a dlt.table() or dlt.view() function. In the context of DLT, node execution does not
trigger a sink write, as this operation is managed by DLT. When a source is a pipeline node, dlt.read()
and
dlt.read_stream()
functions are called to ensure compatibility with the DLT framework.
from laktory import dlt
from laktory import models
with open("pipeline.yaml") as fp:
pl = models.Pipeline.model_validate_yaml(fp.read())
def define_table(node):
@dlt.table_or_view(
name=node.name,
comment=node.description,
as_view=node.sink is None,
)
@dlt.expect_all(node.warning_expectations)
@dlt.expect_all_or_drop(node.drop_expectations)
@dlt.expect_all_or_fail(node.fail_expectations)
def get_df():
# Execute node
df = node.execute(spark=spark)
# Return
return df
return get_df
# Build nodes
for node in pl.nodes:
wrapper = define_table(node)
df = dlt.get_df(wrapper)
display(df)
Notice how dlt
module is imported from laktory as it provides additional
debugging and inspection capabilities. Notably, you can run the notebook in a
user cluster and will be able to inspect the resulting dataframe.
Databricks Job¤
A Databricks Job is another powerful orchestration mechanism. In this case, Laktory will create a task for each node, enabling parallel execution of nodes. Each reading and writing operation is entirely handled by Laktory source and sink.
The supporting notebook simply needs to load the pipeline model and retrieve the node name from the job.
dbutils.widgets.text("pipeline_name", "pl-stock-prices")
dbutils.widgets.text("node_name", "")
from laktory import models
from laktory import settings
# --------------------------------------------------------------------------- #
# Read Pipeline #
# --------------------------------------------------------------------------- #
pl_name = dbutils.widgets.get("pipeline_name")
node_name = dbutils.widgets.get("node_name")
filepath = f"/Workspace{settings.workspace_laktory_root}pipelines/{pl_name}.json"
with open(filepath, "r") as fp:
pl = models.Pipeline.model_validate_json(fp.read())
# --------------------------------------------------------------------------- #
# Execution #
# --------------------------------------------------------------------------- #
if node_name:
pl.nodes_dict[node_name].execute(spark=spark)
else:
pl.execute(spark=spark)
Apache Airflow¤
Support for Apache Airflow as an orchestrator is under development and will be available soon.
Streaming Operations¤
Laktory supports event-based and kappa architectures using Apache Spark Structured Streaming for continuous, scalable data processing.
By setting as_stream: True
in a pipeline node's data source, the DataFrame becomes streaming-enabled, processing only
new rows of data at each run instead of re-processing the entire dataset.
Streaming does not mean the pipeline is continuously running. Execution can still be scheduled, but each run is
incremental. Currently, the only way to deploy a continuously running pipeline is by selecting the Delta Live Tables
orchestrator with continuous: True
.
For more information about streaming data, consider reading this blog post.