Skip to content

Instantly share code, notes, and snippets.

@binarybrat
Last active August 10, 2020 13:35
Show Gist options
  • Save binarybrat/2092cf49a27f1541ec0668e7154a6120 to your computer and use it in GitHub Desktop.
Save binarybrat/2092cf49a27f1541ec0668e7154a6120 to your computer and use it in GitHub Desktop.
Current streaming processes setup - adding logger
import sys
import time
import traceback
import peewee as pw
import multiprocessing
from datetime import datetime
from ago import human
import logging.handlers
from loggingTesting import Log
from models import init_db
from models.commentmodel import Comment, updateComment, delUpdateComment, add_comment, commentDone
from models.detailsmodel import Details, DetailsPending
from models.detailsmodel import DetailsHelper
from models.detailsmodel import get_details_approvalID, detailsComment
from models.messagemodel import SewingMessages, add_sewmsg
from models.modlogmodel import Modlog, most_actions, add_modlog
from models.patternmodel import Pattern, PendingPatternPosts, DataPatternsPending, CompanyPatternsPending, ppp_done
from models.patternmodel import add_patternpostconnect, add_ppp, patternDone, postsforpatDone, PatternPosts
from models.postmodel import Posts, add_post, postDone, update_post
from models.postmodel import delUpdate_post, unflaired_canFlair
from models.statstrafficmodel_ import TotalsTesting
from utils import detailpendinglogin
from utils.globalvars import isdeleted, has_pattern, formatdt
from utils.globalvars import shortenRedditURL
from utils.randomhelpershit import FLAIRCSSTOCHECK, modpost, flair_post
class StreamingStuff:
def __init__(self, testing=False, wordmin=30, scoremin=20, timeback=24, doPatterns=False, nsfwChecks=False):
self.testing = testing
self.subname = 'sewing' if not self.testing else 'sewingmods'
self.wordmin = wordmin
self.scoremin = scoremin
self.timeback = timeback
self.doPatterns = doPatterns
self.nsftChecks = nsfwChecks
self.reddit = detailpendinglogin.login()
self.sewsub = self.reddit.subreddit(self.subname)
self.db = init_db()
self.db.connect()
self.db.create_tables([Comment, Posts, Details, DetailsPending, Modlog, SewingMessages, PendingPatternPosts])
self.db.create_tables([Pattern, PatternPosts, CompanyPatternsPending, DataPatternsPending])
self.db.close()
self.log = Log()
self.startTime = datetime.now()
self.logger = create_logger()
self.logger.info("Starting the Streaming Bot Tests - Running on: /r/{}".format(self.subname))
def post_stream(self):
self.logger.info("Starting Post Stream")
while True:
for post in self.sewsub.stream.submissions(skip_existing=True):
self.db.connect()
self.logger.info(
"Incoming post by u/{} - {}\n - {}".format(post.author.name if post.author else "[deleted]",
post.shortlink, post.title))
if not postDone(post) and not isdeleted(post):
add_post(post)
self.log.Intro(" - Added post to the database")
try:
TotalsTesting.create(
datecreated=datetime.utcfromtimestamp(post.created_utc).date(),
idstr=post.fullname
)
self.log.Intro(" - Added post to TotalsTesting DB")
except pw.IntegrityError:
pass
else:
if isdeleted(post):
delUpdate_post(post)
self.log.Intro(" - Updating Deleted Post: {}\n - {}".format(post.shortlink, post.title))
else:
update_post(post)
self.log.Intro(" - Updating Post by u/{}".format(post.author.name if post.author else "[deleted]"))
# Approve weird recurring posts that AuoMod does not approve
if post.approved and post.approved_by is not None and not isdeleted(
post) and post.author.name in ['AutoModerator',
'sewingmodthings']:
post.mod.approve()
self.log.Intro(
" - Approved a weekly thread by AutoModerator: {}\n{}".format(post.shortlink,
post.title))
# Checks author account for over-18 and age to determine if should filter - half assed spambot protection
if self.nsftChecks and post.author.subreddit['over_18'] and \
(time.time() - post.author.created_utc <= 2629743):
post.mod.remove()
post.report("NSFW user with account under 30 days old - please check and take action")
self.log.Intro(" - NSFW user account under a month old - removed and sending report")
continue
# Adds flair to what looks to be a project type post that is unflaired
if post.link_flair_text is None and unflaired_canFlair(post):
flair_post(post, 'FO', False)
self.log.Intro(" - u/sewingmodthings gave FO flair to unflaired post")
# If pattern found in title add it to the pending patterns DB
if post.link_flair_text is not None and post.link_flair_css_class in FLAIRCSSTOCHECK and \
has_pattern(post.title) and not ppp_done(post):
try:
add_ppp(post)
self.log.Intro(" - Pattern found in posted - added to pending posts db")
except pw.IntegrityError:
pass
self.db.close()
def comment_stream(self):
self.logger.info("Starting Comment Stream")
while True:
for comment in self.sewsub.stream.comments(skip_existing=True):
self.db.connect()
self.log.Magenta(
"Incomming comment by u/{} - {}".format(comment.author.name if comment.author else "[deleted]",
comment.fullname))
if not commentDone(comment.fullname) and not isdeleted(comment):
add_comment(comment)
self.log.Intro(" - NEW Comment added to database")
try:
TotalsTesting.create(datecreated=datetime.utcfromtimestamp(comment.created_utc).date(),
idstr=comment.fullname)
self.log.Intro(" - Comment added to TotalsTesting DB")
except pw.IntegrityError:
pass
else:
if isdeleted(comment):
delUpdateComment(comment)
self.log.Intro(" - Comment deleted - updated database")
else:
updateComment(comment)
self.log.Intro(" - Comment updated in the database")
# Checks if project type post and whether or not comment passes as a details comment/processed details
if detailsComment(comment, wordmin=self.wordmin):
self.log.Intro(" - Details Comment Found")
dhelper = DetailsHelper(self.reddit, comment, verbose=True, overkill=True)
if not dhelper.already_in_db() and not dhelper.is_deleted:
dhelper.add_details()
self.log.Intro(" - Added details to the DB")
else:
if dhelper.oktoupdate:
self.log.Intro(" - Updated details comment in DB")
if dhelper.has_pattern and not ppp_done(dhelper.comment.submission):
if has_pattern(dhelper.comment.submission.title):
add_ppp(dhelper.submission)
self.log.Intro(" - Adding POST pending pattern post: {} - {}".format(dhelper.comment.fullname,
dhelper.submission.fullname))
else:
add_ppp(dhelper.submission, dhelper.comment.fullname, dhelper.comment.body)
self.log.Intro(
" - Adding COMMENT pending pattern post: {} - {}".format(dhelper.comment.fullname,
dhelper.submission.fullname))
# If post is either warned, or filtered process and send approval message
if dhelper.pending:
self.log.Intro(" - Found a details pending post with details comment!!!")
pending = DetailsPending.get_or_none(DetailsPending.idstr == dhelper.comment.submission.fullname)
if pending.filtered and dhelper.comment.removed and dhelper.comment.banned_by == 'sewingmodthings':
modpost(dhelper.submission.id, 'approve')
self.log.Intro(" - Re-Approved pending post for giving details")
if not dhelper.approval_msg_sent():
dhelper.send_details_approval()
apmsgid = get_details_approvalID(self.reddit, dhelper.submission)
self.log.Intro(" - Sent Details Approval Message: {}".format(apmsgid))
dhelper.update_pending_approved()
self.log.Intro(" - Updated pending details to approved: {} - {}".format(dhelper.comment.fullname,
dhelper.submission.fullname))
self.db.close()
def edited_stream(self):
self.logger.info("Starting Edited Stream")
while True:
for edited in self.sewsub.mod.stream.edited(skip_existing=True):
self.db.connect()
if edited.fullname.startswith('t1'):
normalprint = "Incoming Edited Comment by: {} - {}".format(
edited.author.name if edited.author else "[deleted]", edited.fullname)
detailsprint = "Incoming Edited DETAILS Comment by: {} - {}".format(
edited.author.name if edited.author else "[deleted]", edited.fullname)
sendtolog = detailsprint if detailsComment(edited) else normalprint
self.logger.info(sendtolog)
# self.log.Teal(detailsprint if detailsComment(edited) else normalprint)
if not commentDone(edited.fullname) and not isdeleted(edited):
add_comment(edited)
self.log.Intro(" - NEW EDITED Comment added to database")
try:
TotalsTesting.create(datecreated=datetime.utcfromtimestamp(edited.created_utc).date(),
idstr=edited.fullname)
self.log.Intro(" - Edited Comment added to TotalsTesting DB")
except pw.IntegrityError:
pass
else:
if isdeleted(edited):
delUpdateComment(edited)
self.log.Intro(" - Edited Comment deleted - updated database")
else:
updateComment(edited)
self.log.Intro(" - Edited Comment updated in the database")
if detailsComment(edited, wordmin=self.wordmin):
self.log.Intro(" - Edited Details Comment Found")
dhelper = DetailsHelper(self.reddit, edited, verbose=True, overkill=True)
if not dhelper.already_in_db() and not dhelper.is_deleted:
dhelper.add_details()
self.log.Intro(" - Added details to the DB")
else:
if dhelper.oktoupdate:
self.log.Intro(" - Updated details comment in DB")
if dhelper.has_pattern and not dhelper.pattern_in_db():
if has_pattern(dhelper.comment.submission.title):
add_ppp(dhelper.submission)
self.log.Intro(
" - Adding POST pending pattern post: {} - {}".format(dhelper.comment.fullname,
dhelper.submission.fullname))
else:
add_ppp(dhelper.submission, dhelper.comment.fullname, dhelper.comment.body)
self.log.Intro(
" - Adding COMMENT pending pattern post: {} - {}".format(
dhelper.comment.fullname,
dhelper.submission.fullname))
if dhelper.pending:
pending = DetailsPending.get_or_none(
DetailsPending.idstr == dhelper.comment.submission.fullname)
if pending.filtered and dhelper.comment.removed and \
dhelper.comment.banned_by == 'sewingmodthings':
modpost(dhelper.submission.id, 'approve')
self.log.Intro(" - Re-Approved pending post for giving details")
if not dhelper.approval_msg_sent():
dhelper.send_details_approval()
apmsgid = get_details_approvalID(self.reddit, dhelper.submission)
self.log.Intro(" - Sent Details Approval Message: {}".format(apmsgid))
dhelper.update_pending_approved()
self.log.Intro(
" - Updated edited pending details to approved: {} - {}".format(
dhelper.comment.fullname,
dhelper.submission.fullname))
elif edited.fullname.startswith('t3'):
self.logger.info("Incoming Edited Post by: u/{} - {}".format(edited.author.name if edited.author
else "[deleted]", edited.fullname))
if not postDone(edited) and not isdeleted(edited):
add_post(edited)
self.log.Intro(" - NEW Post added to database FROM EDITED STREAM")
TotalsTesting.create(
datecreated=datetime.utcfromtimestamp(edited.created_utc).date(),
idstr=edited.fullname
)
self.log.Intro(" - Edited Post added to TotalsTesting DB")
else:
if isdeleted(edited):
delUpdate_post(edited)
self.log.Intro(" - Edited post deleted - updated database")
else:
update_post(edited)
self.log.Intro(" - Updated edited post in database")
self.db.close()
def inbox_stream(self):
self.logger.info("Starting Inbox Stream")
while True:
for msg in self.reddit.inbox.stream(skip_existing=True):
itemadded = "Message" if msg.fullname.startswith('t4_') else "Comment"
self.logger.info("Incoming Inbox item: u/{} - {}".format(
msg.author.name if msg.author else "[deleted]", msg.fullname))
self.db.connect()
add_sewmsg(msg)
self.db.close()
self.log.Intro(" - New {} Added: {} - {} - {}".format(itemadded, msg.author, msg.dest, msg.name))
def modlog_stream(self):
self.logger.info("Starting Modlog Stream")
while True:
for entry in self.sewsub.mod.stream.log(skip_existing=True):
if entry.action in most_actions:
self.logger.info("Incoming Modlog Entry: {} to {} Mod: {}".format(entry.action, entry.mod,
entry.target_fullname))
self.db.connect()
add_modlog(entry)
self.db.close()
self.log.Intro(" - Added entry to db")
if entry.target_fullname is not None and entry.target_fullname.startswith('t3_') or \
entry.target_fullname.startswith('t1_'):
urlLink = shortenRedditURL('https://www.reddit.com' + entry.target_permalink)
self.log.Intro(" - {}: {}".format("Post" if entry.target_fullname.startswith('t3_')
else "Comment", urlLink))
if entry.target_fullname.startswith("t3_"):
self.log.Intro(" - {}".format(entry.target_title))
if entry.mod == 'AutoModerator':
self.log.Intro(" - Details: {}".format(entry.details))
# Should this be it's own process?
if entry.action == 'editflair':
self.db.connect()
self.handle_reflair(entry=entry)
self.db.close()
def handle_reflair(self, entry):
feditedpost = self.reddit.submission(entry.target_fullname[3:])
feditedonDB = Posts.get_or_none(Posts.fullname==entry.target_fullname)
if not feditedonDB:
add_post(feditedpost)
self.log.Intro(" - Added post that was reflaired but not in the database via modlog process")
postflair = feditedpost.link_flair_text if feditedpost.link_flair_text is not None else None
ondbflair = feditedonDB.flairtext if feditedonDB.flairtext is not None else None
if ondbflair != postflair:
update_post(feditedpost)
self.log.Intro(" - Updated post with flair change! New Flair: {}".format(postflair))
def processing(self):
subprocess1 = multiprocessing.Process(target=self.post_stream, name='postStream')
subprocess2 = multiprocessing.Process(target=self.comment_stream, name='commentStream')
subprocess3 = multiprocessing.Process(target=self.edited_stream, name='editedStream')
subprocess4 = multiprocessing.Process(target=self.inbox_stream, name='inboxStream')
subprocess5 = multiprocessing.Process(target=self.modlog_stream, name='modlogStream')
subprocess1.start()
subprocess2.start()
subprocess3.start()
subprocess4.start()
subprocess5.start()
def fancy_processing(self):
postProcess = multiprocessing.Process(target=self.post_stream, name='postStream')
commentProcess = multiprocessing.Process(target=self.comment_stream, name='commentStream')
editedProcess = multiprocessing.Process(target=self.edited_stream, name='editedStream')
inboxProcess = multiprocessing.Process(target=self.inbox_stream, name='inboxStream')
modlogProcess = multiprocessing.Process(target=self.modlog_stream, name='modlogStream')
all_processes = [postProcess, commentProcess, editedProcess, inboxProcess, modlogProcess]
# From here I am unsure what to do - should I join the processes and run them? Should I put them in a Queue?
def run(self):
try:
self.processing()
except pw.IntegrityError:
pass
except pw.DatabaseError or pw.OperationalError:
self.db.close()
time.sleep(3)
self.db.connect()
self.logger.error("DatabaseError or OperationError - closed and re-oppended the database")
except KeyboardInterrupt:
if not self.db.is_closed():
self.log.Warn("CLOSED DATABASE")
self.db.close()
self.logger.debug("Start Time: {} - Ran for: {}".format(datetime.fromtimestamp(startTime).strftime(formatdt),
human(startTime)))
except Exception as e:
att_error = traceback.format_exc()
proc = multiprocessing.current_process().name
self.logger.warning(">{} - Attribute Error - Process: {}\n{}".format(datetime.now().strftime(formatdt),
proc, att_error))
self.log.Intro('------------------------------')
self.logger.error('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e)
def create_logger():
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
'[%(asctime)s| %(levelname)s| %(processName)s] %(message)s',
'%m-%d-%Y %H:%M:%S %p')
handler = logging.FileHandler('logs/multi_stream.log')
handler.setFormatter(formatter)
if not len(logger.handlers):
logger.addHandler(handler)
return logger
if __name__ == '__main__':
startTime = time.time()
bot = StreamingStuff()
bot.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment