Skip to content

groupby_and_agg

laktory.spark.dataframe.groupby_and_agg ¤

CLASS DESCRIPTION
TimeWindow

Specifications for Time Window Aggregation

FUNCTION DESCRIPTION
groupby_and_agg

Apply a groupby and create aggregation columns.

Classes¤

TimeWindow ¤

Bases: BaseModel

Specifications for Time Window Aggregation

ATTRIBUTE DESCRIPTION
time_column

Timestamp column used for grouping rows

TYPE: str

window_duration

Duration of the window e.g. ‘1 second’, ‘1 day 12 hours’, ‘2 minutes’

TYPE: str

slide_duration

Duration of the slide. If a slide is smaller than the window, windows are overlapping

TYPE: Union[str, None]

start_time

Offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals.

TYPE: Union[str, None]

References

Functions¤

groupby_and_agg ¤

groupby_and_agg(df, groupby_window=None, groupby_columns=None, agg_expressions=None)

Apply a groupby and create aggregation columns.

PARAMETER DESCRIPTION
df

DataFrame

groupby_window

Aggregation window definition

TYPE: TimeWindow DEFAULT: None

groupby_columns

List of column names to group by

TYPE: list[str] DEFAULT: None

agg_expressions

List of columns defining the aggregations

TYPE: list[ChainNodeColumn] DEFAULT: None

Examples:

import laktory  # noqa: F401
import pandas as pd

df0 = spark.createDataFrame(
    pd.DataFrame(
        {
            "symbol": ["AAPL", "GOOGL"],
            "price": [200.0, 205.0],
            "tstamp": ["2023-09-01", "2023-09-01"],
        }
    )
)

df = df0.laktory.groupby_and_agg(
    groupby_window={
        "time_column": "tstamp",
        "window_duration": "1 day",
    },
    agg_expressions=[
        {
            "name": "mean_price",
            "expr": "F.mean('price')",
        },
    ],
)

print(df.toPandas().to_string())
'''
                                       window  mean_price
0  (2023-09-01 00:00:00, 2023-09-02 00:00:00)       202.5
'''
References
Source code in laktory/spark/dataframe/groupby_and_agg.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def groupby_and_agg(
    df,
    groupby_window: TimeWindow = None,
    groupby_columns: list[str] = None,
    agg_expressions: list["ChainNodeColumn"] = None,
) -> DataFrame:
    """
    Apply a groupby and create aggregation columns.

    Parameters
    ----------
    df:
        DataFrame
    groupby_window:
        Aggregation window definition
    groupby_columns:
        List of column names to group by
    agg_expressions:
        List of columns defining the aggregations

    Examples
    --------
    ```py
    import laktory  # noqa: F401
    import pandas as pd

    df0 = spark.createDataFrame(
        pd.DataFrame(
            {
                "symbol": ["AAPL", "GOOGL"],
                "price": [200.0, 205.0],
                "tstamp": ["2023-09-01", "2023-09-01"],
            }
        )
    )

    df = df0.laktory.groupby_and_agg(
        groupby_window={
            "time_column": "tstamp",
            "window_duration": "1 day",
        },
        agg_expressions=[
            {
                "name": "mean_price",
                "expr": "F.mean('price')",
            },
        ],
    )

    print(df.toPandas().to_string())
    '''
                                           window  mean_price
    0  (2023-09-01 00:00:00, 2023-09-02 00:00:00)       202.5
    '''
    ```

    References
    ----------

    * [pyspark window](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.window.html)
    """
    import pyspark.sql.functions as F

    from laktory.models.transformers.basechainnode import ChainNodeColumn

    # Parse inputs
    if groupby_window and not isinstance(groupby_window, TimeWindow):
        groupby_window = TimeWindow(**groupby_window)
    if agg_expressions is None:
        raise ValueError("`agg_expressions` must be specified")
    if groupby_columns is None:
        groupby_columns = []

    logger.info(
        f"Executing groupby ({groupby_window} & {groupby_columns}) with {agg_expressions}"
    )

    # Groupby arguments
    groupby = []
    if groupby_window:
        groupby += [
            F.window(
                timeColumn=groupby_window.time_column,
                windowDuration=groupby_window.window_duration,
                slideDuration=groupby_window.slide_duration,
                startTime=groupby_window.start_time,
            )
        ]

    for c in groupby_columns:
        groupby += [c]

    # Agg arguments
    aggs = []
    for expr in agg_expressions:
        if not isinstance(expr, ChainNodeColumn):
            expr = ChainNodeColumn(**expr)

        expr.type = None
        aggs += [expr.eval(dataframe_backend="SPARK").alias(expr.name)]

    return df.groupby(groupby).agg(*aggs)