Skip to content

SparkChain

laktory.models.SparkChain ¤

Bases: BaseChain

The SparkChain class defines a series of Spark transformation to be applied to a dataframe. Each transformation is expressed as a node (SparkChainNode object) that, upon execution, returns a new dataframe. Each node is executed sequentially in the provided order. A node may also be another SparkChain.

ATTRIBUTE DESCRIPTION
dataframe_backend

Differentiator to select dataframe chain type

TYPE: Literal['SPARK']

nodes

The list of transformations to be executed.

TYPE: list[Union[SparkChainNode, SparkChain]]

Examples:

import pandas as pd
from laktory import models

df0 = spark.createDataFrame(pd.DataFrame({"x": [1, 2, 3]}))

# Build Chain
sc = models.SparkChain(
    nodes=[
        {
            "with_column": {
                "name": "cos_x",
                "type": "double",
                "expr": "F.cos('x')",
            },
        },
        {
            "nodes": [
                {
                    "func_name": "withColumnRenamed",
                    "func_args": [
                        "x",
                        "x_tmp",
                    ],
                },
                {
                    "with_column": {
                        "name": "x2",
                        "type": "double",
                        "expr": "F.sqrt('x_tmp')",
                    },
                },
            ],
        },
        {
            "func_name": "drop",
            "func_args": [
                "x_tmp",
            ],
        },
    ]
)

# Execute Chain
df = sc.execute(df0)

# Print result
print(df.toPandas().to_string())
'''
      cos_x        x2
0  0.540302  1.000000
1 -0.416147  1.414214
2 -0.989992  1.732051
'''

--

laktory.models.SparkChainNode ¤

Bases: BaseChainNode

PolarsChain node that output a dataframe upon execution. As a convenience, with_column argument can be specified to create a new column from a spark or sql expression. Each node is executed sequentially in the provided order. A node may also be another Spark Chain.

ATTRIBUTE DESCRIPTION
func_args

List of arguments to be passed to the spark function. If the function expects a spark column, its string representation can be provided with support for col, lit, expr and F..

TYPE: list[Union[Any]]

func_kwargs

List of keyword arguments to be passed to the spark function. If the function expects a spark column, its string representation can be provided with support for col, lit, expr and F..

TYPE: dict[str, Union[Any]]

func_name

Name of the spark function to build the dataframe. Mutually exclusive to sql_expr and with_column.

TYPE: Union[str, None]

sql_expr

SQL Expression using {df} to reference upstream dataframe and defining how to build the output dataframe. Mutually exclusive to func_name and with_column. Other pipeline nodes can also be referenced using {nodes.node_name}.

TYPE: Union[str, None]

with_column

Syntactic sugar for adding a column. Mutually exclusive to func_name and sql_expr.

TYPE: Union[ChainNodeColumn, None]

with_columns

Syntactic sugar for adding columns. Mutually exclusive to func_name and sql_expr.

TYPE: Union[list[ChainNodeColumn], None]

Examples:

import pandas as pd
from laktory import models

df0 = spark.createDataFrame(pd.DataFrame({"x": [1, 2, 2, 3]}))

node = models.SparkChainNode(
    with_column={
        "name": "cosx",
        "type": "double",
        "expr": "F.cos('x')",
    },
)
df = node.execute(df0)

node = models.SparkChainNode(
    with_column={
        "name": "xy",
        "type": "double",
        "expr": "F.coalesce('x')",
    },
)
df = node.execute(df)

print(df.toPandas().to_string())
'''
   x      cosx   xy
0  1  0.540302  1.0
1  2 -0.416147  2.0
2  2 -0.416147  2.0
3  3 -0.989992  3.0
'''

node = models.SparkChainNode(
    func_name="drop_duplicates",
    func_args=[["x"]],
)
df = node.execute(df)

print(df.toPandas().to_string())
'''
   x      cosx   xy
0  1  0.540302  1.0
1  2 -0.416147  2.0
2  3 -0.989992  3.0
'''
METHOD DESCRIPTION
execute

Execute spark chain node

Functions¤

execute ¤

execute(df, udfs=None)

Execute spark chain node

PARAMETER DESCRIPTION
df

Input dataframe

TYPE: SparkDataFrame

udfs

User-defined functions

TYPE: list[Callable[[...], Union[SparkColumn, SparkDataFrame]]] DEFAULT: None

RETURNS DESCRIPTION
Output dataframe
Source code in laktory/models/transformers/sparkchainnode.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def execute(
    self,
    df: SparkDataFrame,
    udfs: list[Callable[[...], Union[SparkColumn, SparkDataFrame]]] = None,
) -> Union[SparkDataFrame]:
    """
    Execute spark chain node

    Parameters
    ----------
    df:
        Input dataframe
    udfs:
        User-defined functions

    Returns
    -------
        Output dataframe
    """
    from laktory.spark import DATATYPES_MAP

    if udfs is None:
        udfs = []
    udfs = {f.__name__: f for f in udfs}

    # Build Columns
    if self._with_columns:
        for column in self._with_columns:
            logger.info(
                f"Building column {column.name} as {column.expr or column.sql_expr}"
            )
            _col = column.eval(udfs=udfs, dataframe_backend="SPARK")
            if column.type:
                _col = _col.cast(DATATYPES_MAP[column.type])
            df = df.withColumns({column.name: _col})
        return df

    # From SQL expression
    if self.sql_expr:
        logger.info(f"DataFrame {self.id} as \n{self.sql_expr.strip()}")
        return self.parsed_sql_expr.eval(df)

    # Get Function
    func_name = self.func_name
    if self.func_name is None:
        raise ValueError(
            "`func_name` must be specified if `sql_expr` is not specified"
        )

    # Get from UDFs
    f = udfs.get(func_name, None)

    # Get from built-in spark and spark extension (including Laktory) functions
    input_df = True
    if f is None:
        # Get function from namespace extension
        if "." in func_name:
            input_df = False
            vals = func_name.split(".")
            f = getattr(getattr(df, vals[0]), vals[1], None)
        else:
            f = getattr(type(df), func_name, None)

    if f is None:
        raise ValueError(f"Function {func_name} is not available")

    _args = self.parsed_func_args
    _kwargs = self.parsed_func_kwargs

    # Build log
    func_log = f"{func_name}("
    func_log += ",".join([a.signature() for a in _args])
    func_log += ",".join([f"{k}={a.signature()}" for k, a in _kwargs.items()])
    func_log += ")"
    logger.info(f"DataFrame {self.id} as {func_log}")

    # Build args
    args = []
    for i, _arg in enumerate(_args):
        args += [_arg.eval(spark=df.sparkSession)]

    # Build kwargs
    kwargs = {}
    for k, _arg in _kwargs.items():
        kwargs[k] = _arg.eval(spark=df.sparkSession)

    # Call function
    if input_df:
        df = f(df, *args, **kwargs)
    else:
        df = f(*args, **kwargs)

    return df

--

laktory.models.SparkChainNodeSQLExpr ¤

Bases: BaseChainNodeSQLExpr

Chain node SQL expression

ATTRIBUTE DESCRIPTION
expr

SQL expression

TYPE: str