Pandera#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

Data validation is having checks in place to make sure that data comes in the format and specifications that we expect. As data pipelines become more interconnected, the chance of changes unintentionally breaking other pipelines also increases. Validations are used to guarantee that upstream changes will not break the integrity of downstream data operations. They serve as a contract for the requirements that data collection or transformation must uphold.

Common data validation patterns include checking for NULL values or checking DataFrame shape to ensure transformations don’t drop any records. Other frequently used operations are checking for column existence and schema. Using data validation avoids silent failures of data processes where everything will run successfully but provide inaccurate results.

There are a couple things that using Fugue provides when it comes to validation:

  • Allows validation code to be reused for both Pandas and Spark projects

  • Ability to use familiar Pandas-based libraries on Spark

  • Simple interface for validation on each partition of data

To illustrate this, we’ll use a simple example with the following Pandas DataFrame.

import pandas as pd 

data = pd.DataFrame({'state': ['FL','FL','FL','CA','CA','CA'], 
                     'city': ['Orlando', 'Miami', 'Tampa',
                              'San Francisco', 'Los Angeles', 'San Diego'],
                     'price': [8, 12, 10, 16, 20, 18]})
data.head()
state city price
0 FL Orlando 8
1 FL Miami 12
2 FL Tampa 10
3 CA San Francisco 16
4 CA Los Angeles 20

Pandera for Data Validation#

Data Validation can be placed at the start of the data pipeline to make sure that any transformations happen smoothly, and it can also be placed at the end to make sure everything is working well before output gets committed to the database. Pandera is a data validation framework that has a lightweight and expressive syntax, making it good for this demo. The process here will also work for other data validation frameworks as long as their classes can be pickled.

For the above DataFrame, we want to guarantee that the price is within a certain range. We want to make sure that the price column is at least 8 and not more than 20.

import pandera as pa

price_check = pa.DataFrameSchema({
    "price": pa.Column(pa.Int, pa.Check.in_range(min_value=8,max_value=20)),
})

def price_validation(data:pd.DataFrame) -> pd.DataFrame:
    price_check.validate(data)
    return data

price_validation(data)
state city price
0 FL Orlando 8
1 FL Miami 12
2 FL Tampa 10
3 CA San Francisco 16
4 CA Los Angeles 20
5 CA San Diego 18

In the example above, we are using pandera’s DataFrameSchema to create a validation schema. In the price_check variable, we have a Check that is applied to a Column named price. That validation guarantees that the prices are within an acceptable range of values. We don’t need to wrap the validation inside a price_validation() function, but this will make bringing the validation to Spark seamless.

We highly suggest checking the Pandera documentation for more information. If you want to see Pandera in action, change the min_value or max_value in the code above to trigger an error.

Using Pandera on Spark#

Pandera now has support for Spark and Dask through PySpark Pandas and Modin, but these have more overhead as they require conversions of the DataFrame objects. Fugue, on the other hand, will pass the validation to the underlying Spark or Dask DataFrames. There are also other use cases enabled by Fugue such as validation per partition seen later.

Below is an example of bringing Pandera validations to Spark with minimal code changes.

Note that pyspark needs to be installed in order for the code snippet below to run

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from fugue import transform

price_check = pa.DataFrameSchema({
    "price": pa.Column(pa.Int, pa.Check.in_range(min_value=5,max_value=20)),
})

def price_validation(data:pd.DataFrame) -> pd.DataFrame:
    price_check.validate(data)
    return data

# df is a Spark DataFrame
df = transform(data, price_validation, schema="*", engine=spark)
df.show()
[Stage 19:>                                                         (0 + 4) / 4]
+-----+-------------+-----+
|state|         city|price|
+-----+-------------+-----+
|   FL|      Orlando|    8|
|   FL|        Miami|   12|
|   FL|        Tampa|   10|
|   CA|San Francisco|   16|
|   CA|  Los Angeles|   20|
|   CA|    San Diego|   18|
+-----+-------------+-----+
                                                                                

There were no code changes to bring the code to Spark. We only needed to use the fugue transform() function. This function ports the Pandas based code reads this and checks to see if the schema is upheld. If users move away from Fugue, the validation function is exactly the same.

The transform() function is the only addition to bring it to Spark. We use the Fugue transform() method to apply the function and because we passed the SparkSession variable spark as the engine to the transform() function, this code will now run in Spark. The transform() call will convert the input pandas DataFrame to a Spark DataFrame.

To use the Dask engine, users can supply a string "dask" or a Dask Client as the engine.

Validation by Partition with Fugue#

There is one current shortcoming of the current data validation frameworks. For the data we have, the price ranges of CA and FL are drastically different. Because the validation is applied per column, we don’t have a way to specify different price ranges for each location. It would be ideal however if we could apply a different check for each group of data. This is what we call validation by partition. In pandas semantics, it would be the equivalent of groupby-validate.

This operation becomes very trivial to perform with Fugue. In the above example, we want to apply a different validation for the data in FL and the data in CA. On average, the CA data points have a higher price so we want to create two validation rules depending on the state. We do this in the code below.

price_check_FL = pa.DataFrameSchema({
    "price": pa.Column(pa.Int, pa.Check.in_range(min_value=7,max_value=13)),
})

price_check_CA = pa.DataFrameSchema({
    "price": pa.Column(pa.Int, pa.Check.in_range(min_value=15,max_value=21)),
})

price_checks = {'CA': price_check_CA, 'FL': price_check_FL}

def price_validation(df:pd.DataFrame) -> pd.DataFrame:
    location = df['state'].iloc[0]
    check = price_checks[location]
    check.validate(df)
    return df

# df is a Spark DataFrame
df = transform(data, price_validation, schema="*", engine="spark", partition={"by": "state"})
df.show()
+-----+-------------+-----+
|state|         city|price|
+-----+-------------+-----+
|   CA|San Francisco|   16|
|   CA|  Los Angeles|   20|
|   CA|    San Diego|   18|
|   FL|      Orlando|    8|
|   FL|        Miami|   12|
|   FL|        Tampa|   10|
+-----+-------------+-----+

The code above should already look familiar by now. All we did was create two different Pandera DataFrameSchema objects. After that, we modified the price_validation() function to pull the location from the DataFrame and apply the appropriate validation. There are two states in our original DataFrame: CA and FL. However, when the data enters the price_validation() function, it is already partitioned by the state because of the partition({"by":"state"}) passed into transform(). This means the function is applied twice: one for FL and once for CA.

Here, we are taking advantage of the SparkExecutionEngine by distributing the task across multiple partitions. We partition the data by state, and then apply different rules depending on the state.

Returning Errors#

Pandera will raise a SchemaError by default that gets buried by the Spark error messages. To return the errors as a DataFrame, we use can use the following approach. If there are no errors in the data, it will just return an empty DataFrame.

To keep the errors for each partition, you can attach the partition key as a column in the returned DataFrame.

out_schema = "schema_context:str, column:str, check:str, check_number:int, failure_case:str, index:int"
out_columns = ["schema_context", "column", "check", "check_number", "failure_case", "index"]

price_check = pa.DataFrameSchema({
    "price": pa.Column(pa.Int, pa.Check.in_range(min_value=12,max_value=18)),
})

def price_validation(data:pd.DataFrame) -> pd.DataFrame:
    try:
        price_check.validate(data, lazy=True)
        return pd.DataFrame(columns=out_columns)
    except pa.errors.SchemaErrors as err:
        return err.failure_cases

transform(data.copy(), price_validation, schema=out_schema)
# Use the Spark engine
transform(data.copy(), price_validation, schema=out_schema, engine=spark).show()
+--------------+------+----------------+------------+------------+-----+
|schema_context|column|           check|check_number|failure_case|index|
+--------------+------+----------------+------------+------------+-----+
|        Column| price|in_range(12, 18)|           0|           8|    0|
|        Column| price|in_range(12, 18)|           0|          10|    0|
|        Column| price|in_range(12, 18)|           0|          20|    0|
+--------------+------+----------------+------------+------------+-----+

Conclusion#

In this demo we showed how Fugue allows Pandas-based data validation frameworks to be used in Spark. This is helpful for organizations that find themselves implementing validation rules twice to support Spark and Pandas implementations. Even though we demoed with Pandera here, this will work with other data validation libraries as long as the classes are serializeable.

Fugue also allows users to perform validation by partition, a missing feature in the current data validation frameworks. When dealing with big data, there are normally logical groupings that require slightly different validation rules. Fugue helps partition the data and parallelize applying different rules through Spark or Dask.