Last active
January 22, 2018 10:46
-
-
Save iaroslav-ai/93aa78385f55a3bcee8b1dd7c46a2215 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Comparison of parallel vs sequential optimization, | |
where constant lie appraoch is used. | |
""" | |
from threading import Thread | |
from copy import deepcopy | |
from skopt import Optimizer | |
from skopt.learning import ExtraTreesRegressor, GaussianProcessRegressor | |
from skopt.space import Real | |
import numpy as np | |
import time | |
class ParallelOptimizer(): | |
def __init__(self, optimizer): | |
self.optimizer = optimizer # instance of sequential solver, will be copied | |
self.rng = optimizer.rng | |
# this is used to store all the data | |
self.data = {} # has the form str(point): (point_list, objective_float, is_actual_objective_bool) | |
def ask(self): | |
opt = deepcopy(self.optimizer) | |
opt.rng = self.rng | |
keys = self.data.keys() | |
X = [self.data[k][0] for k in keys] | |
Y = [self.data[k][1] for k in keys] | |
if len(X) > 0: | |
opt._n_random_starts = max(0, opt._n_random_starts - len(X)) | |
opt.tell(X, Y) | |
x = opt.ask() | |
y_lie = np.mean(Y) if len(Y) > 0 else 0.0 | |
opt.tell(x, y_lie) | |
self.data[str(x)] = (x, y_lie, False) | |
return x | |
def tell(self, x, y): | |
if y is None: # necessary to handle points for which computation failed, eg due to network failure | |
del self.data[str(x)] | |
return | |
self.data[str(x)] = (x, y, True) | |
def clear(self): | |
# deletes all the lie points | |
self.data = {k:v for k,v in self.data.items() if v[-1]} | |
# this is run in a thread, thus returning result in a weird fashion via xy array | |
def obj(xy): | |
time.sleep(np.random.rand()*3.0) | |
try: | |
# this is where evaluation can fail | |
if np.random.rand() < 0.1: | |
raise BaseException("Schroedinger's exception") | |
y = np.sum(np.array(xy[0]) ** 2) | |
except BaseException: | |
y = None | |
xy[1] = y | |
dimensions = [Real(-3.0, 3.0) for i in range(5)] | |
optimizer = Optimizer(base_estimator=GaussianProcessRegressor(), dimensions=dimensions, acq_optimizer='sampling') | |
n_runs = 11 | |
n_calls = 29 | |
results = {} | |
for config in [(8,False), (8,True), ]: | |
n_jobs, batch_computation = config # batch_computation: whether to run evluations in batches of n_jobs | |
history = [] | |
for rep in range(n_runs): | |
start_time = time.time() | |
parallel_optimizer = ParallelOptimizer(optimizer) | |
history.append([]) | |
threads_xy = [] # array of elements of the form [thread_obj, [x,y]], | |
# where [x,y] is used to return the result of computation | |
# approach kinda like https://sigopt.com/docs/overview/parallel | |
total_evaluations = n_calls | |
while total_evaluations > 0: | |
updated_threads_xy = [] # this will be an array of threads_xy that did not finish computing yet | |
for t in threads_xy: | |
if not t[0].is_alive(): | |
x, y = t[1] | |
parallel_optimizer.tell(x,y) | |
# for simplicity total_evaluations is not decreased if experiment failed | |
if y is not None: | |
history[-1].append([time.time() - start_time, y]) | |
total_evaluations -= 1 | |
print("y=",y,"n_calls=",total_evaluations) | |
else: | |
updated_threads_xy.append(t) | |
threads_xy = updated_threads_xy | |
if batch_computation and len(threads_xy) > 0: | |
continue | |
while len(threads_xy) < min(total_evaluations, n_jobs): | |
x = parallel_optimizer.ask() | |
args = [x, None] | |
thread = Thread(target=obj, args=[args]) | |
thread.daemon = True | |
thread.start() | |
threads_xy.append([thread, args]) | |
time.sleep(0.01) | |
results[str(config)] = history | |
"""EVERYTHING BELOW IS ONLY NEEDED TO PLOT NICELY RESULTS""" | |
def average_run(history): | |
history = np.array(history) | |
y = history[:,:,1] | |
x = history[:,:,0] # time stamps | |
x = (x.T - np.min(x, axis=1)).T # first measurement has timestamp of 0.0 | |
resolution = 1000 | |
# find the resolution timestamp = min. non - zero distance between two timestamps | |
time_step = np.max(x)/float(resolution) | |
current_time = 0.0 | |
y_binned = [] | |
x_binned = [] | |
current_y = y[:, 0] | |
while current_time < np.max(x): | |
y_binned.append(np.copy(current_y)) | |
selection = (current_time <= x) & (x <= current_time + time_step) | |
I = np.any(selection, axis=1) | |
mean_selection = np.sum( y*selection, axis=1 ) / np.sum(selection, axis=1) | |
current_y[I] = mean_selection[I] | |
current_time += time_step | |
x_binned.append(current_time) | |
y_binned = np.column_stack(y_binned) | |
y_binned = np.minimum.accumulate(y_binned, axis=1) | |
y_binned = np.mean(y_binned, axis=0) | |
x_binned = np.array(x_binned) | |
return x_binned, y_binned | |
# plot the history | |
import matplotlib.pyplot as plt | |
for i,k in enumerate(results): | |
x,y = average_run(results[k]) | |
plt.plot(x,y, label=k, c=[float(v) for v in np.binary_repr(i,3)]) | |
plt.xlabel("Time") | |
plt.ylabel("Best objective found") | |
plt.title("Results for different (n_jobs, batch_computations)") | |
plt.legend() | |
plt.grid() | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment