Created
April 23, 2025 18:00
-
-
Save dipamsen/bb5f9d060a0498c5324f4dce9bf2219f to your computer and use it in GitHub Desktop.
Discrete Event Simulation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import heapq | |
class Task: | |
def __init__(self, task_id, arrival_time, duration, priority): | |
self.task_id = task_id | |
self.arrival_time = arrival_time | |
self.duration = duration | |
self.priority = priority | |
self.start_time = None | |
self.end_time = None | |
def __lt__(self, other): | |
return self.priority < other.priority | |
class Event: | |
def __init__(self, time, event_type, task): | |
self.time = time | |
self.event_type = event_type | |
self.task = task | |
def __lt__(self, other): | |
return self.time < other.time | |
def __repr__(self): | |
return f"Event(type={self.event_type}, time={self.time}, task={self.task.task_id})" | |
class Scheduler: | |
def __init__(self): | |
self.event_queue = [] | |
self.ready_queue = [] | |
self.clock = 0 | |
self.cpu_busy = False | |
self.completed_tasks = [] | |
def schedule_event(self, event): | |
heapq.heappush(self.event_queue, event) | |
# print(f"[DEBUG] Pushed event {event}.") | |
def add_task(self, task): | |
arrival_event = Event(task.arrival_time, "arrival", task) | |
self.schedule_event(arrival_event) | |
def run(self): | |
while self.event_queue: | |
event = heapq.heappop(self.event_queue) | |
# print(f"[DEBUG] Processing event {event}") | |
self.clock = event.time | |
if event.event_type == 'arrival': | |
self.handle_arrival(event.task) | |
elif event.event_type == 'complete': | |
self.handle_complete(event.task) | |
else: | |
print("Broke out of event loop since no more events.") | |
def handle_arrival(self, task): | |
print(f"[{self.clock}] Task {task.task_id} arrived.") | |
# self.ready_queue.append(task) | |
heapq.heappush(self.ready_queue, task) | |
if not self.cpu_busy: | |
self.start_next_task() | |
def handle_complete(self, task): | |
print(f"[{self.clock}] Task {task.task_id} completed.") | |
self.cpu_busy = False | |
self.completed_tasks.append(task) | |
self.start_next_task() | |
def start_next_task(self): | |
if self.ready_queue: | |
# task = self.ready_queue.pop(0) | |
task = heapq.heappop(self.ready_queue) | |
start_time = self.clock | |
finish_time = self.clock + task.duration | |
print(f"[{start_time}] Task {task.task_id} started (runs till {finish_time})") | |
self.cpu_busy = True | |
self.schedule_event(Event(finish_time, 'complete', task)) | |
if __name__ == "__main__": | |
task_list = [ | |
Task(task_id=1, arrival_time=0, duration=4, priority=2), | |
Task(task_id=2, arrival_time=1, duration=10, priority=1), | |
Task(task_id=3, arrival_time=5, duration=2, priority=5), | |
Task(task_id=4, arrival_time=6, duration=2, priority=3), | |
] | |
scheduler = Scheduler() | |
for t in task_list: | |
scheduler.add_task(t) | |
scheduler.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment