Skip to content

PolarsChain

laktory.models.PolarsChain ¤

Bases: BaseChain

The PolarsChain class defines a series of Polars transformation to be applied to a dataframe. Each transformation is expressed as a node (PolarsChainNode object) that, upon execution, returns a new dataframe. Each node is executed sequentially in the provided order. A node may also be another PolarsChain.

ATTRIBUTE DESCRIPTION
dataframe_type

Differentiator to select dataframe chain type

TYPE: Literal['POLARS']

nodes

The list of transformations to be executed.

TYPE: list[Union[PolarsChainNode, PolarsChain]]

Examples:

import polars as pl
from laktory import models

df0 = pl.DataFrame({"x": [1, 2, 3]})

# Build Chain
sc = models.PolarsChain(
    nodes=[
        {
            "with_column": {
                "name": "cos_x",
                "type": "double",
                "expr": "pl.col('x').cos()",
            },
        },
        {
            "nodes": [
                {
                    "func_name": "rename",
                    "func_args": [
                        {"x": "x_tmp"},
                    ],
                },
                {
                    "with_column": {
                        "name": "x2",
                        "type": "double",
                        "expr": "pl.col('x_tmp').sqrt()",
                    },
                },
            ],
        },
        {
            "func_name": "drop",
            "func_args": [
                "x_tmp",
            ],
        },
    ],
)

# Execute Chain
df = sc.execute(df0)

# Print result
print(df.glimpse(return_as_string=True))
'''
Rows: 3
Columns: 2
$ cos_x <f64> 0.5403023058681398, -0.4161468365471424, -0.9899924966004454
$ x2    <f64> 1.0, 1.4142135623730951, 1.7320508075688772
'''

--

laktory.models.PolarsChainNode ¤

Bases: BaseChainNode

PolarsChain node that output a dataframe upon execution. As a convenience, with_column argument can be specified to create a new column from a polars or sql expression.

ATTRIBUTE DESCRIPTION
func_args

List of arguments to be passed to the polars function. If the function expects a polars expression, its string representation can be provided with support for col, lit, sql_expr and pl..

TYPE: list[Union[Any]]

func_kwargs

List of keyword arguments to be passed to the polars function.If the function expects a polars expression, its string representation can be provided with support for col, lit, sql_expr and pl..

TYPE: dict[str, Union[Any]]

func_name

Name of the polars function to build the dataframe. Mutually exclusive to sql_expr and with_column.

TYPE: Union[str, None]

sql_expr

SQL Expression using {df} to reference upstream dataframe and defining how to build the output dataframe. Mutually exclusive to func_name and with_column. Other pipeline nodes can also be referenced using {nodes.node_name}.

TYPE: Union[str, None]

with_column

Syntactic sugar for adding a column. Mutually exclusive to func_name and sql_expr.

TYPE: Union[PolarsChainNodeColumn, None]

with_columns

Syntactic sugar for adding columns. Mutually exclusive to func_name and sql_expr.

TYPE: Union[list[PolarsChainNodeColumn], None]

Examples:

from laktory import models
import polars as pl

df0 = pl.DataFrame({"x": [1, 2, 2, 3]})
print(df0.glimpse(return_as_string=True))
'''
Rows: 4
Columns: 1
$ x <i64> 1, 2, 2, 3
'''

node = models.PolarsChainNode(
    with_column={"name": "cosx", "type": "double", "expr": "pl.col('x').cos()"},
)
df = node.execute(df0)

node = models.PolarsChainNode(
    with_column={
        "name": "xy",
        "type": "double",
        "expr": "pl.coalesce('x')",
    },
)
df = node.execute(df)
print(df.glimpse(return_as_string=True))
'''
Rows: 4
Columns: 3
$ x    <i64> 1, 2, 2, 3
$ cosx <f64> 0.5403023058681398, -0.4161468365471424, -0.4161468365471424, -0.9899924966004454
$ xy   <f64> 1.0, 2.0, 2.0, 3.0
'''

