PyCaret#

Homepage Slack Status

Fugue is a low-code unified interface for different computing frameworks such as Spark, Dask and Pandas. PyCaret is using Fugue to support distributed computing scenarios.

Hello World#

Classification#

Let’s start with the most standard example, the code is exactly the same as the local version, there is no magic.

from pycaret.datasets import get_data
from pycaret.classification import *

setup(data=get_data("juice"), target = 'Purchase', n_jobs=1)

test_models = models().index.tolist()[:5]
  Description Value
0 session_id 5517
1 Target Purchase
2 Target Type Binary
3 Label Encoded CH: 0, MM: 1
4 Original Data (1070, 19)
5 Missing Values False
6 Numeric Features 13
7 Categorical Features 5
8 Ordinal Features False
9 High Cardinality Features False
10 High Cardinality Method None
11 Transformed Train Set (748, 17)
12 Transformed Test Set (322, 17)
13 Shuffle Train-Test True
14 Stratify Train-Test False
15 Fold Generator StratifiedKFold
16 Fold Number 10
17 CPU Jobs 1
18 Use GPU False
19 Log Experiment False
20 Experiment Name clf-default-name
21 USI b06e
22 Imputation Type simple
23 Iterative Imputation Iteration None
24 Numeric Imputer mean
25 Iterative Imputation Numeric Model None
26 Categorical Imputer constant
27 Iterative Imputation Categorical Model None
28 Unknown Categoricals Handling least_frequent
29 Normalize False
30 Normalize Method None
31 Transformation False
32 Transformation Method None
33 PCA False
34 PCA Method None
35 PCA Components None
36 Ignore Low Variance False
37 Combine Rare Levels False
38 Rare Level Threshold None
39 Numeric Binning False
40 Remove Outliers False
41 Outliers Threshold None
42 Remove Multicollinearity False
43 Multicollinearity Threshold None
44 Remove Perfect Collinearity True
45 Clustering False
46 Clustering Iteration None
47 Polynomial Features False
48 Polynomial Degree None
49 Trignometry Features False
50 Polynomial Threshold None
51 Group Features False
52 Feature Selection False
53 Feature Selection Method classic
54 Features Selection Threshold None
55 Feature Interaction False
56 Feature Ratio False
57 Interaction Threshold None
58 Fix Imbalance False
59 Fix Imbalance Method SMOTE

compare_model is also exactly the same if you don’t want to use a distributed system

compare_models(include=test_models, n_select=2)
  Model Accuracy AUC Recall Prec. F1 Kappa MCC TT (Sec)
lr Logistic Regression 0.8395 0.8982 0.7399 0.8363 0.7833 0.6565 0.6614 0.1390
nb Naive Bayes 0.7646 0.8387 0.7846 0.6776 0.7244 0.5219 0.5291 0.0080
dt Decision Tree Classifier 0.7487 0.7420 0.6848 0.6796 0.6799 0.4734 0.4757 0.0100
knn K Neighbors Classifier 0.7085 0.7508 0.5820 0.6417 0.6075 0.3770 0.3802 0.0110
svm SVM - Linear Kernel 0.5578 0.0000 0.6138 0.4659 0.4345 0.1344 0.1648 0.0100
[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                    intercept_scaling=1, l1_ratio=None, max_iter=1000,
                    multi_class='auto', n_jobs=None, penalty='l2',
                    random_state=5517, solver='lbfgs', tol=0.0001, verbose=0,
                    warm_start=False),
 GaussianNB(priors=None, var_smoothing=1e-09)]

Now let’s make it distributed, as a toy case, on dask. The only thing changed is an additional parameter parallel_backend

from pycaret.parallel import FugueBackend

compare_models(include=test_models, n_select=2, parallel=FugueBackend("dask"))
[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                    intercept_scaling=1, l1_ratio=None, max_iter=1000,
                    multi_class='auto', n_jobs=None, penalty='l2',
                    random_state=5517, solver='lbfgs', tol=0.0001, verbose=0,
                    warm_start=False),
 GaussianNB(priors=None, var_smoothing=1e-09)]

In order to use Spark as the execution engine, you must have access to a Spark cluster, and you must have a SparkSession, let’s initialize a local Spark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Now just change parallel_backend to this session object, you make it run on Spark. You must understand this is a toy case. In the real situation, you need to have a SparkSession pointing to a real Spark cluster to enjoy the power of Spark

compare_models(include=test_models, n_select=2, parallel=FugueBackend(spark))
                                                                                
[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                    intercept_scaling=1, l1_ratio=None, max_iter=1000,
                    multi_class='auto', n_jobs=None, penalty='l2',
                    random_state=4418, solver='lbfgs', tol=0.0001, verbose=0,
                    warm_start=False),
 GaussianNB(priors=None, var_smoothing=1e-09)]

In the end, you can pull to get the metrics table

pull()
Model Accuracy AUC Recall Prec. F1 Kappa MCC TT (Sec)
lr Logistic Regression 0.8276 0.8905 0.7420 0.8141 0.7732 0.6351 0.6401 0.384
nb Naive Bayes 0.7674 0.8394 0.7674 0.6757 0.7174 0.5213 0.5258 0.015
dt Decision Tree Classifier 0.7594 0.7549 0.6970 0.6897 0.6911 0.4946 0.4967 0.040
knn K Neighbors Classifier 0.7285 0.7716 0.6052 0.6750 0.6367 0.4214 0.4239 0.012
svm SVM - Linear Kernel 0.5162 0.0000 0.5655 0.2674 0.3505 0.0500 0.0576 0.020

Regression#

It’s follows the same pattern as classification.

from pycaret.datasets import get_data
from pycaret.regression import *

setup(data=get_data("insurance"), target = 'charges', n_jobs=1)

test_models = models().index.tolist()[:5]
  Description Value
0 session_id 4045
1 Target charges
2 Original Data (1338, 7)
3 Missing Values False
4 Numeric Features 2
5 Categorical Features 4
6 Ordinal Features False
7 High Cardinality Features False
8 High Cardinality Method None
9 Transformed Train Set (936, 14)
10 Transformed Test Set (402, 14)
11 Shuffle Train-Test True
12 Stratify Train-Test False
13 Fold Generator KFold
14 Fold Number 10
15 CPU Jobs 1
16 Use GPU False
17 Log Experiment False
18 Experiment Name reg-default-name
19 USI d080
20 Imputation Type simple
21 Iterative Imputation Iteration None
22 Numeric Imputer mean
23 Iterative Imputation Numeric Model None
24 Categorical Imputer constant
25 Iterative Imputation Categorical Model None
26 Unknown Categoricals Handling least_frequent
27 Normalize False
28 Normalize Method None
29 Transformation False
30 Transformation Method None
31 PCA False
32 PCA Method None
33 PCA Components None
34 Ignore Low Variance False
35 Combine Rare Levels False
36 Rare Level Threshold None
37 Numeric Binning False
38 Remove Outliers False
39 Outliers Threshold None
40 Remove Multicollinearity False
41 Multicollinearity Threshold None
42 Remove Perfect Collinearity True
43 Clustering False
44 Clustering Iteration None
45 Polynomial Features False
46 Polynomial Degree None
47 Trignometry Features False
48 Polynomial Threshold None
49 Group Features False
50 Feature Selection False
51 Feature Selection Method classic
52 Features Selection Threshold None
53 Feature Interaction False
54 Feature Ratio False
55 Interaction Threshold None
56 Transform Target False
57 Transform Target Method box-cox

compare_model is also exactly the same if you don’t want to use a distributed system

compare_models(include=test_models, n_select=2)
  Model MAE MSE RMSE R2 RMSLE MAPE TT (Sec)
lasso Lasso Regression 4121.9556 36109634.6000 5980.6114 0.7376 0.5463 0.4243 0.0130
ridge Ridge Regression 4134.4132 36105753.4000 5980.2880 0.7376 0.5453 0.4268 0.0120
lr Linear Regression 4122.6497 36115891.4000 5981.1752 0.7375 0.5472 0.4243 0.0080
en Elastic Net 7122.3933 87174564.0000 9313.8934 0.3674 0.7421 0.9344 0.0100
lar Least Angle Regression 7305.2647 1287737542.0774 16591.0408 -9.7522 0.6450 0.8588 0.0120
[Lasso(alpha=1.0, copy_X=True, fit_intercept=True, max_iter=1000,
       normalize=False, positive=False, precompute=False, random_state=4045,
       selection='cyclic', tol=0.0001, warm_start=False),
 Ridge(alpha=1.0, copy_X=True, fit_intercept=True, max_iter=None,
       normalize=False, random_state=4045, solver='auto', tol=0.001)]

Now let’s make it distributed, as a toy case, on dask. The only thing changed is an additional parameter parallel_backend

from pycaret.parallel import FugueBackend

compare_models(include=test_models, n_select=2, parallel=FugueBackend("dask"))
[Lasso(alpha=1.0, copy_X=True, fit_intercept=True, max_iter=1000,
       normalize=False, positive=False, precompute=False, random_state=4045,
       selection='cyclic', tol=0.0001, warm_start=False),
 Ridge(alpha=1.0, copy_X=True, fit_intercept=True, max_iter=None,
       normalize=False, random_state=4045, solver='auto', tol=0.001)]

In order to use Spark as the execution engine, you must have access to a Spark cluster, and you must have a SparkSession, let’s initialize a local Spark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Now just change parallel_backend to this session object, you make it run on Spark. You must understand this is a toy case. In the real situation, you need to have a SparkSession pointing to a real Spark cluster to enjoy the power of Spark

compare_models(include=test_models, n_select=2, parallel=FugueBackend(spark))
                                                                                
[Lasso(alpha=1.0, copy_X=True, fit_intercept=True, max_iter=1000,
       normalize=False, positive=False, precompute=False, random_state=7138,
       selection='cyclic', tol=0.0001, warm_start=False),
 LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1, normalize=False)]

In the end, you can pull to get the metrics table

pull()
Model MAE MSE RMSE R2 RMSLE MAPE TT (Sec)
lasso Lasso Regression 4240.9847 3.703576e+07 6063.9052 0.7478 0.5959 0.4329 0.015
lr Linear Regression 4211.7614 3.722926e+07 6058.1708 0.7400 0.5822 0.4211 0.021
lar Least Angle Regression 4403.0912 3.944249e+07 6243.0943 0.7317 0.5758 0.4289 0.020
ridge Ridge Regression 4152.4058 3.682102e+07 6037.5101 0.7142 0.5722 0.4263 0.018
en Elastic Net 7406.3822 9.128549e+07 9497.0126 0.3646 0.7475 0.9472 0.030

As you see, the results from the distributed versions can be different from your local versions. In the next section, we will show how to make them identical.

A more practical case#

The above examples are pure toys, to make things work perfectly in a distributed system you must be careful about a few things

Use a lambda instead of a dataframe in setup#

If you directly provide a dataframe in setup, this dataset will need to be sent to all worker nodes. If the dataframe is 1G, you have 100 workers, then it is possible your server machine will need to send out up to 100G data (depending on specific framework’s implementation), then this data transfer becomes a bottleneck itself. Instead, if you provide a lambda function, it doesn’t change the local compute scenario, but the driver will only send the function reference to workers, and each worker will be responsible to load the data by themselves, so there is no heavy traffic on the driver side.

Be deterministic#

You should always use session_id to make the distributed compute deterministic, otherwise, for the exactly same logic you could get drastically different selection for each run.

Set n_jobs#

It is important to be explicit on n_jobs when you want to run something distributedly, so it will not overuse the local/remote resources. This can also avoid resource contention, and make the compute faster.

from pycaret.classification import *

setup(data=lambda: get_data("juice", verbose=False, profile=False), target = 'Purchase', session_id=0, n_jobs=1);
  Description Value
0 session_id 0
1 Target Purchase
2 Target Type Binary
3 Label Encoded CH: 0, MM: 1
4 Original Data (1070, 19)
5 Missing Values False
6 Numeric Features 13
7 Categorical Features 5
8 Ordinal Features False
9 High Cardinality Features False
10 High Cardinality Method None
11 Transformed Train Set (748, 17)
12 Transformed Test Set (322, 17)
13 Shuffle Train-Test True
14 Stratify Train-Test False
15 Fold Generator StratifiedKFold
16 Fold Number 10
17 CPU Jobs 1
18 Use GPU False
19 Log Experiment False
20 Experiment Name clf-default-name
21 USI cc4a
22 Imputation Type simple
23 Iterative Imputation Iteration None
24 Numeric Imputer mean
25 Iterative Imputation Numeric Model None
26 Categorical Imputer constant
27 Iterative Imputation Categorical Model None
28 Unknown Categoricals Handling least_frequent
29 Normalize False
30 Normalize Method None
31 Transformation False
32 Transformation Method None
33 PCA False
34 PCA Method None
35 PCA Components None
36 Ignore Low Variance False
37 Combine Rare Levels False
38 Rare Level Threshold None
39 Numeric Binning False
40 Remove Outliers False
41 Outliers Threshold None
42 Remove Multicollinearity False
43 Multicollinearity Threshold None
44 Remove Perfect Collinearity True
45 Clustering False
46 Clustering Iteration None
47 Polynomial Features False
48 Polynomial Degree None
49 Trignometry Features False
50 Polynomial Threshold None
51 Group Features False
52 Feature Selection False
53 Feature Selection Method classic
54 Features Selection Threshold None
55 Feature Interaction False
56 Feature Ratio False
57 Interaction Threshold None
58 Fix Imbalance False
59 Fix Imbalance Method SMOTE

Set the appropriate batch_size#

batch_size parameter helps adjust between load balance and overhead. For each batch, setup will be called only once. So

Choice

Load Balance

Overhead

Best Scenario

Smaller batch size

Better

Worse

training time >> data loading time or models ~= workers

Larger batch size

Worse

Better

training time << data loading time or models >> workers

The default value is set to 1, meaning we want the best load balance.

Display progress#

In development, you can enable visual effect by display_remote=True, but meanwhile you must also enable Fugue Callback so that the driver can monitor worker progress. But it is recommended to turn off display in production.

fconf = {
    "fugue.rpc.server": "fugue.rpc.flask.FlaskRPCServer",  # keep this value
    "fugue.rpc.flask_server.host": "0.0.0.0",  # the driver ip address workers can access
    "fugue.rpc.flask_server.port": "3333",  # the open port on the driver
    "fugue.rpc.flask_server.timeout": "2 sec",  # the timeout for worker to talk to driver
}

be = FugueBackend("dask", fconf, display_remote=True, batch_size=3, top_only=False)
compare_models(n_select=2, parallel=be)
Model Accuracy AUC Recall Prec. F1 Kappa MCC TT (Sec)
lda Linear Discriminant Analysis 0.8328 0.8949 0.7585 0.7985 0.7735 0.6416 0.6464 0.016
lr Logistic Regression 0.8275 0.8964 0.7265 0.8105 0.7589 0.6260 0.6344 0.185
ridge Ridge Classifier 0.8275 0.0000 0.7479 0.7971 0.7654 0.6299 0.6366 0.011
catboost CatBoost Classifier 0.8221 0.8967 0.7585 0.7755 0.7624 0.6209 0.6254 0.779
gbc Gradient Boosting Classifier 0.8195 0.8855 0.7510 0.7760 0.7594 0.6154 0.6193 0.113
rf Random Forest Classifier 0.8048 0.8792 0.7408 0.7483 0.7397 0.5843 0.5889 0.171
ada Ada Boost Classifier 0.8021 0.8668 0.7014 0.7639 0.7275 0.5729 0.5776 0.090
lightgbm Light Gradient Boosting Machine 0.7994 0.8775 0.7299 0.7444 0.7331 0.5730 0.5768 0.051
xgboost Extreme Gradient Boosting 0.7941 0.8729 0.7228 0.7353 0.7248 0.5609 0.5649 0.258
et Extra Trees Classifier 0.7820 0.8509 0.7122 0.7214 0.7101 0.5365 0.5428 0.148
dt Decision Tree Classifier 0.7778 0.7646 0.7047 0.7098 0.7048 0.5270 0.5294 0.009
nb Naive Bayes 0.7674 0.8340 0.7369 0.6776 0.7031 0.5129 0.5173 0.008
knn K Neighbors Classifier 0.7073 0.7646 0.5447 0.6275 0.5792 0.3579 0.3627 0.011
svm SVM - Linear Kernel 0.6403 0.0000 0.1107 0.1439 0.1047 0.0688 0.0820 0.010
dummy Dummy Classifier 0.6243 0.5000 0.0000 0.0000 0.0000 0.0000 0.0000 0.005
qda Quadratic Discriminant Analysis 0.5853 0.5676 0.4395 0.3236 0.3474 0.1035 0.1171 0.008
[LinearDiscriminantAnalysis(n_components=None, priors=None, shrinkage=None,
                            solver='svd', store_covariance=False, tol=0.0001),
 LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                    intercept_scaling=1, l1_ratio=None, max_iter=1000,
                    multi_class='auto', n_jobs=None, penalty='l2',
                    random_state=0, solver='lbfgs', tol=0.0001, verbose=0,
                    warm_start=False)]

Custom Metrics#

You can add custom metrics like before. But in order to make the scorer distributable, it must be serializable. A common function should be fine, but if inside the function, it is using some global variables that are not serializable (for example an RLock object), it can cause issues. So try to make the custom function independent from global variables.

def score_dummy(y_true, y_prob, axis=0):
    return 0.0

add_metric(id = 'mydummy',
               name = 'DUMMY',
               score_func = score_dummy,
               target = 'pred_proba',
               greater_is_better = True,
              )
Name                                                             DUMMY
Display Name                                                     DUMMY
Score Function                <function score_dummy at 0x7efc2af16e50>
Scorer               make_scorer(score_dummy, needs_proba=True, err...
Target                                                      pred_proba
Args                                                                {}
Greater is Better                                                 True
Multiclass                                                        True
Custom                                                            True
Name: mydummy, dtype: object

Adding a function in a class instance is also ok, but make sure all member variables in the class are serializable.

test_models = models().index.tolist()[:5]
compare_models(include=test_models, n_select=2, sort="DUMMY", parallel=FugueBackend(spark))
                                                                                
[GaussianNB(priors=None, var_smoothing=1e-09),
 KNeighborsClassifier(algorithm='auto', leaf_size=30, metric='minkowski',
                      metric_params=None, n_jobs=1, n_neighbors=5, p=2,
                      weights='uniform')]
pull()
Model Accuracy AUC Recall Prec. F1 Kappa MCC DUMMY TT (Sec)
nb Naive Bayes 0.7674 0.8340 0.7369 0.6776 0.7031 0.5129 0.5173 0.0 0.015
knn K Neighbors Classifier 0.7073 0.7646 0.5447 0.6275 0.5792 0.3579 0.3627 0.0 0.032
svm SVM - Linear Kernel 0.6403 0.0000 0.1107 0.1439 0.1047 0.0688 0.0820 0.0 0.011
lr Logistic Regression 0.8275 0.8964 0.7265 0.8105 0.7589 0.6260 0.6344 0.0 0.433
dt Decision Tree Classifier 0.7778 0.7646 0.7047 0.7098 0.7048 0.5270 0.5294 0.0 0.020
class Scores:
    def score_dummy2(self, y_true, y_prob, axis=0):
        return 1.0
    
scores = Scores()

add_metric(id = 'mydummy2',
               name = 'DUMMY2',
               score_func = scores.score_dummy2,
               target = 'pred_proba',
               greater_is_better = True,
              )
Name                                                            DUMMY2
Display Name                                                    DUMMY2
Score Function       <bound method Scores.score_dummy2 of <__main__...
Scorer               make_scorer(score_dummy2, needs_proba=True, er...
Target                                                      pred_proba
Args                                                                {}
Greater is Better                                                 True
Multiclass                                                        True
Custom                                                            True
Name: mydummy2, dtype: object
compare_models(include=test_models, n_select=2, sort="DUMMY2", parallel=FugueBackend("dask"))
[KNeighborsClassifier(algorithm='auto', leaf_size=30, metric='minkowski',
                      metric_params=None, n_jobs=1, n_neighbors=5, p=2,
                      weights='uniform'),
 DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
                        max_depth=None, max_features=None, max_leaf_nodes=None,
                        min_impurity_decrease=0.0, min_impurity_split=None,
                        min_samples_leaf=1, min_samples_split=2,
                        min_weight_fraction_leaf=0.0, presort='deprecated',
                        random_state=0, splitter='best')]
pull()
Model Accuracy AUC Recall Prec. F1 Kappa MCC DUMMY DUMMY2 TT (Sec)
knn K Neighbors Classifier 0.7073 0.7646 0.5447 0.6275 0.5792 0.3579 0.3627 0.0 1.0 0.011
dt Decision Tree Classifier 0.7778 0.7646 0.7047 0.7098 0.7048 0.5270 0.5294 0.0 1.0 0.010
nb Naive Bayes 0.7674 0.8340 0.7369 0.6776 0.7031 0.5129 0.5173 0.0 1.0 0.008
lr Logistic Regression 0.8275 0.8964 0.7265 0.8105 0.7589 0.6260 0.6344 0.0 1.0 0.192
svm SVM - Linear Kernel 0.6403 0.0000 0.1107 0.1439 0.1047 0.0688 0.0820 0.0 0.0 0.011

Notes#

Spark settings#

It is highly recommended to have only 1 worker on each Spark executor, so the worker can fully utilize all cpus (set spark.task.cpus). Also when you do this you should explicitly set n_jobs in setup to the number of cpus of each executor.

executor_cores = 4

spark = SparkSession.builder.config("spark.task.cpus", executor_cores).config("spark.executor.cores", executor_cores).getOrCreate()

setup(data=get_data("juice", verbose=False, profile=False), target = 'Purchase', session_id=0, n_jobs=executor_cores)

compare_models(n_select=2, parallel=FugueBackend(spark))

Databricks#

On Databricks, spark is the magic variable representing a SparkSession. But there is no difference to use. You do the exactly same thing as before:

compare_models(parallel=FugueBackend(spark))

But Databricks, the visualization is difficult, so it may be a good idea to do two things:

  • Set verbose to False in setup

  • Set display_remote to False in FugueBackend

Dask#

Dask has fake distributed modes such as the default (multi-thread) and multi-process modes. The default mode will just work fine (but they are actually running sequentially), and multi-process doesn’t work for PyCaret for now because it messes up with PyCaret’s global variables. On the other hand, any Spark execution mode will just work fine.

Local Parallelization#

For practical use where you try non-trivial data and models, local parallelization (The eaiest way is to use local Dask as backend as shown above) normally doesn’t have performance advantage. Because it’s very easy to overload the CPUS on training, increasing the contention of resources. The value of local parallelization is to verify the code and give you confidence that the distributed environment will provide the expected result with much shorter time.

How to develop#

Distributed systems are powerful but you must follow some good practices to use them:

  1. From small to large: initially, you must start with a small set of data, for example in compare_model limit the models you want to try to a small number of cheap models, and when you verify they work, you can change to a larger model collection.

  2. From local to distributed: you should follow this sequence: verify small data locally then verify small data distributedly and then verify large data distributedly. The current design makes the transition seamless. You can do these sequentially: parallel=None -> parallel=FugueBackend() -> parallel=FugueBackend(spark). In the second step, you can replace with a local SparkSession or local dask.