Skip to content

smart_join

laktory.spark.dataframe.smart_join ¤

FUNCTION DESCRIPTION
smart_join

Join tables and coalesce join columns. Optionally coalesce columns found

Classes¤

Functions¤

smart_join ¤

smart_join(left, other, how='left', on=None, on_expression=None, left_on=None, left_watermark=None, other_on=None, other_watermark=None, time_constraint_interval_lower='60 seconds', time_constraint_interval_upper=None, coalesce=False)

Join tables and coalesce join columns. Optionally coalesce columns found in both left and other not used in join. Support for watermarking for streaming joins.

PARAMETER DESCRIPTION
left

Left side of the join

TYPE: DataFrame

other

Right side of the join

TYPE: DataFrame

how

Type of join (left, outer, full, etc.)

TYPE: str DEFAULT: 'left'

on

A list of strings for the columns to join on. The columns must exist on both sides.

TYPE: list[str] DEFAULT: None

on_expression

String expression the join on condition. The expression can include left and other dataframe references.

TYPE: str DEFAULT: None

left_on

Name(s) of the left join column(s).

TYPE: list[str] DEFAULT: None

left_watermark

Watermark for left dataframe

TYPE: Watermark DEFAULT: None

other_on

Name(s) of the right join column(s).

TYPE: list[str] DEFAULT: None

other_watermark

Watermark for other dataframe

TYPE: Watermark DEFAULT: None

time_constraint_interval_lower

Lower bound for a spark streaming event-time constraint

TYPE: str DEFAULT: '60 seconds'

time_constraint_interval_upper

Upper bound for a spark streaming event-time constraint

TYPE: str DEFAULT: None

coalesce

If True columns present in both left and other are coalesced.

TYPE: bool DEFAULT: False

Examples:

import pandas as pd
import laktory  # noqa: F401

df_prices = spark.createDataFrame(
    pd.DataFrame(
        {
            "symbol": ["AAPL", "GOOGL"],
            "price": [200.0, 205.0],
        }
    )
)

df_meta = spark.createDataFrame(
    pd.DataFrame(
        {
            "symbol": ["AAPL", "GOOGL"],
            "name": ["Apple", "Google"],
        }
    )
)

df = df_prices.laktory.smart_join(
    other=df_meta,
    on=["symbol"],
)

