Whylogs#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

Installation#

To use whylogs with Fugue, use install both whylogs with the Fugue extra, and fugue with whatever backend you plan to use. For example, to run on Spark, the command would be:

pip install 'whylogs[fugue]' 'fugue[spark]'

For Dask and Ray, just replace the Spark part of the installation.

Setup#

First, we construct a dataset that we will use for profiling.

import pandas as pd
import numpy as np

n = 100
np.random.seed(0)
tdf = pd.DataFrame(
    dict(
        a=np.random.choice([1, 2, 3], n),
        b=np.random.choice(["a", "b"], n),
        c=np.random.random(n),
        d=np.random.choice(["xy", "z"], n),
    )
)
tdf.to_parquet("/tmp/test.parquet")

Profiling using Whylogs + Fugue#

The simplest way to use profile is equivalent to use why.log(df).view()

from whylogs.api.fugue import fugue_profile

fugue_profile(tdf).to_pandas()
cardinality/est cardinality/lower_1 cardinality/upper_1 counts/inf counts/n counts/nan counts/null distribution/max distribution/mean distribution/median ... frequent_items/frequent_strings ints/max ints/min type types/boolean types/fractional types/integral types/object types/string types/tensor
column
a 3.000000 3.0 3.000150 0 100 0 0 3.000000 1.880000 2.000000 ... [FrequentItem(value='1', est=39, upper=39, low... 3.0 1.0 SummaryType.COLUMN 0 0 100 0 0 0
b 2.000000 2.0 2.000100 0 100 0 0 NaN 0.000000 NaN ... [FrequentItem(value='a', est=57, upper=57, low... NaN NaN SummaryType.COLUMN 0 0 0 0 100 0
c 100.000025 100.0 100.005018 0 100 0 0 0.992396 0.499929 0.487838 ... NaN NaN NaN SummaryType.COLUMN 0 100 0 0 0 0
d 2.000000 2.0 2.000100 0 100 0 0 NaN 0.000000 NaN ... [FrequentItem(value='xy', est=53, upper=53, lo... NaN NaN SummaryType.COLUMN 0 0 0 0 100 0

4 rows × 31 columns

We can select the specific columns to be used for profiling.

fugue_profile(tdf, profile_cols=["c","d"]).to_pandas()
cardinality/est cardinality/lower_1 cardinality/upper_1 counts/inf counts/n counts/nan counts/null distribution/max distribution/mean distribution/median ... distribution/q_99 distribution/stddev type types/boolean types/fractional types/integral types/object types/string types/tensor frequent_items/frequent_strings
column
c 100.000025 100.0 100.005018 0 100 0 0 0.992396 0.499929 0.487838 ... 0.992396 0.294085 SummaryType.COLUMN 0 100 0 0 0 0 NaN
d 2.000000 2.0 2.000100 0 100 0 0 NaN 0.000000 NaN ... NaN 0.000000 SummaryType.COLUMN 0 0 0 0 100 0 [FrequentItem(value='xy', est=53, upper=53, lo...

2 rows × 29 columns

Now assuming we want to use Spark to profile the dataset distributedly and assuming this is how we get a SparkSession:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
fugue_profile(tdf, engine=spark)
<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7fd42931e610>

If we want to profile a SparkDataFrame:

spark_df = spark.createDataFrame(tdf)
fugue_profile(spark_df)
<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7fd42929e2e0>

We can also directly profile a parquet file or a folder of parquet files locally or on the cloud (the file will be loaded distributedly):

fugue_profile("/tmp/test.parquet", engine=spark)
<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7fd41b82e6a0>

Profiling by Partition#

If we want to profile tdf grouped by columns a and b:

fugue_profile(tdf, partition={"by":["a","b"]})
a b __whylogs_df_profile_view
0 1 a b'WHY1\x00\x88\x03\n\x0e \xc8\xa2\x81\xc8\xf20...
1 1 b b'WHY1\x00\x88\x03\n\x0e \xd5\xa2\x81\xc8\xf20...
2 2 a b'WHY1\x00\x88\x03\n\x0e \xe1\xa2\x81\xc8\xf20...
3 2 b b'WHY1\x00\x88\x03\n\x0e \xeb\xa2\x81\xc8\xf20...
4 3 a b'WHY1\x00\x88\x03\n\x0e \xf6\xa2\x81\xc8\xf20...
5 3 b b'WHY1\x00\x88\x03\n\x0e \x82\xa3\x81\xc8\xf20...

We can also control the output profile field:

res = fugue_profile(tdf, partition={"by":["a","b"]}, profile_field="x")
res
a b x
0 1 a b'WHY1\x00\x88\x03\n\x0e \xef\xae\x81\xc8\xf20...
1 1 b b'WHY1\x00\x88\x03\n\x0e \xfe\xae\x81\xc8\xf20...
2 2 a b'WHY1\x00\x88\x03\n\x0e \x88\xaf\x81\xc8\xf20...
3 2 b b'WHY1\x00\x88\x03\n\x0e \x94\xaf\x81\xc8\xf20...
4 3 a b'WHY1\x00\x88\x03\n\x0e \xa1\xaf\x81\xc8\xf20...
5 3 b b'WHY1\x00\x88\x03\n\x0e \xac\xaf\x81\xc8\xf20...
from whylogs import DatasetProfileView

res.x.apply(DatasetProfileView.deserialize)
0    <whylogs.core.view.dataset_profile_view.Datase...
1    <whylogs.core.view.dataset_profile_view.Datase...
2    <whylogs.core.view.dataset_profile_view.Datase...
3    <whylogs.core.view.dataset_profile_view.Datase...
4    <whylogs.core.view.dataset_profile_view.Datase...
5    <whylogs.core.view.dataset_profile_view.Datase...
Name: x, dtype: object
fugue_profile(tdf, partition={"by":["a","b"]}, engine=spark, as_local=False) 
DataFrame[a: bigint, b: string, __whylogs_df_profile_view: binary]

We may also directly save the output to a file distributedly:

fugue_profile(tdf, partition={"by":["a","b"]}, save_path="/tmp/output1.parquet", engine=spark)
fugue_profile("/tmp/test.parquet", partition={"by":["a","b"]}, save_path="/tmp/output2.parquet", engine=spark)
'/tmp/output2.parquet'

Using the Fugue API#

Fugue API is a collection of standalone platform agnostic functions for common big data operations. fugue_profile can be used in a similar way. In the previous example, loading, profiling, and execution was all contained in one function. While compact, it overloads the function, making it hard to read and maintain. The code can be split out like this instead:

import fugue.api as fa

with fa.engine_context(spark):
    df = fa.load("/tmp/test.parquet")
    res = fugue_profile(df, partition={"by":["a","b"]})
    fa.save(res, "/tmp/output2.parquet")

Visualization in FugueSQL#

Whylogs profile visualization is a built-in FugueSQL extension. It can be used with why:viz

Here is how you use the extension:

import whylogs.api.fugue.registry  # you don't really need to import this explicitly, the registration is automatic
import fugue.api as fa

fa.fugue_sql_flow("""
-- visualize a single dataframe's profile
OUTPUT df USING why:viz
-- compare profiles, must set reference and target
OUTPUT target=df, reference=df USING why:viz
""", df = tdf).run();
Connection error. Skip stats collection.

Performance Tips#

Whether using Spark, Ray or Dask, setting the number of partitions with the num keyword to two times of the cluster CPUs will work very well.

fugue_profile(..., partition={"num": 200}, engine=spark)

Setting the number if also possible even if there are logical groupings.

fugue_profile(..., partition={"by":["a","b"], "num": 200}, engine=spark)

Spark

It is also beneficial to enabled pandas UDF on Spark to get better performance. We need to follow this instruction to enable spark.sql.execution.arrow.pyspark.enabled.

The convention in Spark is to spark.shuffle.partitions when starting the Spark cluster. An ideal number should be 2-4 times of the total CPUs.

Ray

If the input DataFrame is a local DataFrame such as a Pandas DataFrame, there is no parallelism enabled by default. So in Ray, it is always a good idea to be explicit about num.

Dask

If the input DataFrame is a local DataFrame such as pandas DataFrame, the default partitioning will be a small number representing the local CPUs. So in Dask, it is always a good idea to be explicit about num.

Accessing Distributed Platforms#

In Fugue, accessing distributed platforms can be very simple. For example with proper setups, to profile a large S3 folder using Databricks, Anyscale or Coiled will be as simple as:

fugue_profile("s3://<path>", engine="db:<databricks_cluster_id>")
fugue_profile("s3://<path>", engine="<anyscale_cluster_uri>")
fugue_profile("s3://<path>", engine="coiled:<coiled_cluster_id>")

For details of each platform, please read the instructions for Databricks, Anyscale and Coiled.