Created
August 18, 2025 18:38
-
-
Save cheeyeo/6179f43f5eb18f6b23f4522718f7d221 to your computer and use it in GitHub Desktop.
Revisions
-
cheeyeo created this gist
Aug 18, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,2156 @@ ```diff --- board/__init__.py +++ board/__init__.py @@ -18,7 +18,7 @@ def get_locale(): - return request.accept_languages.best_match(['en', 'es']) + return request.accept_languages.best_match(["en", "es"]) def create_app(): @@ -38,18 +38,16 @@ # Setting up translation Babel(app, locale_selector=get_locale) - # Sets up Cloudwatch Logs Embedded Metrics @metric_scope def my_handler(path, method, duration, metrics): - metrics.put_dimensions({'Path': path}) - metrics.put_metric('Latency', duration, 'Milliseconds') - metrics.set_property('Method', method) - metrics.set_property('Path', path) - metrics.set_namespace('flaskappv22') + metrics.put_dimensions({"Path": path}) + metrics.put_metric("Latency", duration, "Milliseconds") + metrics.set_property("Method", method) + metrics.set_property("Path", path) + metrics.set_namespace("flaskappv22") metrics.set_timestamp(datetime.datetime.now()) - # Setup logging @app.before_request @@ -63,8 +61,8 @@ "url": request.url, "path": request.path, "client_ip": client_ip, - "user_agent": request.headers.get("user-agent") - } + "user_agent": request.headers.get("user-agent"), + }, ) @after_this_request @@ -75,7 +73,7 @@ log_level = logging.ERROR elif response.status_code >= 400: log_level = logging.WARNING - + app.logger.log( log_level, f"{request.method} to {request.path} completed with status {response.status_code}", @@ -84,8 +82,8 @@ "url": request.url, "path": request.path, "status_code": response.status_code, - "latency": duration - } + "latency": duration, + }, ) if os.getenv("ENV_TYPE") == "prod": @@ -93,38 +91,41 @@ return response - # Set up moment js @app.before_request def moment_before_request(): g.locale = str(get_locale()) - + # moment = Moment(app) Moment(app) # Set up flask-mail - app.config['MAIL_SERVER'] = os.getenv('MAIL_SERVER') - app.config['MAIL_PORT'] = os.getenv('MAIL_PORT') + app.config["MAIL_SERVER"] = os.getenv("MAIL_SERVER") + app.config["MAIL_PORT"] = os.getenv("MAIL_PORT") Mailer.init_app(app) login.login_manager.init_app(app) login.login_manager.login_view = "users.login" - login.login_message = _l('Please login to access this page.') + login.login_message = _l("Please login to access this page.") if os.environ.get("TESTING"): - app.config["SQLALCHEMY_DATABASE_URI"] = f"postgresql://{os.environ.get('RDS_TEST_USERNAME')}:{os.environ.get('RDS_TEST_PASSWORD')}@{os.environ.get('RDS_HOSTNAME')}:{os.environ.get('RDS_TEST_PORT')}/{os.environ.get('RDS_TEST_DB_NAME')}" + app.config["SQLALCHEMY_DATABASE_URI"] = ( + f"postgresql://{os.environ.get('RDS_TEST_USERNAME')}:{os.environ.get('RDS_TEST_PASSWORD')}@{os.environ.get('RDS_HOSTNAME')}:{os.environ.get('RDS_TEST_PORT')}/{os.environ.get('RDS_TEST_DB_NAME')}" + ) else: - app.config["SQLALCHEMY_DATABASE_URI"] = f"postgresql://{os.environ.get('RDS_USERNAME')}:{os.environ.get('RDS_PASSWORD')}@{os.environ.get('RDS_HOSTNAME')}:{os.environ.get('RDS_PORT')}/{os.environ.get('RDS_DB_NAME')}" + app.config["SQLALCHEMY_DATABASE_URI"] = ( + f"postgresql://{os.environ.get('RDS_USERNAME')}:{os.environ.get('RDS_PASSWORD')}@{os.environ.get('RDS_HOSTNAME')}:{os.environ.get('RDS_PORT')}/{os.environ.get('RDS_DB_NAME')}" + ) # Set pagination value - app.config['POSTS_PER_PAGE'] = int(os.getenv("POSTS_PER_PAGE", 20)) + app.config["POSTS_PER_PAGE"] = int(os.getenv("POSTS_PER_PAGE", 20)) database.Database.init_app(app) # Adds migration commands via Flask-migrate but doesn't run the actual migration... Migrate(app, database.Database) # Migrate.init_app(app, database.Database) - + app.register_blueprint(pages.bp) app.register_blueprint(posts.bp) app.register_blueprint(users.bp) @@ -132,7 +133,7 @@ app.register_error_handler(404, errors.page_not_found) app.register_error_handler(500, errors.internal_error) app.register_error_handler(Exception, errors.unhandled_exception_handler) - + # app.logger.info("Microblog startup") return app --- board/cli.py +++ board/cli.py @@ -30,14 +30,43 @@ try: with open(messages_path, "w") as _tmp: - result = subprocess.run([pybabel_bin, "extract", "-F", "babel.cfg", "-k", "_l", "-o", messages_path, "."], capture_output=True, text=True, check=True) + result = subprocess.run( + [ + pybabel_bin, + "extract", + "-F", + "babel.cfg", + "-k", + "_l", + "-o", + messages_path, + ".", + ], + capture_output=True, + text=True, + check=True, + ) print(result.stdout) print(result.stderr) if result.returncode != 0: raise RuntimeError("extract command failed") - result = subprocess.run([pybabel_bin, "init", "-i", messages_path, "-d", "board.translations", "-l", lang], capture_output=True, text=True, check=True) + result = subprocess.run( + [ + pybabel_bin, + "init", + "-i", + messages_path, + "-d", + "board.translations", + "-l", + lang, + ], + capture_output=True, + text=True, + check=True, + ) print(result.stdout) print(result.stderr) @@ -56,16 +85,43 @@ try: with open(messages_path, "w") as _tmp: - result = subprocess.run([pybabel_bin, "extract", "-F", "babel.cfg", "-k", "_l", "-o", messages_path, "."], capture_output=True, text=True, check=True) + result = subprocess.run( + [ + pybabel_bin, + "extract", + "-F", + "babel.cfg", + "-k", + "_l", + "-o", + messages_path, + ".", + ], + capture_output=True, + text=True, + check=True, + ) print(result.stdout) print(result.stderr) if result.returncode != 0: raise RuntimeError("Babel extract command failed") - - result = subprocess.run([pybabel_bin, "update", "-i", messages_path, "-d", "board/translations"], capture_output=True, text=True, check=True) + result = subprocess.run( + [ + pybabel_bin, + "update", + "-i", + messages_path, + "-d", + "board/translations", + ], + capture_output=True, + text=True, + check=True, + ) + print(result.stdout) print(result.stderr) @@ -82,7 +138,12 @@ Compile all languages """ - result = subprocess.run([pybabel_bin, "compile", "-d", "board/translations"], capture_output=True, text=True, check=True) + result = subprocess.run( + [pybabel_bin, "compile", "-d", "board/translations"], + capture_output=True, + text=True, + check=True, + ) print(result.stdout) print(result.stderr) --- board/database.py +++ board/database.py @@ -25,84 +25,84 @@ # Users followers table followers = Table( - 'followers', + "followers", Base.metadata, - Column('follower_id', - Integer(), - ForeignKey('users.id'), - primary_key=True, - index=True), - Column('followed_id', - Integer(), - ForeignKey('users.id'), - primary_key=True, - index=True) + Column( + "follower_id", Integer(), ForeignKey("users.id"), primary_key=True, index=True + ), + Column( + "followed_id", Integer(), ForeignKey("users.id"), primary_key=True, index=True + ), ) class User(UserMixin, Database.Model): - __tablename__ = 'users' + __tablename__ = "users" id = mapped_column(Integer(), primary_key=True, autoincrement=True) - username = mapped_column(String(length=256), unique=True, index=True, nullable=False) + username = mapped_column( + String(length=256), unique=True, index=True, nullable=False + ) email = mapped_column(String(), unique=True, index=True, nullable=False) password_hash = mapped_column(String(length=256)) about_me = mapped_column(String(length=140)) - last_seen = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) + last_seen = mapped_column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) # Posts - posts: WriteOnlyMapped['Post'] = relationship('Post', back_populates='author') + posts: WriteOnlyMapped["Post"] = relationship("Post", back_populates="author") # Followers - following: WriteOnlyMapped['User'] = relationship( + following: WriteOnlyMapped["User"] = relationship( secondary=followers, primaryjoin=(followers.c.follower_id == id), secondaryjoin=(followers.c.followed_id == id), - back_populates='followers') - - followers: WriteOnlyMapped['User'] = relationship( + back_populates="followers", + ) + + followers: WriteOnlyMapped["User"] = relationship( secondary=followers, primaryjoin=(followers.c.followed_id == id), secondaryjoin=(followers.c.follower_id == id), - back_populates='following') + back_populates="following", + ) def set_password(self, password): self.password_hash = generate_password_hash(password) - + def check_password(self, password): return check_password_hash(self.password_hash, password) - + def __repr__(self): return f"<User: {self.email}>" - + def avatar(self, size): - digest = hashlib.new("md5", self.email.lower().encode('utf-8'), usedforsecurity=False).hexdigest() - return f'https://www.gravatar.com/avatar/{digest}?d=identicon&s={size}' - + digest = hashlib.new( + "md5", self.email.lower().encode("utf-8"), usedforsecurity=False + ).hexdigest() + return f"https://www.gravatar.com/avatar/{digest}?d=identicon&s={size}" + def follow(self, user): if not self.is_following(user): self.following.add(user) - + def unfollow(self, user): if self.is_following(user): self.following.remove(user) - + def is_following(self, user): query = self.following.select().where(User.id == user.id) return Database.session.scalar(query) is not None - + def followers_count(self): - query = Select(func.count()).select_from( - self.followers.select().subquery() - ) + query = Select(func.count()).select_from(self.followers.select().subquery()) return Database.session.scalar(query) - + def following_count(self): - query = Select(func.count()).select_from( - self.following.select().subquery() - ) + query = Select(func.count()).select_from(self.following.select().subquery()) return Database.session.scalar(query) - + def following_posts(self): Author = so.aliased(User) Follower = so.aliased(User) @@ -111,45 +111,49 @@ Select(Post) .join(Post.author.of_type(Author)) .join(Author.followers.of_type(Follower), isouter=True) - .where(sa.or_( - Follower.id == self.id, - # Author.id == self.id, - )) + .where( + sa.or_( + Follower.id == self.id, + # Author.id == self.id, + ) + ) .group_by(Post) .order_by(Post.created.desc()) ) - + def get_password_reset_token(self, expires_in=600): return jwt.encode( - {'reset_password': self.id, 'exp': time() + expires_in}, + {"reset_password": self.id, "exp": time() + expires_in}, key=os.getenv("APP_SECRET"), - algorithm='HS256' + algorithm="HS256", ) - + @staticmethod def verify_reset_password_token(token): try: - id = jwt.decode( - token, - key=os.getenv('APP_SECRET'), - algorithms=['HS256'] - )['reset_password'] + id = jwt.decode(token, key=os.getenv("APP_SECRET"), algorithms=["HS256"])[ + "reset_password" + ] except Exception as _: - current_app.logger.exception("Failure in JWT decode of password reset token") + current_app.logger.exception( + "Failure in JWT decode of password reset token" + ) return - + return Database.session.get(User, id) class Post(Database.Model): - __tablename__ = 'posts' + __tablename__ = "posts" id = mapped_column(Integer(), primary_key=True, autoincrement=True) message = mapped_column(Text(), nullable=False) - created = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) + created = mapped_column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) language = mapped_column(String(length=5), nullable=True) user_id = mapped_column(ForeignKey(User.id), index=True) - author = relationship('User', back_populates='posts') + author = relationship("User", back_populates="posts") def __repr__(self): - return f"<Post: {self.id}>" \ No newline at end of file + return f"<Post: {self.id}>" --- board/email.py +++ board/email.py @@ -6,11 +6,7 @@ def send_email(subject, sender, recipients, text_body, html_body): - msg = Message( - subject, - sender=sender, - recipients=recipients - ) + msg = Message(subject, sender=sender, recipients=recipients) msg.body = text_body msg.html = html_body Mailer.send(msg) @@ -18,10 +14,10 @@ def send_password_reset_email(user): token = user.get_password_reset_token() - send_email('[Microblog] Reset Your Password', - sender='[email protected]', - recipients=[user.email], - text_body=render_template('email/reset_password.txt', - user=user, token=token), - html_body=render_template('email/reset_password.html', - user=user, token=token)) \ No newline at end of file + send_email( + "[Microblog] Reset Your Password", + sender="[email protected]", + recipients=[user.email], + text_body=render_template("email/reset_password.txt", user=user, token=token), + html_body=render_template("email/reset_password.html", user=user, token=token), + ) --- board/errors.py +++ board/errors.py @@ -18,11 +18,9 @@ current_app.logger.exception("unhandled exception during request processing") response = e.get_response() - response.data = json.dumps({ - "code": e.code, - "name": e.name, - "description": e.description - }) + response.data = json.dumps( + {"code": e.code, "name": e.name, "description": e.description} + ) response.content_type = "application/json" return response --- board/form.py +++ board/form.py @@ -6,36 +6,44 @@ class LoginForm(FlaskForm): - email = StringField(_l('Email'), validators=[DataRequired(), Email()]) - password = PasswordField(_l('Password'), validators=[DataRequired()]) - remember_me = BooleanField(_l('Remember Me')) - submit = SubmitField(_l('Sign In')) + email = StringField(_l("Email"), validators=[DataRequired(), Email()]) + password = PasswordField(_l("Password"), validators=[DataRequired()]) + remember_me = BooleanField(_l("Remember Me")) + submit = SubmitField(_l("Sign In")) class RegistrationForm(FlaskForm): - username = StringField('Username', validators=[DataRequired()]) - email = StringField('Email', validators=[DataRequired(), Email()]) - password = PasswordField('Password', validators=[DataRequired()]) - password2 = PasswordField('Repeat Password', validators=[DataRequired(), EqualTo('password')]) - submit = SubmitField('Register') + username = StringField("Username", validators=[DataRequired()]) + email = StringField("Email", validators=[DataRequired(), Email()]) + password = PasswordField("Password", validators=[DataRequired()]) + password2 = PasswordField( + "Repeat Password", validators=[DataRequired(), EqualTo("password")] + ) + submit = SubmitField("Register") def validate_username(self, username): - user = database.Database.session.scalar(database.Database.select(database.User).where( - database.User.username == username.data)) + user = database.Database.session.scalar( + database.Database.select(database.User).where( + database.User.username == username.data + ) + ) if user is not None: - raise ValidationError('Please use a different username.') + raise ValidationError("Please use a different username.") def validate_email(self, email): - user = database.Database.session.scalar(database.Database.select(database.User).where( - database.User.email == email.data)) + user = database.Database.session.scalar( + database.Database.select(database.User).where( + database.User.email == email.data + ) + ) if user is not None: - raise ValidationError('Please use a different email address.') - + raise ValidationError("Please use a different email address.") + class EditProfileForm(FlaskForm): - username = StringField('Username', validators=[DataRequired()]) - about_me = TextAreaField('About Me', validators=[Length(min=0, max=140)]) - submit = SubmitField('Submit') + username = StringField("Username", validators=[DataRequired()]) + about_me = TextAreaField("About Me", validators=[Length(min=0, max=140)]) + submit = SubmitField("Submit") def __init__(self, original_username, *args, **kwargs): super().__init__(*args, **kwargs) @@ -43,29 +51,35 @@ def validate_username(self, username): if username.data != self.original_username: - user = database.Database.session.scalar(database.Database.select(database.User).where( - database.User.username == username.data)) + user = database.Database.session.scalar( + database.Database.select(database.User).where( + database.User.username == username.data + ) + ) if user is not None: - raise ValidationError('Please use a different username.') + raise ValidationError("Please use a different username.") class EmptyForm(FlaskForm): - submit = SubmitField('Submit') + submit = SubmitField("Submit") class PostForm(FlaskForm): - message = TextAreaField('Say something', validators=[DataRequired(), Length(min=1, max=140)]) - submit = SubmitField('Submit') + message = TextAreaField( + "Say something", validators=[DataRequired(), Length(min=1, max=140)] + ) + submit = SubmitField("Submit") class ResetPasswordRequestForm(FlaskForm): - email = StringField('Email', validators=[DataRequired(), Email()]) - submit = SubmitField('Submit') + email = StringField("Email", validators=[DataRequired(), Email()]) + submit = SubmitField("Submit") class ResetPasswordForm(FlaskForm): - password = PasswordField('Password', validators=[DataRequired()]) + password = PasswordField("Password", validators=[DataRequired()]) password2 = PasswordField( - 'Repeat Password', validators=[DataRequired(), EqualTo('password')]) - submit = SubmitField('Request Password Reset') + "Repeat Password", validators=[DataRequired(), EqualTo("password")] + ) + submit = SubmitField("Request Password Reset") --- board/log_context.py +++ board/log_context.py @@ -21,7 +21,7 @@ context = _log_context.get() for key, value in context.items(): setattr(record, key, value) - + return True --- board/login.py +++ board/login.py @@ -7,4 +7,4 @@ @login_manager.user_loader def load_user(id): - return database.Database.session.get(database.User, int(id)) \ No newline at end of file + return database.Database.session.get(database.User, int(id)) --- board/pages.py +++ board/pages.py @@ -1,4 +1,12 @@ -from flask import Blueprint, render_template, flash, request, redirect, url_for, current_app +from flask import ( + Blueprint, + render_template, + flash, + request, + redirect, + url_for, + current_app, +) from flask_login import login_required, current_user from langdetect import detect, LangDetectException from board.form import PostForm @@ -16,51 +24,61 @@ try: language = detect(form.message.data) except LangDetectException: - language = '' + language = "" - post = Post( - message=form.message.data, - language=language, - author=current_user - ) + post = Post(message=form.message.data, language=language, author=current_user) Database.session.add(post) Database.session.commit() - flash('Your post is now live!') - return redirect(url_for('pages.home')) - + flash("Your post is now live!") + return redirect(url_for("pages.home")) + # Show all posts of users you're currently following - page = request.args.get('page', 1, type=int) + page = request.args.get("page", 1, type=int) # posts = Database.session.scalars( # current_user.following_posts() # ).all() - posts = Database.paginate(current_user.following_posts(), - page=page, - per_page=current_app.config['POSTS_PER_PAGE'], - error_out=False) - - next_url = url_for('pages.home', page=posts.next_num) if posts.has_next else None + posts = Database.paginate( + current_user.following_posts(), + page=page, + per_page=current_app.config["POSTS_PER_PAGE"], + error_out=False, + ) + + next_url = url_for("pages.home", page=posts.next_num) if posts.has_next else None - prev_url = url_for('pages.home', page=posts.prev_num) if posts.has_prev else None + prev_url = url_for("pages.home", page=posts.prev_num) if posts.has_prev else None - return render_template("pages/home.html", title="Home Page", form=form, posts=posts.items, next_url=next_url, prev_url=prev_url) + return render_template( + "pages/home.html", + title="Home Page", + form=form, + posts=posts.items, + next_url=next_url, + prev_url=prev_url, + ) @bp.route("/explore") @login_required def explore(): - page = request.args.get('page', 1, type=int) + page = request.args.get("page", 1, type=int) query = Database.select(Post).order_by(Post.created.desc()) # posts = Database.session.scalars(query).all() - posts = Database.paginate(query, - page=page, - per_page=current_app.config['POSTS_PER_PAGE'], - error_out=False) - - next_url = url_for('pages.explore', page=posts.next_num) if posts.has_next else None + posts = Database.paginate( + query, page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False + ) + + next_url = url_for("pages.explore", page=posts.next_num) if posts.has_next else None - prev_url = url_for('pages.explore', page=posts.prev_num) if posts.has_prev else None + prev_url = url_for("pages.explore", page=posts.prev_num) if posts.has_prev else None - return render_template("pages/home.html", title="Explore", posts=posts.items, next_url=next_url, prev_url=prev_url) + return render_template( + "pages/home.html", + title="Explore", + posts=posts.items, + next_url=next_url, + prev_url=prev_url, + ) @bp.get("/about") --- board/posts.py +++ board/posts.py @@ -23,18 +23,16 @@ try: language = detect(form.message.data) except LangDetectException: - language = '' + language = "" new_post = database.Post( - message=form.message.data, - language=language, - user_id=current_user.id + message=form.message.data, language=language, user_id=current_user.id ) database.Database.session.add(new_post) database.Database.session.commit() current_app.logger.info(f"New post by author {current_user.email}") flash(f"Thanks for posting, {current_user.email}", category="success") - return redirect(url_for('posts.posts')) + return redirect(url_for("posts.posts")) else: flash("Message cannot be blank", category="error") return render_template("posts/new.html", title="Add Post") @@ -46,7 +44,9 @@ # TODO: How to call current_user.posts ??? current_app.logger.info(f"Viewing all posts by {current_user.email}") posts = database.Database.session.execute( - database.Database.select(database.Post).order_by(database.Post.created.desc()).where(database.Post.user_id == current_user.id ) + database.Database.select(database.Post) + .order_by(database.Post.created.desc()) + .where(database.Post.user_id == current_user.id) ).scalars() - return render_template("posts/posts.html", title="All posts", posts=posts) \ No newline at end of file + return render_template("posts/posts.html", title="All posts", posts=posts) --- board/translate.py +++ board/translate.py @@ -7,8 +7,7 @@ def translate(text, source_lang, dest_lang): bedrock_client = boto3.client( - "bedrock-runtime", - region_name=os.getenv("AWS_REGION", "us-east-1") + "bedrock-runtime", region_name=os.getenv("AWS_REGION", "us-east-1") ) # TODO: Add logging config to Cloudwatch Logs here: @@ -33,30 +32,17 @@ """ req = { - "messages": [ - { - "role": "user", - "content": [ - { - "text": prompt - } - ] - } - ], - "inferenceConfig": { - "temperature": 0.1, - "topP": 0.9, - "maxTokens": 10240 - } + "messages": [{"role": "user", "content": [{"text": prompt}]}], + "inferenceConfig": {"temperature": 0.1, "topP": 0.9, "maxTokens": 10240}, } bedrock_response = bedrock_client.invoke_model( - modelId = "us.amazon.nova-micro-v1:0", + modelId="us.amazon.nova-micro-v1:0", body=json.dumps(req), - contentType="application/json" + contentType="application/json", ) resp = json.loads(bedrock_response["body"].read()) - translation = resp["output"]["message"]["content"][0]['text'] + translation = resp["output"]["message"]["content"][0]["text"] return translation --- board/translations.py +++ board/translations.py @@ -11,5 +11,5 @@ def get_translation(): data = request.get_json() return { - 'text': translate(data['text'], data['source_language'], data['dest_language']) + "text": translate(data["text"], data["source_language"], data["dest_language"]) } --- board/users.py +++ board/users.py @@ -1,11 +1,26 @@ from urllib.parse import urlsplit from datetime import datetime, timezone -from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app +from flask import ( + Blueprint, + render_template, + request, + redirect, + url_for, + flash, + current_app, +) from flask_login import current_user, login_user, logout_user, login_required import sqlalchemy as sa from sqlalchemy.sql import func from board.database import Database, User, Post -from board.form import LoginForm, RegistrationForm, EditProfileForm, EmptyForm, ResetPasswordRequestForm, ResetPasswordForm +from board.form import ( + LoginForm, + RegistrationForm, + EditProfileForm, + EmptyForm, + ResetPasswordRequestForm, + ResetPasswordForm, +) from board.email import send_password_reset_email from board.log_context import add_to_log_context @@ -13,11 +28,11 @@ bp = Blueprint("users", __name__) -@bp.route("/login", methods=['GET', 'POST']) +@bp.route("/login", methods=["GET", "POST"]) def login(): if current_user.is_authenticated: - flash('User already logged in') - return redirect(url_for('pages.home')) + flash("User already logged in") + return redirect(url_for("pages.home")) form = LoginForm() @@ -26,16 +41,16 @@ Database.select(User).where(User.email == form.email.data) ) if user is None or not user.check_password(form.password.data): - flash('Invalid username or password') - return redirect(url_for('users.login')) + flash("Invalid username or password") + return redirect(url_for("users.login")) login_user(user, remember=form.remember_me.data) with add_to_log_context(user_email=form.email.data): current_app.logger.info(f"User {current_user.email} has logged in.") - - next_page = request.args.get('next') - if not next_page or urlsplit(next_page).netloc != '': - next_page = url_for('pages.home') + + next_page = request.args.get("next") + if not next_page or urlsplit(next_page).netloc != "": + next_page = url_for("pages.home") return redirect(next_page) return render_template("users/login.html", form=form, title="Login") @@ -48,28 +63,25 @@ with add_to_log_context(): current_app.logger.info("User has logged out.") - return redirect(url_for('pages.home')) + return redirect(url_for("pages.home")) -@bp.route("/register", methods=['GET', 'POST']) +@bp.route("/register", methods=["GET", "POST"]) def register(): if current_user.is_authenticated: - flash('You are already signed in') - return redirect(url_for('pages.home')) + flash("You are already signed in") + return redirect(url_for("pages.home")) form = RegistrationForm() if form.validate_on_submit(): - user = User( - username=form.username.data, - email=form.email.data - ) + user = User(username=form.username.data, email=form.email.data) user.set_password(form.password.data) Database.session.add(user) Database.session.commit() - flash('You have registered successfully') - return redirect(url_for('users.login')) - - return render_template('users/registration.html', form=form, title="Register") + flash("You have registered successfully") + return redirect(url_for("users.login")) + + return render_template("users/registration.html", form=form, title="Register") @bp.route("/user/<username>") @@ -81,30 +93,42 @@ # ) user = Database.first_or_404( - sa.select(User) - .where(func.lower(User.username) == func.lower(username)) + sa.select(User).where(func.lower(User.username) == func.lower(username)) ) print(f"USERNAME: {username}") print(f"FOUND USER: {user}") if user is None: - flash('User does not exist') - return redirect(url_for('pages.home')) + flash("User does not exist") + return redirect(url_for("pages.home")) - page = request.args.get('page', 1, type=int) + page = request.args.get("page", 1, type=int) query = user.posts.select().order_by(Post.created.desc()) posts = Database.paginate( - query, - page=page, - per_page=current_app.config['POSTS_PER_PAGE'], - error_out=False + query, page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False ) - next_url = url_for('users.user', username=user.username, page=posts.next_num) if posts.has_next else None + next_url = ( + url_for("users.user", username=user.username, page=posts.next_num) + if posts.has_next + else None + ) - prev_url = url_for('users.user', username=user.username, page=posts.prev_num) if posts.has_prev else None + prev_url = ( + url_for("users.user", username=user.username, page=posts.prev_num) + if posts.has_prev + else None + ) - return render_template("users/user.html", user=user, posts=posts, form=form, next_url=next_url, prev_url=prev_url, title="Profile") + return render_template( + "users/user.html", + user=user, + posts=posts, + form=form, + next_url=next_url, + prev_url=prev_url, + title="Profile", + ) @bp.before_request @@ -122,16 +146,16 @@ current_user.username = form.username.data current_user.about_me = form.about_me.data Database.session.commit() - flash('Your profile changes have been saved') - return redirect(url_for('users.edit_profile')) - elif request.method == 'GET': + flash("Your profile changes have been saved") + return redirect(url_for("users.edit_profile")) + elif request.method == "GET": form.username.data = current_user.username form.about_me.data = current_user.about_me - + return render_template("users/edit_profile.html", form=form, title="Edit Profile") -@bp.route("/follow/<username>", methods=['POST']) +@bp.route("/follow/<username>", methods=["POST"]) @login_required def follow(username): form = EmptyForm() @@ -142,21 +166,21 @@ if user is None: flash(f"User {username} not found") - return redirect(url_for('pages.home')) - + return redirect(url_for("pages.home")) + if user == current_user: - flash('You cannot follow yourself') - return redirect(url_for('users.user', username=username)) - + flash("You cannot follow yourself") + return redirect(url_for("users.user", username=username)) + current_user.follow(user) Database.session.commit() flash(f"You are following {username}") - return redirect(url_for('users.user', username=username)) + return redirect(url_for("users.user", username=username)) else: - return redirect(url_for('pages.home')) + return redirect(url_for("pages.home")) -@bp.route("/unfollow/<username>", methods=['POST']) +@bp.route("/unfollow/<username>", methods=["POST"]) @login_required def unfollow(username): form = EmptyForm() @@ -167,26 +191,26 @@ if user is None: flash(f"User {username} not found") - return redirect(url_for('pages.home')) - + return redirect(url_for("pages.home")) + if user == current_user: - flash('You cannot unfollow yourself') - return redirect(url_for('users.user', username=username)) - + flash("You cannot unfollow yourself") + return redirect(url_for("users.user", username=username)) + current_user.unfollow(user) Database.session.commit() flash(f"You are not following {username}") - return redirect(url_for('users.user', username=username)) + return redirect(url_for("users.user", username=username)) else: - return redirect(url_for('pages.home')) - + return redirect(url_for("pages.home")) + -@bp.route('/reset_password_request', methods=['GET', 'POST']) +@bp.route("/reset_password_request", methods=["GET", "POST"]) def reset_password_request(): if current_user.is_authenticated: - flash('You are already logged in') - return redirect(url_for('pages.home')) - + flash("You are already logged in") + return redirect(url_for("pages.home")) + form = ResetPasswordRequestForm() if form.validate_on_submit(): user = Database.session.scalar( @@ -195,30 +219,34 @@ if user: send_password_reset_email(user) - flash('Check your email for instructions on how to reset password') - return redirect(url_for('users.login')) + flash("Check your email for instructions on how to reset password") + return redirect(url_for("users.login")) else: - flash('User does not exist!') - - return render_template('users/reset_password_request.html', form=form, title="Reset Password") + flash("User does not exist!") + + return render_template( + "users/reset_password_request.html", form=form, title="Reset Password" + ) -@bp.route('/reset_password/<token>', methods=["GET", "POST"]) +@bp.route("/reset_password/<token>", methods=["GET", "POST"]) def reset_password(token): if current_user.is_authenticated: - flash('You are already logged in') - return redirect(url_for('pages.home')) - + flash("You are already logged in") + return redirect(url_for("pages.home")) + user = User.verify_reset_password_token(token) if not user: - flash('User not found for password reset') - return redirect(url_for('pages.home')) + flash("User not found for password reset") + return redirect(url_for("pages.home")) form = ResetPasswordForm() if form.validate_on_submit(): user.set_password(form.password.data) Database.session.commit() - flash('Your password has been reset') - return redirect(url_for('users.login')) - - return render_template('users/reset_password.html', form=form, token=token, title="Reset Password") \ No newline at end of file + flash("Your password has been reset") + return redirect(url_for("users.login")) + + return render_template( + "users/reset_password.html", form=form, token=token, title="Reset Password" + ) --- gunicorn.conf.py +++ gunicorn.conf.py @@ -3,9 +3,11 @@ import multiprocessing -bind = ':5000' +bind = ":5000" workers = multiprocessing.cpu_count() -if os.getenv('AUTO_RELOAD'): +if os.getenv("AUTO_RELOAD"): reload = True - reload_extra_files = glob('board/templates/**/*.html', recursive=True) + glob('board/static/*.css', recursive=True) \ No newline at end of file + reload_extra_files = glob("board/templates/**/*.html", recursive=True) + glob( + "board/static/*.css", recursive=True + ) --- migrations/env.py +++ migrations/env.py @@ -12,32 +12,31 @@ # Interpret the config file for Python logging. # This line sets up loggers basically. fileConfig(config.config_file_name) -logger = logging.getLogger('alembic.env') +logger = logging.getLogger("alembic.env") def get_engine(): try: # this works with Flask-SQLAlchemy<3 and Alchemical - return current_app.extensions['migrate'].db.get_engine() + return current_app.extensions["migrate"].db.get_engine() except (TypeError, AttributeError): # this works with Flask-SQLAlchemy>=3 - return current_app.extensions['migrate'].db.engine + return current_app.extensions["migrate"].db.engine def get_engine_url(): try: - return get_engine().url.render_as_string(hide_password=False).replace( - '%', '%%') + return get_engine().url.render_as_string(hide_password=False).replace("%", "%%") except AttributeError: - return str(get_engine().url).replace('%', '%%') + return str(get_engine().url).replace("%", "%%") # add your model's MetaData object here # for 'autogenerate' support # from myapp import mymodel # target_metadata = mymodel.Base.metadata -config.set_main_option('sqlalchemy.url', get_engine_url()) -target_db = current_app.extensions['migrate'].db +config.set_main_option("sqlalchemy.url", get_engine_url()) +target_db = current_app.extensions["migrate"].db # other values from the config, defined by the needs of env.py, # can be acquired: @@ -46,7 +45,7 @@ def get_metadata(): - if hasattr(target_db, 'metadatas'): + if hasattr(target_db, "metadatas"): return target_db.metadatas[None] return target_db.metadata @@ -65,8 +64,7 @@ """ url = config.get_main_option("sqlalchemy.url") context.configure( - url=url, target_metadata=get_metadata(), literal_binds=True, - compare_type=True + url=url, target_metadata=get_metadata(), literal_binds=True, compare_type=True ) with context.begin_transaction(): @@ -85,13 +83,13 @@ # when there are no changes to the schema # reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html def process_revision_directives(context, revision, directives): - if getattr(config.cmd_opts, 'autogenerate', False): + if getattr(config.cmd_opts, "autogenerate", False): script = directives[0] if script.upgrade_ops.is_empty(): directives[:] = [] - logger.info('No changes in schema detected.') + logger.info("No changes in schema detected.") - conf_args = current_app.extensions['migrate'].configure_args + conf_args = current_app.extensions["migrate"].configure_args if conf_args.get("process_revision_directives") is None: conf_args["process_revision_directives"] = process_revision_directives @@ -99,9 +97,7 @@ with connectable.connect() as connection: context.configure( - connection=connection, - target_metadata=get_metadata(), - **conf_args + connection=connection, target_metadata=get_metadata(), **conf_args ) with context.begin_transaction(): --- migrations/versions/343d75cb7e92_add_new_user_fields.py +++ migrations/versions/343d75cb7e92_add_new_user_fields.py @@ -5,30 +5,33 @@ Create Date: 2025-06-15 12:56:02.953010 """ + from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '343d75cb7e92' -down_revision = '91d28e4d9bcd' +revision = "343d75cb7e92" +down_revision = "91d28e4d9bcd" branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.add_column(sa.Column('about_me', sa.String(length=140), nullable=True)) - batch_op.add_column(sa.Column('last_seen', sa.DateTime(timezone=True), nullable=True)) + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.add_column(sa.Column("about_me", sa.String(length=140), nullable=True)) + batch_op.add_column( + sa.Column("last_seen", sa.DateTime(timezone=True), nullable=True) + ) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.drop_column('last_seen') - batch_op.drop_column('about_me') + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.drop_column("last_seen") + batch_op.drop_column("about_me") # ### end Alembic commands ### --- migrations/versions/63b1599550fc_add_username_constraints.py +++ migrations/versions/63b1599550fc_add_username_constraints.py @@ -5,28 +5,31 @@ Create Date: 2025-06-15 13:43:31.537580 """ + from alembic import op -import sqlalchemy as sa # noqa: F401 +import sqlalchemy as sa # noqa: F401 # revision identifiers, used by Alembic. -revision = '63b1599550fc' -down_revision = '343d75cb7e92' +revision = "63b1599550fc" +down_revision = "343d75cb7e92" branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_users_username'), ['username'], unique=True) + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.create_index( + batch_op.f("ix_users_username"), ["username"], unique=True + ) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_users_username')) + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_users_username")) # ### end Alembic commands ### --- migrations/versions/82d5e7b23e28_add_language_to_post.py +++ migrations/versions/82d5e7b23e28_add_language_to_post.py @@ -5,28 +5,29 @@ Create Date: 2025-08-01 15:19:47.077103 """ + from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '82d5e7b23e28' -down_revision = 'e93ccd60891f' +revision = "82d5e7b23e28" +down_revision = "e93ccd60891f" branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('posts', schema=None) as batch_op: - batch_op.add_column(sa.Column('language', sa.String(length=5), nullable=True)) + with op.batch_alter_table("posts", schema=None) as batch_op: + batch_op.add_column(sa.Column("language", sa.String(length=5), nullable=True)) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('posts', schema=None) as batch_op: - batch_op.drop_column('language') + with op.batch_alter_table("posts", schema=None) as batch_op: + batch_op.drop_column("language") # ### end Alembic commands ### --- migrations/versions/91d28e4d9bcd_init.py +++ migrations/versions/91d28e4d9bcd_init.py @@ -1,16 +1,17 @@ """Setup Database Revision ID: 91d28e4d9bcd -Revises: +Revises: Create Date: 2025-06-13 14:00:11.591660 """ + from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '91d28e4d9bcd' +revision = "91d28e4d9bcd" down_revision = None branch_labels = None depends_on = None @@ -18,38 +19,43 @@ def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.create_table('users', - sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - sa.Column('username', sa.String(length=256), nullable=False), - sa.Column('email', sa.String(), nullable=False), - sa.Column('password_hash', sa.String(length=256), nullable=True), - sa.PrimaryKeyConstraint('id') + op.create_table( + "users", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("username", sa.String(length=256), nullable=False), + sa.Column("email", sa.String(), nullable=False), + sa.Column("password_hash", sa.String(length=256), nullable=True), + sa.PrimaryKeyConstraint("id"), ) - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_users_email'), ['email'], unique=True) + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_users_email"), ["email"], unique=True) - op.create_table('posts', - sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - sa.Column('message', sa.Text(), nullable=False), - sa.Column('created', sa.DateTime(timezone=True), nullable=True), - sa.Column('user_id', sa.Integer(), nullable=True), - sa.ForeignKeyConstraint(['user_id'], ['users.id'], ), - sa.PrimaryKeyConstraint('id') + op.create_table( + "posts", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("message", sa.Text(), nullable=False), + sa.Column("created", sa.DateTime(timezone=True), nullable=True), + sa.Column("user_id", sa.Integer(), nullable=True), + sa.ForeignKeyConstraint( + ["user_id"], + ["users.id"], + ), + sa.PrimaryKeyConstraint("id"), ) - with op.batch_alter_table('posts', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_posts_user_id'), ['user_id'], unique=False) + with op.batch_alter_table("posts", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_posts_user_id"), ["user_id"], unique=False) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('posts', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_posts_user_id')) + with op.batch_alter_table("posts", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_posts_user_id")) - op.drop_table('posts') - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_users_email')) + op.drop_table("posts") + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_users_email")) - op.drop_table('users') + op.drop_table("users") # ### end Alembic commands ### --- migrations/versions/e93ccd60891f_add_followers.py +++ migrations/versions/e93ccd60891f_add_followers.py @@ -5,38 +5,50 @@ Create Date: 2025-06-16 16:27:23.142554 """ + from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = 'e93ccd60891f' -down_revision = '63b1599550fc' +revision = "e93ccd60891f" +down_revision = "63b1599550fc" branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.create_table('followers', - sa.Column('follower_id', sa.Integer(), nullable=False), - sa.Column('followed_id', sa.Integer(), nullable=False), - sa.ForeignKeyConstraint(['followed_id'], ['users.id'], ), - sa.ForeignKeyConstraint(['follower_id'], ['users.id'], ), - sa.PrimaryKeyConstraint('follower_id', 'followed_id') + op.create_table( + "followers", + sa.Column("follower_id", sa.Integer(), nullable=False), + sa.Column("followed_id", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ["followed_id"], + ["users.id"], + ), + sa.ForeignKeyConstraint( + ["follower_id"], + ["users.id"], + ), + sa.PrimaryKeyConstraint("follower_id", "followed_id"), ) - with op.batch_alter_table('followers', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_followers_followed_id'), ['followed_id'], unique=False) - batch_op.create_index(batch_op.f('ix_followers_follower_id'), ['follower_id'], unique=False) + with op.batch_alter_table("followers", schema=None) as batch_op: + batch_op.create_index( + batch_op.f("ix_followers_followed_id"), ["followed_id"], unique=False + ) + batch_op.create_index( + batch_op.f("ix_followers_follower_id"), ["follower_id"], unique=False + ) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('followers', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_followers_follower_id')) - batch_op.drop_index(batch_op.f('ix_followers_followed_id')) + with op.batch_alter_table("followers", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_followers_follower_id")) + batch_op.drop_index(batch_op.f("ix_followers_followed_id")) - op.drop_table('followers') + op.drop_table("followers") # ### end Alembic commands ### --- test_bedrock.py +++ test_bedrock.py @@ -25,29 +25,16 @@ """ req = { - "messages": [ - { - "role": "user", - "content": [ - { - "text": prompt - } - ] - } - ], - "inferenceConfig": { - "temperature": 0.1, - "topP": 0.9, - "maxTokens": 10240 - } + "messages": [{"role": "user", "content": [{"text": prompt}]}], + "inferenceConfig": {"temperature": 0.1, "topP": 0.9, "maxTokens": 10240}, } bedrock_response = bedrock_client.invoke_model( - modelId = "us.amazon.nova-micro-v1:0", + modelId="us.amazon.nova-micro-v1:0", body=json.dumps(req), - contentType="application/json" + contentType="application/json", ) resp = json.loads(bedrock_response["body"].read()) - translation = resp["output"]["message"]["content"][0]['text'] + translation = resp["output"]["message"]["content"][0]["text"] print(translation) --- tests/conftest.py +++ tests/conftest.py @@ -4,13 +4,15 @@ from board.database import Database, User -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def app(): app = create_app() - app.config.update({ - "TESTING": True, - "WTF_CSRF_ENABLED": False # needed to disable CSRF protection in forms else tests will fail - }) + app.config.update( + { + "TESTING": True, + "WTF_CSRF_ENABLED": False, # needed to disable CSRF protection in forms else tests will fail + } + ) # Other setup can go here app.register_blueprint(pages.bp, name="testpages") @@ -22,7 +24,7 @@ username="chee", email="[email protected]", ) - + user.set_password("testing") Database.session.add(user) Database.session.commit() @@ -37,48 +39,48 @@ Database.session.commit() -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def client(app): return app.test_client() -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def new_user(): - user = User(email='[email protected]',username='New User') + user = User(email="[email protected]", username="New User") return user -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def second_user(): - user = User(email='[email protected]', username='Second User') + user = User(email="[email protected]", username="Second User") return user -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def test_user(app): with app.app_context(): - user = Database.session.scalar( + user = Database.session.scalar( Database.select(User).where(User.email == "[email protected]") ) return user -@pytest.fixture(scope='function') +@pytest.fixture(scope="function") def login_user(app, test_user): app.test_client_class = FlaskLoginClient with app.test_client(user=test_user) as client: yield client - client.get('/logout') + client.get("/logout") -# Sets the session ID directly -@pytest.fixture(scope='function') +# Sets the session ID directly +@pytest.fixture(scope="function") def login_user_via_session(app, client, test_user): with client.session_transaction() as session: - session['_user_id'] = test_user.id - + session["_user_id"] = test_user.id + yield client - - client.get('/logout') + + client.get("/logout") --- tests/functional/test_pages.py +++ tests/functional/test_pages.py @@ -4,14 +4,14 @@ def test_request_home(client): response = client.get("/", follow_redirects=True) assert response.status_code == 200 - assert b'Login' in response.data + assert b"Login" in response.data def test_request_home_authenticated(login_user_via_session): resp = login_user_via_session.get("/") - assert b'/user/chee' in resp.data - assert b'Logout' in resp.data - assert b'Say something' in resp.data + assert b"/user/chee" in resp.data + assert b"Logout" in resp.data + assert b"Say something" in resp.data # The login_user is for logging in @@ -21,20 +21,17 @@ # Use the FlaskLoginCLient session_transaction block to retrieve the saved session user ID # Can also replace the session['_user_id'] directly to simulate a logged in user... - # + # with login_user.session_transaction() as session: - session_id = int(session['_user_id']) + session_id = int(session["_user_id"]) with app.app_context(): session_user = Database.session.scalar( Database.select(User).where(User.id == session_id) ) - new_user.set_password('MyNewPassword') - post = Post( - message='A test post', - author=new_user - ) + new_user.set_password("MyNewPassword") + post = Post(message="A test post", author=new_user) Database.session.add(new_user) Database.session.add(post) Database.session.commit() @@ -57,4 +54,4 @@ def test_not_found(client): response = client.get("/nan") assert response.status_code == 404 - assert b"Page not found" in response.data \ No newline at end of file + assert b"Page not found" in response.data --- tests/functional/test_posts.py +++ tests/functional/test_posts.py @@ -5,7 +5,7 @@ """ response = client.get("/posts/new", follow_redirects=True) assert response.status_code == 200 - assert b'Please log in to access this page' in response.data + assert b"Please log in to access this page" in response.data def test_get_new_posts_authenticated(login_user): @@ -25,45 +25,42 @@ Test /posts/create without valid session user Should redirect to login page """ - response = client.post("/posts/create", - follow_redirects=True) + response = client.post("/posts/create", follow_redirects=True) assert response.status_code == 200 assert len(response.history) == 1 - assert b'Please log in to access this page' in response.data + assert b"Please log in to access this page" in response.data def test_post_create_authenticated(login_user): - response = login_user.post("/posts/create", - data={"message": "Test Message"}, - follow_redirects=True) + response = login_user.post( + "/posts/create", data={"message": "Test Message"}, follow_redirects=True + ) assert response.status_code == 200 assert len(response.history) == 1 - assert b'Thanks for posting' in response.data - assert b'Test Message' in response.data + assert b"Thanks for posting" in response.data + assert b"Test Message" in response.data def test_post_create_invalid_authenticated(login_user): - response = login_user.post("/posts/create", - data={"message": None}, - follow_redirects=True) + response = login_user.post( + "/posts/create", data={"message": None}, follow_redirects=True + ) assert response.status_code == 200 - assert b'Message cannot be blank' in response.data + assert b"Message cannot be blank" in response.data def test_post_list(client): - response = client.get("/posts", - follow_redirects=True) + response = client.get("/posts", follow_redirects=True) assert response.status_code == 200 assert len(response.history) == 1 - assert b'Please log in to access this page' + assert b"Please log in to access this page" def test_post_list_authenticated(login_user): - login_user.post("/posts/create", - data={"message": "Test Message New"}) - + login_user.post("/posts/create", data={"message": "Test Message New"}) + response = login_user.get("/posts") assert response.status_code == 200 - assert b'Posts' in response.data - assert b'Test Message New' in response.data + assert b"Posts" in response.data + assert b"Test Message New" in response.data assert b'<a href="/user/chee">chee</a> said' in response.data --- tests/functional/test_users.py +++ tests/functional/test_users.py @@ -1,156 +1,213 @@ def test_get_login(client): resp = client.get("/login") assert resp.status_code == 200 - assert b'Sign In' in resp.data - assert b'Email' in resp.data - assert b'Password' in resp.data - assert b'Click to Register' in resp.data + assert b"Sign In" in resp.data + assert b"Email" in resp.data + assert b"Password" in resp.data + assert b"Click to Register" in resp.data def test_post_login(client): - resp = client.post("/login", - data={"email": "[email protected]", "password": "testing"}, - follow_redirects=True) + resp = client.post( + "/login", + data={"email": "[email protected]", "password": "testing"}, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'Hi, chee' in resp.data - assert b'Logout' in resp.data + assert b"Hi, chee" in resp.data + assert b"Logout" in resp.data client.get("/logout") def test_post_login_invalid_password(client): - resp = client.post("/login", - data={"email": "[email protected]", "password": "wrong"}, - follow_redirects=True) - + resp = client.post( + "/login", + data={"email": "[email protected]", "password": "wrong"}, + follow_redirects=True, + ) + assert resp.status_code == 200 - assert b'Invalid username or password' in resp.data - assert b'Sign In' in resp.data + assert b"Invalid username or password" in resp.data + assert b"Sign In" in resp.data def test_post_login_invalid_email(client): - resp = client.post("/login", - data={"email": "[email protected]", "password": "testing"}, - follow_redirects=True) + resp = client.post( + "/login", + data={"email": "[email protected]", "password": "testing"}, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'Invalid username or password' in resp.data - assert b'Sign In' in resp.data + assert b"Invalid username or password" in resp.data + assert b"Sign In" in resp.data def test_post_login_if_authenticated(login_user): - resp = login_user.post("login", - data={"email": "[email protected]", "password": "testing"}, - follow_redirects=True) + resp = login_user.post( + "login", + data={"email": "[email protected]", "password": "testing"}, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'Logout' in resp.data + assert b"Logout" in resp.data def test_get_logout(login_user): - resp = login_user.get("/logout", - follow_redirects=True) + resp = login_user.get("/logout", follow_redirects=True) assert resp.status_code == 200 - assert b'Login' in resp.data + assert b"Login" in resp.data def test_get_register(client): resp = client.get("/register") assert resp.status_code == 200 - assert b'Register' in resp.data - assert b'Username' in resp.data - assert b'Password' in resp.data - assert b'Repeat Password' in resp.data + assert b"Register" in resp.data + assert b"Username" in resp.data + assert b"Password" in resp.data + assert b"Repeat Password" in resp.data def test_post_register(client): - resp = client.post("/register", - data={"username": "second user", "email": "[email protected]", "password": "testing", "password2": "testing"}, - follow_redirects=True) + resp = client.post( + "/register", + data={ + "username": "second user", + "email": "[email protected]", + "password": "testing", + "password2": "testing", + }, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'You have registered successfully' in resp.data - assert b'Sign In' in resp.data + assert b"You have registered successfully" in resp.data + assert b"Sign In" in resp.data - resp = client.post("/login", - data={"email": "[email protected]", "password": "testing"}, - follow_redirects=True) + resp = client.post( + "/login", + data={"email": "[email protected]", "password": "testing"}, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'Hi, second user' in resp.data - assert b'Logout' in resp.data + assert b"Hi, second user" in resp.data + assert b"Logout" in resp.data client.get("/logout") def test_post_register_missing_username(client): - resp = client.post("/register", - data={"email": "[email protected]", "password": "testing", "password2": "testing"}, - follow_redirects=True) - assert b'Register' in resp.data - assert b'This field is required' in resp.data + resp = client.post( + "/register", + data={ + "email": "[email protected]", + "password": "testing", + "password2": "testing", + }, + follow_redirects=True, + ) + assert b"Register" in resp.data + assert b"This field is required" in resp.data def test_post_register_invalid_email(client): - resp = client.post("/register", - data={"username": "test user", "email": "none", "password": "testing", "password2": "testing"}, - follow_redirects=True) - assert b'Register' in resp.data - assert b'Invalid email address' in resp.data + resp = client.post( + "/register", + data={ + "username": "test user", + "email": "none", + "password": "testing", + "password2": "testing", + }, + follow_redirects=True, + ) + assert b"Register" in resp.data + assert b"Invalid email address" in resp.data def test_post_register_exists_username(client): - resp = client.post("/register", - data={"username": "second user", "email": "[email protected]", "password": "testing", "password2": "testing"}, - follow_redirects=True) - assert b'Register' in resp.data - assert b'Please use a different username' in resp.data + resp = client.post( + "/register", + data={ + "username": "second user", + "email": "[email protected]", + "password": "testing", + "password2": "testing", + }, + follow_redirects=True, + ) + assert b"Register" in resp.data + assert b"Please use a different username" in resp.data def test_post_register_exists_email(client): - resp = client.post("/register", - data={"username": "third user", "email": "[email protected]", "password": "testing", "password2": "testing"}, - follow_redirects=True) - assert b'Register' in resp.data - assert b'Please use a different email address' in resp.data + resp = client.post( + "/register", + data={ + "username": "third user", + "email": "[email protected]", + "password": "testing", + "password2": "testing", + }, + follow_redirects=True, + ) + assert b"Register" in resp.data + assert b"Please use a different email address" in resp.data def test_post_register_missing_password(client): - resp = client.post("/register", - data={"username": "third user", "email": "[email protected]"}, - follow_redirects=True) - assert b'Register' in resp.data - assert b'This field is required' in resp.data + resp = client.post( + "/register", + data={"username": "third user", "email": "[email protected]"}, + follow_redirects=True, + ) + assert b"Register" in resp.data + assert b"This field is required" in resp.data def test_post_register_mismatch_password(client): - resp = client.post("/register", - data={"username": "third user", "email": "[email protected]", "password": "testing", "password2": "no match"}, - follow_redirects=True) - assert b'Register' in resp.data - assert b'Field must be equal to password' in resp.data + resp = client.post( + "/register", + data={ + "username": "third user", + "email": "[email protected]", + "password": "testing", + "password2": "no match", + }, + follow_redirects=True, + ) + assert b"Register" in resp.data + assert b"Field must be equal to password" in resp.data def test_reset_password_request_when_authenticated(login_user): - resp = login_user.get('/reset_password_request', follow_redirects=True) + resp = login_user.get("/reset_password_request", follow_redirects=True) assert resp.status_code == 200 - assert b'You are already logged in' in resp.data + assert b"You are already logged in" in resp.data def test_reset_password_request(client): - resp = client.post("/reset_password_request", - data={"email": "[email protected]"}, - follow_redirects=True) + resp = client.post( + "/reset_password_request", + data={"email": "[email protected]"}, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'Check your email for instructions on how to reset password' in resp.data + assert b"Check your email for instructions on how to reset password" in resp.data def test_reset_password_request_no_user(client): - resp = client.post("/reset_password_request", - data={"email": "[email protected]"}, - follow_redirects=True) + resp = client.post( + "/reset_password_request", + data={"email": "[email protected]"}, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'User does not exist' in resp.data + assert b"User does not exist" in resp.data def test_reset_password_when_authenticated(login_user): - resp = login_user.get('/reset_password/token', follow_redirects=True) + resp = login_user.get("/reset_password/token", follow_redirects=True) assert resp.status_code == 200 - assert b'You are already logged in' in resp.data + assert b"You are already logged in" in resp.data + -# TODO: Need to mock out jwt and User.verify_reset_password_token to continue testinng the reset password route \ No newline at end of file +# TODO: Need to mock out jwt and User.verify_reset_password_token to continue testinng the reset password route --- tests/unit/test_models.py +++ tests/unit/test_models.py @@ -16,16 +16,16 @@ def test_set_password(new_user): - new_user.set_password('MyNewPassword') - assert new_user.password_hash != 'MyNewPassword' - assert new_user.check_password('MyNewPassword') - assert not new_user.check_password('Wrong') + new_user.set_password("MyNewPassword") + assert new_user.password_hash != "MyNewPassword" + assert new_user.check_password("MyNewPassword") + assert not new_user.check_password("Wrong") def test_has_many_posts(app, new_user): with app.app_context(): - new_user.set_password('MyNewPassword') - post = Post(message='A test post', author=new_user) + new_user.set_password("MyNewPassword") + post = Post(message="A test post", author=new_user) post2 = Post(message="A second post", author=new_user) Database.session.add(new_user) @@ -40,10 +40,10 @@ def test_user_avatar(new_user): - hashed = md5(new_user.email.lower().encode('utf-8')).hexdigest() + hashed = md5(new_user.email.lower().encode("utf-8")).hexdigest() profile_pic = new_user.avatar(20) assert hashed in profile_pic - assert '20' in profile_pic + assert "20" in profile_pic def test_user_following(app, new_user, second_user): @@ -52,7 +52,7 @@ Database.session.add(second_user) Database.session.commit() new_user.follow(second_user) - + assert new_user.is_following(second_user) assert new_user.following_count() == 1 assert second_user.followers_count() == 1 @@ -104,12 +104,13 @@ # Tests that no user is returned as jwt.decode failed assert User.verify_reset_password_token(token) is None - mock_logger.assert_called_once_with("Failure in JWT decode of password reset token") + mock_logger.assert_called_once_with( + "Failure in JWT decode of password reset token" + ) - def test_post(new_user): post = Post(message="Test Post", author=new_user, id=123) assert post.author == new_user assert post.message == "Test Post" - assert post.__repr__() == f"<Post: {post.id}>" \ No newline at end of file + assert post.__repr__() == f"<Post: {post.id}>" ```