SparkChain
laktory.models.SparkChain
¤
Bases: BaseChain
The SparkChain
class defines a series of Spark transformation to be
applied to a dataframe. Each transformation is expressed as a node
(SparkChainNode
object) that, upon execution, returns a new dataframe.
Each node is executed sequentially in the provided order. A node may also
be another SparkChain
.
ATTRIBUTE | DESCRIPTION |
---|---|
dataframe_backend |
Differentiator to select dataframe chain type
TYPE:
|
nodes |
The list of transformations to be executed.
TYPE:
|
Examples:
import pandas as pd
from laktory import models
df0 = spark.createDataFrame(pd.DataFrame({"x": [1, 2, 3]}))
# Build Chain
sc = models.SparkChain(
nodes=[
{
"with_column": {
"name": "cos_x",
"type": "double",
"expr": "F.cos('x')",
},
},
{
"nodes": [
{
"func_name": "withColumnRenamed",
"func_args": [
"x",
"x_tmp",
],
},
{
"with_column": {
"name": "x2",
"type": "double",
"expr": "F.sqrt('x_tmp')",
},
},
],
},
{
"func_name": "drop",
"func_args": [
"x_tmp",
],
},
]
)
# Execute Chain
df = sc.execute(df0)
# Print result
print(df.toPandas().to_string())
'''
cos_x x2
0 0.540302 1.000000
1 -0.416147 1.414214
2 -0.989992 1.732051
'''
--
laktory.models.SparkChainNode
¤
Bases: BaseChainNode
PolarsChain node that output a dataframe upon execution. As a convenience,
with_column
argument can be specified to create a new column from a
spark or sql expression. Each node is executed sequentially in the
provided order. A node may also be another Spark Chain.
ATTRIBUTE | DESCRIPTION |
---|---|
func_args |
List of arguments to be passed to the spark function. If the function
expects a spark column, its string representation can be provided
with support for |
func_kwargs |
List of keyword arguments to be passed to the spark function. If the
function expects a spark column, its string representation can be
provided with support for |
func_name |
Name of the spark function to build the dataframe. Mutually
exclusive to |
sql_expr |
SQL Expression using |
with_column |
Syntactic sugar for adding a column. Mutually exclusive to
TYPE:
|
with_columns |
Syntactic sugar for adding columns. Mutually exclusive to |
Examples:
import pandas as pd
from laktory import models
df0 = spark.createDataFrame(pd.DataFrame({"x": [1, 2, 2, 3]}))
node = models.SparkChainNode(
with_column={
"name": "cosx",
"type": "double",
"expr": "F.cos('x')",
},
)
df = node.execute(df0)
node = models.SparkChainNode(
with_column={
"name": "xy",
"type": "double",
"expr": "F.coalesce('x')",
},
)
df = node.execute(df)
print(df.toPandas().to_string())
'''
x cosx xy
0 1 0.540302 1.0
1 2 -0.416147 2.0
2 2 -0.416147 2.0
3 3 -0.989992 3.0
'''
node = models.SparkChainNode(
func_name="drop_duplicates",
func_args=[["x"]],
)
df = node.execute(df)
print(df.toPandas().to_string())
'''
x cosx xy
0 1 0.540302 1.0
1 2 -0.416147 2.0
2 3 -0.989992 3.0
'''
METHOD | DESCRIPTION |
---|---|
execute |
Execute spark chain node |
Functions¤
execute
¤
execute(df, udfs=None)
Execute spark chain node
PARAMETER | DESCRIPTION |
---|---|
df
|
Input dataframe
TYPE:
|
udfs
|
User-defined functions
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Output dataframe
|
|
Source code in laktory/models/transformers/sparkchainnode.py
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 |
|
--