-
-
Save ogrisel/9886cde997f3e0173a5dc9ca1aae9eab to your computer and use it in GitHub Desktop.
# | |
# XXX: do not use this code, it's broken! | |
# Use: https://gist.github.com/ogrisel/b6a97ed87939e3b559568ac2f6599cba | |
# | |
# See comments. | |
import os | |
import os.path as op | |
from time import time | |
import dask.dataframe as ddf | |
import dask.array as da | |
from dask import delayed, compute | |
from distributed import Client | |
def make_categorical_data(n_samples=int(1e7), n_features=10): | |
"""Generate some random categorical data | |
The default parameters should generate around 1GB of random integer data | |
with increasing cardinality along with a normally distributed real valued | |
target variable. | |
""" | |
feature_names = ['f_%03d' % i for i in range(n_features)] | |
features_series = [ | |
da.random.randint(low=0, high=(i + 1) * 10, size=n_samples, | |
chunks=n_samples // 10) | |
for i in range(n_features) | |
] | |
features_series = [ | |
ddf.from_dask_array(col_data, columns=[feature_name]) | |
for col_data, feature_name in zip(features_series, feature_names) | |
] | |
target = da.random.normal(loc=0, scale=1, size=n_samples, | |
chunks=n_samples // 10) | |
target = ddf.from_dask_array(target, columns=['target']) | |
data = ddf.concat(features_series + [target], axis=1) | |
data = data.repartition(npartitions=10) | |
return data | |
def target_mean_transform(data, feature_colname, target_colname): | |
if data[feature_colname].dtype.kind not in ('i', 'O'): | |
# Non-categorical variables are kept untransformed: | |
return data[feature_colname] | |
data = data[[feature_colname, target_colname]] | |
target_means = data.groupby(feature_colname).mean() | |
mapping = target_means.to_dict()[target_colname] | |
return data[feature_colname].map(mapping) | |
def encode_with_target_mean(data, target_colname='target'): | |
"""Supervised encoding of categorical variables with per-group target mean. | |
All columns that contain integer values are replaced by real valued data | |
representing the average target value for each category. | |
""" | |
features_data = data.drop(target_colname, axis=1) | |
target_data = data[target_colname] | |
return delayed(ddf.concat)( | |
[delayed(target_mean_transform)(data, colname, target_colname) | |
for colname in features_data.columns] + [target_data], | |
axis=1 | |
) | |
if __name__ == '__main__': | |
# make sure dask uses the distributed scheduler: | |
# Start the scheduler and at least one worker with: | |
# $ dask-scheduler | |
# $ dask-worker localhost:8786 | |
# | |
c = Client('localhost:8786') | |
original_folder_name = op.abspath('random_categorical_data') | |
encoded_folder_name = op.abspath('random_encoded_data') | |
if not op.exists(original_folder_name): | |
print("Generating random categorical data in", original_folder_name) | |
os.mkdir(original_folder_name) | |
data = make_categorical_data() | |
ddf.to_parquet(original_folder_name, data) | |
print("Using data from", original_folder_name) | |
data = ddf.read_parquet(original_folder_name) | |
print("Encoding categorical variables...") | |
encoded = encode_with_target_mean(data, target_colname='target') | |
print("Saving encoded data to", encoded_folder_name) | |
t0 = time() | |
# Repartition to get small parquet files in the output folder. | |
encoded = encoded.repartition(npartitions=10) | |
compute(delayed(ddf.to_parquet)(encoded_folder_name, encoded)) | |
print("done in %0.3fs" % (time() - t0)) |
Ok I have a new version of the example where I removed the column-wise parallelism:
https://gist.github.com/ogrisel/b6a97ed87939e3b559568ac2f6599cba
There is no need for delayed
anymore (just plain collection operations) and the tasks panel of bokeh makes me think that it's well parallelized like that already.
Also that fixed version only take 10s 13s total instead of 20s.
Maybe I should just not try to do any nesting by parallelizing the column-wise transforms. The internal calls to group-by and map should already parallelize enough.
I think that can still be parallelized, if you're able to fit the data in distributed memory.
def encode_with_target_mean(data, target_colname='target'):
"""Supervised encoding of categorical variables with per-group target mean.
All columns that contain integer values are replaced by real valued data
representing the average target value for each category.
"""
features_data = data.drop(target_colname, axis=1)
target_data = data[target_colname]
encode_columns = features_data.select_dtypes(['int', 'object']).dtypes.index
mappings = [target_data.groupby(features_data[col]).mean()
for col in encode_columns]
mappings = compute(*mappings) # explicit compute here, so dd.persist(data) is useful earlier
mappings = {m.index.name: m for m in mappings}
for col in encode_columns:
features_data[col] = features_data[col].map(mappings[col])
return ddf.concat([features_data, target_data], axis=1)
A couple other changes in the main
(mostly removing delayed
calls): https://gist.github.com/436442baeb353c53bcdf61adf6b32823, which runs in
Using data from /private/tmp/random_categorical_data
Encoding categorical variables...
Saving encoded data to /private/tmp/random_encoded_data
done in 17.253s
compared to 24 seconds for the original version.
The flat / pure collection API version runs in 10s 13s. Column-wise parallelism is probably useless. Unless the data has many many columns on a cluster with many many workers.
Edit: I did not include the time of computing the mappings which adds ~3s.
BTW in your code you might also want to features_data.select_dtypes(['int', 'object', 'category'])
.
@TomAugspurger thanks for the parallelized mapping version though. Even if slower it's interesting. I had not thought about that option.
Maybe I should just not try to do any nesting by parallelizing the column-wise transforms. The internal calls to group-by and map should already parallelize enough.