Skip to content

FileDataSource

laktory.models.datasources.FileDataSource ¤

Bases: BaseDataSource

Data source using disk files, such as data events (json/csv) and dataframe parquets. It is generally used in the context of a data pipeline.

ATTRIBUTE DESCRIPTION
format

Format of the data files

TYPE: Literal['AVRO', 'BINARYFILE', 'CSV', 'DELTA', 'EXCEL', 'JSON', 'JSONL', 'NDJSON', 'ORC', 'PARQUET', 'TEXT', 'XML']

read_options

Other options passed to spark.read.options

TYPE: dict[str, Any]

schema

Target schema specified as a list of columns, as a dict or a json serialization. Only used when reading data from non-strongly typed files such as JSON or csv files.

schema_location

Path for schema inference when reading data as a stream. If None, parent directory of path is used.

TYPE: str

Examples:

from laktory import models

source = models.FileDataSource(
    path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    format="JSON",
    as_stream=False,
)
# df = source.read(spark)

# With Explicit Schema
source = models.FileDataSource(
    path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    format="JSON",
    as_stream=False,
    schema=[
        {"name": "description", "type": "string", "nullable": True},
        {"name": "close", "type": "double", "nullable": False},
    ],
)
# df = source.read(spark)
METHOD DESCRIPTION
path_to_string

Required to apply settings before instantiating resources and setting default values

Functions¤

path_to_string classmethod ¤

path_to_string(data)

Required to apply settings before instantiating resources and setting default values

Source code in laktory/models/datasources/filedatasource.py
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@model_validator(mode="before")
@classmethod
def path_to_string(cls, data: Any) -> Any:
    """Required to apply settings before instantiating resources and setting default values"""

    if not isinstance(data, dict):
        return data

    for k in ["path", "schema_location"]:
        path = data.get(k, None)
        if path and isinstance(path, Path):
            data[k] = str(path)

    return data