Skip to content

Refdata loaders

dve.core_engine.backends.base.reference_data.BaseRefDataLoader

Bases: Generic[EntityType], Mapping[EntityName, EntityType], ABC

A reference data mapper which lazy-loads requested entities.

Source code in src/dve/core_engine/backends/base/reference_data.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class BaseRefDataLoader(Generic[EntityType], Mapping[EntityName, EntityType], ABC):
    """A reference data mapper which lazy-loads requested entities."""

    __entity_type__: ClassVar[type[EntityType]]  # type: ignore
    """
    The entity type used for the reference data.

    This will be populated from the generic annotation at class creation time.

    """
    __step_functions__: ClassVar[dict[type[ReferenceConfig], Callable]] = {}
    """
    A mapping between refdata config types and functions to call to load these configs
    into reference data entities
    """

    __reader_functions__: ClassVar[dict[str, Callable]] = {}
    """
    A mapping between file extensions and functions to load the file uris
    into reference data entities
    """
    prefix: str = "refdata_"

    def __init_subclass__(cls, *_, **__) -> None:
        """When this class is subclassed, create and populate the `__step_functions__`
        class variable for the subclass.

        """
        # Set entity type from parent class subscript.
        if cls is not BaseRefDataLoader:
            cls.__entity_type__ = get_entity_type(cls, "BaseRefDataLoader")

        # ensure that dicts are specific to each subclass - redefine rather
        # than keep the same reference
        cls.__reader_functions__ = {}
        cls.__step_functions__ = {}

        for method_name in dir(cls):
            if method_name.startswith("_"):
                continue

            method = getattr(cls, method_name, None)
            if method is None or not callable(method):
                continue

            if ext := getattr(method, _FILE_EXTENSION_NAME, None):
                cls.__reader_functions__[ext] = method
                continue

            type_hints = get_type_hints(method)
            if set(type_hints.keys()) != {"config", "return"}:
                continue
            config_type = type_hints["config"]
            if not issubclass(config_type, BaseModel):
                continue

            cls.__step_functions__[config_type] = method  # type: ignore

    # pylint: disable=unused-argument
    def __init__(
        self,
        reference_entity_config: dict[EntityName, ReferenceConfig],
        dataset_config_uri: Optional[URI] = None,
        **kwargs,
    ) -> None:
        self.reference_entity_config = reference_entity_config
        self.dataset_config_uri = dataset_config_uri
        """
        Configuration options for the reference data. This is likely to vary
        from backend to backend (e.g. might be locations and file types for
        some backends, and table names for others).

        """
        self.entity_cache: dict[EntityName, EntityType] = {}
        """A cache for already-loaded entities."""

    @abstractmethod
    def load_table(self, config: ReferenceTable) -> EntityType:
        """Load reference entity from a database table"""
        raise NotImplementedError()

    def load_file(self, config: ReferenceFile) -> EntityType:
        "Load reference entity from a relative file path"
        if not self.dataset_config_uri:
            raise AttributeError("dataset_config_uri must be specified if using relative paths")
        target_location = fh.build_relative_uri(self.dataset_config_uri, config.filename)
        if isinstance(_get_implementation(self.dataset_config_uri), LocalFilesystemImplementation):
            target_location = fh.file_uri_to_local_path(target_location).as_posix()
        try:
            impl = self.__reader_functions__[config.file_extension]
            return impl(self, target_location)
        except KeyError as exc:
            raise RefdataLacksFileExtensionSupport(file_extension=config.file_extension) from exc

    def load_uri(self, config: ReferenceURI) -> EntityType:
        "Load reference entity from an absolute URI"
        if isinstance(_get_implementation(config.uri), LocalFilesystemImplementation):
            target_location = fh.file_uri_to_local_path(config.uri).as_posix()
        else:
            target_location = config.uri
        try:
            impl = self.__reader_functions__[config.file_extension]
            return impl(self, target_location)
        except KeyError as exc:
            raise RefdataLacksFileExtensionSupport(file_extension=config.file_extension) from exc

    def load_entity(self, entity_name: EntityName, config: ReferenceConfig) -> EntityType:
        """Load a reference entity given the reference config"""
        config_type = type(config)
        func = self.__step_functions__[config_type]
        entity = func(self, config)
        self.entity_cache[entity_name] = entity
        return entity

    def __getitem__(self, key: EntityName) -> EntityType:
        try:
            return self.entity_cache[key]
        except KeyError:
            try:
                config = self.reference_entity_config[key]
                return self.load_entity(entity_name=key, config=config)
            except Exception as err:
                raise MissingRefDataEntity(entity_name=key) from err

    def __iter__(self) -> Iterator[str]:
        return iter(self.reference_entity_config.keys())

    def __len__(self) -> int:
        return len(self.reference_entity_config)

__entity_type__ class-attribute

The entity type used for the reference data.

This will be populated from the generic annotation at class creation time.

__reader_functions__ = {} class-attribute

A mapping between file extensions and functions to load the file uris into reference data entities

__step_functions__ = {} class-attribute

A mapping between refdata config types and functions to call to load these configs into reference data entities

dataset_config_uri = dataset_config_uri instance-attribute

Configuration options for the reference data. This is likely to vary from backend to backend (e.g. might be locations and file types for some backends, and table names for others).

entity_cache = {} instance-attribute

A cache for already-loaded entities.

__init_subclass__(*_, **__)

When this class is subclassed, create and populate the __step_functions__ class variable for the subclass.

Source code in src/dve/core_engine/backends/base/reference_data.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def __init_subclass__(cls, *_, **__) -> None:
    """When this class is subclassed, create and populate the `__step_functions__`
    class variable for the subclass.

    """
    # Set entity type from parent class subscript.
    if cls is not BaseRefDataLoader:
        cls.__entity_type__ = get_entity_type(cls, "BaseRefDataLoader")

    # ensure that dicts are specific to each subclass - redefine rather
    # than keep the same reference
    cls.__reader_functions__ = {}
    cls.__step_functions__ = {}

    for method_name in dir(cls):
        if method_name.startswith("_"):
            continue

        method = getattr(cls, method_name, None)
        if method is None or not callable(method):
            continue

        if ext := getattr(method, _FILE_EXTENSION_NAME, None):
            cls.__reader_functions__[ext] = method
            continue

        type_hints = get_type_hints(method)
        if set(type_hints.keys()) != {"config", "return"}:
            continue
        config_type = type_hints["config"]
        if not issubclass(config_type, BaseModel):
            continue

        cls.__step_functions__[config_type] = method  # type: ignore

load_entity(entity_name, config)

Load a reference entity given the reference config

Source code in src/dve/core_engine/backends/base/reference_data.py
195
196
197
198
199
200
201
def load_entity(self, entity_name: EntityName, config: ReferenceConfig) -> EntityType:
    """Load a reference entity given the reference config"""
    config_type = type(config)
    func = self.__step_functions__[config_type]
    entity = func(self, config)
    self.entity_cache[entity_name] = entity
    return entity

load_file(config)

Load reference entity from a relative file path

Source code in src/dve/core_engine/backends/base/reference_data.py
170
171
172
173
174
175
176
177
178
179
180
181
def load_file(self, config: ReferenceFile) -> EntityType:
    "Load reference entity from a relative file path"
    if not self.dataset_config_uri:
        raise AttributeError("dataset_config_uri must be specified if using relative paths")
    target_location = fh.build_relative_uri(self.dataset_config_uri, config.filename)
    if isinstance(_get_implementation(self.dataset_config_uri), LocalFilesystemImplementation):
        target_location = fh.file_uri_to_local_path(target_location).as_posix()
    try:
        impl = self.__reader_functions__[config.file_extension]
        return impl(self, target_location)
    except KeyError as exc:
        raise RefdataLacksFileExtensionSupport(file_extension=config.file_extension) from exc

load_table(config) abstractmethod

Load reference entity from a database table

Source code in src/dve/core_engine/backends/base/reference_data.py
165
166
167
168
@abstractmethod
def load_table(self, config: ReferenceTable) -> EntityType:
    """Load reference entity from a database table"""
    raise NotImplementedError()

load_uri(config)

Load reference entity from an absolute URI

Source code in src/dve/core_engine/backends/base/reference_data.py
183
184
185
186
187
188
189
190
191
192
193
def load_uri(self, config: ReferenceURI) -> EntityType:
    "Load reference entity from an absolute URI"
    if isinstance(_get_implementation(config.uri), LocalFilesystemImplementation):
        target_location = fh.file_uri_to_local_path(config.uri).as_posix()
    else:
        target_location = config.uri
    try:
        impl = self.__reader_functions__[config.file_extension]
        return impl(self, target_location)
    except KeyError as exc:
        raise RefdataLacksFileExtensionSupport(file_extension=config.file_extension) from exc

dve.core_engine.backends.implementations.duckdb.reference_data.DuckDBRefDataLoader

Bases: BaseRefDataLoader[DuckDBPyRelation]

A reference data loader using already existing DuckDB tables.

Source code in src/dve/core_engine/backends/implementations/duckdb/reference_data.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class DuckDBRefDataLoader(BaseRefDataLoader[DuckDBPyRelation]):
    """A reference data loader using already existing DuckDB tables."""

    connection: DuckDBPyConnection
    """The DuckDB connection for the backend."""
    dataset_config_uri: Optional[URI] = None
    """The location of the dischema file"""

    def __init__(
        self,
        reference_entity_config: dict[EntityName, ReferenceConfigUnion],
        **kwargs,
    ) -> None:
        super().__init__(reference_entity_config, self.dataset_config_uri, **kwargs)

        if not self.connection:
            raise AttributeError("DuckDBConnection must be specified")

    def load_table(self, config: ReferenceTable) -> DuckDBPyRelation:
        """Load reference entity from a database table"""
        return self.connection.sql(f"select * from {config.fq_table_name}")

    @mark_refdata_file_extension("parquet")
    def load_parquet_file(self, uri: str) -> DuckDBPyRelation:
        """Load a parquet file into a duckdb relation"""
        return self.connection.read_parquet(uri)

    @mark_refdata_file_extension("arrow")
    def load_arrow_file(self, uri: str) -> DuckDBPyRelation:
        """Load an arrow ipc file into a duckdb relation"""
        return self.connection.from_arrow(ipc.open_stream(uri).read_all())  # type:ignore

connection instance-attribute

The DuckDB connection for the backend.

dataset_config_uri = None class-attribute instance-attribute

The location of the dischema file

load_arrow_file(uri)

Load an arrow ipc file into a duckdb relation

Source code in src/dve/core_engine/backends/implementations/duckdb/reference_data.py
46
47
48
49
@mark_refdata_file_extension("arrow")
def load_arrow_file(self, uri: str) -> DuckDBPyRelation:
    """Load an arrow ipc file into a duckdb relation"""
    return self.connection.from_arrow(ipc.open_stream(uri).read_all())  # type:ignore

load_parquet_file(uri)

Load a parquet file into a duckdb relation

Source code in src/dve/core_engine/backends/implementations/duckdb/reference_data.py
41
42
43
44
@mark_refdata_file_extension("parquet")
def load_parquet_file(self, uri: str) -> DuckDBPyRelation:
    """Load a parquet file into a duckdb relation"""
    return self.connection.read_parquet(uri)

load_table(config)

Load reference entity from a database table

Source code in src/dve/core_engine/backends/implementations/duckdb/reference_data.py
37
38
39
def load_table(self, config: ReferenceTable) -> DuckDBPyRelation:
    """Load reference entity from a database table"""
    return self.connection.sql(f"select * from {config.fq_table_name}")

dve.core_engine.backends.implementations.spark.reference_data.SparkRefDataLoader

Bases: BaseRefDataLoader[DataFrame]

A reference data loader using already existing Apache Spark Tables.

Source code in src/dve/core_engine/backends/implementations/spark/reference_data.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class SparkRefDataLoader(BaseRefDataLoader[DataFrame]):
    """A reference data loader using already existing Apache Spark Tables."""

    spark: SparkSession
    """The Spark session for the backend."""
    dataset_config_uri: Optional[URI] = None
    """The location of the dischema file defining business rules"""

    def __init__(
        self,
        reference_entity_config: dict[EntityName, ReferenceConfig],
        **kwargs,
    ) -> None:
        super().__init__(reference_entity_config, self.dataset_config_uri, **kwargs)
        if not self.spark:
            raise AttributeError("Spark session must be provided")

    def load_table(self, config: ReferenceTable) -> DataFrame:
        return self.spark.table(f"{config.fq_table_name}")

    @mark_refdata_file_extension("parquet")
    def load_parquet_file(self, uri: str) -> DataFrame:
        """Load a parquet file into a spark dataframe"""
        return self.spark.read.parquet(uri)

dataset_config_uri = None class-attribute instance-attribute

The location of the dischema file defining business rules

spark instance-attribute

The Spark session for the backend.

load_parquet_file(uri)

Load a parquet file into a spark dataframe

Source code in src/dve/core_engine/backends/implementations/spark/reference_data.py
39
40
41
42
@mark_refdata_file_extension("parquet")
def load_parquet_file(self, uri: str) -> DataFrame:
    """Load a parquet file into a spark dataframe"""
    return self.spark.read.parquet(uri)