from abc import ABC, abstractclassmethod, abstractmethod
import asyncio, json, logging, time, os, traceback
from collections import OrderedDict
from datetime import datetime
from dateutil.relativedelta import relativedelta
from orsim.messenger import Messenger
from orsim.core.orsim_env import ORSimEnv
from orsim.utils import time_to_str, str_to_time
from cerberus import Validator
[docs]
class ORSimAgent(ABC):
messenger = None
agent_failed = False
payload_cache = None
step_log = {}
# def __init__(self, unique_id, run_id, reference_time, init_time_step, scheduler_id, behavior, orsim_settings):
def __init__(self, unique_id, run_id, reference_time, init_time_step, scheduler, behavior): #, orsim_settings):
self.unique_id = unique_id
self.run_id = run_id
self.scheduler_id = scheduler['id']
self.reference_time = datetime.strptime(reference_time, '%Y%m%d%H%M%S') # datetime
self.current_time = self.reference_time
self.next_event_time = self.reference_time # To be set by agent at every step_response
# self.orsim_settings = orsim_settings
self.orsim_settings = ORSimEnv.validate_orsim_settings(scheduler['orsim_settings'])
# Ideally behavior should be read from a datafile/db or in case of simulation, generated by the Model and passed in as attribute
self.prev_time_step = 0
self.current_time_step = 0
self.elapsed_duration_steps = 0
self.active = False
self._shutdown = False
self.behavior = behavior
self.agent_credentials = {
'email': f"{self.run_id}_{self.scheduler_id}_{unique_id}",
'password': "secret_password",
}
self.messenger = Messenger(ORSimEnv.messenger_settings, self.agent_credentials)
self.start_time = time.time()
self.message_processing_active = False
self.message_handlers = {}
self.bootstrap_step(init_time_step)
self.register_message_handler(topic=f"{self.run_id}/{self.scheduler_id}/ORSimAgent",
method=self.handle_orsim_agent_message)
[docs]
def register_message_handler(self, topic, method):
self.message_handlers[topic] = method
[docs]
def is_active(self):
return self.active
[docs]
def reset_step_log(self):
self.step_log = {}
[docs]
def add_step_log(self, message):
self.step_log[datetime.now().isoformat()] = message
[docs]
def take_first_step(self, dummy_payload):
self.message_processing_active = True
self.handle_orsim_agent_message(dummy_payload)
[docs]
def handle_orsim_agent_message(self, payload):
# print('Inside handle_orsim_agent_message')
self.add_step_log('In handle_orsim_agent_message')
try:
self.bootstrap_step(payload['time_step'])
if payload.get('action') == 'init':
''' NOTE This is unused block of code at the moment'''
# print(f"{self.unique_id} received {payload=}")
did_step = self.process_payload(payload)
self.next_event_time = self.estimate_next_event_time()
response_payload = {
'agent_id': self.unique_id,
'time_step': self.current_time_step,
'action': 'ready', # 'completed',
'did_step': did_step,
'run_time': time.time() - self.start_time,
}
elif payload.get('action') == 'step':
did_step = self.process_payload(payload)
self.next_event_time = self.estimate_next_event_time()
response_payload = {
'agent_id': self.unique_id,
'time_step': self.current_time_step,
'action': 'completed' if self._shutdown==False else 'shutdown',
'did_step': did_step,
'run_time': time.time() - self.start_time,
}
elif payload.get('action') == 'shutdown':
''' '''
# self.shutdown()
response_payload = {
'agent_id': self.unique_id,
'time_step': self.current_time_step,
'action': 'shutdown',
'did_step': True,
'run_time': time.time() - self.start_time,
}
except Exception as e:
response_payload = {
'agent_id': self.unique_id,
'time_step': self.current_time_step,
'action': 'error',
'did_step': False,
'run_time': time.time() - self.start_time,
'details': traceback.format_exc(), # str(e)
}
# logging.exception(f"{self.unique_id} raised {str(e)}")
self.messenger.client.publish(f'{self.run_id}/{self.scheduler_id}/ORSimScheduler', json.dumps(response_payload))
if payload.get('action') == 'shutdown':
self.shutdown()
self.end_time = time.time()
self.message_processing_active = False
[docs]
def on_receive_message(self, client, userdata, message):
''' '''
payload = json.loads(message.payload.decode('utf-8'))
# print("received message", message.payload.decode('utf-8'))
self.start_time = time.time()
self.message_processing_active = True
self.payload_cache = payload
self.reset_step_log()
logging.debug(f"Agent {self.unique_id} received {payload.get('action')}")
if self.message_handlers.get(message.topic) is not None:
method = self.message_handlers[message.topic]
method(payload)
logging.debug(f"Runtime for {self.unique_id} at {self.current_time_step}: {self.end_time - self.start_time:0.2f} secs ")
[docs]
def start_listening(self):
start_time_for_ready = time.time() # NOTE Local start time NOT a class variable
if not self.agent_failed:
# self.agent_messenger = Messenger(self.agent_credentials, f"{self.run_id}/{self.scheduler_id}/ORSimAgent", self.on_receive_message)
self.messenger.client.subscribe([(topic, 0) for topic, _ in self.message_handlers.items()])
self.messenger.client.on_message = self.on_receive_message
# print('subscribed to ', self.message_handlers)
# if settings['CONCURRENCY_STRATEGY'] == 'ASYNCIO':
# logging.debug(f'Agent {self.unique_id} is Listening for Messages')
# loop = asyncio.get_event_loop()
# try:
# loop.run_forever()
# except KeyboardInterrupt:
# pass
# finally:
# loop.close()
# elif settings['CONCURRENCY_STRATEGY'] == 'EVENTLET':
import eventlet
def run_forever():
while True:
# eventlet.sleep(0.1)
eventlet.sleep(5)
self.handle_heartbeat_failure()
# logging.info(f"{self.unique_id} Heartbeat")
if self._shutdown == True:
self.stop_listening()
break
eventlet.spawn(run_forever)
# Once agent is setup and listening, send the ready message
response_payload = {
'agent_id': self.unique_id,
'time_step': -1,
'action': 'ready',
'run_time': time.time() - start_time_for_ready,
}
else:
response_payload = {
'agent_id': self.unique_id,
'time_step': -1,
'action': 'init_error',
'run_time': time.time() - start_time_for_ready,
}
self.take_first_step({
'action': 'init',
'time_step': self.current_time_step
})
# self.messenger.client.publish(f'{self.run_id}/{self.scheduler_id}/ORSimScheduler', json.dumps(response_payload))
[docs]
def stop_listening(self):
self.messenger.disconnect()
[docs]
def get_current_time_str(self):
return time_to_str(self.current_time)
# @classmethod
# def time_to_str(cls, time_var):
# return datetime.strftime(time_var, "%a, %d %b %Y %H:%M:%S GMT")
# @classmethod
# def str_to_time(cls, time_str):
# return datetime.strptime(time_str, "%a, %d %b %Y %H:%M:%S GMT")
[docs]
def bootstrap_step(self, time_step):
self.prev_time_step = self.current_time_step
self.current_time_step = time_step
self.elapsed_duration_steps = self.current_time_step - self.prev_time_step
self.current_time = self.reference_time + relativedelta(seconds = time_step * self.orsim_settings['STEP_INTERVAL'])
[docs]
def shutdown(self):
if not self._shutdown:
logging.info(f'Shutting down {self.unique_id = }')
# self.stop_listening()
self.logout()
self.active = False
self._shutdown = True
[docs]
def handle_heartbeat_failure(self):
if self.message_processing_active:
now = time.time()
threshold = self.orsim_settings['STEP_TIMEOUT']
if (now - self.start_time) > threshold:
logging.warning(f"Auto Shutdown Agent {self.unique_id}. Exceeded heartbeat threshold {threshold} sec while processing...")
# logging.warning(f"{self.payload_cache = }")
logging.warning(f"{json.dumps(self.step_log, indent=2)}")
# self.stop_listening()
self.active = False
self._shutdown = True
[docs]
def get_transition_probability(self, condition, default):
try:
for rule in self.behavior.get('transition_prob'):
if rule[0] == condition:
return rule[1]
except: pass
return default
[docs]
@abstractmethod
def process_payload(self, payload):
raise NotImplementedError
[docs]
@abstractmethod
def estimate_next_event_time(self):
raise NotImplementedError
[docs]
@abstractmethod
def logout(self):
''' process any logout processes needed in the agent.
'''
raise NotImplementedError