Skip to content

SparkChain

laktory.models.SparkChain ¤

Bases: BaseChain

The SparkChain class defines a series of Spark transformation to be applied to a dataframe. Each transformation is expressed as a node (SparkChainNode object) that, upon execution, returns a new dataframe. Each node is executed sequentially in the provided order. A node may also be another SparkChain.

ATTRIBUTE DESCRIPTION
dataframe_type

Differentiator to select dataframe chain type

TYPE: Literal['SPARK']

nodes

The list of transformations to be executed.

TYPE: list[Union[SparkChainNode, SparkChain]]

Examples:

import pandas as pd
from laktory import models

df0 = spark.createDataFrame(pd.DataFrame({"x": [1, 2, 3]}))

# Build Chain
sc = models.SparkChain(
    nodes=[
        {
            "with_column": {
                "name": "cos_x",
                "type": "double",
                "expr": "F.cos('x')",
            },
        },
        {
            "nodes": [
                {
                    "func_name": "withColumnRenamed",
                    "func_args": [
                        "x",
                        "x_tmp",
                    ],
                },
                {
                    "with_column": {
                        "name": "x2",
                        "type": "double",
                        "expr": "F.sqrt('x_tmp')",
                    },
                },
            ],
        },
        {
            "func_name": "drop",
            "func_args": [
                "x_tmp",
            ],
        },
    ]
)

# Execute Chain
df = sc.execute(df0)

# Print result
print(df.toPandas().to_string())
'''
      cos_x        x2
0  0.540302  1.000000
1 -0.416147  1.414214
2 -0.989992  1.732051
'''

--

laktory.models.SparkChainNode ¤

Bases: BaseChainNode

PolarsChain node that output a dataframe upon execution. As a convenience, with_column argument can be specified to create a new column from a spark or sql expression. Each node is executed sequentially in the provided order. A node may also be another Spark Chain.

ATTRIBUTE DESCRIPTION
func_args

List of arguments to be passed to the spark function. If the function expects a spark column, its string representation can be provided with support for col, lit, expr and F..

TYPE: list[Union[Any]]

func_kwargs

List of keyword arguments to be passed to the spark function. If the function expects a spark column, its string representation can be provided with support for col, lit, expr and F..

TYPE: dict[str, Union[Any]]

func_name

Name of the spark function to build the dataframe. Mutually exclusive to sql_expr and with_column.

TYPE: Union[str, None]

sql_expr

SQL Expression using {df} to reference upstream dataframe and defining how to build the output dataframe. Mutually exclusive to func_name and with_column. Other pipeline nodes can also be referenced using {nodes.node_name}.

TYPE: Union[str, None]

with_column

Syntactic sugar for adding a column. Mutually exclusive to func_name and sql_expr.

TYPE: Union[SparkChainNodeColumn, None]

with_columns

Syntactic sugar for adding columns. Mutually exclusive to func_name and sql_expr.

TYPE: Union[list[SparkChainNodeColumn], None]

Examples:

import pandas as pd
from laktory import models

df0 = spark.createDataFrame(pd.DataFrame({"x": [1, 2, 2, 3]}))

node = models.SparkChainNode(
    with_column={
        "name": "cosx",
        "type": "double",
        "expr": "F.cos('x')",
    },
)
df = node.execute(df0)

node = models.SparkChainNode(
    with_column={
        "name": "xy",
        "type": "double",
        "expr": "F.coalesce('x')",
    },
)
df = node.execute(df)

print(df.toPandas().to_string())
'''
   x      cosx   xy
0  1  0.540302  1.0
1  2 -0.416147  2.0
2  2 -0.416147  2.0
3  3 -0.989992  3.0
'''

node = models.SparkChainNode(
    func_name="drop_duplicates",
    func_args=[["x"]],
)
df = node.execute(df)

print(df.toPandas().to_string())
'''
   x      cosx   xy
0  1  0.540302  1.0
1  2 -0.416147  2.0
2  3 -0.989992  3.0
'''

Functions¤

execute ¤

execute(df, udfs=None)

Execute spark chain node

PARAMETER DESCRIPTION
df

Input dataframe

TYPE: SparkDataFrame

udfs

User-defined functions

TYPE: list[Callable[[...], Union[SparkColumn, SparkDataFrame]]] DEFAULT: None

RETURNS DESCRIPTION
Output dataframe
Source code in laktory/models/transformers/sparkchainnode.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def execute(
    self,
    df: SparkDataFrame,
    udfs: list[Callable[[...], Union[SparkColumn, SparkDataFrame]]] = None,
) -> Union[SparkDataFrame]:
    """
    Execute spark chain node

    Parameters
    ----------
    df:
        Input dataframe
    udfs:
        User-defined functions

    Returns
    -------
        Output dataframe
    """
    import pyspark.sql.functions as F
    from pyspark.sql.dataframe import DataFrame
    from pyspark.sql.connect.dataframe import DataFrame as DataFrameConnect
    from pyspark.sql import Column
    from laktory.spark import DATATYPES_MAP

    if udfs is None:
        udfs = []
    udfs = {f.__name__: f for f in udfs}

    # Build Columns
    if self._with_columns:
        for column in self._with_columns:
            logger.info(
                f"Building column {column.name} as {column.expr or column.sql_expr}"
            )
            _col = column.eval(udfs=udfs)
            if column.type:
                _col = _col.cast(DATATYPES_MAP[column.type])
            df = df.withColumns({column.name: _col})
        return df

    # From SQL expression
    if self.sql_expr:
        logger.info(f"DataFrame {self.id} as \n{self.sql_expr.strip()}")
        return self.parsed_sql_expr.eval(df, chain_node=self)

    # Get Function
    func_name = self.func_name
    if self.func_name is None:
        raise ValueError(
            "`func_name` must be specified if `sql_expr` is not specified"
        )

    # Get from UDFs
    f = udfs.get(func_name, None)

    # Get from built-in spark and spark extension (including Laktory) functions
    input_df = True
    if f is None:
        # Get function from namespace extension
        if "." in func_name:
            input_df = False
            vals = func_name.split(".")
            f = getattr(getattr(df, vals[0]), vals[1], None)
        else:
            f = getattr(type(df), func_name, None)

    if f is None:
        raise ValueError(f"Function {func_name} is not available")

    _args = self.parsed_func_args
    _kwargs = self.parsed_func_kwargs

    # Build log
    func_log = f"{func_name}("
    func_log += ",".join([a.signature() for a in _args])
    func_log += ",".join([f"{k}={a.signature()}" for k, a in _kwargs.items()])
    func_log += ")"
    logger.info(f"DataFrame {self.id} as {func_log}")

    # Build args
    args = []
    for i, _arg in enumerate(_args):
        args += [_arg.eval(spark=df.sparkSession)]

    # Build kwargs
    kwargs = {}
    for k, _arg in _kwargs.items():
        kwargs[k] = _arg.eval(spark=df.sparkSession)

    # Call function
    if input_df:
        df = f(df, *args, **kwargs)
    else:
        df = f(*args, **kwargs)

    return df

--

laktory.models.SparkChainNodeColumn ¤

Bases: BaseChainNodeColumn

Chain node column definition

ATTRIBUTE DESCRIPTION
name

Column name

TYPE: str

type

Column data type

TYPE: Union[str, None]

unit

Column units

TYPE: Union[str, None]

expr

String representation of a polars expression

TYPE: Union[str, None]

sql_expr

SQL expression

TYPE: Union[str, None]

--

laktory.models.SparkChainNodeSQLExpr ¤

Bases: BaseChainNodeSQLExpr

Chain node SQL expression

ATTRIBUTE DESCRIPTION
expr

SQL expression