Skip to content

PolarsChain

laktory.models.PolarsChain ¤

Bases: BaseChain

The PolarsChain class defines a series of Polars transformation to be applied to a dataframe. Each transformation is expressed as a node (PolarsChainNode object) that, upon execution, returns a new dataframe. Each node is executed sequentially in the provided order. A node may also be another PolarsChain.

ATTRIBUTE DESCRIPTION
dataframe_backend

Differentiator to select dataframe chain type

TYPE: Literal['POLARS']

nodes

The list of transformations to be executed.

TYPE: list[Union[PolarsChainNode, PolarsChain]]

Examples:

import polars as pl
from laktory import models

df0 = pl.DataFrame({"x": [1, 2, 3]})

# Build Chain
sc = models.PolarsChain(
    nodes=[
        {
            "with_column": {
                "name": "cos_x",
                "type": "double",
                "expr": "pl.col('x').cos()",
            },
        },
        {
            "nodes": [
                {
                    "func_name": "rename",
                    "func_args": [
                        {"x": "x_tmp"},
                    ],
                },
                {
                    "with_column": {
                        "name": "x2",
                        "type": "double",
                        "expr": "pl.col('x_tmp').sqrt()",
                    },
                },
            ],
        },
        {
            "func_name": "drop",
            "func_args": [
                "x_tmp",
            ],
        },
    ],
)

# Execute Chain
df = sc.execute(df0)

# Print result
print(df.glimpse(return_as_string=True))
'''
Rows: 3
Columns: 2
$ cos_x <f64> 0.5403023058681398, -0.4161468365471424, -0.9899924966004454
$ x2    <f64> 1.0, 1.4142135623730951, 1.7320508075688772
'''

--

laktory.models.PolarsChainNode ¤

Bases: BaseChainNode

PolarsChain node that output a dataframe upon execution. As a convenience, with_column argument can be specified to create a new column from a polars or sql expression.

ATTRIBUTE DESCRIPTION
func_args

List of arguments to be passed to the polars function. If the function expects a polars expression, its string representation can be provided with support for col, lit, sql_expr and pl..

TYPE: list[Union[Any]]

func_kwargs

List of keyword arguments to be passed to the polars function.If the function expects a polars expression, its string representation can be provided with support for col, lit, sql_expr and pl..

TYPE: dict[str, Union[Any]]

func_name

Name of the polars function to build the dataframe. Mutually exclusive to sql_expr and with_column.

TYPE: Union[str, None]

sql_expr

SQL Expression using {df} to reference upstream dataframe and defining how to build the output dataframe. Mutually exclusive to func_name and with_column. Other pipeline nodes can also be referenced using {nodes.node_name}.

TYPE: Union[str, None]

with_column

Syntactic sugar for adding a column. Mutually exclusive to func_name and sql_expr.

TYPE: Union[ChainNodeColumn, None]

with_columns

Syntactic sugar for adding columns. Mutually exclusive to func_name and sql_expr.

TYPE: Union[list[ChainNodeColumn], None]

Examples:

from laktory import models
import polars as pl

df0 = pl.DataFrame({"x": [1, 2, 2, 3]})
print(df0.glimpse(return_as_string=True))
'''
Rows: 4
Columns: 1
$ x <i64> 1, 2, 2, 3
'''

node = models.PolarsChainNode(
    with_column={"name": "cosx", "type": "double", "expr": "pl.col('x').cos()"},
)
df = node.execute(df0)

node = models.PolarsChainNode(
    with_column={
        "name": "xy",
        "type": "double",
        "expr": "pl.coalesce('x')",
    },
)
df = node.execute(df)
print(df.glimpse(return_as_string=True))
'''
Rows: 4
Columns: 3
$ x    <i64> 1, 2, 2, 3
$ cosx <f64> 0.5403023058681398, -0.4161468365471424, -0.4161468365471424, -0.9899924966004454
$ xy   <f64> 1.0, 2.0, 2.0, 3.0
'''

node = models.PolarsChainNode(
    func_name="unique",
    func_args=[["x"]],
    func_kwargs={"maintain_order": True},
)
df = node.execute(df)
print(df.glimpse(return_as_string=True))
'''
Rows: 3
Columns: 3
$ x    <i64> 1, 2, 3
$ cosx <f64> 0.5403023058681398, -0.4161468365471424, -0.9899924966004454
$ xy   <f64> 1.0, 2.0, 3.0
'''
METHOD DESCRIPTION
execute

Execute polars chain node

Functions¤

execute ¤

execute(df, udfs=None)

Execute polars chain node

PARAMETER DESCRIPTION
df

Input dataframe

TYPE: PolarsDataFrame

udfs

User-defined functions

TYPE: list[Callable[[...], Union[PolarsExpr, PolarsDataFrame]]] DEFAULT: None

RETURNS DESCRIPTION
Output dataframe
Source code in laktory/models/transformers/polarschainnode.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def execute(
    self,
    df: PolarsDataFrame,
    udfs: list[Callable[[...], Union[PolarsExpr, PolarsDataFrame]]] = None,
) -> Union[PolarsDataFrame]:
    """
    Execute polars chain node

    Parameters
    ----------
    df:
        Input dataframe
    udfs:
        User-defined functions

    Returns
    -------
        Output dataframe
    """
    from polars import DataFrame

    from laktory.polars.datatypes import DATATYPES_MAP

    if udfs is None:
        udfs = []
    udfs = {f.__name__: f for f in udfs}

    # Build Columns
    if self._with_columns:
        for column in self._with_columns:
            logger.info(
                f"Building column {column.name} as {column.expr or column.sql_expr}"
            )
            _col = column.eval(udfs=udfs, dataframe_backend="POLARS")
            if column.type:
                _col = _col.cast(DATATYPES_MAP[column.type])
            df = df.with_columns(**{column.name: _col})
        return df

    # From SQL expression
    if self.sql_expr:
        logger.info(f"DataFrame {self.id} as \n{self.sql_expr.strip()}")
        return self.parsed_sql_expr.eval(df)

    # Get Function
    func_name = self.func_name
    if self.func_name is None:
        raise ValueError(
            "`func_name` must be specified if `sql_expr` is not specified"
        )

    # Get from UDFs
    f = udfs.get(func_name, None)

    # Get from built-in polars and polars extension (including Laktory) functions
    input_df = True
    if f is None:
        # Get function from namespace extension
        if "." in func_name:
            input_df = False
            vals = func_name.split(".")
            f = getattr(getattr(df, vals[0]), vals[1], None)
        else:
            f = getattr(DataFrame, func_name, None)

    if f is None:
        raise ValueError(f"Function {func_name} is not available")

    _args = self.parsed_func_args
    _kwargs = self.parsed_func_kwargs

    # Build log
    func_log = f"{func_name}("
    func_log += ",".join([a.signature() for a in _args])
    func_log += ",".join([f"{k}={a.signature()}" for k, a in _kwargs.items()])
    func_log += ")"
    logger.info(f"DataFrame {self.id} as {func_log}")

    # Build args
    args = []
    for i, _arg in enumerate(_args):
        args += [_arg.eval()]

    # Build kwargs
    kwargs = {}
    for k, _arg in _kwargs.items():
        kwargs[k] = _arg.eval()

    # Call function
    if input_df:
        df = f(df, *args, **kwargs)
    else:
        df = f(*args, **kwargs)

    return df

--

laktory.models.PolarsChainNodeSQLExpr ¤

Bases: BaseChainNodeSQLExpr

Chain node SQL expression

ATTRIBUTE DESCRIPTION
expr

SQL expression

TYPE: str