# Version: 0.3.0 # Rewrite: Incomplete import re import mpf # post file support import utils import enum import jnet import json Plaintext = "" Markdown = "markdown" class Infrastructure: def __init__(self, key): self.botKey = key self.baseUri = "https://api.telegram.org/bot" + key def getMe(self): data = jnet.requestJson(self.baseUri+ "/getMe") if data and data['ok']: return User(data["result"]) def getUpdates(self, params=None): data = jnet.requestJson(self.baseUri+ "/getUpdates", params) updates = [] if data and data['ok']: for mapObj in data['result']: updates.append(Update(mapObj)) return updates def getChat(self, id): return Chat({'chat_id': id}) def setWebHook(self, uri): data = jnet.requestJson(self.baseUri+ "/setWebhook", {"url": uri}) if data and data['ok']: if data['result'] == True: print("Webhook set.") def deleteWebHook(self): data = jnet.requestJson(self.baseUri+ "/deleteWebhook") if data and data['ok']: if data['result'] == True: print("Webhook deleted.") def getFileUri(self, fileId): data = jnet.requestJson(self.botQueryUri+ "/getFile", {"file_id": fileId}) if data: if data['ok']: return "https://api.telegram.org/file/bot" + self.botKey + "/" + data['result']['file_path'] else: print(data['description']) def getStickerSet(self, name): data = jnet.requestJson(self.botQueryUri+ "/getStickerSet", {"name": name}) if data: if data['ok']: return StickerSet(data['result']) else: print(data['description']) class Optional: def __init__(self, data): for field, value in data.items(): self.set(field, value) def __getattribute__(self, field): try: return object.__getattribute__(self, field) except AttributeError: return None def set(self, field, value): setattr(self, field, value) class Error(Optional): def __init__(self): Optional.__init__(self) class Extras(Optional): def __init__(self): Optional.__init__(self) class Update(Optional): def __init__(self, data): Optional.__init__(self, data) if self.message: self.message = Message(self.message) if self.edited_message: self.edited_message = Message(self.edited_message) class User(Optional): def __init__(self, data): Optional.__init__(self, data) class Message(Optional): def __init__(self, data): Optional.__init__(self, data) if hasattr(self, "from"): self.sender = User(getattr(self, "from")) if self.chat: self.chat = Chat(self.chat) if self.reply_to_message: self.reply_to_message = Message(self.reply_to_message) if self.new_chat_members: newMembers = [] for user in self.new_chat_members: newMembers.append(User(user)) self.new_chat_members = newMembers if self.left_chat_member: self.left_chat_member = User(self.left_chat_member) class Chat(Optional): infra = None @staticmethod def setInfra(infra): Chat.infra = infra def __init__(self, data): if self.infra == None: raise "No infrastrcture set." if type(data) == dict: Optional.__init__(self, data) if type(data) == int: self.chat_id = data def getMember(self, userId): data = jnet.requestJson(self.infra.baseUri+ "/getChatMember", {"chat_id": self.chat_id, "user_id": userId}) if data: if data['ok']: return ChatMember(data['result']) else: self.sendMessage(data['description']) def isAdmin(self, userId): user = self.getMember(userId) print("User:", user) if user.status in ["administrator", "creator"]: return True def sendMessage(self, message, parseMode=Plaintext): data = jnet.requestJson(self.infra.baseUri+ "/sendMessage", {'chat_id': self.chat_id, 'text': message, "parse_mode": parseMode}) if data: if data['ok']: return Message(data['result']) else: print("Error description:", data['description']) self.sendMessage(data['description']) def sendSticker(self, stickerId): res = jnet.requestJson(self.infra.baseUri+ "/sendSticker", {'chat_id': self.chat_id, 'sticker': stickerId}) data = res.getJson() if data: if res.isError(): self.sendMessage(data['description']) else: return data['result'] # A status of true means good. Check reason if false. def sendMedia(self, file, to=None, caption=None, type="Document"): to = to or str(self.chat_id) form = mpf.Form() form.addText(type.lower(), "attach://upload_file") form.addFile("upload_file", file) form.addText("chat_id", to) if caption: form.addText("caption", caption) FormRequest = mpf.BasicRequest() result = FormRequest.post(self.infra.baseUri + "/send" + type, form) if result['headers']['Content-Type'] == "application/json": data = json.loads(result['content']) if data['ok']: result = data['result'] response = {"status": data['ok']} if response['status'] == False: response['reason'] = data['description'] elif result['chat']['type'] == "supergroup": response['link'] = "http://t.me/" + str(result['chat']['id']) + "/" + str(result['message_id']) return response else: self.sendMessage(data['description']) def deleteMessage(self, messageId): data = jnet.requestJson(self.infra.baseUri+ "/deleteMessage", {'chat_id': self.chat_id, 'message_id': messageId}) if data: if "ok" in data and data['ok']: return data['result'] else: self.sendMessage(data['description']) # Send different type of media. def sendAudio(self, file, to=None, caption=None): self.sendMedia(file, to, caption, "Audio") def sendVideo(self, file, to=None, caption=None): self.sendMedia(file, to, caption, "Video") def sendPhoto(self, file, to=None, caption=None): self.sendMedia(file, to, caption, "Photo") def restrictMember(self, userId, permissions): data = jnet.requestJson(self.infra.baseUri+ "/restrictChatMember", {"chat_id": self.chat_id, "user_id": userId, "permissions": permissions}) if data['ok']: return data['result'] else: self.sendMessage(data['description']) # Returns True on success. def pin(self, messageId, disableNotify=False): # Returns 'Bad Request: CHAT_NOT_MODIFIED' if same message is pinned. res = jnet.requestJson(self.infra.baseUri+ "/pinChatMessage", {'chat_id': self.chat_id, 'message_id': messageId, 'disable_notification': disableNotify}) if res: if data := res.good(): return data['result'] elif data := res.bad(): self.sendMessage(data['description']) # Returns True on success. def unpin(self, to): # Returns 'Bad Request: CHAT_NOT_MODIFIED' if no message is pinned. res = jnet.requestJson(self.infra.baseUri+ "/unpinChatMessage", {'chat_id': self.chat_id}) data = res.getJson() if data: if res.isError(): print(data['result']['description']) else: return data['result'] def getAdminsIds(self): res = jnet.requestJson(self.infra.baseUri+ "/getChatAdministrators", {"chat_id": self.chat_id}) data = res.getJson() if data: if res.isError(): print(data['description']) else: admins = [] for member in data['result']: admins.append(member['user']['id']) return admins class ChatMember(Optional): def __init__(self, data): Optional.__init__(self, data) self.user = User(self.user) class ChatPermissions: def __init__(self, level=None): # all members have messages, poll, and add members permissions. if level == None: self.restrict() elif level == "basic": self.unrestrict() elif level == "media": self.allow_media() def __str__(self): return json.dumps(self.data) def unrestrict(self): self.data = { "can_send_messages": True, "can_send_media": False, "can_send_polls": True, "can_send_other_messages": False, "can_add_web_page_previews": False, "can_change_info": False, "can_invite_users": True, "can_pin_messages": False } def allow_media(self): self.data = { # basic "can_send_messages": True, "can_send_polls": True, "can_invite_users": True, # media "can_send_media": True, "can_send_other_messages": True, "can_add_web_page_previews": True } def restrict(self): self.data = { "can_send_messages": False } # Docs: https://core.telegram.org/bots/api#sticker class Sticker(Optional): def __init__(self, data): Optional.__init__(self, data) # Docs: https://core.telegram.org/bots/api#stickerset class StickerSet(Optional): def __init__(self, data): Optional.__init__(self, data) if self.stickers: stickers = [] for stickerMap in self.stickers: stickers.append(Sticker(stickerMap)) self.stickers = stickers class InlineQueryResultGifs: def __init__(self): self.list = [] self.id = 0 def add(self, url, thumb_url): self.list.append({ "type": "gif", "id": str(self.id), "gif_url": url, "thumb_url": thumb_url, "thumb_mime_type": "image/gif" }) self.id += 1 class Bot(Infrastructure): def __init__(self, botKey): Infrastructure.__init__(self, botKey) self.commands = [] self.helpString = "" self.botQueryUri = "https://api.telegram.org/bot"+botKey self.inlineQueryMethod = None # bit switches self.logUsers = False self.botData = self.getMe() self.on("help", self.help) self.on("online", self.online) # Set the infrastructure for base classes. Chat.setInfra(self) # The heart of the framework. def start(self, globals=None): print(self.botData) print("Started %s." % self.botData.username) # Automagically assigns callbacks. if globals: self.setCallbacks(globals) updates = [] update_id = -1 while True: updates = self.getUpdates({ 'allowed_updates': 'message', 'timeout': 15, 'offset': update_id+1 }) update_id = self.processUpdates(updates) def processUpdates(self, updates): update_id = -1 for update in updates: # For webhooks, convert dicts to Updates if type(update) == dict: update = Update(update) if update.message or update.edited_message: message = update.message or update.edited_message text, extras = self.processMessage(message) self.run(text, extras) if update.inline_query: self.processInlineQuery(update.inline_query) update_id = update.update_id return update_id def processMessage(self, msg): message = "" extras = {} extras = { "messageId": msg.message_id, "chatId": msg.chat.id, "sender": msg.sender.first_name, "senderId": msg.sender.id, "username": msg.sender.username } if msg.text: message = msg.text if message.startswith("/"): if message.find(" ") != -1: commandStr, message = message.split(" ", 1) commandStr = commandStr else: # if there is no space it's a simple command and in # this case it should be moved to commandStr commandStr = message message = "" if commandStr.find("@") != -1: commandStr, extras['commandBot'] = commandStr.split("@", 1) extras['commandStr'] = commandStr[1:] # slice off the slash extras['private'] = True if msg.chat.type == "private" \ else False if msg.entities: entities = msg.entities mentions = [] for entity in entities: if entity['type'] == "text_mention": mentions.append(entity['user']['id']) elif entity['type'] == "mention": offset = entity['offset'] length = entity['length'] mentions.append(msg.text[offset:offset+length]) if mentions != []: extras['mentions'] = mentions extras["replyMsg"] = False if msg.reply_to_message: reply = msg.reply_to_message extras["replyMsg"] = True extras.update({ "replyingToMsgUserObj": reply.sender, "replyingToMsgId": reply.message_id }) if reply.text: extras["replyingToMsgText"] = reply.text extras['bots'] = [] if msg.new_chat_members: for user in msg.new_chat_members: if user.is_bot: extras['bots'].append({ "user": user.first_name, "id": user.id }) for user in msg.new_chat_members: print("Joined " + user.first_name) if msg.left_chat_member: print("Leaving " + msg.left_chat_member.first_name) extras['stickerMsg'] = False if msg.sticker: extras.update({ "sitckerMsg": True, 'stickerSet': msg.sticker['set_name'], 'stickerEmoji': msg.sticker['emoji'] }) if "commandStr" in extras or self.logUsers: print("Extras:", extras) print(extras['sender'], repr(message)) return [message, extras] def processInlineQuery(self, query): if self.inlineQueryMethod: self.inlineQueryMethod(query['id'], query['query']) def setCallbacks(self, globals): if globals: privmethods = utils.getPrivateMethods(globals) for method in privmethods: print(method) self.on(method.replace("__cmd__", ""), privmethods[method]) def run(self, message, extras): chat = self.getChat(extras['chatId']) for triggerData in self.commands: pattern = triggerData[0] command = triggerData[1] if "commandStr" in extras: if pattern == extras['commandStr']: command(message, chat, extras) return if "__msg__" in self.commands: print("Running message handler") self.commands["__msg__"](text, chat, extras) def on(self, pattern, method): self.commands.append([pattern, method]) def addHelpString(self, text): self.helpString += text + "\n" def addUnfilteredMsgHandler(self, method): self.unfilteredMsgHandlerMethod = method def addInlineQueryMethod(self, method): self.inlineQueryMethod = method # self event handlers def help(self, message, chat, extras): if self.helpString == "": chat.sendMessage("No help set") else: chat.sendMessage(self.helpString) def online(self, message, chat, extras): chat.sendMessage("I'm online!") # end def getMentionText(self, sender, userId): return "[" + sender + "](tg://user?id=" + str(userId) + ")" def answerInlineQuery(self, queryId, answer): res = jnet.requestJson(self.botQueryUri+ "/answerInlineQuery", {'inline_query_id': queryId, 'results': json.dumps(answer.list)}) if not res.isError(): return True