Skip to content

Readers

CSV

Bases: BaseFileReader

A base reader for CSV files.

Init function for the base CSV reader.

Parameters:

Name Type Description Default
- `delimiter`

the delimiter for the CSV file. This separates the fields. Default: ,

required
- `escape_char`

the character used to 'escape' the delimiter in unquoted fields. Implementations may also use this character to escape the quote character within quoted fields. Default: \

required
- `quote_char`

the character used to quote fields. Default: "

required
- `header`

a boolean value indicating whether to treat the first row of the file as a header. Default: True

required
- `trim_cells`

a boolean value indicating whether to strip whitespace from fields. Default: True

required
- `null_values`

a container of values to replace with null if encountered in a cell. Default: {'', 'null', 'NULL'}

required
- `encoding`

encoding of the CSV file. Default: utf-8-sig

required
- `**extra_args`

extra, implementation specific parser arguments.

required
Source code in src/dve/core_engine/backends/readers/csv.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __init__(
    self,
    *,
    delimiter: str = ",",
    escape_char: str = "\\",
    quote_char: str = '"',
    header: bool = True,
    trim_cells: bool = True,
    null_values: Collection[str] = frozenset({"NULL", "null", ""}),
    encoding: str = "utf-8-sig",
    **_,
):
    """Init function for the base CSV reader.

    Args:
     - `delimiter`: the delimiter for the CSV file. This separates the fields.
       Default: `,`
     - `escape_char`: the character used to 'escape' the delimiter in unquoted
       fields. Implementations may also use this character to escape the quote
       character within quoted fields. Default: `\\`
     - `quote_char`: the character used to quote fields. Default: `"`
     - `header`: a boolean value indicating whether to treat the first row of
       the file as a header. Default: `True`
     - `trim_cells`: a boolean value indicating whether to strip whitespace
       from fields. Default: `True`
     - `null_values`: a container of values to replace with null if encountered
       in a cell. Default: `{'', 'null', 'NULL'}`
     - `encoding`: encoding of the CSV file. Default: `utf-8-sig`
     - `**extra_args`: extra, implementation specific parser arguments.

    """
    self.delimiter = delimiter
    """The delimiter for the CSV file. This separates the fields."""
    self.escape_char = escape_char
    """
    The character used to 'escape' the delimiter in unquoted fields. Implementations
    may also use this character to escape the quote character within quoted fields.

    """
    self.quote_char = quote_char
    """The character used to quote fields."""
    self.header = header
    """
    A boolean value indicating whether to treat the first row of the file
    as a header.

    """
    self.trim_cells = trim_cells
    """A boolean value indicating whether to strip whitespace from fields."""
    self.null_values = null_values
    """A container of values to replace with null if encountered in a cell."""
    self.encoding = encoding
    """Encoding of the CSV file."""

Bases: BaseFileReader

A reader for CSV files including the ability to compare the passed model to the file header, if it exists.

field_check: flag to compare submitted file header to the accompanying pydantic model field_check_error_code: The error code to provide if the file header doesn't contain the expected fields field_check_error_message: The error message to provide if the file header doesn't contain the expected fields

Source code in src/dve/core_engine/backends/implementations/duckdb/readers/csv.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
@duckdb_record_index
@duckdb_write_parquet
class DuckDBCSVReader(BaseFileReader):
    """A reader for CSV files including the ability to compare the passed model
    to the file header, if it exists.

    field_check: flag to compare submitted file header to the accompanying pydantic model
    field_check_error_code: The error code to provide if the file header doesn't contain
                            the expected fields
    field_check_error_message: The error message to provide if the file header doesn't contain
                               the expected fields"""

    # TODO - the read_to_relation should include the schema and determine whether to
    # TODO - stringify or not
    def __init__(
        self,
        *,
        header: bool = True,
        delim: str = ",",
        quotechar: str = '"',
        connection: Optional[DuckDBPyConnection] = None,
        field_check: bool = False,
        field_check_error_code: Optional[str] = "ExpectedVsActualFieldMismatch",
        field_check_error_message: Optional[str] = "The submitted header is missing fields",
        null_empty_strings: bool = False,
        **_,
    ):
        self.header = header
        self.delim = delim
        self.quotechar = quotechar
        self._connection = connection if connection else default_connection
        self.field_check = field_check
        self.field_check_error_code = field_check_error_code
        self.field_check_error_message = field_check_error_message
        self.null_empty_strings = null_empty_strings

        super().__init__()

    def perform_field_check(
        self, resource: URI, entity_name: str, expected_schema: type[BaseModel]
    ):
        """Check that the header of the CSV aligns with the provided model"""
        if not self.header:
            raise ValueError("Cannot perform field check without a CSV header")

        if missing := check_csv_header_expected(resource, expected_schema, self.delim):
            raise MessageBearingError(
                "The CSV header doesn't match what is expected",
                messages=[
                    FeedbackMessage(
                        entity=entity_name,
                        record=None,
                        failure_type="submission",
                        error_location="Whole File",
                        error_code=self.field_check_error_code,
                        error_message=f"{self.field_check_error_message} - missing fields: {missing}",  # pylint: disable=line-too-long
                    )
                ],
            )

    def read_to_py_iterator(
        self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
    ) -> Iterator[dict[str, Any]]:
        """Creates an iterable object of rows as dictionaries"""
        yield from self.read_to_relation(resource, entity_name, schema).pl().iter_rows(named=True)

    @read_function(DuckDBPyRelation)
    def read_to_relation(  # pylint: disable=unused-argument
        self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
    ) -> DuckDBPyRelation:
        """Returns a relation object from the source csv"""
        if get_content_length(resource) == 0:
            raise EmptyFileError(f"File at {resource} is empty.")

        if self.field_check:
            self.perform_field_check(resource, entity_name, schema)

        reader_options: dict[str, Any] = {
            "header": self.header,
            "delimiter": self.delim,
            "quotechar": self.quotechar,
        }

        ddb_schema: dict[str, SQLType] = {
            fld.name: str(get_duckdb_type_from_annotation(fld.annotation))  # type: ignore
            for fld in schema.__fields__.values()
        }

        reader_options["columns"] = ddb_schema

        rel = self.add_record_index(read_csv(resource, **reader_options, parallel=False))

        if self.null_empty_strings:
            cleaned_cols = ",".join(
                [f"NULLIF(TRIM({c}), '') as {c}" for c in reader_options["columns"].keys()]
            )
            rel = rel.select(cleaned_cols)

        return rel

__init__(*, header=True, delim=',', quotechar='"', connection=None, field_check=False, field_check_error_code='ExpectedVsActualFieldMismatch', field_check_error_message='The submitted header is missing fields', null_empty_strings=False, **_)

Source code in src/dve/core_engine/backends/implementations/duckdb/readers/csv.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def __init__(
    self,
    *,
    header: bool = True,
    delim: str = ",",
    quotechar: str = '"',
    connection: Optional[DuckDBPyConnection] = None,
    field_check: bool = False,
    field_check_error_code: Optional[str] = "ExpectedVsActualFieldMismatch",
    field_check_error_message: Optional[str] = "The submitted header is missing fields",
    null_empty_strings: bool = False,
    **_,
):
    self.header = header
    self.delim = delim
    self.quotechar = quotechar
    self._connection = connection if connection else default_connection
    self.field_check = field_check
    self.field_check_error_code = field_check_error_code
    self.field_check_error_message = field_check_error_message
    self.null_empty_strings = null_empty_strings

    super().__init__()

Bases: DuckDBCSVReader

Utilises the polars lazy csv reader which is then converted into a DuckDBPyRelation object.

The primary reason this reader exists is due to the limitation within duckdb csv reader and it not being able to read partial content from a csv (i.e. select a, b NOT y).

Source code in src/dve/core_engine/backends/implementations/duckdb/readers/csv.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@polars_record_index
class PolarsToDuckDBCSVReader(DuckDBCSVReader):
    """
    Utilises the polars lazy csv reader which is then converted into a DuckDBPyRelation object.

    The primary reason this reader exists is due to the limitation within duckdb csv reader and
    it not being able to read partial content from a csv (i.e. select a, b NOT y).
    """

    @read_function(DuckDBPyRelation)
    def read_to_relation(  # pylint: disable=unused-argument
        self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
    ) -> DuckDBPyRelation:
        """Returns a relation object from the source csv"""
        if get_content_length(resource) == 0:
            raise EmptyFileError(f"File at {resource} is empty.")

        if self.field_check:
            self.perform_field_check(resource, entity_name, schema)

        reader_options: dict[str, Any] = {
            "has_header": self.header,
            "separator": self.delim,
            "quote_char": self.quotechar,
        }

        polars_types = {
            fld.name: get_polars_type_from_annotation(fld.annotation)  # type: ignore
            for fld in schema.__fields__.values()
        }
        reader_options["dtypes"] = polars_types

        # there is a raise_if_empty arg for 0.18+. Future reference when upgrading. Makes L85
        # redundant
        df = self.add_record_index(  # pylint: disable=W0612
            pl.scan_csv(resource, **reader_options).select(  # type: ignore
                list(polars_types.keys())
            )
        )

        if self.null_empty_strings:
            pl_exprs = [
                pl.col(c).str.strip_chars().replace("", None)
                for c in df.columns
                if c != RECORD_INDEX_COLUMN_NAME
            ] + [pl.col(RECORD_INDEX_COLUMN_NAME)]
            df = df.select(pl_exprs)

        return ddb.sql("SELECT * FROM df")

Bases: PolarsToDuckDBCSVReader

A Reader for files with a .csv extension and where there are repeating "header" values within the file. Header in this case is not the column names at the top of a csv, rather a collection of unique records that would usually be structured in another entity. However, due to the fact that csv is a semi-structured data format, you cannot define complex entities, hence the values are then repeated on all rows.

Example of a repeating header data may look like this...

headerCol1 headerCol2 headerCol3 nonHeaderCol1 nonHeaderCol2
shop 1 clothes 2025-01-01 jeans 20.39
shop 1 clothes 2025-01-01 shirt 14.99

This reader will just pull out the distinct values from the header column. Where there are more/less than one distinct value per column, the reader will produce a NonDistinctHeaderError.

So using the example above, the expected entity would look like this...

headerCol1 headerCol2 headerCol3
shop1 clothes 2025-01-01
Source code in src/dve/core_engine/backends/implementations/duckdb/readers/csv.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):
    """A Reader for files with a `.csv` extension and where there are repeating "header" values
    within the file. Header in this case is not the column names at the top of a csv, rather a
    collection of unique records that would usually be structured in another entity. However, due
    to the fact that `csv` is a semi-structured data format, you cannot define complex entities,
    hence the values are then repeated on all rows.

    Example of a repeating header data may look like this...

    | headerCol1 | headerCol2 | headerCol3 | nonHeaderCol1 | nonHeaderCol2 |
    | ---------- | ---------- | ---------- | ------------- | ------------- |
    | shop 1     | clothes    | 2025-01-01 | jeans         | 20.39         |
    | shop 1     | clothes    | 2025-01-01 | shirt         | 14.99         |

    This reader will just pull out the distinct values from the header column. Where there are
    more/less than one distinct value per column, the reader will produce a
    `NonDistinctHeaderError`.

    So using the example above, the expected entity would look like this...

    | headerCol1 | headerCol2 | headerCol3 |
    | ---------- | ---------- | ---------- |
    | shop1      | clothes    | 2025-01-01 |
    """

    def __init__(
        self,
        *args,
        non_unique_header_error_code: Optional[str] = "NonUniqueHeader",
        non_unique_header_error_message: Optional[str] = None,
        **kwargs,
    ):
        self._non_unique_header_code = non_unique_header_error_code
        self._non_unique_header_message = non_unique_header_error_message
        super().__init__(*args, **kwargs)

    @read_function(DuckDBPyRelation)
    def read_to_relation(  # pylint: disable=unused-argument
        self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
    ) -> DuckDBPyRelation:
        entity: DuckDBPyRelation = super().read_to_relation(
            resource=resource, entity_name=entity_name, schema=schema
        )
        entity = entity.select(StarExpression(exclude=[RECORD_INDEX_COLUMN_NAME])).distinct()
        no_records = entity.shape[0]

        if no_records != 1:
            rows = entity.pl().to_dicts()
            differing_values = [
                f"{key}: {', '.join(sorted(str(val) for val in values))}"
                for key, *values in zip(rows[0], *map(dict.values, rows))  # type: ignore
                if len(set(values)) > 1
            ]
            raise MessageBearingError(
                "More than one set of Headers found in CSV file",
                messages=[
                    FeedbackMessage(
                        record={entity_name: differing_values},
                        entity="Pre-validation",
                        failure_type="submission",
                        error_message=(
                            f"Found {no_records} distinct combination of header values."
                            if not self._non_unique_header_message
                            else self._non_unique_header_message
                        ),
                        error_location=entity_name,
                        category="Bad file",
                        error_code=self._non_unique_header_code,
                    )
                ],
            )

        return entity.select(f"*, 1 as {RECORD_INDEX_COLUMN_NAME}")

__init__(*args, non_unique_header_error_code='NonUniqueHeader', non_unique_header_error_message=None, **kwargs)

Source code in src/dve/core_engine/backends/implementations/duckdb/readers/csv.py
211
212
213
214
215
216
217
218
219
220
def __init__(
    self,
    *args,
    non_unique_header_error_code: Optional[str] = "NonUniqueHeader",
    non_unique_header_error_message: Optional[str] = None,
    **kwargs,
):
    self._non_unique_header_code = non_unique_header_error_code
    self._non_unique_header_message = non_unique_header_error_message
    super().__init__(*args, **kwargs)

Bases: BaseFileReader

A Spark reader for CSV files.

Source code in src/dve/core_engine/backends/implementations/spark/readers/csv.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@spark_record_index
@spark_write_parquet
class SparkCSVReader(BaseFileReader):
    """A Spark reader for CSV files."""

    def __init__(
        self,
        *,
        delimiter: str = ",",
        escape_char: str = "\\",
        quote_char: str = '"',
        header: bool = True,
        multi_line: bool = False,
        encoding: str = "utf-8-sig",
        null_empty_strings: bool = False,
        spark_session: Optional[SparkSession] = None,
        **_,
    ) -> None:

        self.delimiter = delimiter
        self.escape_char = escape_char
        self.encoding = encoding
        self.quote_char = quote_char
        self.header = header
        self.multi_line = multi_line
        self.null_empty_strings = null_empty_strings
        self.spark_session = spark_session if spark_session else SparkSession.builder.getOrCreate()  # type: ignore  # pylint: disable=C0301

        super().__init__()

    def read_to_py_iterator(
        self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
    ) -> Iterator[dict[URI, Any]]:
        df = self.read_to_dataframe(resource, entity_name, schema)
        yield from (record.asDict(True) for record in df.toLocalIterator())

    @read_function(DataFrame)
    def read_to_dataframe(
        self,
        resource: URI,
        entity_name: EntityName,  # pylint: disable=unused-argument
        schema: type[BaseModel],
    ) -> DataFrame:
        """Read a CSV file directly to a Spark DataFrame."""
        if get_content_length(resource) == 0:
            raise EmptyFileError(f"File at {resource} is empty.")

        spark_schema: StructType = get_type_from_annotation(schema)
        kwargs = {
            "sep": self.delimiter,
            "header": self.header,
            "escape": self.escape_char,
            "quote": self.quote_char,
            "multiLine": self.multi_line,
        }

        df = self.add_record_index(
            self.spark_session.read.format("csv")
            .options(**kwargs)  # type: ignore
            .load(resource, schema=spark_schema)
        )

        if self.null_empty_strings:
            df = df.select(
                *[psf.trim(psf.col(c.name)).alias(c.name) for c in spark_schema.fields]
            ).replace("", None)

        return df

__init__(*, delimiter=',', escape_char='\\', quote_char='"', header=True, multi_line=False, encoding='utf-8-sig', null_empty_strings=False, spark_session=None, **_)

Source code in src/dve/core_engine/backends/implementations/spark/readers/csv.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(
    self,
    *,
    delimiter: str = ",",
    escape_char: str = "\\",
    quote_char: str = '"',
    header: bool = True,
    multi_line: bool = False,
    encoding: str = "utf-8-sig",
    null_empty_strings: bool = False,
    spark_session: Optional[SparkSession] = None,
    **_,
) -> None:

    self.delimiter = delimiter
    self.escape_char = escape_char
    self.encoding = encoding
    self.quote_char = quote_char
    self.header = header
    self.multi_line = multi_line
    self.null_empty_strings = null_empty_strings
    self.spark_session = spark_session if spark_session else SparkSession.builder.getOrCreate()  # type: ignore  # pylint: disable=C0301

    super().__init__()

JSON

Bases: BaseFileReader

A reader for JSON files

Source code in src/dve/core_engine/backends/implementations/duckdb/readers/json.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@duckdb_record_index
@duckdb_write_parquet
class DuckDBJSONReader(BaseFileReader):
    """A reader for JSON files"""

    def __init__(
        self,
        *,
        json_format: Optional[str] = "array",
        **_,
    ):
        self._json_format = json_format

        super().__init__()

    def read_to_py_iterator(
        self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
    ) -> Iterator[dict[str, Any]]:
        """Creates an iterable object of rows as dictionaries"""
        return self.read_to_relation(resource, entity_name, schema).pl().iter_rows(named=True)

    @read_function(DuckDBPyRelation)
    def read_to_relation(  # pylint: disable=unused-argument
        self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
    ) -> DuckDBPyRelation:
        """Returns a relation object from the source json"""

        ddb_schema: dict[str, SQLType] = {
            fld.name: str(get_duckdb_type_from_annotation(fld.annotation))  # type: ignore
            for fld in schema.__fields__.values()
        }

        return self.add_record_index(
            read_json(resource, columns=ddb_schema, format=self._json_format)  # type: ignore
        )

__init__(*, json_format='array', **_)

Source code in src/dve/core_engine/backends/implementations/duckdb/readers/json.py
25
26
27
28
29
30
31
32
33
def __init__(
    self,
    *,
    json_format: Optional[str] = "array",
    **_,
):
    self._json_format = json_format

    super().__init__()

Bases: BaseFileReader

A Spark reader for JSON files.

Source code in src/dve/core_engine/backends/implementations/spark/readers/json.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@spark_record_index
@spark_write_parquet
class SparkJSONReader(BaseFileReader):
    """A Spark reader for JSON files."""

    def __init__(
        self,
        *,
        encoding: Optional[str] = "utf-8",
        multi_line: Optional[bool] = True,
        spark_session: Optional[SparkSession] = None,
        **_,
    ) -> None:

        self.encoding = encoding
        self.multi_line = multi_line
        self.spark_session = spark_session if spark_session else SparkSession.builder.getOrCreate()  # type: ignore  # pylint: disable=C0301

        super().__init__()

    def read_to_py_iterator(
        self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
    ) -> Iterator[dict[URI, Any]]:
        df = self.read_to_dataframe(resource, entity_name, schema)
        yield from (record.asDict(True) for record in df.toLocalIterator())

    @read_function(DataFrame)
    def read_to_dataframe(
        self,
        resource: URI,
        entity_name: EntityName,  # pylint: disable=unused-argument
        schema: type[BaseModel],
    ) -> DataFrame:
        """Read a JSON file directly to a Spark DataFrame."""
        if get_content_length(resource) == 0:
            raise EmptyFileError(f"File at {resource} is empty.")

        spark_schema: StructType = get_type_from_annotation(schema)
        kwargs = {
            "encoding": self.encoding,
            "multiline": self.multi_line,
        }

        return self.add_record_index(
            self.spark_session.read.format("json")
            .options(**kwargs)  # type: ignore
            .load(resource, schema=spark_schema)
        )

__init__(*, encoding='utf-8', multi_line=True, spark_session=None, **_)

Source code in src/dve/core_engine/backends/implementations/spark/readers/json.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(
    self,
    *,
    encoding: Optional[str] = "utf-8",
    multi_line: Optional[bool] = True,
    spark_session: Optional[SparkSession] = None,
    **_,
) -> None:

    self.encoding = encoding
    self.multi_line = multi_line
    self.spark_session = spark_session if spark_session else SparkSession.builder.getOrCreate()  # type: ignore  # pylint: disable=C0301

    super().__init__()

XML

Bases: BaseFileReader

A reader for XML files built atop LXML.

Init function for the base XML reader.

Parameters:

Name Type Description Default
- `record_tag`

a required string indicating the tag of each 'record' in the XML document.

required
- `root_tag`

a string indicating the tag to find the records in within the XML document. If None, assume that the records are under the root node.

required
- `trim_cells`

a boolean value indicating whether to strip whitespace from elements in the XML document. Default: True

required
- `null_values`

a container of values to replace with null if encountered in an element. Default: {'', 'null', 'NULL'}

required
- `sanitise_multiline`

whether to sanitise (remove newlines and multiple spaces) from multiline fields.

required
- `encoding`

encoding of the XML file. Default: utf-8-sig

required
- `n_records_to_read`

the maximum number of records to read from a document.

required
Source code in src/dve/core_engine/backends/readers/xml.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def __init__(
    self,
    *,
    record_tag: str,
    root_tag: Optional[str] = None,
    trim_cells: bool = True,
    null_values: Collection[str] = frozenset({"NULL", "null", ""}),
    sanitise_multiline: bool = True,
    encoding: str = "utf-8-sig",
    n_records_to_read: Optional[int] = None,
    xsd_location: Optional[URI] = None,
    xsd_error_code: Optional[str] = None,
    xsd_error_message: Optional[str] = None,
    rules_location: Optional[URI] = None,
    **_,
):
    """Init function for the base XML reader.

    Args:
     - `record_tag`: a required string indicating the tag of each 'record'
       in the XML document.
     - `root_tag`: a string indicating the tag to find the records in within
       the XML document. If `None`, assume that the records are under the root
       node.
     - `trim_cells`: a boolean value indicating whether to strip whitespace
       from elements in the XML document. Default: `True`
     - `null_values`: a container of values to replace with null if encountered
       in an element. Default: `{'', 'null', 'NULL'}`
     - `sanitise_multiline`: whether to sanitise (remove newlines and multiple
       spaces) from multiline fields.
     - `encoding`: encoding of the XML file. Default: `utf-8-sig`
     - `n_records_to_read`: the maximum number of records to read from a document.

    """
    self.record_tag = record_tag
    """The name of a 'record' tag in the XML document."""
    self.root_tag = root_tag
    """The name of the 'root' tag in the XML document."""
    self.trim_cells = trim_cells
    """A boolean value indicating whether to strip whitespace from fields."""
    self.null_values = null_values
    """A container of values to replace with null if encountered in an element."""
    self.sanitise_multiline = sanitise_multiline
    """A boolean value indicating whether to sanitise multiline fields."""
    self.encoding = encoding
    """Encoding of the XML file."""
    self.n_records_to_read = n_records_to_read
    """The maximum number of records to read from a document."""
    if rules_location is not None and xsd_location is not None:
        self.xsd_location = rules_location + xsd_location
    else:
        self.xsd_location = xsd_location  # type: ignore
        """The URI of the xsd file if wishing to perform xsd validation."""
    self.xsd_error_code = xsd_error_code
    """The error code to be reported if xsd validation fails (if xsd)"""
    self.xsd_error_message = xsd_error_message
    """The error message to be reported if xsd validation fails"""
    super().__init__()
    self._logger = get_logger(__name__)

Bases: XMLStreamReader

A reader for XML files

Source code in src/dve/core_engine/backends/implementations/duckdb/readers/xml.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@polars_record_index
@duckdb_write_parquet
class DuckDBXMLStreamReader(XMLStreamReader):
    """A reader for XML files"""

    def __init__(self, *, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
        self.ddb_connection = ddb_connection if ddb_connection else default_connection
        super().__init__(**kwargs)

    @read_function(DuckDBPyRelation)
    def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseModel]):
        """Returns a relation object from the source xml"""
        if self.xsd_location:
            msg = self._run_xmllint(file_uri=resource)
            if msg:
                raise MessageBearingError(
                    "Submitted file failed XSD validation.",
                    messages=[msg],
                )

        polars_schema: dict[str, pl.DataType] = {  # type: ignore
            fld.name: get_polars_type_from_annotation(fld.annotation)
            for fld in stringify_model(schema).__fields__.values()
        }

        _lazy_frame = self.add_record_index(
            pl.LazyFrame(
                data=self.read_to_py_iterator(resource, entity_name, schema), schema=polars_schema
            )
        )
        return self.ddb_connection.sql("select * from _lazy_frame")

__init__(*, ddb_connection=None, **kwargs)

Source code in src/dve/core_engine/backends/implementations/duckdb/readers/xml.py
27
28
29
def __init__(self, *, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
    self.ddb_connection = ddb_connection if ddb_connection else default_connection
    super().__init__(**kwargs)

Bases: XMLStreamReader

An XML stream reader that adds a method to read to a dataframe

Source code in src/dve/core_engine/backends/implementations/spark/readers/xml.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@spark_record_index
@spark_write_parquet
class SparkXMLStreamReader(XMLStreamReader):
    """An XML stream reader that adds a method to read to a dataframe"""

    spark = None

    @read_function(DataFrame)
    def read_to_dataframe(
        self,
        resource: URI,
        entity_name: EntityName,
        schema: type[BaseModel],
    ) -> DataFrame:
        """Stream an XML file into a Spark data frame"""
        if not self.spark:
            self.spark = SparkSession.builder.getOrCreate()  # type: ignore
        spark_schema = get_type_from_annotation(schema)
        return self.add_record_index(
            self.spark.createDataFrame(  # type: ignore
                list(self.read_to_py_iterator(resource, entity_name, schema)),
                schema=spark_schema,
            )
        )

Bases: BasicXMLFileReader

A reader for XML files built atop Spark-XML.

Source code in src/dve/core_engine/backends/implementations/spark/readers/xml.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@spark_record_index
@spark_write_parquet
class SparkXMLReader(BasicXMLFileReader):  # pylint: disable=too-many-instance-attributes
    """A reader for XML files built atop Spark-XML."""

    def __init__(
        self,
        *,
        record_tag: str,
        root_tag: Optional[str] = None,
        spark_session: Optional[SparkSession] = None,
        sampling_ratio: int = 1,
        exclude_attribute: bool = True,
        mode: SparkXMLMode = "PERMISSIVE",
        infer_schema: bool = False,
        ignore_namespace: bool = True,
        null_values: Collection[str] = frozenset(("NULL", "null", "")),
        sanitise_multiline: bool = True,
        namespace=None,
        trim_cells=True,
        xsd_location: Optional[URI] = None,
        xsd_error_code: Optional[str] = None,
        xsd_error_message: Optional[str] = None,
        rules_location: Optional[URI] = None,
        **_,
    ) -> None:

        super().__init__(
            record_tag=record_tag,
            root_tag=root_tag,
            trim_cells=trim_cells,
            null_values=null_values,
            sanitise_multiline=sanitise_multiline,
            xsd_location=xsd_location,
            xsd_error_code=xsd_error_code,
            xsd_error_message=xsd_error_message,
            rules_location=rules_location,
        )

        self.spark_session = spark_session or SparkSession.builder.getOrCreate()  # type: ignore
        self.sampling_ratio = sampling_ratio
        self.exclude_attribute = exclude_attribute
        self.mode = mode
        self.infer_schema = infer_schema
        self.ignore_namespace = ignore_namespace
        self.sanitise_multiline = sanitise_multiline
        self.namespace = namespace

    def read_to_py_iterator(
        self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
    ) -> Iterator[dict[URI, Any]]:
        df = self.read_to_dataframe(resource, entity_name, schema)
        yield from (record.asDict(True) for record in df.toLocalIterator())

    @read_function(DataFrame)
    def read_to_dataframe(
        self,
        resource: URI,
        entity_name: EntityName,  # pylint: disable=unused-argument
        schema: type[BaseModel],
    ) -> DataFrame:
        """Read an XML file directly to a Spark DataFrame using the Databricks
        XML reader package.

        """
        if get_content_length(resource) == 0:
            raise EmptyFileError(f"File at {resource} is empty.")

        if self.xsd_location:
            msg = self._run_xmllint(file_uri=resource)
            if msg:
                raise MessageBearingError(
                    "Submitted file failed XSD validation.",
                    messages=[msg],
                )

        spark_schema: StructType = get_type_from_annotation(schema)
        kwargs = {
            "rowTag": self.record_tag,
            "samplingRatio": self.sampling_ratio,
            "excludeAttribute": self.exclude_attribute,
            "mode": self.mode,
            "inferSchema": self.infer_schema,
            "ignoreNamespace": self.ignore_namespace,
            "nullValue": None,
            "treatEmptyValuesAsNulls": True,
            "ignoreSurroundingSpaces": self.trim_cells,
            "withIgnoreSurroundingSpaces": self.trim_cells,
        }
        read_schema: Optional[StructType] = spark_schema
        if self.root_tag:
            kwargs["rowTag"] = self.root_tag
            # Need to let Spark infer the schema and then check if it
            # fits.
            read_schema = None
        if self.namespace:
            kwargs["rowTag"] = f"{self.namespace}:{kwargs['rowTag']}"

        try:
            df = (
                self.spark_session.read.format("xml")
                .options(**kwargs)  # type: ignore
                .load(resource, schema=read_schema)
            )
            if df_is_empty(df):
                with open_stream(resource, "r") as stream:
                    head = stream.read(1000)
                results = re.search(rf"<(\w+):{self.record_tag}", head)
                if results:
                    namespace = results.groups()[0]
                    kwargs["rowTag"] = f"{namespace}:{self.record_tag}"
                    df = (
                        self.spark_session.read.format("xml")
                        .options(**kwargs)  # type: ignore
                        .load(resource, schema=read_schema)
                    )
            if self.root_tag and df.columns:
                try:
                    df = df.select(sf.explode(self.record_tag)).select("col.*")
                except AnalysisException:
                    df = df.select(f"{self.record_tag}.*")

        except Exception as exc:
            raise ValueError(f"Failed to read XML file at {resource}") from exc

        df = self._add_missing_columns(df, spark_schema)
        df = self._sanitise_columns(df)
        return self.add_record_index(df)

    def _add_missing_columns(self, df: DataFrame, fields: Iterable[StructField]) -> DataFrame:
        for field in fields:
            if field.name not in df.columns:
                df = df.withColumn(field.name, sf.lit(None).cast(field.dataType))
        return df

    def _sanitise_columns(self, df: DataFrame) -> DataFrame:
        for col in df.schema:
            name = col.name
            if col.dataType == StringType():
                for value in self.null_values:
                    df = df.withColumn(name, self._replace(sf.col(name), value))
            if self.sanitise_multiline and col.dataType == StringType():
                df = df.withColumn(name, sf.regexp_replace(sf.col(name), "\\s*\n\\s*", " "))
            if self.trim_cells and col.dataType == StringType():
                df = df.withColumn(name, sf.trim(sf.col(name)))
        return df

    @staticmethod
    def _replace(column: Column, value: str) -> Column:
        return sf.when(column != sf.lit(value), column).otherwise(sf.lit(None))

__init__(*, record_tag, root_tag=None, spark_session=None, sampling_ratio=1, exclude_attribute=True, mode='PERMISSIVE', infer_schema=False, ignore_namespace=True, null_values=frozenset(('NULL', 'null', '')), sanitise_multiline=True, namespace=None, trim_cells=True, xsd_location=None, xsd_error_code=None, xsd_error_message=None, rules_location=None, **_)

Source code in src/dve/core_engine/backends/implementations/spark/readers/xml.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def __init__(
    self,
    *,
    record_tag: str,
    root_tag: Optional[str] = None,
    spark_session: Optional[SparkSession] = None,
    sampling_ratio: int = 1,
    exclude_attribute: bool = True,
    mode: SparkXMLMode = "PERMISSIVE",
    infer_schema: bool = False,
    ignore_namespace: bool = True,
    null_values: Collection[str] = frozenset(("NULL", "null", "")),
    sanitise_multiline: bool = True,
    namespace=None,
    trim_cells=True,
    xsd_location: Optional[URI] = None,
    xsd_error_code: Optional[str] = None,
    xsd_error_message: Optional[str] = None,
    rules_location: Optional[URI] = None,
    **_,
) -> None:

    super().__init__(
        record_tag=record_tag,
        root_tag=root_tag,
        trim_cells=trim_cells,
        null_values=null_values,
        sanitise_multiline=sanitise_multiline,
        xsd_location=xsd_location,
        xsd_error_code=xsd_error_code,
        xsd_error_message=xsd_error_message,
        rules_location=rules_location,
    )

    self.spark_session = spark_session or SparkSession.builder.getOrCreate()  # type: ignore
    self.sampling_ratio = sampling_ratio
    self.exclude_attribute = exclude_attribute
    self.mode = mode
    self.infer_schema = infer_schema
    self.ignore_namespace = ignore_namespace
    self.sanitise_multiline = sanitise_multiline
    self.namespace = namespace