Last active
August 10, 2020 13:35
-
-
Save binarybrat/2092cf49a27f1541ec0668e7154a6120 to your computer and use it in GitHub Desktop.
Current streaming processes setup - adding logger
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys | |
| import time | |
| import traceback | |
| import peewee as pw | |
| import multiprocessing | |
| from datetime import datetime | |
| from ago import human | |
| import logging.handlers | |
| from loggingTesting import Log | |
| from models import init_db | |
| from models.commentmodel import Comment, updateComment, delUpdateComment, add_comment, commentDone | |
| from models.detailsmodel import Details, DetailsPending | |
| from models.detailsmodel import DetailsHelper | |
| from models.detailsmodel import get_details_approvalID, detailsComment | |
| from models.messagemodel import SewingMessages, add_sewmsg | |
| from models.modlogmodel import Modlog, most_actions, add_modlog | |
| from models.patternmodel import Pattern, PendingPatternPosts, DataPatternsPending, CompanyPatternsPending, ppp_done | |
| from models.patternmodel import add_patternpostconnect, add_ppp, patternDone, postsforpatDone, PatternPosts | |
| from models.postmodel import Posts, add_post, postDone, update_post | |
| from models.postmodel import delUpdate_post, unflaired_canFlair | |
| from models.statstrafficmodel_ import TotalsTesting | |
| from utils import detailpendinglogin | |
| from utils.globalvars import isdeleted, has_pattern, formatdt | |
| from utils.globalvars import shortenRedditURL | |
| from utils.randomhelpershit import FLAIRCSSTOCHECK, modpost, flair_post | |
| class StreamingStuff: | |
| def __init__(self, testing=False, wordmin=30, scoremin=20, timeback=24, doPatterns=False, nsfwChecks=False): | |
| self.testing = testing | |
| self.subname = 'sewing' if not self.testing else 'sewingmods' | |
| self.wordmin = wordmin | |
| self.scoremin = scoremin | |
| self.timeback = timeback | |
| self.doPatterns = doPatterns | |
| self.nsftChecks = nsfwChecks | |
| self.reddit = detailpendinglogin.login() | |
| self.sewsub = self.reddit.subreddit(self.subname) | |
| self.db = init_db() | |
| self.db.connect() | |
| self.db.create_tables([Comment, Posts, Details, DetailsPending, Modlog, SewingMessages, PendingPatternPosts]) | |
| self.db.create_tables([Pattern, PatternPosts, CompanyPatternsPending, DataPatternsPending]) | |
| self.db.close() | |
| self.log = Log() | |
| self.startTime = datetime.now() | |
| self.logger = create_logger() | |
| self.logger.info("Starting the Streaming Bot Tests - Running on: /r/{}".format(self.subname)) | |
| def post_stream(self): | |
| self.logger.info("Starting Post Stream") | |
| while True: | |
| for post in self.sewsub.stream.submissions(skip_existing=True): | |
| self.db.connect() | |
| self.logger.info( | |
| "Incoming post by u/{} - {}\n - {}".format(post.author.name if post.author else "[deleted]", | |
| post.shortlink, post.title)) | |
| if not postDone(post) and not isdeleted(post): | |
| add_post(post) | |
| self.log.Intro(" - Added post to the database") | |
| try: | |
| TotalsTesting.create( | |
| datecreated=datetime.utcfromtimestamp(post.created_utc).date(), | |
| idstr=post.fullname | |
| ) | |
| self.log.Intro(" - Added post to TotalsTesting DB") | |
| except pw.IntegrityError: | |
| pass | |
| else: | |
| if isdeleted(post): | |
| delUpdate_post(post) | |
| self.log.Intro(" - Updating Deleted Post: {}\n - {}".format(post.shortlink, post.title)) | |
| else: | |
| update_post(post) | |
| self.log.Intro(" - Updating Post by u/{}".format(post.author.name if post.author else "[deleted]")) | |
| # Approve weird recurring posts that AuoMod does not approve | |
| if post.approved and post.approved_by is not None and not isdeleted( | |
| post) and post.author.name in ['AutoModerator', | |
| 'sewingmodthings']: | |
| post.mod.approve() | |
| self.log.Intro( | |
| " - Approved a weekly thread by {}: {}\n{}".format(post.author.name, post.shortlink, | |
| post.title)) | |
| # Checks author account for over-18 and age to determine if should filter - half assed spambot protection | |
| if self.nsftChecks and post.author.subreddit['over_18'] and \ | |
| (time.time() - post.author.created_utc <= 2629743): | |
| post.mod.remove() | |
| post.report("NSFW user with account under 30 days old - please check and take action") | |
| self.log.Intro(" - NSFW user account under a month old - removed and sending report") | |
| continue | |
| # Adds flair to what looks to be a project type post that is unflaired | |
| if post.link_flair_text is None and unflaired_canFlair(post): | |
| flair_post(post, 'FO', False) | |
| self.log.Intro(" - u/sewingmodthings gave FO flair to unflaired post") | |
| # If pattern found in title add it to the pending patterns DB | |
| # Still Need To: Add back in ability to check if pattern is in the DB and if so mark it for info comment | |
| if post.link_flair_text is not None and post.link_flair_css_class in FLAIRCSSTOCHECK and \ | |
| has_pattern(post.title) and not ppp_done(post): | |
| try: | |
| add_ppp(post) | |
| self.log.Intro(" - Pattern found in posted - added to pending posts db") | |
| except pw.IntegrityError: | |
| pass | |
| self.db.close() | |
| def comment_stream(self): | |
| self.logger.info("Starting Comment Stream") | |
| while True: | |
| for comment in self.sewsub.stream.comments(skip_existing=True): | |
| self.db.connect() | |
| self.log.Magenta( | |
| "Incomming comment by u/{} - {}".format(comment.author.name if comment.author else "[deleted]", | |
| comment.fullname)) | |
| if not commentDone(comment.fullname) and not isdeleted(comment): | |
| add_comment(comment) | |
| self.log.Intro(" - NEW Comment added to database") | |
| try: | |
| TotalsTesting.create(datecreated=datetime.utcfromtimestamp(comment.created_utc).date(), | |
| idstr=comment.fullname) | |
| self.log.Intro(" - Comment added to TotalsTesting DB") | |
| except pw.IntegrityError: | |
| pass | |
| else: | |
| if isdeleted(comment): | |
| delUpdateComment(comment) | |
| self.log.Intro(" - Comment deleted - updated database") | |
| else: | |
| updateComment(comment) | |
| self.log.Intro(" - Comment updated in the database") | |
| # Checks if project type post and whether or not comment passes as a details comment/processed details | |
| if detailsComment(comment, wordmin=self.wordmin): | |
| self.log.Intro(" - Details Comment Found") | |
| dhelper = DetailsHelper(self.reddit, comment, verbose=True, overkill=True) | |
| if not dhelper.already_in_db() and not dhelper.is_deleted: | |
| dhelper.add_details() | |
| self.log.Intro(" - Added details to the DB") | |
| else: | |
| if dhelper.oktoupdate: | |
| self.log.Intro(" - Updated details comment in DB") | |
| if dhelper.has_pattern and not ppp_done(dhelper.comment.submission): | |
| if has_pattern(dhelper.comment.submission.title): | |
| add_ppp(dhelper.submission) | |
| self.log.Intro(" - Adding POST pending pattern post: {} - {}".format(dhelper.comment.fullname, | |
| dhelper.submission.fullname)) | |
| else: | |
| add_ppp(dhelper.submission, dhelper.comment.fullname, dhelper.comment.body) | |
| self.log.Intro( | |
| " - Adding COMMENT pending pattern post: {} - {}".format(dhelper.comment.fullname, | |
| dhelper.submission.fullname)) | |
| # If post is either warned, or filtered process and send approval message | |
| if dhelper.pending: | |
| self.log.Intro(" - Found a details pending post with details comment!!!") | |
| pending = DetailsPending.get_or_none(DetailsPending.idstr == dhelper.comment.submission.fullname) | |
| if pending.filtered and dhelper.comment.removed and dhelper.comment.banned_by == 'sewingmodthings': | |
| modpost(dhelper.submission.id, 'approve') | |
| self.log.Intro(" - Re-Approved pending post for giving details") | |
| if not dhelper.approval_msg_sent(): | |
| dhelper.send_details_approval() | |
| apmsgid = get_details_approvalID(self.reddit, dhelper.submission) | |
| self.log.Intro(" - Sent Details Approval Message: {}".format(apmsgid)) | |
| dhelper.update_pending_approved() | |
| self.log.Intro(" - Updated pending details to approved: {} - {}".format(dhelper.comment.fullname, | |
| dhelper.submission.fullname)) | |
| self.db.close() | |
| def edited_stream(self): | |
| self.logger.info("Starting Edited Stream") | |
| while True: | |
| for edited in self.sewsub.mod.stream.edited(skip_existing=True): | |
| self.db.connect() | |
| if edited.fullname.startswith('t1'): | |
| normalprint = "Incoming Edited Comment by: {} - {}".format( | |
| edited.author.name if edited.author else "[deleted]", edited.fullname) | |
| detailsprint = "Incoming Edited DETAILS Comment by: {} - {}".format( | |
| edited.author.name if edited.author else "[deleted]", edited.fullname) | |
| sendtolog = detailsprint if detailsComment(edited) else normalprint | |
| self.logger.info(sendtolog) | |
| # self.log.Teal(detailsprint if detailsComment(edited) else normalprint) | |
| if not commentDone(edited.fullname) and not isdeleted(edited): | |
| add_comment(edited) | |
| self.log.Intro(" - NEW EDITED Comment added to database") | |
| try: | |
| TotalsTesting.create(datecreated=datetime.utcfromtimestamp(edited.created_utc).date(), | |
| idstr=edited.fullname) | |
| self.log.Intro(" - Edited Comment added to TotalsTesting DB") | |
| except pw.IntegrityError: | |
| pass | |
| else: | |
| if isdeleted(edited): | |
| delUpdateComment(edited) | |
| self.log.Intro(" - Edited Comment deleted - updated database") | |
| else: | |
| updateComment(edited) | |
| self.log.Intro(" - Edited Comment updated in the database") | |
| if detailsComment(edited, wordmin=self.wordmin): | |
| self.log.Intro(" - Edited Details Comment Found") | |
| dhelper = DetailsHelper(self.reddit, edited, verbose=True, overkill=True) | |
| if not dhelper.already_in_db() and not dhelper.is_deleted: | |
| dhelper.add_details() | |
| self.log.Intro(" - Added details to the DB") | |
| else: | |
| if dhelper.oktoupdate: | |
| self.log.Intro(" - Updated details comment in DB") | |
| if dhelper.has_pattern and not dhelper.pattern_in_db(): | |
| if has_pattern(dhelper.comment.submission.title): | |
| add_ppp(dhelper.submission) | |
| self.log.Intro( | |
| " - Adding POST pending pattern post: {} - {}".format(dhelper.comment.fullname, | |
| dhelper.submission.fullname)) | |
| else: | |
| add_ppp(dhelper.submission, dhelper.comment.fullname, dhelper.comment.body) | |
| self.log.Intro( | |
| " - Adding COMMENT pending pattern post: {} - {}".format( | |
| dhelper.comment.fullname, | |
| dhelper.submission.fullname)) | |
| if dhelper.pending: | |
| pending = DetailsPending.get_or_none( | |
| DetailsPending.idstr == dhelper.comment.submission.fullname) | |
| if pending.filtered and dhelper.comment.removed and \ | |
| dhelper.comment.banned_by == 'sewingmodthings': | |
| modpost(dhelper.submission.id, 'approve') | |
| self.log.Intro(" - Re-Approved pending post for giving details") | |
| if not dhelper.approval_msg_sent(): | |
| dhelper.send_details_approval() | |
| apmsgid = get_details_approvalID(self.reddit, dhelper.submission) | |
| self.log.Intro(" - Sent Details Approval Message: {}".format(apmsgid)) | |
| dhelper.update_pending_approved() | |
| self.log.Intro( | |
| " - Updated edited pending details to approved: {} - {}".format( | |
| dhelper.comment.fullname, | |
| dhelper.submission.fullname)) | |
| elif edited.fullname.startswith('t3'): | |
| self.logger.info("Incoming Edited Post by: u/{} - {}".format(edited.author.name if edited.author | |
| else "[deleted]", edited.fullname)) | |
| if not postDone(edited) and not isdeleted(edited): | |
| add_post(edited) | |
| self.log.Intro(" - NEW Post added to database FROM EDITED STREAM") | |
| TotalsTesting.create( | |
| datecreated=datetime.utcfromtimestamp(edited.created_utc).date(), | |
| idstr=edited.fullname | |
| ) | |
| self.log.Intro(" - Edited Post added to TotalsTesting DB") | |
| else: | |
| if isdeleted(edited): | |
| delUpdate_post(edited) | |
| self.log.Intro(" - Edited post deleted - updated database") | |
| else: | |
| update_post(edited) | |
| self.log.Intro(" - Updated edited post in database") | |
| self.db.close() | |
| def inbox_stream(self): | |
| self.logger.info("Starting Inbox Stream") | |
| while True: | |
| for msg in self.reddit.inbox.stream(skip_existing=True): | |
| itemadded = "Message" if msg.fullname.startswith('t4_') else "Comment" | |
| self.logger.info("Incoming Inbox item: u/{} - {}".format( | |
| msg.author.name if msg.author else "[deleted]", msg.fullname)) | |
| self.db.connect() | |
| add_sewmsg(msg) | |
| self.db.close() | |
| self.log.Intro(" - New {} Added: {} - {} - {}".format(itemadded, msg.author, msg.dest, msg.name)) | |
| def modlog_stream(self): | |
| self.logger.info("Starting Modlog Stream") | |
| while True: | |
| for entry in self.sewsub.mod.stream.log(skip_existing=True): | |
| if entry.action in most_actions: | |
| self.logger.info("Incoming Modlog Entry: {} to {} Mod: {}".format(entry.action, entry.mod, | |
| entry.target_fullname)) | |
| self.db.connect() | |
| add_modlog(entry) | |
| self.db.close() | |
| self.log.Intro(" - Added entry to db") | |
| if entry.target_fullname is not None and entry.target_fullname.startswith('t3_') or \ | |
| entry.target_fullname.startswith('t1_'): | |
| urlLink = shortenRedditURL('https://www.reddit.com' + entry.target_permalink) | |
| self.log.Intro(" - {}: {}".format("Post" if entry.target_fullname.startswith('t3_') | |
| else "Comment", urlLink)) | |
| if entry.target_fullname.startswith("t3_"): | |
| self.log.Intro(" - {}".format(entry.target_title)) | |
| if entry.mod == 'AutoModerator': | |
| self.log.Intro(" - Details: {}".format(entry.details)) | |
| # Should this be it's own process? | |
| if entry.action == 'editflair': | |
| self.db.connect() | |
| self.handle_reflair(entry=entry) | |
| self.db.close() | |
| def handle_reflair(self, entry): | |
| feditedpost = self.reddit.submission(entry.target_fullname[3:]) | |
| feditedonDB = Posts.get_or_none(Posts.fullname==entry.target_fullname) | |
| if not feditedonDB: | |
| add_post(feditedpost) | |
| self.log.Intro(" - Added post that was reflaired but not in the database via modlog process") | |
| postflair = feditedpost.link_flair_text if feditedpost.link_flair_text is not None else None | |
| ondbflair = feditedonDB.flairtext if feditedonDB.flairtext is not None else None | |
| if ondbflair != postflair: | |
| update_post(feditedpost) | |
| self.log.Intro(" - Updated post with flair change! New Flair: {}".format(postflair)) | |
| def processing(self): | |
| subprocess1 = multiprocessing.Process(target=self.post_stream, name='postStream') | |
| subprocess2 = multiprocessing.Process(target=self.comment_stream, name='commentStream') | |
| subprocess3 = multiprocessing.Process(target=self.edited_stream, name='editedStream') | |
| subprocess4 = multiprocessing.Process(target=self.inbox_stream, name='inboxStream') | |
| subprocess5 = multiprocessing.Process(target=self.modlog_stream, name='modlogStream') | |
| subprocess1.start() | |
| subprocess2.start() | |
| subprocess3.start() | |
| subprocess4.start() | |
| subprocess5.start() | |
| def fancy_processing(self): | |
| postProcess = multiprocessing.Process(target=self.post_stream, name='postStream') | |
| commentProcess = multiprocessing.Process(target=self.comment_stream, name='commentStream') | |
| editedProcess = multiprocessing.Process(target=self.edited_stream, name='editedStream') | |
| inboxProcess = multiprocessing.Process(target=self.inbox_stream, name='inboxStream') | |
| modlogProcess = multiprocessing.Process(target=self.modlog_stream, name='modlogStream') | |
| all_processes = [postProcess, commentProcess, editedProcess, inboxProcess, modlogProcess] | |
| # From here I am unsure what to do - should I join the processes and run them? Should I put them in a Queue? | |
| def run(self): | |
| try: | |
| self.processing() | |
| except pw.IntegrityError: | |
| pass | |
| except pw.DatabaseError or pw.OperationalError: | |
| self.db.close() | |
| time.sleep(3) | |
| self.db.connect() | |
| self.logger.error("DatabaseError or OperationError - closed and re-oppended the database") | |
| except KeyboardInterrupt: | |
| if not self.db.is_closed(): | |
| self.log.Warn("CLOSED DATABASE") | |
| self.db.close() | |
| self.logger.debug("Start Time: {} - Ran for: {}".format(datetime.fromtimestamp(startTime).strftime(formatdt), | |
| human(startTime))) | |
| except Exception as e: | |
| att_error = traceback.format_exc() | |
| proc = multiprocessing.current_process().name | |
| self.logger.warning(">{} - Attribute Error - Process: {}\n{}".format(datetime.now().strftime(formatdt), | |
| proc, att_error)) | |
| self.log.Intro('------------------------------') | |
| self.logger.error('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e) | |
| def create_logger(): | |
| logger = multiprocessing.get_logger() | |
| logger.setLevel(logging.INFO) | |
| formatter = logging.Formatter( | |
| '[%(asctime)s| %(levelname)s| %(processName)s] %(message)s', | |
| '%m-%d-%Y %H:%M:%S %p') | |
| handler = logging.FileHandler('logs/multi_stream.log') | |
| handler.setFormatter(formatter) | |
| if not len(logger.handlers): | |
| logger.addHandler(handler) | |
| return logger | |
| if __name__ == '__main__': | |
| bot = StreamingStuff() | |
| bot.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment