Skip to content

window_filter

laktory.polars.dataframe.window_filter ¤

Functions¤

window_filter ¤

window_filter(df, partition_by, order_by=None, drop_row_index=True, row_index_name='_row_index', rows_to_keep=1)

Apply spark window-based filtering

PARAMETER DESCRIPTION
df

DataFrame

partition_by

Defines the columns used for grouping into Windows

TYPE: Union[list[str], None]

order_by

Defines the column used for sorting before dropping rows

TYPE: Union[list[OrderBy], None] DEFAULT: None

drop_row_index

If True, the row index column is dropped

TYPE: bool DEFAULT: True

row_index_name

Group-specific and sorted rows index name

TYPE: str DEFAULT: '_row_index'

rows_to_keep

How many rows to keep per window

TYPE: int DEFAULT: 1

Examples:

import laktory  # noqa: F401
import polars as pl

df0 = pl.DataFrame(
    [
        ["2023-01-01T00:00:00Z", "APPL", 200.0],
        ["2023-01-02T00:00:00Z", "APPL", 202.0],
        ["2023-01-03T00:00:00Z", "APPL", 201.5],
        ["2023-01-01T00:00:00Z", "GOOL", 200.0],
        ["2023-01-02T00:00:00Z", "GOOL", 202.0],
        ["2023-01-03T00:00:00Z", "GOOL", 201.5],
    ],
    ["created_at", "symbol", "price"],
).with_columns(pl.col("created_at").cast(pl.Datetime))

df = df0.laktory.window_filter(
    partition_by=["symbol"],
    order_by=[
        {"sql_expression": "created_at", "desc": True},
    ],
    drop_row_index=False,
    rows_to_keep=1,
)

print(df.glimpse(return_as_string=True))
'''
Rows: 2
Columns: 4
$ created_at <datetime[μs]> 2023-01-03 00:00:00, 2023-01-03 00:00:00
$ symbol              <str> 'APPL', 'GOOL'
$ price               <f64> 201.5, 201.5
$ _row_index          <u32> 1, 1
'''
References
Source code in laktory/polars/dataframe/window_filter.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def window_filter(
    df,
    partition_by: Union[list[str], None],
    order_by: Union[list[OrderBy], None] = None,
    drop_row_index: bool = True,
    row_index_name: str = "_row_index",
    rows_to_keep: int = 1,
) -> pl.DataFrame:
    """
    Apply spark window-based filtering

    Parameters
    ----------
    df:
        DataFrame
    partition_by
        Defines the columns used for grouping into Windows
    order_by:
        Defines the column used for sorting before dropping rows
    drop_row_index:
        If `True`, the row index column is dropped
    row_index_name:
        Group-specific and sorted rows index name
    rows_to_keep:
        How many rows to keep per window


    Examples
    --------
    ```py
    import laktory  # noqa: F401
    import polars as pl

    df0 = pl.DataFrame(
        [
            ["2023-01-01T00:00:00Z", "APPL", 200.0],
            ["2023-01-02T00:00:00Z", "APPL", 202.0],
            ["2023-01-03T00:00:00Z", "APPL", 201.5],
            ["2023-01-01T00:00:00Z", "GOOL", 200.0],
            ["2023-01-02T00:00:00Z", "GOOL", 202.0],
            ["2023-01-03T00:00:00Z", "GOOL", 201.5],
        ],
        ["created_at", "symbol", "price"],
    ).with_columns(pl.col("created_at").cast(pl.Datetime))

    df = df0.laktory.window_filter(
        partition_by=["symbol"],
        order_by=[
            {"sql_expression": "created_at", "desc": True},
        ],
        drop_row_index=False,
        rows_to_keep=1,
    )

    print(df.glimpse(return_as_string=True))
    '''
    Rows: 2
    Columns: 4
    $ created_at <datetime[μs]> 2023-01-03 00:00:00, 2023-01-03 00:00:00
    $ symbol              <str> 'APPL', 'GOOL'
    $ price               <f64> 201.5, 201.5
    $ _row_index          <u32> 1, 1
    '''
    ```

    References
    ----------

    * [polars window](https://docs.pola.rs/user-guide/expressions/window/)
    """

    # Row Number
    e = pl.Expr.laktory.row_number()

    # Order by
    if order_by:
        bys = []
        descs = []
        for o in order_by:
            if not isinstance(o, OrderBy):
                o = OrderBy(**o)
            bys += [o.sql_expression]
            descs += [o.desc]
        e = e.sort_by(by=bys, descending=descs)

    # Partition By
    e = e.over(*partition_by)

    # Set rows index
    df = df.with_columns(**{row_index_name: e})

    # Filter
    df = df.filter(pl.col(row_index_name) <= rows_to_keep)
    if drop_row_index:
        df = df.drop(row_index_name)

    return df