Skip to content

Models

dve.core_engine.models

Models used by core_engine

Models for parametersied pipeline execution - also used by API service

AuditRecord

Bases: BaseModel

Record to add to audit table

Source code in src/dve/core_engine/models.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class AuditRecord(BaseModel):
    """Record to add to audit table"""

    submission_id: str
    """Unique id of the submission"""
    date_updated: Optional[dt.date] = None
    """The date the record was added to the table"""
    time_updated: Optional[dt.datetime] = Field(default_factory=dt.datetime.now)
    """The timestamp the record was added to the table"""

    @root_validator(allow_reuse=True)
    def populate_date_updated(cls, values):  # pylint: disable=no-self-argument
        """Add date_updated from time_updated value"""
        values["date_updated"] = values["time_updated"].date()
        return values

date_updated = None class-attribute instance-attribute

The date the record was added to the table

submission_id instance-attribute

Unique id of the submission

time_updated = Field(default_factory=(dt.datetime.now)) class-attribute instance-attribute

The timestamp the record was added to the table

populate_date_updated(values)

Add date_updated from time_updated value

Source code in src/dve/core_engine/models.py
34
35
36
37
38
@root_validator(allow_reuse=True)
def populate_date_updated(cls, values):  # pylint: disable=no-self-argument
    """Add date_updated from time_updated value"""
    values["date_updated"] = values["time_updated"].date()
    return values

ConcreteDatasetSpecification

Bases: DatasetSpecification

A dataset with concrete entities.

Source code in src/dve/core_engine/models.py
192
193
194
195
196
class ConcreteDatasetSpecification(DatasetSpecification):
    """A dataset with concrete entities."""

    datasets: MutableMapping[EntityName, ConcreteEntity]  # type: ignore
    """Datasets which can be read from the input files."""

datasets instance-attribute

Datasets which can be read from the input files.

ConcreteEntity

Bases: EntitySpecification

An entity which has a configured reader and (possibly) a key field.

Source code in src/dve/core_engine/models.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class ConcreteEntity(EntitySpecification, arbitrary_types_allowed=True):
    """An entity which has a configured reader and (possibly) a key field."""

    reader_config: dict[Extension, ReaderConfig]
    """A reader configuration for the entity."""
    key_field: Optional[str] = None
    """An optional key field to use for the entity."""
    reporting_fields: Optional[list[str]] = None

    @validator("reporting_fields", pre=True)
    def _ensure_list(cls, value: Optional[str]) -> Optional[list[str]]:  # pylint: disable=E0213
        """Ensure the reporting fields are a list."""
        if value is None:
            return None
        return value if isinstance(value, list) else [value]

key_field = None class-attribute instance-attribute

An optional key field to use for the entity.

reader_config instance-attribute

A reader configuration for the entity.

EngineRun

Bases: BaseModel

The parameters needed to execute a core engine pipeline run Basic level of validation

Source code in src/dve/core_engine/models.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class EngineRun(BaseModel):
    """The parameters needed to execute a core engine pipeline run
    Basic level of validation
    """

    submission_id: UUID4 = Field(default_factory=uuid.uuid4)
    dataset_config_path: Path
    output_prefix: Path = Path("./output")

    # TODO: What if we want to set an alt/override output prefix
    #  and not have the submission_id appended to it?
    @validator("output_prefix")
    def _set_output_path(cls, prefix, values: dict):  # pylint: disable=E0213
        v_id = values.get("submission_id")
        if v_id:
            return os.path.join(prefix, str(v_id))
        return prefix

EngineRunValidation

Bases: EngineRun

The parameters needed to execute a core engine pipeline run Additional validation for paths which point to valid files or directories

Source code in src/dve/core_engine/models.py
167
168
169
170
171
172
class EngineRunValidation(EngineRun):
    """The parameters needed to execute a core engine pipeline run
    Additional validation for paths which point to valid files or directories
    """

    dataset_config_path: FilePath

ProcessingStatusRecord

Bases: AuditRecord

A record detailing what phase of processing a submission is

Source code in src/dve/core_engine/models.py
137
138
139
140
141
142
143
144
145
class ProcessingStatusRecord(AuditRecord):
    """A record detailing what phase of processing a submission is"""

    processing_status: ProcessingStatus
    """The processing status of the submission"""
    job_run_id: Optional[int]
    """The run id of the databricks job used to process the submission"""
    submission_result: Optional[SubmissionResult]
    """Whether the file validation was a success or failure"""

job_run_id instance-attribute

The run id of the databricks job used to process the submission

processing_status instance-attribute

The processing status of the submission

submission_result instance-attribute

Whether the file validation was a success or failure

SubmissionInfo

Bases: AuditRecord

Submission metadata