node = models.PolarsChainNode(
    func_name="unique",
    func_args=[["x"]],
    func_kwargs={"maintain_order": True},
)
df = node.execute(df)
print(df.glimpse(return_as_string=True))
'''
Rows: 3
Columns: 3
$ x    <i64> 1, 2, 3
$ cosx <f64> 0.5403023058681398, -0.4161468365471424, -0.9899924966004454
$ xy   <f64> 1.0, 2.0, 3.0
'''

Functions¤

execute ¤

execute(df, udfs=None)

Execute polars chain node

PARAMETER DESCRIPTION
df

Input dataframe

TYPE: PolarsDataFrame

udfs

User-defined functions

TYPE: list[Callable[[...], Union[PolarsExpr, PolarsDataFrame]]] DEFAULT: None

RETURNS DESCRIPTION
Output dataframe
Source code in laktory/models/transformers/polarschainnode.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
def execute(
    self,
    df: PolarsDataFrame,
    udfs: list[Callable[[...], Union[PolarsExpr, PolarsDataFrame]]] = None,
) -> Union[PolarsDataFrame]:
    """
    Execute polars chain node

    Parameters
    ----------
    df:
        Input dataframe
    udfs:
        User-defined functions

    Returns
    -------
        Output dataframe
    """
    import polars.functions as F
    from polars import Expr
    from polars import DataFrame
    from laktory.polars.datatypes import DATATYPES_MAP

    if udfs is None:
        udfs = []
    udfs = {f.__name__: f for f in udfs}

    # Build Columns
    if self._with_columns:
        for column in self._with_columns:
            logger.info(
                f"Building column {column.name} as {column.expr or column.sql_expr}"
            )
            _col = column.eval(udfs=udfs)
            if column.type:
                _col = _col.cast(DATATYPES_MAP[column.type])
            df = df.with_columns(**{column.name: _col})
        return df

    # From SQL expression
    if self.sql_expr:
        return self.parsed_sql_expr.eval(df)

    # Get Function
    func_name = self.func_name
    if self.func_name is None:
        raise ValueError(
            "`func_name` must be specified if `sql_expr` is not specified"
        )

    # Get from UDFs
    f = udfs.get(func_name, None)

    # Get from built-in polars and polars extension (including Laktory) functions
    input_df = True
    if f is None:
        # Get function from namespace extension
        if "." in func_name:
            input_df = False
            vals = func_name.split(".")
            f = getattr(getattr(df, vals[0]), vals[1], None)
        else:
            f = getattr(DataFrame, func_name, None)

    if f is None:
        raise ValueError(f"Function {func_name} is not available")

    _args = self.parsed_func_args
    _kwargs = self.parsed_func_kwargs

    # Build log
    func_log = f"{func_name}("
    func_log += ",".join([a.signature() for a in _args])
    func_log += ",".join([f"{k}={a.signature()}" for k, a in _kwargs.items()])
    func_log += ")"
    logger.info(f"DataFrame {self.id} as {func_log}")

    # Build args
    args = []
    for i, _arg in enumerate(_args):
        args += [_arg.eval()]

    # Build kwargs
    kwargs = {}
    for k, _arg in _kwargs.items():
        kwargs[k] = _arg.eval()

    # Call function
    if input_df:
        df = f(df, *args, **kwargs)
    else:
        df = f(*args, **kwargs)

    return df

--

laktory.models.PolarsChainNodeColumn ¤

Bases: BaseChainNodeColumn

Chain node column definition

ATTRIBUTE DESCRIPTION
name

Column name

TYPE: str

type

Column data type

TYPE: Union[str, None]

unit

Column units

TYPE: Union[str, None]

expr

String representation of a polars expression

TYPE: Union[str, None]

sql_expr

SQL expression

TYPE: Union[str, None]

--

laktory.models.PolarsChainNodeSQLExpr ¤

Bases: BaseChainNodeSQLExpr

Chain node SQL expression

ATTRIBUTE DESCRIPTION
expr

SQL expression