print(df.toPandas().to_string())
'''
   price    name symbol
0  200.0   Apple   AAPL
1  205.0  Google  GOOGL
'''
References
Source code in laktory/spark/dataframe/smart_join.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def smart_join(
    left: DataFrame,
    other: DataFrame,
    how: str = "left",
    on: list[str] = None,
    on_expression: str = None,
    left_on: list[str] = None,
    left_watermark: Watermark = None,
    other_on: list[str] = None,
    other_watermark: Watermark = None,
    time_constraint_interval_lower: str = "60 seconds",
    time_constraint_interval_upper: str = None,
    coalesce: bool = False,
) -> DataFrame:
    """
     Join tables and coalesce join columns. Optionally coalesce columns found
    in both left and other not used in join. Support for watermarking for
    streaming joins.

    Parameters
    ----------
    left:
        Left side of the join
    other:
        Right side of the join
    how:
        Type of join (left, outer, full, etc.)
    on:
        A list of strings for the columns to join on. The columns must exist
        on both sides.
    on_expression:
        String expression the join on condition. The expression can include
        `left` and `other` dataframe references.
    left_on:
        Name(s) of the left join column(s).
    left_watermark
        Watermark for left dataframe
    other_on:
        Name(s) of the right join column(s).
    other_watermark
        Watermark for other dataframe
    time_constraint_interval_lower:
        Lower bound for a spark streaming event-time constraint
    time_constraint_interval_upper:
        Upper bound for a spark streaming event-time constraint
    coalesce:
        If `True` columns present in both left and other are coalesced.

    Examples
    --------
    ```py
    import pandas as pd
    import laktory  # noqa: F401

    df_prices = spark.createDataFrame(
        pd.DataFrame(
            {
                "symbol": ["AAPL", "GOOGL"],
                "price": [200.0, 205.0],
            }
        )
    )

    df_meta = spark.createDataFrame(
        pd.DataFrame(
            {
                "symbol": ["AAPL", "GOOGL"],
                "name": ["Apple", "Google"],
            }
        )
    )

    df = df_prices.laktory.smart_join(
        other=df_meta,
        on=["symbol"],
    )

    print(df.toPandas().to_string())
    '''
       price    name symbol
    0  200.0   Apple   AAPL
    1  205.0  Google  GOOGL
    '''
    ```

    References
    ----------

    * [pyspark join](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html)
    * [spark streaming join](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking)
    """
    import pyspark.sql.functions as F

    logger.info(f"Executing {left} {how} JOIN {other}")

    # Validate inputs
    if left_on or other_on:
        if not other_on:
            raise ValueError("If `left_on` is set, `other_on` should also be set")
        if not left_on:
            raise ValueError("If `other_on` is set, `left_on` should also be set")
    if not (on or left_on or on_expression):
        raise ValueError(
            "Either `on` or (`left_on` and `other_on`) or `on_expression` should be set"
        )

    # Parse inputs
    if on is None:
        on = []
    elif isinstance(on, str):
        on = [on]
    if left_on is None:
        left_on = []
    elif isinstance(left_on, str):
        left_on = [left_on]
    if other_on is None:
        other_on = []
    elif isinstance(other_on, str):
        other_on = [other_on]

    wml = left_watermark
    wmo = other_watermark
    if wml is not None and not isinstance(wml, Watermark):
        wml = Watermark(**wml)
    if wmo is not None and not isinstance(wmo, Watermark):
        wmo = Watermark(**wmo)

    # Set watermarks
    if wml is None:
        try:
            wml = watermark(left)
        except Exception as e:
            logger.warn(f"Could not fetch wartermark from left dataframe: {e}")
    else:
        left = left.withWatermark(wml.column, wml.threshold)
    if wmo is None:
        try:
            wmo = watermark(other)
        except Exception as e:
            logger.warn(f"Could not fetch wartermark from other dataframe: {e}")
    else:
        other = other.withWatermark(wmo.column, wmo.threshold)

    # Add watermark
    other_cols = []
    if wmo is not None:
        other_cols += [F.col(wmo.column).alias("_other_wc")]
    other_cols += [F.col(c) for c in other.columns]
    other = other.select(other_cols)

    # Drop duplicates to prevent adding rows to left
    if on:
        other = other.dropDuplicates(on)

    _join = []
    for c in on:
        _join += [f"left.{c} == other.{c}"]
    for l, o in zip(left_on, other_on):
        _join += [f"left.{l} == other.{o}"]
    if on_expression:
        _join += [on_expression]

    if wmo is not None:
        if time_constraint_interval_lower:
            _join += [
                f"left.{wml.column} >= other._other_wc - interval {time_constraint_interval_lower}"
            ]
        if time_constraint_interval_upper:
            _join += [
                f"left.{wml.column} <= other._other_wc + interval {time_constraint_interval_upper}"
            ]
    _join = " AND ".join(_join)

    logger.info(f"   ON {_join}")
    logger.debug(f"Left Schema: {left.schema}")
    logger.debug(f"Other Schema: {other.schema}")

    df = left.alias("left").join(
        other=other.alias("other"),
        on=F.expr(_join),
        how=how,
    )

    # Find duplicated columns (because of join)
    d = defaultdict(lambda: 0)
    for c in df.columns:
        d[c] += 1

    # Drop duplicated columns
    for c, v in d.items():
        if v < 2:
            continue

        if not (coalesce or c in _join):
            continue
        df = df.withColumn("__tmp", F.coalesce(f"left.{c}", f"other.{c}"))
        df = df.drop(c)
        df = df.withColumn(c, F.col("__tmp"))
        df = df.drop("__tmp")

    # Drop watermark column
    if wmo is not None:
        df = df.drop(F.col("other._other_wc"))
    logger.debug(f"Joined Schema: {df.schema}")

    return df