Source code in src/dve/core_engine/models.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class SubmissionInfo(AuditRecord):
    """Submission metadata"""

    dataset_id: str
    """The dataset that the submission relates to."""
    file_name: str
    """The name of the submitted file."""
    file_extension: str
    """The extension of the file received."""
    submission_method: Optional[str] = None  # type: ignore
    """The method that the file was submitted"""
    submitting_org: Optional[str] = None  # type: ignore
    """The organisation who submitted the file."""
    reporting_period_start: Optional[str] = None  # type: ignore
    """The start of the reporting period the submission relates to."""
    reporting_period_end: Optional[str] = None  # type: ignore
    """The end of the reporting period the submission relates to."""
    file_size: Optional[int] = None  # type: ignore
    """The size (in bytes) of the file received."""
    datetime_received: Optional[dt.datetime] = None  # type: ignore
    """The datetime the file was received."""

    @validator("file_extension")
    def _ensure_just_file_stem(cls, extension: str):  # pylint: disable=no-self-argument
        return extension.rsplit(".", 1)[-1]

    @property
    def file_name_with_ext(self):
        """Return file name with extension."""
        return f"{self.file_name}.{self.file_extension}"

    @classmethod
    def from_metadata_file(cls, submission_id: str, metadata_uri: Location):
        """Create a submission metadata instance from DVE metadata file."""
        metadata_uri = resolve_location(metadata_uri)
        with open_stream(metadata_uri, "r", "utf-8") as stream:
            try:
                metadata_dict: dict[str, Any] = json.load(stream)
            except json.JSONDecodeError as exc:
                raise ValueError(f"File found at {metadata_uri!r} is not valid JSON") from exc

        if isinstance(metadata_dict, list):
            raise ValueError(f"File found at {metadata_uri!r} is not a JSON mapping")

        if not metadata_dict.get("file_name"):
            metadata_dict["file_name"] = get_file_stem(metadata_uri)

        return cls(submission_id=submission_id, **metadata_dict)

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, SubmissionInfo):
            raise NotImplementedError("Unable to determine equality if not a SubmissionInfo object")
        _exclude = ["date_updated", "time_updated"]
        return {k: v for k, v in self.dict().items() if k not in _exclude} == {
            k: v for k, v in other.dict().items() if k not in _exclude
        }

dataset_id instance-attribute

The dataset that the submission relates to.

datetime_received = None class-attribute instance-attribute

The datetime the file was received.

file_extension instance-attribute

The extension of the file received.

file_name instance-attribute

The name of the submitted file.

file_name_with_ext property

Return file name with extension.

file_size = None class-attribute instance-attribute

The size (in bytes) of the file received.

reporting_period_end = None class-attribute instance-attribute

The end of the reporting period the submission relates to.

reporting_period_start = None class-attribute instance-attribute

The start of the reporting period the submission relates to.

submission_method = None class-attribute instance-attribute

The method that the file was submitted

submitting_org = None class-attribute instance-attribute

The organisation who submitted the file.

from_metadata_file(submission_id, metadata_uri) classmethod

Create a submission metadata instance from DVE metadata file.

Source code in src/dve/core_engine/models.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@classmethod
def from_metadata_file(cls, submission_id: str, metadata_uri: Location):
    """Create a submission metadata instance from DVE metadata file."""
    metadata_uri = resolve_location(metadata_uri)
    with open_stream(metadata_uri, "r", "utf-8") as stream:
        try:
            metadata_dict: dict[str, Any] = json.load(stream)
        except json.JSONDecodeError as exc:
            raise ValueError(f"File found at {metadata_uri!r} is not valid JSON") from exc

    if isinstance(metadata_dict, list):
        raise ValueError(f"File found at {metadata_uri!r} is not a JSON mapping")

    if not metadata_dict.get("file_name"):
        metadata_dict["file_name"] = get_file_stem(metadata_uri)

    return cls(submission_id=submission_id, **metadata_dict)

SubmissionInfoMismatchWarning

Bases: UserWarning

Emitted when the submission info does not match the filename.

Source code in src/dve/core_engine/models.py
41
42
class SubmissionInfoMismatchWarning(UserWarning):
    """Emitted when the submission info does not match the filename."""

SubmissionStatisticsRecord

Bases: AuditRecord

Record detailing key metrics from dve processing

Source code in src/dve/core_engine/models.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
class SubmissionStatisticsRecord(AuditRecord):
    """Record detailing key metrics from dve processing"""

    record_count: Optional[int]
    """Count of records in the submitted file"""
    number_record_rejections: Optional[int]
    """Number of record rejections raised following validation"""
    number_warnings: Optional[int]
    """Number of warnings raised following validation"""

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, SubmissionStatisticsRecord):
            raise NotImplementedError(
                "Unable to determine equality if not a SubmissionStatisticsRecord object"
            )  # pylint: disable=line-too-long
        _exclude = ["date_updated", "time_updated"]
        return {k: v for k, v in self.dict().items() if k not in _exclude} == {
            k: v for k, v in other.dict().items() if k not in _exclude
        }

number_record_rejections instance-attribute

Number of record rejections raised following validation

number_warnings instance-attribute

Number of warnings raised following validation

record_count instance-attribute

Count of records in the submitted file

TransferRecord

Bases: AuditRecord

A record detailing extracts sent following dve processing

Source code in src/dve/core_engine/models.py
124
125
126
127
128
129
130
131
132
133
134
class TransferRecord(AuditRecord):
    """A record detailing extracts sent following dve processing"""

    report_name: str
    """The type of extract sent"""
    transfer_id: str
    """The DPS transfer id for the extract sent"""
    transfer_method: Optional[str] = None
    """What transfer mechanism was used to send the extract"""
    recipient: Optional[str] = None
    """The recipient of the extract"""

recipient = None class-attribute instance-attribute

The recipient of the extract

report_name instance-attribute

The type of extract sent

transfer_id instance-attribute

The DPS transfer id for the extract sent

transfer_method = None class-attribute instance-attribute

What transfer mechanism was used to send the extract