Skip to content

Instantly share code, notes, and snippets.

@dipamsen
Created April 23, 2025 18:00
Show Gist options
  • Save dipamsen/bb5f9d060a0498c5324f4dce9bf2219f to your computer and use it in GitHub Desktop.
Save dipamsen/bb5f9d060a0498c5324f4dce9bf2219f to your computer and use it in GitHub Desktop.
Discrete Event Simulation
import heapq
class Task:
def __init__(self, task_id, arrival_time, duration, priority):
self.task_id = task_id
self.arrival_time = arrival_time
self.duration = duration
self.priority = priority
self.start_time = None
self.end_time = None
def __lt__(self, other):
return self.priority < other.priority
class Event:
def __init__(self, time, event_type, task):
self.time = time
self.event_type = event_type
self.task = task
def __lt__(self, other):
return self.time < other.time
def __repr__(self):
return f"Event(type={self.event_type}, time={self.time}, task={self.task.task_id})"
class Scheduler:
def __init__(self):
self.event_queue = []
self.ready_queue = []
self.clock = 0
self.cpu_busy = False
self.completed_tasks = []
def schedule_event(self, event):
heapq.heappush(self.event_queue, event)
# print(f"[DEBUG] Pushed event {event}.")
def add_task(self, task):
arrival_event = Event(task.arrival_time, "arrival", task)
self.schedule_event(arrival_event)
def run(self):
while self.event_queue:
event = heapq.heappop(self.event_queue)
# print(f"[DEBUG] Processing event {event}")
self.clock = event.time
if event.event_type == 'arrival':
self.handle_arrival(event.task)
elif event.event_type == 'complete':
self.handle_complete(event.task)
else:
print("Broke out of event loop since no more events.")
def handle_arrival(self, task):
print(f"[{self.clock}] Task {task.task_id} arrived.")
# self.ready_queue.append(task)
heapq.heappush(self.ready_queue, task)
if not self.cpu_busy:
self.start_next_task()
def handle_complete(self, task):
print(f"[{self.clock}] Task {task.task_id} completed.")
self.cpu_busy = False
self.completed_tasks.append(task)
self.start_next_task()
def start_next_task(self):
if self.ready_queue:
# task = self.ready_queue.pop(0)
task = heapq.heappop(self.ready_queue)
start_time = self.clock
finish_time = self.clock + task.duration
print(f"[{start_time}] Task {task.task_id} started (runs till {finish_time})")
self.cpu_busy = True
self.schedule_event(Event(finish_time, 'complete', task))
if __name__ == "__main__":
task_list = [
Task(task_id=1, arrival_time=0, duration=4, priority=2),
Task(task_id=2, arrival_time=1, duration=10, priority=1),
Task(task_id=3, arrival_time=5, duration=2, priority=5),
Task(task_id=4, arrival_time=6, duration=2, priority=3),
]
scheduler = Scheduler()
for t in task_list:
scheduler.add_task(t)
scheduler.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment