Skip to content

Commit

Permalink
Allow parallel prediction for UpliftRandomForestClassifier (#477)
Browse files Browse the repository at this point in the history
* predict trees in parallel if self.n_jobs != 1
* test predictions coincide for single and parallel computation for UpliftRandomForestClassifier
* mention that `n_jobs` is also used for predictions of UpliftRandomForestClassifier
* Add optional `joblib_prefer` argument to constructor of UpliftRandomForestClassifier. It is passed as `prefer` to joblib.Parallel in the `fit` and `predict` methods.
* add parameter `joblib_prefer` to `test_UpliftRandomForestClassifier`
  • Loading branch information
heiderich authored Feb 14, 2022
1 parent 05f8bf7 commit 92767c3
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
21 changes: 17 additions & 4 deletions causalml/inference/tree/uplift.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,11 @@ class UpliftRandomForestClassifier:
n_jobs: int, optional (default=-1)
The parallelization parameter to define how many parallel jobs need to be created.
This is passed on to joblib library for parallelizing uplift-tree creation.
This is passed on to joblib library for parallelizing uplift-tree creation and prediction.
joblib_prefer: str, optional (default="threads")
The preferred backend for joblib (passed as `prefer` to joblib.Parallel). See the joblib
documentation for valid values.
Outputs
----------
Expand All @@ -1256,7 +1260,8 @@ class UpliftRandomForestClassifier:
n_reg=10,
evaluationFunction='KL',
normalization=True,
n_jobs=-1):
n_jobs=-1,
joblib_prefer: str = "threads"):

"""
Initialize the UpliftRandomForestClassifier class.
Expand All @@ -1272,6 +1277,7 @@ class UpliftRandomForestClassifier:
self.control_name = control_name
self.normalization = normalization
self.n_jobs = n_jobs
self.joblib_prefer = joblib_prefer

assert control_name is not None and isinstance(control_name, str), \
f"control_group should be string but {control_name} is passed"
Expand Down Expand Up @@ -1321,7 +1327,7 @@ class UpliftRandomForestClassifier:
self.n_class = len(self.classes_)

self.uplift_forest = (
Parallel(n_jobs=self.n_jobs, prefer="threads")
Parallel(n_jobs=self.n_jobs, prefer=self.joblib_prefer)
(delayed(self.bootstrap)(X, treatment, y, tree) for tree in self.uplift_forest)
)

Expand Down Expand Up @@ -1366,7 +1372,14 @@ class UpliftRandomForestClassifier:
'''
# Make predictions with all trees and take the average
y_pred_ensemble = sum([tree.predict(X=X) for tree in self.uplift_forest]) / len(self.uplift_forest)

if self.n_jobs != 1:
y_pred_ensemble = sum(
Parallel(n_jobs=self.n_jobs, prefer=self.joblib_prefer)
(delayed(tree.predict)(X=X) for tree in self.uplift_forest)
) / len(self.uplift_forest)
else:
y_pred_ensemble = sum([tree.predict(X=X) for tree in self.uplift_forest]) / len(self.uplift_forest)

# Summarize results into dataframe
df_res = pd.DataFrame(y_pred_ensemble, columns=self.classes_)
Expand Down
22 changes: 19 additions & 3 deletions tests/test_uplift_trees.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def test_make_uplift_classification(generate_classification_data):


@pytest.mark.parametrize("backend", ['loky', 'threading', 'multiprocessing'])
def test_UpliftRandomForestClassifier(generate_classification_data, backend):
@pytest.mark.parametrize("joblib_prefer", ['threads', 'processes'])
def test_UpliftRandomForestClassifier(generate_classification_data, backend, joblib_prefer):
df, x_names = generate_classification_data()
df_train, df_test = train_test_split(df,
test_size=0.2,
Expand All @@ -29,14 +30,29 @@ def test_UpliftRandomForestClassifier(generate_classification_data, backend):
uplift_model = UpliftRandomForestClassifier(
min_samples_leaf=50,
control_name=TREATMENT_NAMES[0],
random_state=RANDOM_SEED
random_state=RANDOM_SEED,
joblib_prefer=joblib_prefer
)

uplift_model.fit(df_train[x_names].values,
treatment=df_train['treatment_group_key'].values,
y=df_train[CONVERSION].values)

y_pred = uplift_model.predict(df_test[x_names].values)
predictions = {}
predictions["single"] = uplift_model.predict(df_test[x_names].values)
with parallel_backend("loky", n_jobs=2):
predictions["loky_2"] = uplift_model.predict(df_test[x_names].values)
with parallel_backend("threading", n_jobs=2):
predictions["threading_2"] = uplift_model.predict(df_test[x_names].values)
with parallel_backend("multiprocessing", n_jobs=2):
predictions["multiprocessing_2"] = uplift_model.predict(df_test[x_names].values)

# assert that the predictions coincide for single and all parallel computations
iterator = iter(predictions.values())
first = next(iterator)
assert all(np.array_equal(first, rest) for rest in iterator)

y_pred = list(predictions.values())[0]
result = pd.DataFrame(y_pred, columns=uplift_model.classes_[1:])

best_treatment = np.where((result < 0).all(axis=1),
Expand Down

0 comments on commit 92767c3

Please sign in to comment.