Skip to content

DataQualityExpectation

laktory.models.DataQualityExpectation ¤

Bases: BaseModel

Data Quality Expectation for a given DataFrame expressed as a row-specific condition (type="ROW") or as an aggregated metric (type="AGGREGATE").

The expression may be defined as a SQL statement or a DataFrame expression.

ATTRIBUTE DESCRIPTION
action

Action to take when expectation is not met. WARN: Write invalid records to the output DataFrame, but log exception. DROP: Drop Invalid records to the output DataFrame and log exception. QUARANTINE: Forward invalid data for quarantine. FAIL: Raise exception when invalid records are found.

TYPE: Literal['WARN', 'DROP', 'QUARANTINE', 'FAIL']

type

Type of expectation: "ROW": Row-specific condition. Must be a boolean expression. "AGGREGATE": Global condition. Must be a boolean expression.

TYPE: Literal['AGGREGATE', 'ROW']

name

Name of the expectation

TYPE: str

expr

SQL or DataFrame expression representing a row-specific condition or an aggregated metric.

TYPE: Union[str, DataFrameColumnExpression]

tolerance

Tolerance for non-matching rows before resulting in failure. Only available for "ROW" type expectation.

TYPE: ExpectationTolerance

Examples:

from laktory import models

dqe = models.DataQualityExpectation(
    name="price higher than 10",
    action="WARN",
    expr="close > 127",
    tolerance={"rel": 0.05},
)
print(dqe)
'''
variables={} action='WARN' type='ROW' name='price higher than 10' expr=DataFrameColumnExpression(variables={}, value='close > 127', type='SQL') tolerance=ExpectationTolerance(variables={}, abs=None, rel=0.05)
'''

dqe = models.DataQualityExpectation(
    name="rows count",
    expr="COUNT(*) > 50",
    type="AGGREGATE",
)
print(dqe)
'''
variables={} action='WARN' type='AGGREGATE' name='rows count' expr=DataFrameColumnExpression(variables={}, value='COUNT(*) > 50', type='SQL') tolerance=ExpectationTolerance(variables={}, abs=0, rel=None)
'''
References
METHOD DESCRIPTION
run_check

Check if expectation is met save result.

raise_or_warn

Raise exception or issue warning if expectation is not met.

ATTRIBUTE DESCRIPTION
pass_filter

Expression representing all rows meeting the expectation.

TYPE: Union[AnyDataFrameColumn, None]

fail_filter

Expression representing all rows not meeting the expectation.

TYPE: Union[AnyDataFrameColumn, None]

keep_filter

Expression representing all rows to keep, considering both the

TYPE: Union[AnyDataFrameColumn, None]

quarantine_filter

Expression representing all rows to quarantine, considering both the

TYPE: Union[AnyDataFrameColumn, None]

Attributes¤

pass_filter property ¤

pass_filter

Expression representing all rows meeting the expectation.

fail_filter property ¤

fail_filter

Expression representing all rows not meeting the expectation.

keep_filter property ¤

keep_filter

Expression representing all rows to keep, considering both the expectation and the selected action.

quarantine_filter property ¤

quarantine_filter

Expression representing all rows to quarantine, considering both the expectation and the selected action.

Functions¤

run_check ¤

run_check(df, raise_or_warn=False, node=None)

Check if expectation is met save result.

PARAMETER DESCRIPTION
df

Input DataFrame for checking the expectation.

TYPE: AnyDataFrame

raise_or_warn

Raise exception or issue warning if expectation is not met.

TYPE: bool DEFAULT: False

node

Pipeline Node

DEFAULT: None

RETURNS DESCRIPTION
output

Check result.

TYPE: DataQualityCheck

Source code in laktory/models/dataquality/expectation.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def run_check(
    self,
    df: AnyDataFrame,
    raise_or_warn: bool = False,
    node=None,
) -> DataQualityCheck:
    """
    Check if expectation is met save result.

    Parameters
    ----------
    df:
        Input DataFrame for checking the expectation.
    raise_or_warn:
        Raise exception or issue warning if expectation is not met.
    node:
        Pipeline Node

    Returns
    -------
    output: DataQualityCheck
        Check result.
    """

    logger.info(
        f"Checking expectation '{self.name}' | {self.expr.value} (type: {self.type})"
    )

    # Assign DataFrame type
    dtype = str(type(df)).lower()
    if "spark" in dtype:
        self._dataframe_backend = "SPARK"
    elif "polars" in dtype:
        self._dataframe_backend = "POLARS"
    else:
        raise ValueError(f"DataFrame type '{dtype}' not supported")

    # Run Check
    self._check = self._check_df(df)

    if raise_or_warn:
        self.raise_or_warn(node)

    return self._check

raise_or_warn ¤

raise_or_warn(node=None)

Raise exception or issue warning if expectation is not met.

Source code in laktory/models/dataquality/expectation.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def raise_or_warn(self, node=None) -> None:
    """
    Raise exception or issue warning if expectation is not met.
    """

    # Failure Message
    msg = f"Expectation '{self.name}'"
    if node:
        msg += f" for node '{node.name}'"
    msg += f" FAILED | {self.log_msg}"

    if self.check.status != "FAIL":
        return

    # Raise Exception
    if self.action == "FAIL":
        raise DataQualityCheckFailedError(self, node)
    else:
        # actions: WARN, DROP, QUARANTINE
        warnings.warn(msg)

laktory.models.dataquality.expectation.ExpectationTolerance ¤

Bases: BaseModel

Tolerance values for data quality expectations with support for either absolute or relative tolerances.

ATTRIBUTE DESCRIPTION
abs

Maximum number of rows with failure for a PASS status

TYPE: int

rel

Relative number of rows with failure for a PASS status

TYPE: float