Skip to content

Instantly share code, notes, and snippets.

@MaxwellDPS
Last active January 26, 2023 14:46
Show Gist options
  • Select an option

  • Save MaxwellDPS/6da026f1565b56c6baca81ed0ad47fb7 to your computer and use it in GitHub Desktop.

Select an option

Save MaxwellDPS/6da026f1565b56c6baca81ed0ad47fb7 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import asyncio
import json
import logging
import os
import yaml
from datetime import datetime
import slixmpp
import essential_generators
from nio import AsyncClient, RoomInviteError, JoinError, LoginResponse, AsyncClientConfig, exceptions, crypto, MatrixRoom, RoomMessageText
def _load_config(path: str = 'config.yml') -> None:
with open(path, 'r') as yfp:
config = yaml.load(yfp, yaml.SafeLoader)
return config
class MatrixXMPPBot(slixmpp.ClientXMPP):
def __init__(self, config: dict):
self.matrix_server:str = config["matrix"]["server"]
self.matrix_username:str = config["matrix"]["username"]
self.matrix_password:str = config["matrix"]["password"]
self.matrix_displayname:str = config["matrix"]["displayname"]
self.matrix_room:str = config["matrix"]["room"]
self.matrix_device_name:str = config["matrix"]["device_name"]
self.matrix_keystore_path:str = config["matrix"]["keystore_path"]
self.creds_file:str = config["matrix"]["creds_file_path"]
self.muted_peeps:list[str] = config["xmpp"]["mute_users"]
self.xmpp_username:str = config["xmpp"]["username"]
self.xmpp_password:str = config["xmpp"]["password"]
self.xmpp_room:str = config["xmpp"]["room"]
self.xmpp_nick:str = config["xmpp"]["nick"]
self.keyword_alerts:bool = config["xmpp"]["keyword_alerts"]["enabled"]
self.alert_prefix:str = config["xmpp"]["keyword_alerts"]["prefix"]
self.keywords:list[str] = config["xmpp"]["keyword_alerts"]["keywords"]
slixmpp.ClientXMPP.__init__(self, self.xmpp_username, self.xmpp_password)
self.add_event_handler("session_start", self.start)
self.add_event_handler("groupchat_message", self._message)
self.loop = asyncio.get_event_loop()
self.loop.run_until_complete(self.init())
self.loop.create_task(
self.matrix_client.sync_forever(
timeout=3000,
full_state=True,
)
)
async def init(self):
await self._login_to_matrix()
self.matrix_client.load_store()
if self.matrix_client.should_upload_keys:
await self.matrix_client.keys_upload()
if self.matrix_client.should_query_keys:
await self.matrix_client.keys_query()
if self.matrix_client.should_claim_keys:
await self.matrix_client.keys_claim()
await self.matrix_client.sync(full_state=True)
await self.matrix_client.set_displayname(self.matrix_displayname)
logging.info("[+] MATRIX BOT INIT DONE")
def _write_details_to_disk(self, resp: LoginResponse, homeserver) -> None:
"""Writes the required login details to disk so we can log in later without
using a password.
Arguments:
resp {LoginResponse} -- the successful client login response.
homeserver -- URL of homeserver, e.g. "https://matrix.example.org"
"""
# open the config file in write-mode
with open(self.creds_file, "w") as f:
# write the login details to disk
json.dump(
{
"homeserver": homeserver, # e.g. "https://matrix.example.org"
"user_id": resp.user_id, # e.g. "@user:example.org"
"device_id": resp.device_id, # device ID, 10 uppercase letters
"access_token": resp.access_token, # cryptogr. access token
},
f,
)
async def _login_to_matrix(self) -> None:
client_config = AsyncClientConfig(
max_limit_exceeded=0,
max_timeouts=0,
store_sync_tokens=True,
encryption_enabled=True,
)
if not os.path.exists(self.creds_file):
self.matrix_client = AsyncClient(
self.matrix_server,
self.matrix_username,
device_id=self.matrix_device_name,
config=client_config,
store_path=self.matrix_keystore_path
)
self.matrix_client.add_event_callback(self._new_matrix_message, RoomMessageText)
resp = await self.matrix_client.login(self.matrix_password, device_name=self.matrix_device_name)
# check that we logged in succesfully
if isinstance(resp, LoginResponse):
self._write_details_to_disk(resp, self.matrix_server)
else:
logging.error(f'homeserver = "{self.matrix_server}"; user = "{self.matrix_username}"')
logging.error(f"Failed to log in: {resp}")
exit(1)
logging.info(
"Logged in using a password. Credentials were stored."
)
# Otherwise the config file exists, so we'll use the stored credentials
else:
# open the file in read-only mode
with open(self.creds_file, "r") as f:
config = json.load(f)
self.matrix_client = AsyncClient(config["homeserver"], config=client_config)
self.matrix_client.access_token = config["access_token"]
self.matrix_client.user_id = config["user_id"]
self.matrix_client.device_id = config["device_id"]
self.matrix_client.add_event_callback(self._new_matrix_message, RoomMessageText)
self.matrix_client.load_store()
logging.info("Logged in using stored credentials.")
async def _invite_user_to_room(self):
res = await self.matrix_client.room_invite(
room_id=self.matrix_room,
user_id=self.matrix_client.user_id
)
if isinstance(res, RoomInviteError):
raise Exception(res)
await self.matrix_client.sync(timeout=3000, full_state=True)
res = await self.matrix_client.join(self.matrix_room)
if isinstance(res, JoinError):
raise Exception(res)
await self.matrix_client.sync(timeout=3000, full_state=True)
def _send_martrix_message(self, message: str, message_type: str = "m.room.message", content_msgtype: str = "m.text") -> None:
loop = asyncio.get_event_loop()
if not self.matrix_room in self.matrix_client.rooms.keys():
loop.run_until_complete(self._invite_user_to_room())
try:
loop.run_until_complete(self.matrix_client.room_send(
room_id=self.matrix_room,
message_type=message_type,
content={"msgtype": content_msgtype, "body": message},
))
except exceptions.OlmUnverifiedDeviceError as err:
device_store: crypto.DeviceStore = self.matrix_client.device_store
for device in device_store:
self.matrix_client.verify_device(device)
loop.run_until_complete(self.matrix_client.room_send(
room_id=self.matrix_room,
message_type=message_type,
content={"msgtype": content_msgtype, "body": message},
))
async def _send_async_martrix_message(self, message: str, message_type: str = "m.room.message", content_msgtype: str = "m.text") -> None:
if not self.matrix_room in self.matrix_client.rooms.keys():
await self._invite_user_to_room()
try:
await self.matrix_client.room_send(
room_id=self.matrix_room,
message_type=message_type,
content={"msgtype": content_msgtype, "body": message},
)
except exceptions.OlmUnverifiedDeviceError as err:
device_store: crypto.DeviceStore = self.matrix_client.device_store
for device in device_store:
self.matrix_client.verify_device(device)
await self.matrix_client.room_send(
room_id=self.matrix_room,
message_type=message_type,
content={"msgtype": content_msgtype, "body": message},
)
async def _new_matrix_message(self, room: MatrixRoom, message:RoomMessageText) -> None:
if message.sender == self.matrix_client.user: return
if not message.body.strip().startswith("!"): return
logging.info(f"GOT COMMAND {message.body.lower()}")
if "!status" in message.body.lower():
wonky_word_bot = essential_generators.MarkovTextGenerator()
word_of_the_day = wonky_word_bot.gen_word()
await self._send_async_martrix_message(f"🤖 Ahoy! The word of the day is: {word_of_the_day}")
logging.info("SENT !STATUS")
elif "!speak" in message.body.lower():
wonky_word_bot = essential_generators.MarkovTextGenerator()
await self._send_async_martrix_message(f"🤖 {wonky_word_bot.gen_text(11)}")
logging.info("SENT !SPEAK")
elif "!mute" in message.body.lower():
user = message.body.lower().split(" ")[1]
if user in self.muted_peeps:
await self._send_async_martrix_message(f"🤖 {user} who dat?")
logging.info(f"REMUTED {user}")
else:
self.muted_peeps.append(user)
await self._send_async_martrix_message(f"🤖 {user} has been 86'ed")
logging.info(f"MUTED {user}")
elif "!unmute" in message.body.lower():
user = message.body.lower().split(" ")[1]
try:
self.muted_peeps.remove(user)
await self._send_async_martrix_message(f"🤖 {user} is cool again")
logging.info(f"UNMUTED {user}")
except ValueError:
await self._send_async_martrix_message(f"🤖 {user} is already one of the cool kids")
logging.info(f"REUNMUTED {user}")
def _check_keywords(self, msg: str) -> bool:
for keyword in self.keywords:
if keyword.lower() in msg.lower():
return True
return False
async def _message(self, msg:dict):
if msg['mucnick'] != self.xmpp_nick:
if self._check_keywords(msg['body']):
if self.keyword_alerts:
prefix = self.alert_prefix
else:
prefix = ''
if msg['mucnick'].lower() not in self.muted_peeps:
await self._send_async_martrix_message(f'{prefix}{msg["mucnick"]}: {msg["body"]}')
logging.info(datetime.now().strftime('%c') + ' ' + msg['body'])
else:
if prefix != '':
await self._send_async_martrix_message(f'{prefix}{msg["mucnick"]}: {msg["body"]}')
async def start(self, event):
await self.get_roster()
self.plugin['xep_0045'].join_muc(self.xmpp_room, self.xmpp_nick)
await self._send_async_martrix_message("[+] Beep Boop 🤖 Bot is Listening and Ready")
if __name__ == '__main__':
# Setup logging.
logging.basicConfig(level=logging.INFO)
config = _load_config(os.getenv('CONFIG_PATH', 'config.yml'))
matrix = MatrixXMPPBot(config)
#xmpp = XMPPBot(config, matrix)
matrix.register_plugin('xep_0030') # Service Discovery
matrix.register_plugin('xep_0045') # Multi-User Chat
matrix.register_plugin('xep_0199') # XMPP Ping
# # Connect to the XMPP server and start processing XMPP stanzas.
matrix.connect()
matrix.process()
# Using slower stringprep, consider compiling the faster cython/libidn one.
# INFO:root:Logged in using a password. Credentials were stored.
# INFO:root:[+] MATRIX BOT INIT DONE
# INFO:slixmpp.features.feature_bind.bind:JID set to: [email protected]/12345
# ERROR:asyncio:Task exception was never retrieved
# future: <Task finished name='Task-7' coro=<AsyncClient.sync_forever() done, defined at /usr/local/lib/python3.10/site-packages/nio/client/async_client.py:1106> exception=TimeoutError()>
# Traceback (most recent call last):
# File "/usr/local/lib/python3.10/site-packages/nio/client/async_client.py", line 1216, in sync_forever
# await self.run_response_callbacks([await response])
# File "/usr/local/lib/python3.10/asyncio/tasks.py", line 571, in _wait_for_one
# return f.result() # May raise f.exception().
# File "/usr/local/lib/python3.10/site-packages/nio/client/async_client.py", line 1062, in sync
# response = await self._send(
# File "/usr/local/lib/python3.10/site-packages/nio/client/async_client.py", line 770, in _send
# transport_resp = await self.send(
# File "/usr/local/lib/python3.10/site-packages/nio/client/async_client.py", line 295, in wrapper
# return await func(self, *args, **kwargs)
# File "/usr/local/lib/python3.10/site-packages/nio/client/async_client.py", line 842, in send
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment