Source code for orsim.core.orsim_scheduler

from abc import ABC, abstractclassmethod, abstractmethod
import asyncio, json, logging, time, os, pprint

from datetime import datetime
# from apps import orsim
from orsim.messenger import Messenger

from random import random
from .orsim_env import ORSimEnv

from orsim.tasks import start_agent


[docs] class ORSimScheduler(ABC): def __init__(self, run_id, scheduler_id, orsim_settings, init_failure_handler='soft'): if ORSimEnv.messenger_settings is None: raise Exception("Please Initialize ORSimEnv.set_backend()") self.orsim_settings = ORSimEnv.validate_orsim_settings(orsim_settings) self.run_id = run_id self.scheduler_id = scheduler_id self.time = 0 self.init_failure_handler = init_failure_handler self.agent_collection = {} self.agent_stat = {} self.agent_credentials = { 'email': f"{self.run_id}_{self.scheduler_id}_ORSimScheduler", 'password': "secret_password", } self.agent_messenger = Messenger(ORSimEnv.messenger_settings, self.agent_credentials, f"{self.run_id}/{self.scheduler_id}/ORSimScheduler", self.on_receive_message) self.pp = pprint.PrettyPrinter(indent=2) logging.info(f'Starting new {scheduler_id= } for {run_id= }') # def add_agent(self, unique_id, method, spec):
[docs] def add_agent(self, spec, #unique_id, project_path, agent_class): ''' ''' self.agent_collection[spec['unique_id']] = { # 'method': method, 'spec': spec, # 'step_response': 'waiting' 'step_response': { self.time: { 'reaction': 'waiting', 'did_step': False, 'run_time': 0, } } } # import sys # if not project_path in sys.path: # sys.path.append(project_path) # print(sys.path) module_comp = agent_class.split('.') module_name, agent_class_name = str.join('.', module_comp[:-1]), module_comp[-1] kwargs = spec.copy() # kwargs['scheduler_id'] = self.scheduler_id kwargs['scheduler'] = { 'id': self.scheduler_id, 'orsim_settings': self.orsim_settings } # method.delay(**kwargs) # NOTE This starts the Celery Task in a new worker thread start_agent.delay(project_path, module_name, agent_class_name, ORSimEnv.messenger_settings, **kwargs) # NOTE This starts the Celery Task in a new worker thread logging.info(f"agent {spec['unique_id']} entering market") print(f"agent {spec['unique_id']} entering market")
# launch_start = time.time() # while True: # # if self.agent_collection[unique_id]['step_response'] == 'ready': # if self.agent_collection[unique_id]['step_response'][self.time]['reaction'] == 'ready': # logging.info(f'agent {unique_id} is ready') # print(f'agent {unique_id} is ready') # break # elif (self.agent_collection[unique_id]['step_response'][self.time]['reaction'] == 'init_error') or \ # ((time.time() - launch_start) > self.orsim_settings['AGENT_LAUNCH_TIMEOUT']): # logging.exception(f'Failed to Launch agent {unique_id}') # print(f'Failed to Launch agent {unique_id}') # self.remove_agent(unique_id) # if self.init_failure_handler == 'soft': # break # else: # raise Exception(f"Shutdown {self.scheduler_id} due to {self.init_failure_handler=}. Agent {unique_id} failed to launch.") # else: # time.sleep(0.1)
[docs] def remove_agent(self, unique_id): try: logging.info(f"agent {unique_id} has left") print(f"agent {unique_id} has left") self.agent_collection.pop(unique_id) except Exception as e: logging.exception(str(e))
# print(e)
[docs] def on_receive_message(self, client, userdata, message): if message.topic == f"{self.run_id}/{self.scheduler_id}/ORSimScheduler": payload = json.loads(message.payload.decode('utf-8')) response_time_step = payload.get('time_step') if payload.get('time_step') != -1 else self.time try: self.agent_collection[payload.get('agent_id')]['step_response'][response_time_step] = { 'reaction': payload.get('action'), 'did_step': payload.get('did_step'), 'run_time': payload.get('run_time') } except: pass # except Exception as e: # logging.exception(str(e)) if (payload.get('action') == 'error') or (response_time_step is None): logging.warning(f'{self.__class__.__name__} received {message.payload = }')
[docs] async def confirm_responses(self): ''' ''' start_time = time.time() base = 0 completed = 0 ready = 0 shutdown = 0 error = 0 waiting = len(self.agent_collection) while waiting > 0: completed = 0 ready = 0 shutdown = 0 error = 0 waiting = 0 num_did_step = 0 for agent_id, _ in self.agent_collection.items(): response = self.agent_collection[agent_id]['step_response'][self.time] if (response['reaction'] == 'completed'): completed += 1 elif (response['reaction'] == 'ready'): ready += 1 elif (response['reaction'] == 'error'): error += 1 elif (response['reaction'] == 'shutdown'): shutdown += 1 elif (response['reaction'] == 'waiting'): waiting += 1 if response['did_step']: num_did_step += 1 self.agent_stat[self.time] = { 'completed': completed, 'ready': ready, 'error': error, 'shutdown': shutdown, 'waiting': waiting, 'stepping_agents': num_did_step, 'total_agents': len(self.agent_collection), # 'run_time_dist': [] } current_time = time.time() if current_time - start_time >= 5: # logging.info(f"Waiting for Agent Response... {completed=}, {error=}, {shutdown=}, {waiting=} of {len(self.agent_collection)}: {base + (current_time - start_time):0.0f} sec") logging.info(f"Waiting for Agent Response... {self.agent_stat[self.time]}: {base + (current_time - start_time):0.0f} sec") base = base + (current_time - start_time) start_time = current_time await asyncio.sleep(0.1)
[docs] async def step(self): logging.info(f"{self.scheduler_id} Step: {self.time}") start_time = time.time() if self.time == self.orsim_settings['SIMULATION_LENGTH_IN_STEPS']-1: message = {'action': 'shutdown', 'time_step': self.time} else: message = {'action': 'step', 'time_step': self.time} for agent_id, _ in self.agent_collection.items(): self.agent_collection[agent_id]['step_response'][self.time] = { 'reaction': 'waiting', 'did_step': False, 'run_time': 0 } self.agent_messenger.client.publish(f'{self.run_id}/{self.scheduler_id}/ORSimAgent', json.dumps(message)) try: # start_time = time.time() await asyncio.wait_for(self.confirm_responses(), timeout=self.orsim_settings['STEP_TIMEOUT']) # end_time = time.time() logging.info(f'{self.agent_stat[self.time] = }') # logging.info(f'{self.scheduler_id} Runtime: {(time.time()-start_time):0.2f} sec') except asyncio.TimeoutError as e: logging.exception(f'Scheduler {self.scheduler_id} timeout beyond {self.orsim_settings["STEP_TIMEOUT"] = } while waiting for confirm_responses.') self.step_timeout_handler(e) # Handle shutdown agents once successfully exiting the loop agents_shutdown = [] for agent_id, agent_item in self.agent_collection.items(): if agent_item['step_response'][self.time]['reaction'] in ['shutdown', 'waiting']: agents_shutdown.append(agent_id) for agent_id in agents_shutdown: self.remove_agent(agent_id) self.time += 1 sim_stat = { 'status': 'success', 'end_sim': False, } if self.time == self.orsim_settings['SIMULATION_LENGTH_IN_STEPS']-1: sim_stat['end_sim'] = True logging.info(f'{self.scheduler_id} Runtime: {(time.time()-start_time):0.2f} sec') return sim_stat
[docs] def step_timeout_handler(self, e): ''' ''' tolerance = self.orsim_settings['STEP_TIMEOUT_TOLERANCE'] # Max % or agents having network issues if (self.agent_stat[self.time]['waiting'] / len(self.agent_collection)) <= tolerance: logging.warning(f"agent_stat = {self.pp.pformat(self.agent_stat[self.time])}") logging.warning(f"Unable to receive response from {self.agent_stat[self.time]['waiting']} Agents at {self.time=}. % Error ({(self.agent_stat[self.time]['waiting'] / len(self.agent_collection)):0.3f}) is within {tolerance=}. Continue processing...") # logging.warning(f'{self.pp.pformat(self.agent_collection)}') else: logging.error(f"Too many missing messages. % Error ({self.agent_stat[self.time]['waiting'] *100 / len(self.agent_collection)}) exceeded {tolerance=}. Abort...") logging.error(f'{self.pp.pformat(self.agent_collection)}') raise e