Last active
February 14, 2023 17:09
-
-
Save LukasWallrich/1e84ba8f532ac9fb9d986850a46dc17f to your computer and use it in GitHub Desktop.
Gmodel.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| TK |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Model proposed by Grim et al | |
| # with different perspectives added - i.e. shortcuts | |
| # Used this to avoid duplicating files | |
| import httpimport | |
| url = "https://gist.githubusercontent.com/LukasWallrich/05f445821fbae694b37a205dc08b2b4f/raw/" | |
| with httpimport.remote_repo(url): | |
| from HPmodel import HPProblem, PSAgent | |
| # Alternative: download file into same folder, then run | |
| # from HPmodel import HPProblem, PSAgent | |
| import pandas as pd | |
| import numpy as np | |
| from copy import copy | |
| from statistics import mean | |
| # TK - Move to base HP Model | |
| class GrimAgent(PSAgent): | |
| def step(self): | |
| """Search for highest peak accessible with own heuristic, in tournament or relay mode (called by mesa)""" | |
| if self.problem.strategy == "relay": | |
| self.focus, self.best_solution = self.problem.max_search(agent = self) | |
| if self.problem.strategy == "tournament": | |
| self.focus, self.best_solution = self.problem.max_search(agent = self, update = False) | |
| class GProblem(HPProblem): | |
| def __init__(self, n, k, l, shortcuts, N_agents, smoothness, seed = None, strategy = "relay", agent_class = GrimAgent): | |
| """Initialize problem, assess heuristics and create agent teams, considering smoothness and strategy given""" | |
| self.draw_shortcuts(n, shortcuts) | |
| self.draw_G_solution(n, smoothness) | |
| super().__init__(n, k, l, N_agents, seed, agent_class = agent_class) | |
| self.strategy = strategy | |
| def draw_shortcuts(self, n, shortcuts): | |
| """ | |
| Created dictionary of shortcuts random numbers between 0 and n as values | |
| and other random numbers as keys | |
| """ | |
| d = {} | |
| for i in range(shortcuts): | |
| d[self.random.randint(0, n-1)] = self.random.randint(0, n-1) | |
| self.shortcuts = d | |
| def draw_solution(self, n): | |
| """Overridden in favor of draw_G_solution""" | |
| pass | |
| def draw_G_solution(self, n, smoothness): | |
| """Generate solution landscape: n numbers, random values ~smoothness apart and interpolated""" | |
| if (smoothness == 0): | |
| super().draw_solution(n) | |
| else: | |
| i = 0 | |
| solution = pd.Series(np.nan for x in range(n)) | |
| while(i < n): | |
| solution[i] = self.random.uniform(0, 100) | |
| i += 1+self.random.randrange(2*smoothness) | |
| #Unless last value is specified, close the circle | |
| if (pd.isna(solution[n-1])): | |
| solution[n] = solution[0] | |
| solution = solution.interpolate().tolist() | |
| if (len(solution) > n): | |
| solution.pop() | |
| self.solution = solution | |
| def max_search(self, agent = None, heuristic = None, update = True): | |
| """Either evaluate heuristic across all starting points, or have agent search from their current location""" | |
| N = self.n #To speed things up | |
| SOLUTION = self.solution | |
| if heuristic == None: #When agents search | |
| start = [self.current_position[agent.team]] | |
| heuristic = agent.heuristic | |
| else: #When heuristics are evaluated | |
| start = range(N) | |
| optima = [] | |
| for current in start: | |
| last_value = SOLUTION[current % N] | |
| while True: | |
| old_value = last_value | |
| for step in heuristic: | |
| new_value = SOLUTION[(current+step) % N] | |
| # Check shortcut and take it if it leads to higher position | |
| shortcut = False | |
| if current+step in self.shortcuts and SOLUTION[self.shortcuts[current+step]] > new_value: | |
| new_value = SOLUTION[self.shortcuts[current+step]] | |
| shortcut = True | |
| if new_value > last_value: | |
| last_value = new_value | |
| if shortcut: | |
| current = self.shortcuts[current+step] | |
| else: | |
| current += step | |
| if old_value == last_value: #No change on k checks | |
| optima.append(last_value) | |
| break | |
| if update: #Should only be used when agents search | |
| self.best_solution[agent.team] = optima[0] | |
| self.current_position[agent.team] = current | |
| return current, mean(optima) | |
| def tournament_step(self): | |
| """Take next step in tournament mode - have each agent search individually, | |
| then teams move to best location identified by team members | |
| """ | |
| solutions = list() | |
| steps_taken = 0 | |
| for i in range(self.n): | |
| self.current_position = dict.fromkeys(self.current_position, i) | |
| while True: | |
| steps_taken =+ 1 | |
| old_solution = self.best_solution | |
| self.datacollector.collect(self) | |
| self.schedule.step() | |
| pos = [{"team": a.__getattribute__("team"), "solution": a.__getattribute__("best_solution"), "focus": a.__getattribute__("focus")} for a in self.schedule.agents] | |
| teams = set(d["team"] for d in pos) | |
| self.current_position = {t:max(list(filter(lambda d: d['team'] == t, pos)), key = lambda x:x["solution"])["focus"] for t in teams} | |
| self.best_solution = {t:max(list(filter(lambda d: d['team'] == t, pos)), key = lambda x:x["solution"])["solution"] for t in teams} | |
| if old_solution == self.best_solution: | |
| solutions.append(copy(self.best_solution)) | |
| break | |
| self.running = False | |
| self.best_solution = self.dict_mean(solutions) | |
| def step(self): | |
| """Have agent teams search for solution, following specified strategy/strategies | |
| (called by mesa, should only take one step) | |
| """ | |
| if(self.strategy == "both"): | |
| self.strategy = "relay" | |
| super().step() | |
| sol = {"relay_" + str(key): val for key, val in self.best_solution.items()} | |
| self.strategy = "tournament" | |
| self.tournament_step() | |
| self.best_solution = dict(sol, **{"tournament_" + str(key): val for key, val in self.best_solution.items()}) | |
| self.strategy = "both" | |
| return None | |
| if self.strategy == "relay": | |
| super().step() | |
| if self.strategy == "tournament": | |
| self.tournament_step() | |
| self.running = False | |
| self.datacollector.collect(self) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment