Skip to content

Spark

Quote

Apache Sparkā„¢ is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.

You can read more about Spark here with the following links:

Setting up a Spark Session

Note

The Audit Tables require delta package available with Spark. The example below will include that.

For a minimal working Spark Session setup with DVE, you can use the following snippet of code:

import os
import tempfile
from pyspark.sql import SparkSession

def get_spark_session() -> SparkSession:
    """Get a configured Spark Session. This MUST be called before any other Spark session is created."""
    temp_dir = tempfile.mkdtemp()
    os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join(
        [
            "--packages",
            "com.databricks:spark-xml_2.12:0.16.0,io.delta:delta-core_2.12:2.4.0",
            "pyspark-shell",
        ]
    )
    spark_session = (
        SparkSession.builder.config("spark.sql.warehouse.dir", temp_dir)
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog"
        )
        .getOrCreate()
    )

You can learn more about setting up a Spark Session here.

Warning

If you need to load XML data and the version of spark you're running is < 4.0.0, you'll need the spark-xml extension. You can read more about it here. The snippet above shows an example of this being installed.

Generating SubmissionInfo Objects

Before we utilise the DVE, we need to generate an iterable object containing SubmissionInfo objects. These objects effectively contain the necessery metadata for the DVE to work with a given submission. Here is an example function used to generate SubmissionInfo objects from a given path:

import glob
from datetime import date, datetime
from pathlib import Path
from typing import Optional
from uuid import uuid4

from dve.core_engine.models import SubmissionInfo


def generate_sub_infos_from_submissions_path(
    submission_path: Path,
    dataset_id: Optional[str] = "example",
    submitting_org: Optional[str] = None,
    submission_method: Optional[str] = "local_test",
    reporting_period_start_date: Optional[date | datetime] = None,
    reporting_period_end_date: Optional[date | datetime] = None,
) -> list[SubmissionInfo]:
    sub_infos: list[SubmissionInfo] = []
    for f in glob.glob(str(submission_path) + "/*.*"):
        file_path = Path(f)
        file_stats = file_path.stat()
        metadata = {
            "dataset_id": dataset_id,
            "file_name": file_path.stem,
            "file_extension": file_path.suffix,
            "submission_method": submission_method,
            "file_size": file_stats.st_size,
            "datetime_received": datetime.now(),
        }
        if submitting_org:
            metadata["submitting_org"] = submitting_org
        if reporting_period_start_date:
            metadata["reporting_period_start"] = str(reporting_period_start_date)
        if reporting_period_end_date:
            metadata["reporting_period_end"] = str(reporting_period_end_date)

        sub_infos.append(SubmissionInfo(submission_id=uuid4().hex, **metadata))
    return sub_infos


submissions = generate_sub_infos_from_submissions_path(Path("path", "to", "my", "submissions"))

Note

If you have a large number of submissions, it may be worth converting the above into a generator. Using the example above, you can do this by simply removing the sub_infos object and yield the SubmissionInfo object per file returned from the glob iterator.

Spark Audit Table Setup

The first object you must setup is an "Audit Manager Object". This can be done with the following code:

from dve.core_engine.backends.implementations.spark.auditing import SparkAuditingManager

db_name = "test_dve"
spark.sql(f"CREATE DATABASE {db_name};")

audit_manager = SparkAuditingManager(db_name, spark)

Note

spark session is optional for the SparkAuditingManager. If not provided a spark session will be generated.

The "Audit Manager" object within the DVE is used to keep track of the status of your submission. A submission for instance could fail during the File Transformation section, so it's important that we have something to keep track of the submission. The Audit Manager object has a number of methods that can be used to read/write information to tables being stored within the duckdb connection setup in the previous step.

You can learn more about the Auditing Objects here.

Once you have setup your "Audit Manager" object, we can move onto setting up the Spark reference data loader (if required) and then setting up the Spark DVE Pipeline object.

Spark Reference Data Setup (Optional)

If your business rules are reliant on utilising reference data, you will need to write the following code to ensure that reference data can be loaded during the application of those rules:

from pathlib import Path

from dve.core_engine.backends.implementations.spark.reference_data import SparkRefDataLoader

SparkRefDataLoader.spark = spark
SparkRefDataLoader.dataset_config_uri = Path("path", "to", "my", "rules").as_posix()

Spark Pipeline Setup

To setup a Spark Pipeline, you can use the following example below:

from dve.pipeline.spark_pipeline import SparkDVEPipeline


dve_pipeline = SparkDVEPipeline(
    processed_files_path=Path("location_to_store", "dve_outputs").as_posix(),
    audit_tables=audit_manager,
    submitted_files_path=Path("submissions", "path").as_posix(),
    reference_data_loader=SparkRefDataLoader,
    spark=spark,
)
from dve.pipeline.spark_pipeline import SparkDVEPipeline


dve_pipeline = SparkDVEPipeline(
    processed_files_path=Path("location_to_store", "dve_outputs").as_posix(),
    audit_tables=audit_manager,
    rules_path=Path("to", "my", "rules").as_posix(),
    submitted_files_path=Path("submissions", "path").as_posix(),
    reference_data_loader=SparkRefDataLoader,
    spark=spark,
)

Note

If using remote resources, then you will want to use as_uri for your paths.

E.g.

Path("remote", "path").as_uri()

Once your Pipeline object is defined, you can simply run the cluster_pipeline_run method. E.g.

error_reports = dve_pipeline.cluster_pipeline_run()

Further documentation

For further details on the objects referenced above, you can use the following links to read more about the objects: