Data Pipeline
API Documentation
The Pipeline model is the cornerstone of Laktory, facilitating the process of reading, transforming, and writing data.
Pipeline Node¤
API Documentation
A pipeline is composed of a sequence of nodes, each designed to generate a Spark or Polars DataFrame. Each node reads from a designated source, applies specified transformations, and optionally writes the result to one or more sinks.
Sources and Sinks¤
Laktory supports a variety of sources and sinks, including data files and warehouse tables. By linking a node as the source for a downstream node, you establish dependencies, creating a directed acyclic graph (DAG).
Transformer¤
The transformations are defined through a transformer which is a chain of SQL statements and/or Spark/Polars DataFrame API function calls. This flexible and highly modular framework supports scalable batch and streaming operations.
Expectations¤
Data quality is achieved through the use of expectations and corresponding actions, which can drop, quarantine, or even halt pipelines if invalid data is detected before it reaches the output.
Serialization¤
The entire pipeline definition is serializable, ensuring portability for deployment on remote compute environments. This makes Laktory ideal for a DataOps approach using infrastructure-as-code principles.
Here is an example of a pipeline declaration:
name: stock_prices
nodes:
- name: brz_stock_prices
source:
path: "./events/stock_prices"
sinks:
- schema_name: finance
table_name: brz_stock_prices
- name: slv_stock_prices
source:
node_name: brz_stock_prices
sinks:
- schema_name: finance
table_name: slv_stock_prices
- schema_name: finance
table_name: slv_stock_prices_quarantine
is_quarantine: True
expectations:
- name: positive price
expr: close > 0
action: QUARANTINE
transformer:
nodes:
- sql_expr: |
SELECT
data.created_at AS created_at,
data.symbol AS symbol,
data.open AS open,
data.close AS close,
data.high AS high,
data.low AS low,
data.volume AS volume
FROM
{df}
- func_name: drop_duplicates
func_kwargs:
subset:
- symbol
- timestamp
...
Execution¤
Local¤
You can execute the pipeline in a local or remote Spark session using the pipeline.execute(spark)
command. If Polars
is the DataFrame engine, the pipeline can run in a simple Python environment without external dependencies. In all
cases, each node processes sequentially: reading data from the source, applying transformations, and writing to the
sink.
from laktory import models
with open("pipeline_node.yaml") as fp:
node = models.PipelineNode.model_validate(fp)
node.execute(spark)
node.output_df.laktory.display()
laktory.models.pipelinenode - INFO - Executing pipeline node slv_stock_prices (None)
laktory.models.datasources.filedatasource - INFO - Reading /Volumes/dev/sources/landing/tables/brz_stock_prices/ as static
laktory.models.transformers.basechain - INFO - Executing SPARK chain
laktory.models.transformers.basechain - INFO - Executing SPARK chain node 0 (SparkChainNode).
laktory.models.transformers.sparkchainnode - INFO - DataFrame df as
SELECT
data.created_at AS created_at,
data.symbol AS symbol,
data.open AS open,
data.close AS close,
data.high AS high,
data.low AS low,
data.volume AS volume
FROM
{df}
laktory.models.transformers.basechain - INFO - Executing SPARK chain node 1 (SparkChainNode).
laktory.models.transformers.sparkchainnode - INFO - DataFrame df as drop_duplicates(subset=['symbol', 'created_at'])
laktory.models.datasinks.filedatasink - INFO - Writing df as static DELTA to finance.slv_stock_prices with mode OVERWRITE and options {'mergeSchema': 'false', 'overwriteSchema': 'true'}
+-------------------------+------+------------------+------------------+------------------+------------------+---------+
|created_at |symbol|open |close |high |low |volume |
+-------------------------+------+------------------+------------------+------------------+------------------+---------+
|2023-07-06T11:30:00-04:00|MSFT |338.7200012207031 |341.6199951171875 |341.6600036621094 |338.4200134277344 |2850613.0|
|2023-02-15T13:30:00-05:00|AAPL |154.3800048828125 |155.2321014404297 |155.32550048828125|154.14999389648438|6005631.0|
|2023-02-15T10:30:00-05:00|MSFT |268.0098876953125 |267.9599914550781 |268.6300048828125 |266.5299987792969 |5300365.0|
|2023-10-18T13:30:00-04:00|MSFT |332.7200012207031 |331.54998779296875|332.7200012207031 |330.739990234375 |2036767.0|
|2023-10-19T12:30:00-04:00|AAPL |176.69000244140625|177.47999572753906|177.83999633789062|175.4600067138672 |7575857.0|
|2023-05-16T11:30:00-04:00|AMZN |113.59500122070312|114.4832992553711 |114.48999786376953|113.2750015258789 |8034165.0|
|2023-07-06T10:30:00-04:00|MSFT |340.5799865722656 |338.70001220703125|341.1199951171875 |338.0899963378906 |3748565.0|
|2023-03-30T10:30:00-04:00|GOOGL |100.59500122070312|100.4749984741211 |100.875 |100.24019622802734|3869214.0|
|2023-01-17T15:30:00-05:00|GOOGL |91.55500030517578 |91.30999755859375 |91.61000061035156 |91.23999786376953 |3977790.0|
|2023-03-22T12:30:00-04:00|AMZN |99.94010162353516 |100.193603515625 |100.21659851074219|99.83219909667969 |3250304.0|
+-------------------------+------+------------------+------------------+------------------+------------------+---------+
only showing top 10 rows
Orchestrators¤
While local execution is ideal for small datasets or prototyping, orchestrators unlock more advanced features such as parallel processing, automatic schema management, and historical re-processing. The desired orchestrator can be configured directly within the pipeline.
- name: stock_prices
nodes: ...
orchestrator: DATABRICKS_DLT
databricks_dlt:
catalog: dev
target: finance
clusters:
- name : default
node_type_id: Standard_DS3_v2
autoscale:
min_workers: 1
max_workers: 2
libraries:
- notebook:
path: /.laktory/dlt/dlt_laktory_pl.py
The choice of orchestrator determines which resources are deployed when
running the laktory deploy
CLI command.
Databricks Job¤
A Databricks Job is a powerful orchestration mechanism. In this case, Laktory will create a task for each node, enabling parallel execution of nodes. Each reading and writing operation is entirely handled by Laktory source and sink.
To use the DATABRICKS_JOB
orchestrator, you must also add the supporting
notebook
to your stack.
Here is a simplified version:
dbutils.widgets.text("pipeline_name", "pl-stock-prices")
dbutils.widgets.text("node_name", "")
from laktory import models # noqa: E402
from laktory import settings # noqa: E402
# Read Pipeline
pl_name = dbutils.widgets.get("pipeline_name")
node_name = dbutils.widgets.get("node_name")
filepath = f"/Workspace{settings.workspace_laktory_root}pipelines/{pl_name}.json"
with open(filepath, "r") as fp:
pl = models.Pipeline.model_validate_json(fp.read())
# Execution
if node_name:
pl.nodes_dict[node_name].execute(spark=spark)
else:
pl.execute(spark=spark)
The notebook reads the pipeline configuration using the pipeline name provided by the job and execute a node, also provided by the job.
Selecting the DATABRICKS_JOB
orchestrator will deploy a pipeline json
configuration file and a requirements.txt
file for installing dependencies.
Both can be found in your workspace under /Workspace/{laktory_root}/pipelines/{pipeline_name}/
.
Delta Live Tables (DLT)¤
Databricks Delta Live Tables offers features like automatic schema change management, continuous execution, and autoscaling.
Each pipeline node runs inside a dlt.table() or dlt.view() function. In the context of DLT, node execution does not
trigger a sink write, as this operation is managed by DLT. When a source is a pipeline node, dlt.read()
and
dlt.read_stream()
functions are called to ensure compatibility with the DLT framework.
To use the DATABRICKS_DLT
orchestrator, you must also add the supporting
notebook
to your stack.
Here is a simplified version:
from laktory import dlt
from laktory import models
with open("pipeline.yaml") as fp:
pl = models.Pipeline.model_validate_yaml(fp.read())
def define_table(node, sink):
@dlt.table_or_view(
name=sink.name,
comment=node.description,
as_view=sink is None,
)
def get_df():
# Execute node
node.execute(spark=spark)
if sink.is_quarantine:
df = node.quarantine_df
else:
df = node.output_df
df.printSchema()
# Return
return df
return get_df
# Build nodes
for node in pl.nodes:
for sink in node.sinks:
wrapper = define_table(node, sink)
df = dlt.get_df(wrapper)
display(df)
Notice how dlt
module is imported from laktory as it provides additional
debugging and inspection capabilities. Notably, you can run the notebook in a
user cluster and will be able to inspect the resulting dataframe.
Selecting the DATABRICKS_DLT
orchestrator will deploy a pipeline json
configuration file and a requirements.txt
file for installing dependencies.
Both can be found in your workspace under /Workspace/{laktory_root}/pipelines/{pipeline_name}/
.
Apache Airflow¤
Support for Apache Airflow as an orchestrator is on the roadmap.
Dagster¤
Support for Dagster as an orchestrator is on the roadmap.
Streaming Operations¤
Laktory supports event-based and kappa architectures using Apache Spark Structured Streaming for continuous, scalable data processing.
By setting as_stream: True
in a pipeline node's data source, the DataFrame becomes streaming-enabled, processing only
new rows of data at each run instead of re-processing the entire dataset.
Streaming does not mean the pipeline is continuously running. Execution can still be scheduled, but each run is
incremental. Currently, the only way to deploy a continuously running pipeline is by selecting the Delta Live Tables
orchestrator with continuous: True
.
For more information about streaming data, consider reading this blog post.