Source code for orsim.messenger.messenger


import json

# from numpy import isin
# from orsim.config import settings
import requests
# import urllib3
# from urllib.parse import quote
import logging

import paho.mqtt.client as paho


[docs] class Messenger: def __init__(self, settings, credentials, channel_id=None, on_message=None, transport=None): ''' ''' self.settings = settings self.credentials = credentials self.channel_id = channel_id if transport is None: self.client = paho.Client(credentials['email'], clean_session=True) self.client.username_pw_set(username=self.credentials['email'], password=self.credentials['password']) # Messenger.register_user(self.credentials['email'], self.credentials['password']) self.register_user(self.credentials['email'], self.credentials['password']) self.client.connect(self.settings['MQTT_BROKER']) if on_message is not None: self.client.on_message = on_message # RabbitMQ PubSub queue is used for processing requests in sequence # This is a deliberate design choice to enable: # - Inter-Agent communication as core part of system design # if channel_id is not None: if isinstance(channel_id, str): # self.client.loop_start() self.client.subscribe(channel_id, qos=0) logging.debug(f"Channel: {channel_id}") elif isinstance(channel_id, list): # self.client.loop_start() self.client.subscribe([(cid, 0) for cid in channel_id]) logging.debug(f"Channel: {channel_id}") self.client.loop_start()
[docs] def disconnect(self): ''' ''' # try: # self.client.unsubscribe(self.channel_id) # except Exception as e: # logging.exception(str(e)) if self.channel_id is not None: try: self.client.loop_stop(force=True) except Exception as e: logging.exception(str(e)) try: self.client.disconnect() except Exception as e: logging.exception(str(e))
# @classmethod
[docs] def register_user(self, username, password): ''' ''' response = requests.get(f"{self.settings['RABBITMQ_MANAGEMENT_SERVER']}/users/{username}") if (response.status_code >= 200) and (response.status_code <= 299): logging.warning('User is already registered') else: try: response = requests.put(f"{self.settings['RABBITMQ_MANAGEMENT_SERVER']}/users/{username}", data=json.dumps({'password': password, 'tags': ''}), headers={"content-type": "application/json"}, auth=(self.settings['RABBITMQ_ADMIN_USER'], self.settings['RABBITMQ_ADMIN_PASSWORD']) ) except Exception as e: logging.exception(str(e)) raise e # reset the user and set appropriate permissions as needed quoted_slash = '%2F' response = requests.put(f"{self.settings['RABBITMQ_MANAGEMENT_SERVER']}/permissions/{quoted_slash}/{username}", data=json.dumps({"username":username, "vhost":"/", "configure":".*", "write":".*", "read":".*"}), headers={"content-type": "application/json"}, auth=(self.settings['RABBITMQ_ADMIN_USER'], self.settings['RABBITMQ_ADMIN_PASSWORD']) ) response = requests.put(f"{self.settings['RABBITMQ_MANAGEMENT_SERVER']}/topic-permissions/{quoted_slash}/{username}", data=json.dumps({username: username, "vhost": "/", "exchange": "", "write": ".*", "read": ".*"}), headers={"content-type": "application/json"}, auth=(self.settings['RABBITMQ_ADMIN_USER'], self.settings['RABBITMQ_ADMIN_PASSWORD']) )