Skip to content

Instantly share code, notes, and snippets.

@binarybrat
Last active August 10, 2020 13:35
Show Gist options
  • Save binarybrat/2092cf49a27f1541ec0668e7154a6120 to your computer and use it in GitHub Desktop.
Save binarybrat/2092cf49a27f1541ec0668e7154a6120 to your computer and use it in GitHub Desktop.

Revisions

  1. binarybrat revised this gist Aug 10, 2020. 1 changed file with 0 additions and 1 deletion.
    1 change: 0 additions & 1 deletion streamingprocesses.py
    Original file line number Diff line number Diff line change
    @@ -26,7 +26,6 @@
    from utils.randomhelpershit import FLAIRCSSTOCHECK, modpost, flair_post



    class StreamingStuff:
    def __init__(self, testing=False, wordmin=30, scoremin=20, timeback=24, doPatterns=False, nsfwChecks=False):
    self.testing = testing
  2. binarybrat revised this gist Aug 10, 2020. 1 changed file with 10 additions and 13 deletions.
    23 changes: 10 additions & 13 deletions streamingprocesses.py
    Original file line number Diff line number Diff line change
    @@ -50,7 +50,6 @@ def __init__(self, testing=False, wordmin=30, scoremin=20, timeback=24, doPatter

    def post_stream(self):
    self.logger.info("Starting Post Stream")

    while True:

    for post in self.sewsub.stream.submissions(skip_existing=True):
    @@ -78,14 +77,14 @@ def post_stream(self):
    update_post(post)
    self.log.Intro(" - Updating Post by u/{}".format(post.author.name if post.author else "[deleted]"))

    # Approve weird recurring posts that AuoMod does not approve
    if post.approved and post.approved_by is not None and not isdeleted(
    post) and post.author.name in ['AutoModerator',
    'sewingmodthings']:
    post.mod.approve()
    self.log.Intro(
    " - Approved a weekly thread by AutoModerator: {}\n{}".format(post.shortlink,
    post.title))
    # Approve weird recurring posts that AuoMod does not approve
    if post.approved and post.approved_by is not None and not isdeleted(
    post) and post.author.name in ['AutoModerator',
    'sewingmodthings']:
    post.mod.approve()
    self.log.Intro(
    " - Approved a weekly thread by {}: {}\n{}".format(post.author.name, post.shortlink,
    post.title))

    # Checks author account for over-18 and age to determine if should filter - half assed spambot protection
    if self.nsftChecks and post.author.subreddit['over_18'] and \
    @@ -101,6 +100,7 @@ def post_stream(self):
    self.log.Intro(" - u/sewingmodthings gave FO flair to unflaired post")

    # If pattern found in title add it to the pending patterns DB
    # Still Need To: Add back in ability to check if pattern is in the DB and if so mark it for info comment
    if post.link_flair_text is not None and post.link_flair_css_class in FLAIRCSSTOCHECK and \
    has_pattern(post.title) and not ppp_done(post):
    try:
    @@ -112,7 +112,6 @@ def post_stream(self):

    def comment_stream(self):
    self.logger.info("Starting Comment Stream")

    while True:
    for comment in self.sewsub.stream.comments(skip_existing=True):
    self.db.connect()
    @@ -387,8 +386,6 @@ def create_logger():


    if __name__ == '__main__':

    startTime = time.time()


    bot = StreamingStuff()
    bot.run()
  3. binarybrat revised this gist Aug 10, 2020. 1 changed file with 190 additions and 137 deletions.
    327 changes: 190 additions & 137 deletions streamingprocesses.py
    Original file line number Diff line number Diff line change
    @@ -1,117 +1,117 @@
    import sys
    import time
    import traceback
    import peewee as pw
    import multiprocessing
    import time
    from datetime import datetime

    from ago import human
    from peewee import IntegrityError
    import logging.handlers

    from loggingTesting import *
    from loggingTesting import Log
    from models import init_db
    from models.commentmodel import Comment, updateComment, delUpdateComment, add_comment, commentDone
    from models.detailsmodel import Details, DetailsPending
    from models.detailsmodel import DetailsHelper
    from models.detailsmodel import get_details_approvalID
    from models.detailsmodel import get_details_approvalID, detailsComment
    from models.messagemodel import SewingMessages, add_sewmsg
    from models.modlogmodel import Modlog, most_actions, add_modlog
    from models.patternmodel import Pattern, PendingPatternPosts, ppp_done, add_ppp
    from models.patternmodel import Pattern, PendingPatternPosts, DataPatternsPending, CompanyPatternsPending, ppp_done
    from models.patternmodel import add_patternpostconnect, add_ppp, patternDone, postsforpatDone, PatternPosts
    from models.postmodel import Posts, add_post, postDone, update_post
    from models.postmodel import delUpdate_post, unflaired_canFlair
    from models.statstrafficmodel_ import TotalsTesting
    from utils import detailpendinglogin
    from utils.globalvars import isdeleted, has_pattern, hardcore_removed, formatdt
    from utils.globalvars import isdeleted, has_pattern, formatdt
    from utils.globalvars import shortenRedditURL
    from utils.randomhelpershit import FLAIRCSSTOCHECK, modpost, flair_post


    def detailsComment(comment, wordmin=30):
    if not isdeleted(comment) and not comment.removed and not comment.spam and not hardcore_removed(comment) and \
    comment.is_root and comment.is_submitter and not comment.submission.spam and \
    not hardcore_removed(comment.submission):
    return True if (len(comment.body.split()) >= wordmin or has_pattern(comment.body)) else False


    class StreamingStuff:
    def __init__(self, wordmin=30, scoremin=50, timeback=24, doPatterns=False, nsfwChecks=False):
    def __init__(self, testing=False, wordmin=30, scoremin=20, timeback=24, doPatterns=False, nsfwChecks=False):
    self.testing = testing
    self.subname = 'sewing' if not self.testing else 'sewingmods'
    self.wordmin = wordmin
    self.scoremin = scoremin
    self.timeback = timeback
    self.doPatterns = doPatterns
    self.nsftChecks = nsfwChecks
    self.reddit = detailpendinglogin.login()
    self.sewsub = self.reddit.subreddit('sewing')
    self.START_TIME = time.time()
    self.HOW_AGO = self.START_TIME - (3600 * 24)
    self.sewsub = self.reddit.subreddit(self.subname)
    self.db = init_db()
    self.db.connect()
    self.db.create_tables([Comment, Posts, Details, DetailsPending, Modlog, SewingMessages, PendingPatternPosts, Pattern])
    self.db.create_tables([Comment, Posts, Details, DetailsPending, Modlog, SewingMessages, PendingPatternPosts])
    self.db.create_tables([Pattern, PatternPosts, CompanyPatternsPending, DataPatternsPending])
    self.db.close()
    self.log = Log()
    self.startTime = datetime.now()
    self.logger = create_logger()
    self.logger.info("Starting the Streaming Bot Tests - Running on: /r/{}".format(self.subname))

    def post_stream(self):
    self.log.LightBlue("Starting Post Stream ~ {}".format(datetime.now().strftime(formatdt)))
    self.logger.info("Starting Post Stream")

    while True:

    for post in self.sewsub.stream.submissions(skip_existing=True):
    self.db.connect()
    self.log.LightBlue(
    ">> Incoming post by u/{} - {}\n > {}".format(post.author.name if post.author else "[deleted]",
    post.shortlink, post.title))
    self.logger.info(
    "Incoming post by u/{} - {}\n - {}".format(post.author.name if post.author else "[deleted]",
    post.shortlink, post.title))

    if not postDone(post) and not isdeleted(post):
    add_post(post)
    self.log.Intro(" > Added post to the database")
    # try:
    # TotalsTesting.create(
    # datecreated=datetime.utcfromtimestamp(post.created_utc).date(),
    # idstr=post.fullname
    # )
    # self.log.Intro(" > Added post to TotalsTesting DB")
    # except IntegrityError:
    # pass
    self.log.Intro(" - Added post to the database")
    try:
    TotalsTesting.create(
    datecreated=datetime.utcfromtimestamp(post.created_utc).date(),
    idstr=post.fullname
    )
    self.log.Intro(" - Added post to TotalsTesting DB")
    except pw.IntegrityError:
    pass
    else:
    if isdeleted(post):
    delUpdate_post(post)
    self.log.Intro(" > Updating Deleted Post: {}\n > {}".format(post.shortlink, post.title))
    self.log.Intro(" - Updating Deleted Post: {}\n - {}".format(post.shortlink, post.title))
    else:
    update_post(post)
    self.log.Intro(" > Updating Post by u/{}".format(post.author.name if post.author else "[deleted]"))
    self.log.Intro(" - Updating Post by u/{}".format(post.author.name if post.author else "[deleted]"))

    # Approve weird recurring posts that AuoMod does not approve
    if post.approved and post.approved_by is not None and not isdeleted(
    post) and post.author.name in ['AutoModerator',
    'sewingmodthings']:
    post.mod.approve()
    self.log.Intro(
    " > Approved a weekly thread by AutoModerator: {}\n{}".format(post.shortlink,
    " - Approved a weekly thread by AutoModerator: {}\n{}".format(post.shortlink,
    post.title))

    # Checks author account for over-18 and age to determine if should filter - half assed spambot protection
    if self.nsftChecks and post.author.subreddit['over_18'] and (time.time() - post.author.created_utc <= 2629743):
    if self.nsftChecks and post.author.subreddit['over_18'] and \
    (time.time() - post.author.created_utc <= 2629743):
    post.mod.remove()
    post.report("NSFW user with account under 30 days old - please check and take action")
    self.log.Intro(" > NSFW user account under a month old - removed and sending report")
    continue
    self.log.Intro(" - NSFW user account under a month old - removed and sending report")
    continue

    # Adds flair to what looks to be a project type post that is unflaired
    if post.link_flair_text is None and unflaired_canFlair(post):
    flair_post(post, 'FO', False)
    self.log.Intro(" > u/sewingmodthings gave FO flair to unflaired post")
    self.log.Intro(" - u/sewingmodthings gave FO flair to unflaired post")

    # If pattern found in title add it to the pending patterns DB
    if post.link_flair_text is not None and post.link_flair_css_class in FLAIRCSSTOCHECK and \
    has_pattern(post.title) and not ppp_done(post):
    try:
    add_ppp(post)
    self.log.Intro(" > Pattern found in posted - added to pending posts db")
    except IntegrityError:
    self.log.Intro(" - Pattern found in posted - added to pending posts db")
    except pw.IntegrityError:
    pass
    self.db.close()

    def comment_stream(self):
    self.log.Magenta("Starting Comment Stream ~ {}".format(datetime.now().strftime(formatdt)))
    self.logger.info("Starting Comment Stream")

    while True:
    for comment in self.sewsub.stream.comments(skip_existing=True):
    @@ -122,106 +122,111 @@ def comment_stream(self):

    if not commentDone(comment.fullname) and not isdeleted(comment):
    add_comment(comment)
    self.log.Intro(" > NEW Comment added to database")
    # try:
    # TotalsTesting.create(datecreated=datetime.utcfromtimestamp(comment.created_utc).date(),
    # idstr=comment.fullname)
    # self.log.Intro(" > Comment added to TotalsTesting DB")
    # except IntegrityError:
    # pass
    self.log.Intro(" - NEW Comment added to database")
    try:
    TotalsTesting.create(datecreated=datetime.utcfromtimestamp(comment.created_utc).date(),
    idstr=comment.fullname)
    self.log.Intro(" - Comment added to TotalsTesting DB")
    except pw.IntegrityError:
    pass
    else:
    if isdeleted(comment):
    delUpdateComment(comment)
    self.log.Intro(" > Comment deleted - updated database")
    self.log.Intro(" - Comment deleted - updated database")
    else:
    updateComment(comment)
    self.log.Intro(" > Comment updated in the database")
    self.log.Intro(" - Comment updated in the database")

    # Checks if project type post and whether or not comment passes as a details comment/processed details
    if detailsComment(comment, wordmin=self.wordmin):
    self.log.Intro(" > Details Comment Found")
    self.log.Intro(" - Details Comment Found")
    dhelper = DetailsHelper(self.reddit, comment, verbose=True, overkill=True)

    if not dhelper.already_in_db() and not dhelper.is_deleted:
    dhelper.add_details()
    self.log.Intro(" > Added details to the DB")
    self.log.Intro(" - Added details to the DB")
    else:
    if dhelper.oktoupdate:
    self.log.Intro(" > Updated details comment in DB")
    self.log.Intro(" - Updated details comment in DB")

    if dhelper.has_pattern and not ppp_done(dhelper.comment.submission):
    if has_pattern(dhelper.comment.submission.title):
    add_ppp(dhelper.submission)
    self.log.Intro(" > Adding POST pending pattern post: {} - {}".format(dhelper.comment.fullname,
    self.log.Intro(" - Adding POST pending pattern post: {} - {}".format(dhelper.comment.fullname,
    dhelper.submission.fullname))
    else:
    add_ppp(dhelper.submission, dhelper.comment.fullname, dhelper.comment.body)
    self.log.Intro(
    " > Adding COMMENT pending pattern post: {} - {}".format(dhelper.comment.fullname,
    " - Adding COMMENT pending pattern post: {} - {}".format(dhelper.comment.fullname,
    dhelper.submission.fullname))
    # If post is either warned, or filtered process and send approval message
    if dhelper.pending:
    self.log.Intro(" > Found a details pending post with details comment!!!")
    self.log.Intro(" - Found a details pending post with details comment!!!")
    pending = DetailsPending.get_or_none(DetailsPending.idstr == dhelper.comment.submission.fullname)
    if pending.filtered and dhelper.comment.removed and dhelper.comment.banned_by == 'sewingmodthings':
    modpost(dhelper.submission.id, 'approve')
    self.log.Intro(" > Re-Approved pending post for giving details")
    self.log.Intro(" - Re-Approved pending post for giving details")
    if not dhelper.approval_msg_sent():
    dhelper.send_details_approval()
    apmsgid = get_details_approvalID(self.reddit, dhelper.submission)
    self.log.Intro(" > Sent Details Approval Message: {}".format(apmsgid))
    self.log.Intro(" - Sent Details Approval Message: {}".format(apmsgid))
    dhelper.update_pending_approved()
    self.log.Intro(" > Updated pending details to approved: {} - {}".format(dhelper.comment.fullname,
    self.log.Intro(" - Updated pending details to approved: {} - {}".format(dhelper.comment.fullname,
    dhelper.submission.fullname))
    self.db.close()

    def edited_stream(self):
    self.log.Teal("Starting Edited Stream ~ {}".format(datetime.now().strftime(formatdt)))
    self.logger.info("Starting Edited Stream")
    while True:
    for edited in self.sewsub.mod.stream.edited(skip_existing=True):
    self.db.connect()
    if edited.fullname.startswith('t1'):
    normalprint = ">> Incoming Edited Comment by: {} - {}".format(
    normalprint = "Incoming Edited Comment by: {} - {}".format(
    edited.author.name if edited.author else "[deleted]", edited.fullname)
    detailsprint = ">> Incoming Edited DETAILS Comment by: {} - {}".format(
    detailsprint = "Incoming Edited DETAILS Comment by: {} - {}".format(
    edited.author.name if edited.author else "[deleted]", edited.fullname)
    self.log.Teal(detailsprint if detailsComment(edited) else normalprint)
    sendtolog = detailsprint if detailsComment(edited) else normalprint
    self.logger.info(sendtolog)
    # self.log.Teal(detailsprint if detailsComment(edited) else normalprint)

    if not commentDone(edited.fullname) and not isdeleted(edited):
    add_comment(edited)
    self.log.Intro(" > NEW Comment added to database")
    self.log.Intro(" - NEW EDITED Comment added to database")
    try:
    TotalsTesting.create(datecreated=datetime.utcfromtimestamp(edited.created_utc).date(),
    idstr=edited.fullname)
    except IntegrityError:
    self.log.Intro(" - Edited Comment added to TotalsTesting DB")
    except pw.IntegrityError:
    pass

    else:
    if isdeleted(edited):
    delUpdateComment(edited)
    self.log.Intro(" > Edited Comment deleted - updated database")
    self.log.Intro(" - Edited Comment deleted - updated database")
    else:
    updateComment(edited)
    self.log.Intro(" > Edited Comment updated in the database")
    if detailsComment(edited, wordmin=30):
    self.log.Intro(" > Edited Details Comment Found")
    self.log.Intro(" - Edited Comment updated in the database")

    if detailsComment(edited, wordmin=self.wordmin):
    self.log.Intro(" - Edited Details Comment Found")
    dhelper = DetailsHelper(self.reddit, edited, verbose=True, overkill=True)
    if not dhelper.already_in_db() and not dhelper.is_deleted:
    dhelper.add_details()
    self.log.Intro(" > Added details to the DB")
    self.log.Intro(" - Added details to the DB")
    else:
    if dhelper.oktoupdate:
    self.log.Intro(" > Updated details comment in DB")
    self.log.Intro(" - Updated details comment in DB")

    if dhelper.has_pattern and not dhelper.pattern_in_db():
    if has_pattern(dhelper.comment.submission.title):
    add_ppp(dhelper.submission)
    self.log.Intro(
    " > Adding POST pending pattern post: {} - {}".format(dhelper.comment.fullname,
    " - Adding POST pending pattern post: {} - {}".format(dhelper.comment.fullname,
    dhelper.submission.fullname))
    else:
    add_ppp(dhelper.submission, dhelper.comment.fullname, dhelper.comment.body)
    self.log.Intro(
    " > Adding COMMENT pending pattern post: {} - {}".format(
    " - Adding COMMENT pending pattern post: {} - {}".format(
    dhelper.comment.fullname,
    dhelper.submission.fullname))

    @@ -231,111 +236,159 @@ def edited_stream(self):
    if pending.filtered and dhelper.comment.removed and \
    dhelper.comment.banned_by == 'sewingmodthings':
    modpost(dhelper.submission.id, 'approve')
    self.log.Intro(" > Re-Approved pending post for giving details")
    self.log.Intro(" - Re-Approved pending post for giving details")
    if not dhelper.approval_msg_sent():
    dhelper.send_details_approval()
    apmsgid = get_details_approvalID(self.reddit, dhelper.submission)
    self.log.Intro(" > Sent Details Approval Message: {}".format(apmsgid))
    self.log.Intro(" - Sent Details Approval Message: {}".format(apmsgid))
    dhelper.update_pending_approved()
    self.log.Intro(
    " > Updated edited pending details to approved: {} - {}".format(
    " - Updated edited pending details to approved: {} - {}".format(
    dhelper.comment.fullname,
    dhelper.submission.fullname))

    elif edited.fullname.startswith('t3'):
    self.log.Teal(">> Incoming Edited Post by: u/{} - {}".format(edited.author.name if
    edited.author else "[deleted]",
    edited.fullname))
    self.logger.info("Incoming Edited Post by: u/{} - {}".format(edited.author.name if edited.author
    else "[deleted]", edited.fullname))
    if not postDone(edited) and not isdeleted(edited):
    add_post(edited)
    self.log.Intro(" > NEW Post added to database FROM EDITED STREAM")
    try:
    TotalsTesting.create(
    self.log.Intro(" - NEW Post added to database FROM EDITED STREAM")
    TotalsTesting.create(
    datecreated=datetime.utcfromtimestamp(edited.created_utc).date(),
    idstr=edited.fullname
    )
    except IntegrityError:
    pass
    self.log.Intro(" - Edited Post added to TotalsTesting DB")
    else:
    if isdeleted(edited):
    delUpdate_post(edited)
    self.log.Intro(" > Edited post deleted - updated database")
    self.log.Intro(" - Edited post deleted - updated database")
    else:
    update_post(edited)
    self.log.Intro(" > Updated edited post in database")
    self.log.Intro(" - Updated edited post in database")
    self.db.close()

    def inbox_stream(self):
    self.log.LightTeal("Starting Inbox Stream ~ {}".format(datetime.now().strftime(formatdt)))
    self.logger.info("Starting Inbox Stream")
    while True:
    for msg in self.reddit.inbox.stream(skip_existing=True):
    self.log.LightTeal(">> Incoming Inbox item from: u/{} - {}".format(
    msg.author.name if msg.author else "[deleted]",
    msg.fullname))
    itemadded = "Message" if msg.fullname.startswith('t4_') else "Comment"
    self.logger.info("Incoming Inbox item: u/{} - {}".format(
    msg.author.name if msg.author else "[deleted]", msg.fullname))
    self.db.connect()
    try:
    add_sewmsg(msg)
    itemadded = "Message" if msg.fullname.startswith('t4_') else "Comment"
    self.log.LightTeal(" > New {} Added: {} - {} - {}".format(itemadded, msg.author, msg.dest, msg.name))
    except IntegrityError:
    pass
    add_sewmsg(msg)
    self.db.close()
    self.log.Intro(" - New {} Added: {} - {} - {}".format(itemadded, msg.author, msg.dest, msg.name))

    def modlog_stream(self):
    self.log.Success("Starting Modlog Stream ~ {}".format(datetime.now().strftime(formatdt)))
    self.logger.info("Starting Modlog Stream")
    while True:
    for entry in self.sewsub.mod.stream.log(skip_existing=True):
    self.db.connect()
    if entry.action in most_actions:
    self.log.Success(">>> Incoming Modlog Entry: {} to {} by u/{} | Mod: {}".format(
    entry.action,
    entry.target_fullname,
    entry.target_author,
    entry.mod))
    try:
    add_modlog(entry)
    if entry.target_fullname is not None and entry.target_fullname.startswith('t3_') or \
    entry.target_fullname.startswith('t1_'):
    urlLink = shortenRedditURL('https://www.reddit.com' + entry.target_permalink)
    self.log.Intro(" > {}: {}".format("Post" if entry.target_fullname.startswith('t3_')
    else "Comment", urlLink))
    if entry.target_fullname.startswith("t3_"):
    self.log.Intro(" > {}".format(entry.target_title))
    if entry.mod == 'AutoModerator':
    self.log.Intro(" > Details: {}".format(entry.details))
    except IntegrityError:
    pass
    if entry.target_fullname is not None and entry.target_fullname.startswith('t3_') and \
    entry.action == 'editflair':
    flair_edit_post = self.reddit.submission(entry.target_fullname.lstrip('t3_'))
    if hasattr(flair_edit_post, 'link_flair_text'):
    flairtext = flair_edit_post.link_flair_text if flair_edit_post.link_flair_text is not None else None
    onthedb = Posts.get_or_none(Posts.idstr == flair_edit_post.id)
    if onthedb.flairtext != flairtext:
    update_post(flair_edit_post)
    self.log.Intro(" > Updated flair edited post to proper flair in the DB")
    self.db.close()
    self.logger.info("Incoming Modlog Entry: {} to {} Mod: {}".format(entry.action, entry.mod,
    entry.target_fullname))
    self.db.connect()
    add_modlog(entry)
    self.db.close()
    self.log.Intro(" - Added entry to db")

    if entry.target_fullname is not None and entry.target_fullname.startswith('t3_') or \
    entry.target_fullname.startswith('t1_'):
    urlLink = shortenRedditURL('https://www.reddit.com' + entry.target_permalink)
    self.log.Intro(" - {}: {}".format("Post" if entry.target_fullname.startswith('t3_')
    else "Comment", urlLink))
    if entry.target_fullname.startswith("t3_"):
    self.log.Intro(" - {}".format(entry.target_title))
    if entry.mod == 'AutoModerator':
    self.log.Intro(" - Details: {}".format(entry.details))

    # Should this be it's own process?
    if entry.action == 'editflair':
    self.db.connect()
    self.handle_reflair(entry=entry)
    self.db.close()

    def handle_reflair(self, entry):

    feditedpost = self.reddit.submission(entry.target_fullname[3:])
    feditedonDB = Posts.get_or_none(Posts.fullname==entry.target_fullname)
    if not feditedonDB:
    add_post(feditedpost)
    self.log.Intro(" - Added post that was reflaired but not in the database via modlog process")

    postflair = feditedpost.link_flair_text if feditedpost.link_flair_text is not None else None
    ondbflair = feditedonDB.flairtext if feditedonDB.flairtext is not None else None

    if ondbflair != postflair:
    update_post(feditedpost)
    self.log.Intro(" - Updated post with flair change! New Flair: {}".format(postflair))

    def processing(self):
    subprocess1 = multiprocessing.Process(target=self.post_stream)
    subprocess2 = multiprocessing.Process(target=self.comment_stream)
    subprocess3 = multiprocessing.Process(target=self.edited_stream)
    subprocess4 = multiprocessing.Process(target=self.inbox_stream)
    subprocess5 = multiprocessing.Process(target=self.modlog_stream)
    subprocess1 = multiprocessing.Process(target=self.post_stream, name='postStream')
    subprocess2 = multiprocessing.Process(target=self.comment_stream, name='commentStream')
    subprocess3 = multiprocessing.Process(target=self.edited_stream, name='editedStream')
    subprocess4 = multiprocessing.Process(target=self.inbox_stream, name='inboxStream')
    subprocess5 = multiprocessing.Process(target=self.modlog_stream, name='modlogStream')

    subprocess1.start()
    subprocess2.start()
    subprocess3.start()
    subprocess4.start()
    subprocess5.start()

    def fancy_processing(self):

    postProcess = multiprocessing.Process(target=self.post_stream, name='postStream')
    commentProcess = multiprocessing.Process(target=self.comment_stream, name='commentStream')
    editedProcess = multiprocessing.Process(target=self.edited_stream, name='editedStream')
    inboxProcess = multiprocessing.Process(target=self.inbox_stream, name='inboxStream')
    modlogProcess = multiprocessing.Process(target=self.modlog_stream, name='modlogStream')

    all_processes = [postProcess, commentProcess, editedProcess, inboxProcess, modlogProcess]

    # From here I am unsure what to do - should I join the processes and run them? Should I put them in a Queue?


    def run(self):
    try:
    self.processing()
    except pw.IntegrityError:
    pass
    except pw.DatabaseError or pw.OperationalError:
    self.db.close()
    time.sleep(3)
    self.db.connect()
    self.logger.error("DatabaseError or OperationError - closed and re-oppended the database")
    except KeyboardInterrupt:
    if not self.db.is_closed():
    self.log.Warn("CLOSED DATABASE")
    self.db.close()
    self.logger.debug("Start Time: {} - Ran for: {}".format(datetime.fromtimestamp(startTime).strftime(formatdt),
    human(startTime)))
    except Exception as e:
    att_error = traceback.format_exc()
    proc = multiprocessing.current_process().name
    self.logger.warning(">{} - Attribute Error - Process: {}\n{}".format(datetime.now().strftime(formatdt),
    proc, att_error))
    self.log.Intro('------------------------------')
    self.logger.error('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e)


    def create_logger():
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(
    '[%(asctime)s| %(levelname)s| %(processName)s] %(message)s',
    '%m-%d-%Y %H:%M:%S %p')
    handler = logging.FileHandler('logs/multi_stream.log')
    handler.setFormatter(formatter)
    if not len(logger.handlers):
    logger.addHandler(handler)
    return logger


    if __name__ == '__main__':

    startTime = time.time()

    try:
    StreamingStuff().processing()
    except KeyboardInterrupt:
    print("Start Time: {} - Ran for: {}".format(datetime.fromtimestamp(startTime).strftime(formatdt),
    human(startTime)))
    bot = StreamingStuff()
    bot.run()
  4. binarybrat created this gist Aug 5, 2020.
    341 changes: 341 additions & 0 deletions streamingprocesses.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,341 @@
    import time
    import multiprocessing
    import time
    from datetime import datetime

    from ago import human
    from peewee import IntegrityError

    from loggingTesting import *
    from models import init_db
    from models.commentmodel import Comment, updateComment, delUpdateComment, add_comment, commentDone
    from models.detailsmodel import Details, DetailsPending
    from models.detailsmodel import DetailsHelper
    from models.detailsmodel import get_details_approvalID
    from models.messagemodel import SewingMessages, add_sewmsg
    from models.modlogmodel import Modlog, most_actions, add_modlog
    from models.patternmodel import Pattern, PendingPatternPosts, ppp_done, add_ppp
    from models.postmodel import Posts, add_post, postDone, update_post
    from models.postmodel import delUpdate_post, unflaired_canFlair
    from models.statstrafficmodel_ import TotalsTesting
    from utils import detailpendinglogin
    from utils.globalvars import isdeleted, has_pattern, hardcore_removed, formatdt
    from utils.globalvars import shortenRedditURL
    from utils.randomhelpershit import FLAIRCSSTOCHECK, modpost, flair_post


    def detailsComment(comment, wordmin=30):
    if not isdeleted(comment) and not comment.removed and not comment.spam and not hardcore_removed(comment) and \
    comment.is_root and comment.is_submitter and not comment.submission.spam and \
    not hardcore_removed(comment.submission):
    return True if (len(comment.body.split()) >= wordmin or has_pattern(comment.body)) else False


    class StreamingStuff:
    def __init__(self, wordmin=30, scoremin=50, timeback=24, doPatterns=False, nsfwChecks=False):
    self.wordmin = wordmin
    self.scoremin = scoremin
    self.timeback = timeback
    self.doPatterns = doPatterns
    self.nsftChecks = nsfwChecks
    self.reddit = detailpendinglogin.login()
    self.sewsub = self.reddit.subreddit('sewing')
    self.START_TIME = time.time()
    self.HOW_AGO = self.START_TIME - (3600 * 24)
    self.db = init_db()
    self.db.connect()
    self.db.create_tables([Comment, Posts, Details, DetailsPending, Modlog, SewingMessages, PendingPatternPosts, Pattern])
    self.db.close()
    self.log = Log()
    self.startTime = datetime.now()

    def post_stream(self):
    self.log.LightBlue("Starting Post Stream ~ {}".format(datetime.now().strftime(formatdt)))

    while True:

    for post in self.sewsub.stream.submissions(skip_existing=True):
    self.db.connect()
    self.log.LightBlue(
    ">> Incoming post by u/{} - {}\n > {}".format(post.author.name if post.author else "[deleted]",
    post.shortlink, post.title))

    if not postDone(post) and not isdeleted(post):
    add_post(post)
    self.log.Intro(" > Added post to the database")
    # try:
    # TotalsTesting.create(
    # datecreated=datetime.utcfromtimestamp(post.created_utc).date(),
    # idstr=post.fullname
    # )
    # self.log.Intro(" > Added post to TotalsTesting DB")
    # except IntegrityError:
    # pass
    else:
    if isdeleted(post):
    delUpdate_post(post)
    self.log.Intro(" > Updating Deleted Post: {}\n > {}".format(post.shortlink, post.title))
    else:
    update_post(post)
    self.log.Intro(" > Updating Post by u/{}".format(post.author.name if post.author else "[deleted]"))

    # Approve weird recurring posts that AuoMod does not approve
    if post.approved and post.approved_by is not None and not isdeleted(
    post) and post.author.name in ['AutoModerator',
    'sewingmodthings']:
    post.mod.approve()
    self.log.Intro(
    " > Approved a weekly thread by AutoModerator: {}\n{}".format(post.shortlink,
    post.title))

    # Checks author account for over-18 and age to determine if should filter - half assed spambot protection
    if self.nsftChecks and post.author.subreddit['over_18'] and (time.time() - post.author.created_utc <= 2629743):
    post.mod.remove()
    post.report("NSFW user with account under 30 days old - please check and take action")
    self.log.Intro(" > NSFW user account under a month old - removed and sending report")
    continue

    # Adds flair to what looks to be a project type post that is unflaired
    if post.link_flair_text is None and unflaired_canFlair(post):
    flair_post(post, 'FO', False)
    self.log.Intro(" > u/sewingmodthings gave FO flair to unflaired post")

    # If pattern found in title add it to the pending patterns DB
    if post.link_flair_text is not None and post.link_flair_css_class in FLAIRCSSTOCHECK and \
    has_pattern(post.title) and not ppp_done(post):
    try:
    add_ppp(post)
    self.log.Intro(" > Pattern found in posted - added to pending posts db")
    except IntegrityError:
    pass
    self.db.close()

    def comment_stream(self):
    self.log.Magenta("Starting Comment Stream ~ {}".format(datetime.now().strftime(formatdt)))

    while True:
    for comment in self.sewsub.stream.comments(skip_existing=True):
    self.db.connect()
    self.log.Magenta(
    "Incomming comment by u/{} - {}".format(comment.author.name if comment.author else "[deleted]",
    comment.fullname))

    if not commentDone(comment.fullname) and not isdeleted(comment):
    add_comment(comment)
    self.log.Intro(" > NEW Comment added to database")
    # try:
    # TotalsTesting.create(datecreated=datetime.utcfromtimestamp(comment.created_utc).date(),
    # idstr=comment.fullname)
    # self.log.Intro(" > Comment added to TotalsTesting DB")
    # except IntegrityError:
    # pass
    else:
    if isdeleted(comment):
    delUpdateComment(comment)
    self.log.Intro(" > Comment deleted - updated database")
    else:
    updateComment(comment)
    self.log.Intro(" > Comment updated in the database")

    # Checks if project type post and whether or not comment passes as a details comment/processed details
    if detailsComment(comment, wordmin=self.wordmin):
    self.log.Intro(" > Details Comment Found")
    dhelper = DetailsHelper(self.reddit, comment, verbose=True, overkill=True)

    if not dhelper.already_in_db() and not dhelper.is_deleted:
    dhelper.add_details()
    self.log.Intro(" > Added details to the DB")
    else:
    if dhelper.oktoupdate:
    self.log.Intro(" > Updated details comment in DB")

    if dhelper.has_pattern and not ppp_done(dhelper.comment.submission):
    if has_pattern(dhelper.comment.submission.title):
    add_ppp(dhelper.submission)
    self.log.Intro(" > Adding POST pending pattern post: {} - {}".format(dhelper.comment.fullname,
    dhelper.submission.fullname))
    else:
    add_ppp(dhelper.submission, dhelper.comment.fullname, dhelper.comment.body)
    self.log.Intro(
    " > Adding COMMENT pending pattern post: {} - {}".format(dhelper.comment.fullname,
    dhelper.submission.fullname))
    # If post is either warned, or filtered process and send approval message
    if dhelper.pending:
    self.log.Intro(" > Found a details pending post with details comment!!!")
    pending = DetailsPending.get_or_none(DetailsPending.idstr == dhelper.comment.submission.fullname)
    if pending.filtered and dhelper.comment.removed and dhelper.comment.banned_by == 'sewingmodthings':
    modpost(dhelper.submission.id, 'approve')
    self.log.Intro(" > Re-Approved pending post for giving details")
    if not dhelper.approval_msg_sent():
    dhelper.send_details_approval()
    apmsgid = get_details_approvalID(self.reddit, dhelper.submission)
    self.log.Intro(" > Sent Details Approval Message: {}".format(apmsgid))
    dhelper.update_pending_approved()
    self.log.Intro(" > Updated pending details to approved: {} - {}".format(dhelper.comment.fullname,
    dhelper.submission.fullname))
    self.db.close()

    def edited_stream(self):
    self.log.Teal("Starting Edited Stream ~ {}".format(datetime.now().strftime(formatdt)))
    while True:
    for edited in self.sewsub.mod.stream.edited(skip_existing=True):
    self.db.connect()
    if edited.fullname.startswith('t1'):
    normalprint = ">> Incoming Edited Comment by: {} - {}".format(
    edited.author.name if edited.author else "[deleted]", edited.fullname)
    detailsprint = ">> Incoming Edited DETAILS Comment by: {} - {}".format(
    edited.author.name if edited.author else "[deleted]", edited.fullname)
    self.log.Teal(detailsprint if detailsComment(edited) else normalprint)

    if not commentDone(edited.fullname) and not isdeleted(edited):
    add_comment(edited)
    self.log.Intro(" > NEW Comment added to database")
    try:
    TotalsTesting.create(datecreated=datetime.utcfromtimestamp(edited.created_utc).date(),
    idstr=edited.fullname)
    except IntegrityError:
    pass
    else:
    if isdeleted(edited):
    delUpdateComment(edited)
    self.log.Intro(" > Edited Comment deleted - updated database")
    else:
    updateComment(edited)
    self.log.Intro(" > Edited Comment updated in the database")
    if detailsComment(edited, wordmin=30):
    self.log.Intro(" > Edited Details Comment Found")
    dhelper = DetailsHelper(self.reddit, edited, verbose=True, overkill=True)
    if not dhelper.already_in_db() and not dhelper.is_deleted:
    dhelper.add_details()
    self.log.Intro(" > Added details to the DB")
    else:
    if dhelper.oktoupdate:
    self.log.Intro(" > Updated details comment in DB")

    if dhelper.has_pattern and not dhelper.pattern_in_db():
    if has_pattern(dhelper.comment.submission.title):
    add_ppp(dhelper.submission)
    self.log.Intro(
    " > Adding POST pending pattern post: {} - {}".format(dhelper.comment.fullname,
    dhelper.submission.fullname))
    else:
    add_ppp(dhelper.submission, dhelper.comment.fullname, dhelper.comment.body)
    self.log.Intro(
    " > Adding COMMENT pending pattern post: {} - {}".format(
    dhelper.comment.fullname,
    dhelper.submission.fullname))

    if dhelper.pending:
    pending = DetailsPending.get_or_none(
    DetailsPending.idstr == dhelper.comment.submission.fullname)
    if pending.filtered and dhelper.comment.removed and \
    dhelper.comment.banned_by == 'sewingmodthings':
    modpost(dhelper.submission.id, 'approve')
    self.log.Intro(" > Re-Approved pending post for giving details")
    if not dhelper.approval_msg_sent():
    dhelper.send_details_approval()
    apmsgid = get_details_approvalID(self.reddit, dhelper.submission)
    self.log.Intro(" > Sent Details Approval Message: {}".format(apmsgid))
    dhelper.update_pending_approved()
    self.log.Intro(
    " > Updated edited pending details to approved: {} - {}".format(
    dhelper.comment.fullname,
    dhelper.submission.fullname))

    elif edited.fullname.startswith('t3'):
    self.log.Teal(">> Incoming Edited Post by: u/{} - {}".format(edited.author.name if
    edited.author else "[deleted]",
    edited.fullname))
    if not postDone(edited) and not isdeleted(edited):
    add_post(edited)
    self.log.Intro(" > NEW Post added to database FROM EDITED STREAM")
    try:
    TotalsTesting.create(
    datecreated=datetime.utcfromtimestamp(edited.created_utc).date(),
    idstr=edited.fullname
    )
    except IntegrityError:
    pass
    else:
    if isdeleted(edited):
    delUpdate_post(edited)
    self.log.Intro(" > Edited post deleted - updated database")
    else:
    update_post(edited)
    self.log.Intro(" > Updated edited post in database")
    self.db.close()

    def inbox_stream(self):
    self.log.LightTeal("Starting Inbox Stream ~ {}".format(datetime.now().strftime(formatdt)))
    while True:
    for msg in self.reddit.inbox.stream(skip_existing=True):
    self.log.LightTeal(">> Incoming Inbox item from: u/{} - {}".format(
    msg.author.name if msg.author else "[deleted]",
    msg.fullname))
    self.db.connect()
    try:
    add_sewmsg(msg)
    itemadded = "Message" if msg.fullname.startswith('t4_') else "Comment"
    self.log.LightTeal(" > New {} Added: {} - {} - {}".format(itemadded, msg.author, msg.dest, msg.name))
    except IntegrityError:
    pass
    self.db.close()

    def modlog_stream(self):
    self.log.Success("Starting Modlog Stream ~ {}".format(datetime.now().strftime(formatdt)))
    while True:
    for entry in self.sewsub.mod.stream.log(skip_existing=True):
    self.db.connect()
    if entry.action in most_actions:
    self.log.Success(">>> Incoming Modlog Entry: {} to {} by u/{} | Mod: {}".format(
    entry.action,
    entry.target_fullname,
    entry.target_author,
    entry.mod))
    try:
    add_modlog(entry)
    if entry.target_fullname is not None and entry.target_fullname.startswith('t3_') or \
    entry.target_fullname.startswith('t1_'):
    urlLink = shortenRedditURL('https://www.reddit.com' + entry.target_permalink)
    self.log.Intro(" > {}: {}".format("Post" if entry.target_fullname.startswith('t3_')
    else "Comment", urlLink))
    if entry.target_fullname.startswith("t3_"):
    self.log.Intro(" > {}".format(entry.target_title))
    if entry.mod == 'AutoModerator':
    self.log.Intro(" > Details: {}".format(entry.details))
    except IntegrityError:
    pass
    if entry.target_fullname is not None and entry.target_fullname.startswith('t3_') and \
    entry.action == 'editflair':
    flair_edit_post = self.reddit.submission(entry.target_fullname.lstrip('t3_'))
    if hasattr(flair_edit_post, 'link_flair_text'):
    flairtext = flair_edit_post.link_flair_text if flair_edit_post.link_flair_text is not None else None
    onthedb = Posts.get_or_none(Posts.idstr == flair_edit_post.id)
    if onthedb.flairtext != flairtext:
    update_post(flair_edit_post)
    self.log.Intro(" > Updated flair edited post to proper flair in the DB")
    self.db.close()

    def processing(self):
    subprocess1 = multiprocessing.Process(target=self.post_stream)
    subprocess2 = multiprocessing.Process(target=self.comment_stream)
    subprocess3 = multiprocessing.Process(target=self.edited_stream)
    subprocess4 = multiprocessing.Process(target=self.inbox_stream)
    subprocess5 = multiprocessing.Process(target=self.modlog_stream)

    subprocess1.start()
    subprocess2.start()
    subprocess3.start()
    subprocess4.start()
    subprocess5.start()


    if __name__ == '__main__':

    startTime = time.time()

    try:
    StreamingStuff().processing()
    except KeyboardInterrupt:
    print("Start Time: {} - Ran for: {}".format(datetime.fromtimestamp(startTime).strftime(formatdt),
    human(startTime)))