Last active
January 26, 2023 14:46
-
-
Save MaxwellDPS/6da026f1565b56c6baca81ed0ad47fb7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import yaml | |
| from datetime import datetime | |
| import slixmpp | |
| import essential_generators | |
| from nio import AsyncClient, RoomInviteError, JoinError, LoginResponse, AsyncClientConfig, exceptions, crypto, MatrixRoom, RoomMessageText | |
| def _load_config(path: str = 'config.yml') -> None: | |
| with open(path, 'r') as yfp: | |
| config = yaml.load(yfp, yaml.SafeLoader) | |
| return config | |
| class MatrixXMPPBot(slixmpp.ClientXMPP): | |
| def __init__(self, config: dict): | |
| self.matrix_server:str = config["matrix"]["server"] | |
| self.matrix_username:str = config["matrix"]["username"] | |
| self.matrix_password:str = config["matrix"]["password"] | |
| self.matrix_displayname:str = config["matrix"]["displayname"] | |
| self.matrix_room:str = config["matrix"]["room"] | |
| self.matrix_device_name:str = config["matrix"]["device_name"] | |
| self.matrix_keystore_path:str = config["matrix"]["keystore_path"] | |
| self.creds_file:str = config["matrix"]["creds_file_path"] | |
| self.muted_peeps:list[str] = config["xmpp"]["mute_users"] | |
| self.xmpp_username:str = config["xmpp"]["username"] | |
| self.xmpp_password:str = config["xmpp"]["password"] | |
| self.xmpp_room:str = config["xmpp"]["room"] | |
| self.xmpp_nick:str = config["xmpp"]["nick"] | |
| self.keyword_alerts:bool = config["xmpp"]["keyword_alerts"]["enabled"] | |
| self.alert_prefix:str = config["xmpp"]["keyword_alerts"]["prefix"] | |
| self.keywords:list[str] = config["xmpp"]["keyword_alerts"]["keywords"] | |
| slixmpp.ClientXMPP.__init__(self, self.xmpp_username, self.xmpp_password) | |
| self.add_event_handler("session_start", self.start) | |
| self.add_event_handler("groupchat_message", self._message) | |
| self.loop = asyncio.get_event_loop() | |
| self.loop.run_until_complete(self.init()) | |
| self.loop.create_task( | |
| self.matrix_client.sync_forever( | |
| timeout=3000, | |
| full_state=True, | |
| ) | |
| ) | |
| async def init(self): | |
| await self._login_to_matrix() | |
| self.matrix_client.load_store() | |
| if self.matrix_client.should_upload_keys: | |
| await self.matrix_client.keys_upload() | |
| if self.matrix_client.should_query_keys: | |
| await self.matrix_client.keys_query() | |
| if self.matrix_client.should_claim_keys: | |
| await self.matrix_client.keys_claim() | |
| await self.matrix_client.sync(full_state=True) | |
| await self.matrix_client.set_displayname(self.matrix_displayname) | |
| logging.info("[+] MATRIX BOT INIT DONE") | |
| def _write_details_to_disk(self, resp: LoginResponse, homeserver) -> None: | |
| """Writes the required login details to disk so we can log in later without | |
| using a password. | |
| Arguments: | |
| resp {LoginResponse} -- the successful client login response. | |
| homeserver -- URL of homeserver, e.g. "https://matrix.example.org" | |
| """ | |
| # open the config file in write-mode | |
| with open(self.creds_file, "w") as f: | |
| # write the login details to disk | |
| json.dump( | |
| { | |
| "homeserver": homeserver, # e.g. "https://matrix.example.org" | |
| "user_id": resp.user_id, # e.g. "@user:example.org" | |
| "device_id": resp.device_id, # device ID, 10 uppercase letters | |
| "access_token": resp.access_token, # cryptogr. access token | |
| }, | |
| f, | |
| ) | |
| async def _login_to_matrix(self) -> None: | |
| client_config = AsyncClientConfig( | |
| max_limit_exceeded=0, | |
| max_timeouts=0, | |
| store_sync_tokens=True, | |
| encryption_enabled=True, | |
| ) | |
| if not os.path.exists(self.creds_file): | |
| self.matrix_client = AsyncClient( | |
| self.matrix_server, | |
| self.matrix_username, | |
| device_id=self.matrix_device_name, | |
| config=client_config, | |
| store_path=self.matrix_keystore_path | |
| ) | |
| self.matrix_client.add_event_callback(self._new_matrix_message, RoomMessageText) | |
| resp = await self.matrix_client.login(self.matrix_password, device_name=self.matrix_device_name) | |
| # check that we logged in succesfully | |
| if isinstance(resp, LoginResponse): | |
| self._write_details_to_disk(resp, self.matrix_server) | |
| else: | |
| logging.error(f'homeserver = "{self.matrix_server}"; user = "{self.matrix_username}"') | |
| logging.error(f"Failed to log in: {resp}") | |
| exit(1) | |
| logging.info( | |
| "Logged in using a password. Credentials were stored." | |
| ) | |
| # Otherwise the config file exists, so we'll use the stored credentials | |
| else: | |
| # open the file in read-only mode | |
| with open(self.creds_file, "r") as f: | |
| config = json.load(f) | |
| self.matrix_client = AsyncClient(config["homeserver"], config=client_config) | |
| self.matrix_client.access_token = config["access_token"] | |
| self.matrix_client.user_id = config["user_id"] | |
| self.matrix_client.device_id = config["device_id"] | |
| self.matrix_client.add_event_callback(self._new_matrix_message, RoomMessageText) | |
| self.matrix_client.load_store() | |
| logging.info("Logged in using stored credentials.") | |
| async def _invite_user_to_room(self): | |
| res = await self.matrix_client.room_invite( | |
| room_id=self.matrix_room, | |
| user_id=self.matrix_client.user_id | |
| ) | |
| if isinstance(res, RoomInviteError): | |
| raise Exception(res) | |
| await self.matrix_client.sync(timeout=3000, full_state=True) | |
| res = await self.matrix_client.join(self.matrix_room) | |
| if isinstance(res, JoinError): | |
| raise Exception(res) | |
| await self.matrix_client.sync(timeout=3000, full_state=True) | |
| def _send_martrix_message(self, message: str, message_type: str = "m.room.message", content_msgtype: str = "m.text") -> None: | |
| loop = asyncio.get_event_loop() | |
| if not self.matrix_room in self.matrix_client.rooms.keys(): | |
| loop.run_until_complete(self._invite_user_to_room()) | |
| try: | |
| loop.run_until_complete(self.matrix_client.room_send( | |
| room_id=self.matrix_room, | |
| message_type=message_type, | |
| content={"msgtype": content_msgtype, "body": message}, | |
| )) | |
| except exceptions.OlmUnverifiedDeviceError as err: | |
| device_store: crypto.DeviceStore = self.matrix_client.device_store | |
| for device in device_store: | |
| self.matrix_client.verify_device(device) | |
| loop.run_until_complete(self.matrix_client.room_send( | |
| room_id=self.matrix_room, | |
| message_type=message_type, | |
| content={"msgtype": content_msgtype, "body": message}, | |
| )) | |
| async def _send_async_martrix_message(self, message: str, message_type: str = "m.room.message", content_msgtype: str = "m.text") -> None: | |
| if not self.matrix_room in self.matrix_client.rooms.keys(): | |
| await self._invite_user_to_room() | |
| try: | |
| await self.matrix_client.room_send( | |
| room_id=self.matrix_room, | |
| message_type=message_type, | |
| content={"msgtype": content_msgtype, "body": message}, | |
| ) | |
| except exceptions.OlmUnverifiedDeviceError as err: | |
| device_store: crypto.DeviceStore = self.matrix_client.device_store | |
| for device in device_store: | |
| self.matrix_client.verify_device(device) | |
| await self.matrix_client.room_send( | |
| room_id=self.matrix_room, | |
| message_type=message_type, | |
| content={"msgtype": content_msgtype, "body": message}, | |
| ) | |
| async def _new_matrix_message(self, room: MatrixRoom, message:RoomMessageText) -> None: | |
| if message.sender == self.matrix_client.user: return | |
| if not message.body.strip().startswith("!"): return | |
| logging.info(f"GOT COMMAND {message.body.lower()}") | |
| if "!status" in message.body.lower(): | |
| wonky_word_bot = essential_generators.MarkovTextGenerator() | |
| word_of_the_day = wonky_word_bot.gen_word() | |
| await self._send_async_martrix_message(f"🤖 Ahoy! The word of the day is: {word_of_the_day}") | |
| logging.info("SENT !STATUS") | |
| elif "!speak" in message.body.lower(): | |
| wonky_word_bot = essential_generators.MarkovTextGenerator() | |
| await self._send_async_martrix_message(f"🤖 {wonky_word_bot.gen_text(11)}") | |
| logging.info("SENT !SPEAK") | |
| elif "!mute" in message.body.lower(): | |
| user = message.body.lower().split(" ")[1] | |
| if user in self.muted_peeps: | |
| await self._send_async_martrix_message(f"🤖 {user} who dat?") | |
| logging.info(f"REMUTED {user}") | |
| else: | |
| self.muted_peeps.append(user) | |
| await self._send_async_martrix_message(f"🤖 {user} has been 86'ed") | |
| logging.info(f"MUTED {user}") | |
| elif "!unmute" in message.body.lower(): | |
| user = message.body.lower().split(" ")[1] | |
| try: | |
| self.muted_peeps.remove(user) | |
| await self._send_async_martrix_message(f"🤖 {user} is cool again") | |
| logging.info(f"UNMUTED {user}") | |
| except ValueError: | |
| await self._send_async_martrix_message(f"🤖 {user} is already one of the cool kids") | |
| logging.info(f"REUNMUTED {user}") | |
| def _check_keywords(self, msg: str) -> bool: | |
| for keyword in self.keywords: | |
| if keyword.lower() in msg.lower(): | |
| return True | |
| return False | |
| async def _message(self, msg:dict): | |
| if msg['mucnick'] != self.xmpp_nick: | |
| if self._check_keywords(msg['body']): | |
| if self.keyword_alerts: | |
| prefix = self.alert_prefix | |
| else: | |
| prefix = '' | |
| if msg['mucnick'].lower() not in self.muted_peeps: | |
| await self._send_async_martrix_message(f'{prefix}{msg["mucnick"]}: {msg["body"]}') | |
| logging.info(datetime.now().strftime('%c') + ' ' + msg['body']) | |
| else: | |
| if prefix != '': | |
| await self._send_async_martrix_message(f'{prefix}{msg["mucnick"]}: {msg["body"]}') | |
| async def start(self, event): | |
| await self.get_roster() | |
| self.plugin['xep_0045'].join_muc(self.xmpp_room, self.xmpp_nick) | |
| await self._send_async_martrix_message("[+] Beep Boop 🤖 Bot is Listening and Ready") | |
| if __name__ == '__main__': | |
| # Setup logging. | |
| logging.basicConfig(level=logging.INFO) | |
| config = _load_config(os.getenv('CONFIG_PATH', 'config.yml')) | |
| matrix = MatrixXMPPBot(config) | |
| #xmpp = XMPPBot(config, matrix) | |
| matrix.register_plugin('xep_0030') # Service Discovery | |
| matrix.register_plugin('xep_0045') # Multi-User Chat | |
| matrix.register_plugin('xep_0199') # XMPP Ping | |
| # # Connect to the XMPP server and start processing XMPP stanzas. | |
| matrix.connect() | |
| matrix.process() | |
| # Using slower stringprep, consider compiling the faster cython/libidn one. | |
| # INFO:root:Logged in using a password. Credentials were stored. | |
| # INFO:root:[+] MATRIX BOT INIT DONE | |
| # INFO:slixmpp.features.feature_bind.bind:JID set to: [email protected]/12345 | |
| # ERROR:asyncio:Task exception was never retrieved | |
| # future: <Task finished name='Task-7' coro=<AsyncClient.sync_forever() done, defined at /usr/local/lib/python3.10/site-packages/nio/client/async_client.py:1106> exception=TimeoutError()> | |
| # Traceback (most recent call last): | |
| # File "/usr/local/lib/python3.10/site-packages/nio/client/async_client.py", line 1216, in sync_forever | |
| # await self.run_response_callbacks([await response]) | |
| # File "/usr/local/lib/python3.10/asyncio/tasks.py", line 571, in _wait_for_one | |
| # return f.result() # May raise f.exception(). | |
| # File "/usr/local/lib/python3.10/site-packages/nio/client/async_client.py", line 1062, in sync | |
| # response = await self._send( | |
| # File "/usr/local/lib/python3.10/site-packages/nio/client/async_client.py", line 770, in _send | |
| # transport_resp = await self.send( | |
| # File "/usr/local/lib/python3.10/site-packages/nio/client/async_client.py", line 295, in wrapper | |
| # return await func(self, *args, **kwargs) | |
| # File "/usr/local/lib/python3.10/site-packages/nio/client/async_client.py", line 842, in send |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment