PipelineNode
laktory.models.PipelineNode
¤
Bases: BaseModel
Pipeline base component generating a DataFrame by reading a data source and applying a transformer (chain of dataframe transformations). Optional output to a data sink. Some basic transformations are also natively supported.
ATTRIBUTE | DESCRIPTION |
---|---|
add_layer_columns |
If
TYPE:
|
dlt_template |
Specify which template (notebook) to use if pipeline is run with
Databricks Delta Live Tables. If |
drop_duplicates |
If |
drop_source_columns |
If |
expectations |
List of expectations for the DataFrame. Can be used as warnings, drop invalid records or fail a pipeline.
TYPE:
|
layer |
Layer in the medallion architecture
TYPE:
|
name |
Name given to the node. Required to reference a node in a data source. |
primary_key |
Name of the column storing a unique identifier for each row. It is used by the node to drop duplicated rows.
TYPE:
|
source |
Definition of the data source
TYPE:
|
sink |
Definition of the data sink
TYPE:
|
transformer |
Spark or Polars chain defining the data transformations applied to the data source.
TYPE:
|
timestamp_key |
Name of the column storing a timestamp associated with each row. It is used as the default column by the builder when creating watermarks.
TYPE:
|
Examples:
A node reading stock prices data from a CSV file and writing a DataFrame to disk as a parquet file.
import io
from laktory import models
node_yaml = '''
name: brz_stock_prices
layer: BRONZE
source:
format: CSV
path: ./raw/brz_stock_prices.csv
sink:
format: PARQUET
mode: OVERWRITE
path: ./dataframes/brz_stock_prices
'''
node = models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))
# node.execute(spark)
A node reading stock prices from an upstream node and writing a DataFrame to a data table.
import io
from laktory import models
node_yaml = '''
name: slv_stock_prices
layer: SILVER
source:
node_name: brz_stock_prices
sink:
catalog_name: hive_metastore
schema_name: default
table_name: slv_stock_prices
transformer:
nodes:
- with_column:
name: created_at
type: timestamp
sql_expr: data.created_at
- with_column:
name: symbol
sql_expr: data.symbol
- with_column:
name: close
type: double
sql_expr: data.close
'''
node = models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))
# node.execute(spark)
Attributes¤
is_orchestrator_dlt
property
¤
is_orchestrator_dlt
If True
, pipeline node is used in the context of a DLT pipeline
apply_changes_kwargs
property
¤
apply_changes_kwargs
Keyword arguments for dlt.apply_changes function
Functions¤
get_sources
¤
get_sources(cls=BaseDataSource)
Get all sources feeding the pipeline node
Source code in laktory/models/pipelinenode.py
458 459 460 461 462 463 464 465 466 467 468 |
|
execute
¤
execute(apply_transformer=True, spark=None, udfs=None, write_sink=True, full_refresh=False)
Execute pipeline node by:
- Reading the source
- Applying the user defined (and layer-specific if applicable) transformations
- Writing the sink
PARAMETER | DESCRIPTION |
---|---|
apply_transformer |
Flag to apply transformer in the execution
TYPE:
|
spark |
Spark session
TYPE:
|
udfs |
User-defined functions |
write_sink |
Flag to include writing sink in the execution
TYPE:
|
full_refresh |
If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
AnyDataFrame
|
output Spark DataFrame |
Source code in laktory/models/pipelinenode.py
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 |
|