```diff --- board/__init__.py +++ board/__init__.py @@ -18,7 +18,7 @@ def get_locale(): - return request.accept_languages.best_match(['en', 'es']) + return request.accept_languages.best_match(["en", "es"]) def create_app(): @@ -38,18 +38,16 @@ # Setting up translation Babel(app, locale_selector=get_locale) - # Sets up Cloudwatch Logs Embedded Metrics @metric_scope def my_handler(path, method, duration, metrics): - metrics.put_dimensions({'Path': path}) - metrics.put_metric('Latency', duration, 'Milliseconds') - metrics.set_property('Method', method) - metrics.set_property('Path', path) - metrics.set_namespace('flaskappv22') + metrics.put_dimensions({"Path": path}) + metrics.put_metric("Latency", duration, "Milliseconds") + metrics.set_property("Method", method) + metrics.set_property("Path", path) + metrics.set_namespace("flaskappv22") metrics.set_timestamp(datetime.datetime.now()) - # Setup logging @app.before_request @@ -63,8 +61,8 @@ "url": request.url, "path": request.path, "client_ip": client_ip, - "user_agent": request.headers.get("user-agent") - } + "user_agent": request.headers.get("user-agent"), + }, ) @after_this_request @@ -75,7 +73,7 @@ log_level = logging.ERROR elif response.status_code >= 400: log_level = logging.WARNING - + app.logger.log( log_level, f"{request.method} to {request.path} completed with status {response.status_code}", @@ -84,8 +82,8 @@ "url": request.url, "path": request.path, "status_code": response.status_code, - "latency": duration - } + "latency": duration, + }, ) if os.getenv("ENV_TYPE") == "prod": @@ -93,38 +91,41 @@ return response - # Set up moment js @app.before_request def moment_before_request(): g.locale = str(get_locale()) - + # moment = Moment(app) Moment(app) # Set up flask-mail - app.config['MAIL_SERVER'] = os.getenv('MAIL_SERVER') - app.config['MAIL_PORT'] = os.getenv('MAIL_PORT') + app.config["MAIL_SERVER"] = os.getenv("MAIL_SERVER") + app.config["MAIL_PORT"] = os.getenv("MAIL_PORT") Mailer.init_app(app) login.login_manager.init_app(app) login.login_manager.login_view = "users.login" - login.login_message = _l('Please login to access this page.') + login.login_message = _l("Please login to access this page.") if os.environ.get("TESTING"): - app.config["SQLALCHEMY_DATABASE_URI"] = f"postgresql://{os.environ.get('RDS_TEST_USERNAME')}:{os.environ.get('RDS_TEST_PASSWORD')}@{os.environ.get('RDS_HOSTNAME')}:{os.environ.get('RDS_TEST_PORT')}/{os.environ.get('RDS_TEST_DB_NAME')}" + app.config["SQLALCHEMY_DATABASE_URI"] = ( + f"postgresql://{os.environ.get('RDS_TEST_USERNAME')}:{os.environ.get('RDS_TEST_PASSWORD')}@{os.environ.get('RDS_HOSTNAME')}:{os.environ.get('RDS_TEST_PORT')}/{os.environ.get('RDS_TEST_DB_NAME')}" + ) else: - app.config["SQLALCHEMY_DATABASE_URI"] = f"postgresql://{os.environ.get('RDS_USERNAME')}:{os.environ.get('RDS_PASSWORD')}@{os.environ.get('RDS_HOSTNAME')}:{os.environ.get('RDS_PORT')}/{os.environ.get('RDS_DB_NAME')}" + app.config["SQLALCHEMY_DATABASE_URI"] = ( + f"postgresql://{os.environ.get('RDS_USERNAME')}:{os.environ.get('RDS_PASSWORD')}@{os.environ.get('RDS_HOSTNAME')}:{os.environ.get('RDS_PORT')}/{os.environ.get('RDS_DB_NAME')}" + ) # Set pagination value - app.config['POSTS_PER_PAGE'] = int(os.getenv("POSTS_PER_PAGE", 20)) + app.config["POSTS_PER_PAGE"] = int(os.getenv("POSTS_PER_PAGE", 20)) database.Database.init_app(app) # Adds migration commands via Flask-migrate but doesn't run the actual migration... Migrate(app, database.Database) # Migrate.init_app(app, database.Database) - + app.register_blueprint(pages.bp) app.register_blueprint(posts.bp) app.register_blueprint(users.bp) @@ -132,7 +133,7 @@ app.register_error_handler(404, errors.page_not_found) app.register_error_handler(500, errors.internal_error) app.register_error_handler(Exception, errors.unhandled_exception_handler) - + # app.logger.info("Microblog startup") return app --- board/cli.py +++ board/cli.py @@ -30,14 +30,43 @@ try: with open(messages_path, "w") as _tmp: - result = subprocess.run([pybabel_bin, "extract", "-F", "babel.cfg", "-k", "_l", "-o", messages_path, "."], capture_output=True, text=True, check=True) + result = subprocess.run( + [ + pybabel_bin, + "extract", + "-F", + "babel.cfg", + "-k", + "_l", + "-o", + messages_path, + ".", + ], + capture_output=True, + text=True, + check=True, + ) print(result.stdout) print(result.stderr) if result.returncode != 0: raise RuntimeError("extract command failed") - result = subprocess.run([pybabel_bin, "init", "-i", messages_path, "-d", "board.translations", "-l", lang], capture_output=True, text=True, check=True) + result = subprocess.run( + [ + pybabel_bin, + "init", + "-i", + messages_path, + "-d", + "board.translations", + "-l", + lang, + ], + capture_output=True, + text=True, + check=True, + ) print(result.stdout) print(result.stderr) @@ -56,16 +85,43 @@ try: with open(messages_path, "w") as _tmp: - result = subprocess.run([pybabel_bin, "extract", "-F", "babel.cfg", "-k", "_l", "-o", messages_path, "."], capture_output=True, text=True, check=True) + result = subprocess.run( + [ + pybabel_bin, + "extract", + "-F", + "babel.cfg", + "-k", + "_l", + "-o", + messages_path, + ".", + ], + capture_output=True, + text=True, + check=True, + ) print(result.stdout) print(result.stderr) if result.returncode != 0: raise RuntimeError("Babel extract command failed") - - result = subprocess.run([pybabel_bin, "update", "-i", messages_path, "-d", "board/translations"], capture_output=True, text=True, check=True) + result = subprocess.run( + [ + pybabel_bin, + "update", + "-i", + messages_path, + "-d", + "board/translations", + ], + capture_output=True, + text=True, + check=True, + ) + print(result.stdout) print(result.stderr) @@ -82,7 +138,12 @@ Compile all languages """ - result = subprocess.run([pybabel_bin, "compile", "-d", "board/translations"], capture_output=True, text=True, check=True) + result = subprocess.run( + [pybabel_bin, "compile", "-d", "board/translations"], + capture_output=True, + text=True, + check=True, + ) print(result.stdout) print(result.stderr) --- board/database.py +++ board/database.py @@ -25,84 +25,84 @@ # Users followers table followers = Table( - 'followers', + "followers", Base.metadata, - Column('follower_id', - Integer(), - ForeignKey('users.id'), - primary_key=True, - index=True), - Column('followed_id', - Integer(), - ForeignKey('users.id'), - primary_key=True, - index=True) + Column( + "follower_id", Integer(), ForeignKey("users.id"), primary_key=True, index=True + ), + Column( + "followed_id", Integer(), ForeignKey("users.id"), primary_key=True, index=True + ), ) class User(UserMixin, Database.Model): - __tablename__ = 'users' + __tablename__ = "users" id = mapped_column(Integer(), primary_key=True, autoincrement=True) - username = mapped_column(String(length=256), unique=True, index=True, nullable=False) + username = mapped_column( + String(length=256), unique=True, index=True, nullable=False + ) email = mapped_column(String(), unique=True, index=True, nullable=False) password_hash = mapped_column(String(length=256)) about_me = mapped_column(String(length=140)) - last_seen = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) + last_seen = mapped_column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) # Posts - posts: WriteOnlyMapped['Post'] = relationship('Post', back_populates='author') + posts: WriteOnlyMapped["Post"] = relationship("Post", back_populates="author") # Followers - following: WriteOnlyMapped['User'] = relationship( + following: WriteOnlyMapped["User"] = relationship( secondary=followers, primaryjoin=(followers.c.follower_id == id), secondaryjoin=(followers.c.followed_id == id), - back_populates='followers') - - followers: WriteOnlyMapped['User'] = relationship( + back_populates="followers", + ) + + followers: WriteOnlyMapped["User"] = relationship( secondary=followers, primaryjoin=(followers.c.followed_id == id), secondaryjoin=(followers.c.follower_id == id), - back_populates='following') + back_populates="following", + ) def set_password(self, password): self.password_hash = generate_password_hash(password) - + def check_password(self, password): return check_password_hash(self.password_hash, password) - + def __repr__(self): return f"" - + def avatar(self, size): - digest = hashlib.new("md5", self.email.lower().encode('utf-8'), usedforsecurity=False).hexdigest() - return f'https://www.gravatar.com/avatar/{digest}?d=identicon&s={size}' - + digest = hashlib.new( + "md5", self.email.lower().encode("utf-8"), usedforsecurity=False + ).hexdigest() + return f"https://www.gravatar.com/avatar/{digest}?d=identicon&s={size}" + def follow(self, user): if not self.is_following(user): self.following.add(user) - + def unfollow(self, user): if self.is_following(user): self.following.remove(user) - + def is_following(self, user): query = self.following.select().where(User.id == user.id) return Database.session.scalar(query) is not None - + def followers_count(self): - query = Select(func.count()).select_from( - self.followers.select().subquery() - ) + query = Select(func.count()).select_from(self.followers.select().subquery()) return Database.session.scalar(query) - + def following_count(self): - query = Select(func.count()).select_from( - self.following.select().subquery() - ) + query = Select(func.count()).select_from(self.following.select().subquery()) return Database.session.scalar(query) - + def following_posts(self): Author = so.aliased(User) Follower = so.aliased(User) @@ -111,45 +111,49 @@ Select(Post) .join(Post.author.of_type(Author)) .join(Author.followers.of_type(Follower), isouter=True) - .where(sa.or_( - Follower.id == self.id, - # Author.id == self.id, - )) + .where( + sa.or_( + Follower.id == self.id, + # Author.id == self.id, + ) + ) .group_by(Post) .order_by(Post.created.desc()) ) - + def get_password_reset_token(self, expires_in=600): return jwt.encode( - {'reset_password': self.id, 'exp': time() + expires_in}, + {"reset_password": self.id, "exp": time() + expires_in}, key=os.getenv("APP_SECRET"), - algorithm='HS256' + algorithm="HS256", ) - + @staticmethod def verify_reset_password_token(token): try: - id = jwt.decode( - token, - key=os.getenv('APP_SECRET'), - algorithms=['HS256'] - )['reset_password'] + id = jwt.decode(token, key=os.getenv("APP_SECRET"), algorithms=["HS256"])[ + "reset_password" + ] except Exception as _: - current_app.logger.exception("Failure in JWT decode of password reset token") + current_app.logger.exception( + "Failure in JWT decode of password reset token" + ) return - + return Database.session.get(User, id) class Post(Database.Model): - __tablename__ = 'posts' + __tablename__ = "posts" id = mapped_column(Integer(), primary_key=True, autoincrement=True) message = mapped_column(Text(), nullable=False) - created = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) + created = mapped_column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) language = mapped_column(String(length=5), nullable=True) user_id = mapped_column(ForeignKey(User.id), index=True) - author = relationship('User', back_populates='posts') + author = relationship("User", back_populates="posts") def __repr__(self): - return f"" \ No newline at end of file + return f"" --- board/email.py +++ board/email.py @@ -6,11 +6,7 @@ def send_email(subject, sender, recipients, text_body, html_body): - msg = Message( - subject, - sender=sender, - recipients=recipients - ) + msg = Message(subject, sender=sender, recipients=recipients) msg.body = text_body msg.html = html_body Mailer.send(msg) @@ -18,10 +14,10 @@ def send_password_reset_email(user): token = user.get_password_reset_token() - send_email('[Microblog] Reset Your Password', - sender='no-reply@microblog.com', - recipients=[user.email], - text_body=render_template('email/reset_password.txt', - user=user, token=token), - html_body=render_template('email/reset_password.html', - user=user, token=token)) \ No newline at end of file + send_email( + "[Microblog] Reset Your Password", + sender="no-reply@microblog.com", + recipients=[user.email], + text_body=render_template("email/reset_password.txt", user=user, token=token), + html_body=render_template("email/reset_password.html", user=user, token=token), + ) --- board/errors.py +++ board/errors.py @@ -18,11 +18,9 @@ current_app.logger.exception("unhandled exception during request processing") response = e.get_response() - response.data = json.dumps({ - "code": e.code, - "name": e.name, - "description": e.description - }) + response.data = json.dumps( + {"code": e.code, "name": e.name, "description": e.description} + ) response.content_type = "application/json" return response --- board/form.py +++ board/form.py @@ -6,36 +6,44 @@ class LoginForm(FlaskForm): - email = StringField(_l('Email'), validators=[DataRequired(), Email()]) - password = PasswordField(_l('Password'), validators=[DataRequired()]) - remember_me = BooleanField(_l('Remember Me')) - submit = SubmitField(_l('Sign In')) + email = StringField(_l("Email"), validators=[DataRequired(), Email()]) + password = PasswordField(_l("Password"), validators=[DataRequired()]) + remember_me = BooleanField(_l("Remember Me")) + submit = SubmitField(_l("Sign In")) class RegistrationForm(FlaskForm): - username = StringField('Username', validators=[DataRequired()]) - email = StringField('Email', validators=[DataRequired(), Email()]) - password = PasswordField('Password', validators=[DataRequired()]) - password2 = PasswordField('Repeat Password', validators=[DataRequired(), EqualTo('password')]) - submit = SubmitField('Register') + username = StringField("Username", validators=[DataRequired()]) + email = StringField("Email", validators=[DataRequired(), Email()]) + password = PasswordField("Password", validators=[DataRequired()]) + password2 = PasswordField( + "Repeat Password", validators=[DataRequired(), EqualTo("password")] + ) + submit = SubmitField("Register") def validate_username(self, username): - user = database.Database.session.scalar(database.Database.select(database.User).where( - database.User.username == username.data)) + user = database.Database.session.scalar( + database.Database.select(database.User).where( + database.User.username == username.data + ) + ) if user is not None: - raise ValidationError('Please use a different username.') + raise ValidationError("Please use a different username.") def validate_email(self, email): - user = database.Database.session.scalar(database.Database.select(database.User).where( - database.User.email == email.data)) + user = database.Database.session.scalar( + database.Database.select(database.User).where( + database.User.email == email.data + ) + ) if user is not None: - raise ValidationError('Please use a different email address.') - + raise ValidationError("Please use a different email address.") + class EditProfileForm(FlaskForm): - username = StringField('Username', validators=[DataRequired()]) - about_me = TextAreaField('About Me', validators=[Length(min=0, max=140)]) - submit = SubmitField('Submit') + username = StringField("Username", validators=[DataRequired()]) + about_me = TextAreaField("About Me", validators=[Length(min=0, max=140)]) + submit = SubmitField("Submit") def __init__(self, original_username, *args, **kwargs): super().__init__(*args, **kwargs) @@ -43,29 +51,35 @@ def validate_username(self, username): if username.data != self.original_username: - user = database.Database.session.scalar(database.Database.select(database.User).where( - database.User.username == username.data)) + user = database.Database.session.scalar( + database.Database.select(database.User).where( + database.User.username == username.data + ) + ) if user is not None: - raise ValidationError('Please use a different username.') + raise ValidationError("Please use a different username.") class EmptyForm(FlaskForm): - submit = SubmitField('Submit') + submit = SubmitField("Submit") class PostForm(FlaskForm): - message = TextAreaField('Say something', validators=[DataRequired(), Length(min=1, max=140)]) - submit = SubmitField('Submit') + message = TextAreaField( + "Say something", validators=[DataRequired(), Length(min=1, max=140)] + ) + submit = SubmitField("Submit") class ResetPasswordRequestForm(FlaskForm): - email = StringField('Email', validators=[DataRequired(), Email()]) - submit = SubmitField('Submit') + email = StringField("Email", validators=[DataRequired(), Email()]) + submit = SubmitField("Submit") class ResetPasswordForm(FlaskForm): - password = PasswordField('Password', validators=[DataRequired()]) + password = PasswordField("Password", validators=[DataRequired()]) password2 = PasswordField( - 'Repeat Password', validators=[DataRequired(), EqualTo('password')]) - submit = SubmitField('Request Password Reset') + "Repeat Password", validators=[DataRequired(), EqualTo("password")] + ) + submit = SubmitField("Request Password Reset") --- board/log_context.py +++ board/log_context.py @@ -21,7 +21,7 @@ context = _log_context.get() for key, value in context.items(): setattr(record, key, value) - + return True --- board/login.py +++ board/login.py @@ -7,4 +7,4 @@ @login_manager.user_loader def load_user(id): - return database.Database.session.get(database.User, int(id)) \ No newline at end of file + return database.Database.session.get(database.User, int(id)) --- board/pages.py +++ board/pages.py @@ -1,4 +1,12 @@ -from flask import Blueprint, render_template, flash, request, redirect, url_for, current_app +from flask import ( + Blueprint, + render_template, + flash, + request, + redirect, + url_for, + current_app, +) from flask_login import login_required, current_user from langdetect import detect, LangDetectException from board.form import PostForm @@ -16,51 +24,61 @@ try: language = detect(form.message.data) except LangDetectException: - language = '' + language = "" - post = Post( - message=form.message.data, - language=language, - author=current_user - ) + post = Post(message=form.message.data, language=language, author=current_user) Database.session.add(post) Database.session.commit() - flash('Your post is now live!') - return redirect(url_for('pages.home')) - + flash("Your post is now live!") + return redirect(url_for("pages.home")) + # Show all posts of users you're currently following - page = request.args.get('page', 1, type=int) + page = request.args.get("page", 1, type=int) # posts = Database.session.scalars( # current_user.following_posts() # ).all() - posts = Database.paginate(current_user.following_posts(), - page=page, - per_page=current_app.config['POSTS_PER_PAGE'], - error_out=False) - - next_url = url_for('pages.home', page=posts.next_num) if posts.has_next else None + posts = Database.paginate( + current_user.following_posts(), + page=page, + per_page=current_app.config["POSTS_PER_PAGE"], + error_out=False, + ) + + next_url = url_for("pages.home", page=posts.next_num) if posts.has_next else None - prev_url = url_for('pages.home', page=posts.prev_num) if posts.has_prev else None + prev_url = url_for("pages.home", page=posts.prev_num) if posts.has_prev else None - return render_template("pages/home.html", title="Home Page", form=form, posts=posts.items, next_url=next_url, prev_url=prev_url) + return render_template( + "pages/home.html", + title="Home Page", + form=form, + posts=posts.items, + next_url=next_url, + prev_url=prev_url, + ) @bp.route("/explore") @login_required def explore(): - page = request.args.get('page', 1, type=int) + page = request.args.get("page", 1, type=int) query = Database.select(Post).order_by(Post.created.desc()) # posts = Database.session.scalars(query).all() - posts = Database.paginate(query, - page=page, - per_page=current_app.config['POSTS_PER_PAGE'], - error_out=False) - - next_url = url_for('pages.explore', page=posts.next_num) if posts.has_next else None + posts = Database.paginate( + query, page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False + ) + + next_url = url_for("pages.explore", page=posts.next_num) if posts.has_next else None - prev_url = url_for('pages.explore', page=posts.prev_num) if posts.has_prev else None + prev_url = url_for("pages.explore", page=posts.prev_num) if posts.has_prev else None - return render_template("pages/home.html", title="Explore", posts=posts.items, next_url=next_url, prev_url=prev_url) + return render_template( + "pages/home.html", + title="Explore", + posts=posts.items, + next_url=next_url, + prev_url=prev_url, + ) @bp.get("/about") --- board/posts.py +++ board/posts.py @@ -23,18 +23,16 @@ try: language = detect(form.message.data) except LangDetectException: - language = '' + language = "" new_post = database.Post( - message=form.message.data, - language=language, - user_id=current_user.id + message=form.message.data, language=language, user_id=current_user.id ) database.Database.session.add(new_post) database.Database.session.commit() current_app.logger.info(f"New post by author {current_user.email}") flash(f"Thanks for posting, {current_user.email}", category="success") - return redirect(url_for('posts.posts')) + return redirect(url_for("posts.posts")) else: flash("Message cannot be blank", category="error") return render_template("posts/new.html", title="Add Post") @@ -46,7 +44,9 @@ # TODO: How to call current_user.posts ??? current_app.logger.info(f"Viewing all posts by {current_user.email}") posts = database.Database.session.execute( - database.Database.select(database.Post).order_by(database.Post.created.desc()).where(database.Post.user_id == current_user.id ) + database.Database.select(database.Post) + .order_by(database.Post.created.desc()) + .where(database.Post.user_id == current_user.id) ).scalars() - return render_template("posts/posts.html", title="All posts", posts=posts) \ No newline at end of file + return render_template("posts/posts.html", title="All posts", posts=posts) --- board/translate.py +++ board/translate.py @@ -7,8 +7,7 @@ def translate(text, source_lang, dest_lang): bedrock_client = boto3.client( - "bedrock-runtime", - region_name=os.getenv("AWS_REGION", "us-east-1") + "bedrock-runtime", region_name=os.getenv("AWS_REGION", "us-east-1") ) # TODO: Add logging config to Cloudwatch Logs here: @@ -33,30 +32,17 @@ """ req = { - "messages": [ - { - "role": "user", - "content": [ - { - "text": prompt - } - ] - } - ], - "inferenceConfig": { - "temperature": 0.1, - "topP": 0.9, - "maxTokens": 10240 - } + "messages": [{"role": "user", "content": [{"text": prompt}]}], + "inferenceConfig": {"temperature": 0.1, "topP": 0.9, "maxTokens": 10240}, } bedrock_response = bedrock_client.invoke_model( - modelId = "us.amazon.nova-micro-v1:0", + modelId="us.amazon.nova-micro-v1:0", body=json.dumps(req), - contentType="application/json" + contentType="application/json", ) resp = json.loads(bedrock_response["body"].read()) - translation = resp["output"]["message"]["content"][0]['text'] + translation = resp["output"]["message"]["content"][0]["text"] return translation --- board/translations.py +++ board/translations.py @@ -11,5 +11,5 @@ def get_translation(): data = request.get_json() return { - 'text': translate(data['text'], data['source_language'], data['dest_language']) + "text": translate(data["text"], data["source_language"], data["dest_language"]) } --- board/users.py +++ board/users.py @@ -1,11 +1,26 @@ from urllib.parse import urlsplit from datetime import datetime, timezone -from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app +from flask import ( + Blueprint, + render_template, + request, + redirect, + url_for, + flash, + current_app, +) from flask_login import current_user, login_user, logout_user, login_required import sqlalchemy as sa from sqlalchemy.sql import func from board.database import Database, User, Post -from board.form import LoginForm, RegistrationForm, EditProfileForm, EmptyForm, ResetPasswordRequestForm, ResetPasswordForm +from board.form import ( + LoginForm, + RegistrationForm, + EditProfileForm, + EmptyForm, + ResetPasswordRequestForm, + ResetPasswordForm, +) from board.email import send_password_reset_email from board.log_context import add_to_log_context @@ -13,11 +28,11 @@ bp = Blueprint("users", __name__) -@bp.route("/login", methods=['GET', 'POST']) +@bp.route("/login", methods=["GET", "POST"]) def login(): if current_user.is_authenticated: - flash('User already logged in') - return redirect(url_for('pages.home')) + flash("User already logged in") + return redirect(url_for("pages.home")) form = LoginForm() @@ -26,16 +41,16 @@ Database.select(User).where(User.email == form.email.data) ) if user is None or not user.check_password(form.password.data): - flash('Invalid username or password') - return redirect(url_for('users.login')) + flash("Invalid username or password") + return redirect(url_for("users.login")) login_user(user, remember=form.remember_me.data) with add_to_log_context(user_email=form.email.data): current_app.logger.info(f"User {current_user.email} has logged in.") - - next_page = request.args.get('next') - if not next_page or urlsplit(next_page).netloc != '': - next_page = url_for('pages.home') + + next_page = request.args.get("next") + if not next_page or urlsplit(next_page).netloc != "": + next_page = url_for("pages.home") return redirect(next_page) return render_template("users/login.html", form=form, title="Login") @@ -48,28 +63,25 @@ with add_to_log_context(): current_app.logger.info("User has logged out.") - return redirect(url_for('pages.home')) + return redirect(url_for("pages.home")) -@bp.route("/register", methods=['GET', 'POST']) +@bp.route("/register", methods=["GET", "POST"]) def register(): if current_user.is_authenticated: - flash('You are already signed in') - return redirect(url_for('pages.home')) + flash("You are already signed in") + return redirect(url_for("pages.home")) form = RegistrationForm() if form.validate_on_submit(): - user = User( - username=form.username.data, - email=form.email.data - ) + user = User(username=form.username.data, email=form.email.data) user.set_password(form.password.data) Database.session.add(user) Database.session.commit() - flash('You have registered successfully') - return redirect(url_for('users.login')) - - return render_template('users/registration.html', form=form, title="Register") + flash("You have registered successfully") + return redirect(url_for("users.login")) + + return render_template("users/registration.html", form=form, title="Register") @bp.route("/user/") @@ -81,30 +93,42 @@ # ) user = Database.first_or_404( - sa.select(User) - .where(func.lower(User.username) == func.lower(username)) + sa.select(User).where(func.lower(User.username) == func.lower(username)) ) print(f"USERNAME: {username}") print(f"FOUND USER: {user}") if user is None: - flash('User does not exist') - return redirect(url_for('pages.home')) + flash("User does not exist") + return redirect(url_for("pages.home")) - page = request.args.get('page', 1, type=int) + page = request.args.get("page", 1, type=int) query = user.posts.select().order_by(Post.created.desc()) posts = Database.paginate( - query, - page=page, - per_page=current_app.config['POSTS_PER_PAGE'], - error_out=False + query, page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False ) - next_url = url_for('users.user', username=user.username, page=posts.next_num) if posts.has_next else None + next_url = ( + url_for("users.user", username=user.username, page=posts.next_num) + if posts.has_next + else None + ) - prev_url = url_for('users.user', username=user.username, page=posts.prev_num) if posts.has_prev else None + prev_url = ( + url_for("users.user", username=user.username, page=posts.prev_num) + if posts.has_prev + else None + ) - return render_template("users/user.html", user=user, posts=posts, form=form, next_url=next_url, prev_url=prev_url, title="Profile") + return render_template( + "users/user.html", + user=user, + posts=posts, + form=form, + next_url=next_url, + prev_url=prev_url, + title="Profile", + ) @bp.before_request @@ -122,16 +146,16 @@ current_user.username = form.username.data current_user.about_me = form.about_me.data Database.session.commit() - flash('Your profile changes have been saved') - return redirect(url_for('users.edit_profile')) - elif request.method == 'GET': + flash("Your profile changes have been saved") + return redirect(url_for("users.edit_profile")) + elif request.method == "GET": form.username.data = current_user.username form.about_me.data = current_user.about_me - + return render_template("users/edit_profile.html", form=form, title="Edit Profile") -@bp.route("/follow/", methods=['POST']) +@bp.route("/follow/", methods=["POST"]) @login_required def follow(username): form = EmptyForm() @@ -142,21 +166,21 @@ if user is None: flash(f"User {username} not found") - return redirect(url_for('pages.home')) - + return redirect(url_for("pages.home")) + if user == current_user: - flash('You cannot follow yourself') - return redirect(url_for('users.user', username=username)) - + flash("You cannot follow yourself") + return redirect(url_for("users.user", username=username)) + current_user.follow(user) Database.session.commit() flash(f"You are following {username}") - return redirect(url_for('users.user', username=username)) + return redirect(url_for("users.user", username=username)) else: - return redirect(url_for('pages.home')) + return redirect(url_for("pages.home")) -@bp.route("/unfollow/", methods=['POST']) +@bp.route("/unfollow/", methods=["POST"]) @login_required def unfollow(username): form = EmptyForm() @@ -167,26 +191,26 @@ if user is None: flash(f"User {username} not found") - return redirect(url_for('pages.home')) - + return redirect(url_for("pages.home")) + if user == current_user: - flash('You cannot unfollow yourself') - return redirect(url_for('users.user', username=username)) - + flash("You cannot unfollow yourself") + return redirect(url_for("users.user", username=username)) + current_user.unfollow(user) Database.session.commit() flash(f"You are not following {username}") - return redirect(url_for('users.user', username=username)) + return redirect(url_for("users.user", username=username)) else: - return redirect(url_for('pages.home')) - + return redirect(url_for("pages.home")) + -@bp.route('/reset_password_request', methods=['GET', 'POST']) +@bp.route("/reset_password_request", methods=["GET", "POST"]) def reset_password_request(): if current_user.is_authenticated: - flash('You are already logged in') - return redirect(url_for('pages.home')) - + flash("You are already logged in") + return redirect(url_for("pages.home")) + form = ResetPasswordRequestForm() if form.validate_on_submit(): user = Database.session.scalar( @@ -195,30 +219,34 @@ if user: send_password_reset_email(user) - flash('Check your email for instructions on how to reset password') - return redirect(url_for('users.login')) + flash("Check your email for instructions on how to reset password") + return redirect(url_for("users.login")) else: - flash('User does not exist!') - - return render_template('users/reset_password_request.html', form=form, title="Reset Password") + flash("User does not exist!") + + return render_template( + "users/reset_password_request.html", form=form, title="Reset Password" + ) -@bp.route('/reset_password/', methods=["GET", "POST"]) +@bp.route("/reset_password/", methods=["GET", "POST"]) def reset_password(token): if current_user.is_authenticated: - flash('You are already logged in') - return redirect(url_for('pages.home')) - + flash("You are already logged in") + return redirect(url_for("pages.home")) + user = User.verify_reset_password_token(token) if not user: - flash('User not found for password reset') - return redirect(url_for('pages.home')) + flash("User not found for password reset") + return redirect(url_for("pages.home")) form = ResetPasswordForm() if form.validate_on_submit(): user.set_password(form.password.data) Database.session.commit() - flash('Your password has been reset') - return redirect(url_for('users.login')) - - return render_template('users/reset_password.html', form=form, token=token, title="Reset Password") \ No newline at end of file + flash("Your password has been reset") + return redirect(url_for("users.login")) + + return render_template( + "users/reset_password.html", form=form, token=token, title="Reset Password" + ) --- gunicorn.conf.py +++ gunicorn.conf.py @@ -3,9 +3,11 @@ import multiprocessing -bind = ':5000' +bind = ":5000" workers = multiprocessing.cpu_count() -if os.getenv('AUTO_RELOAD'): +if os.getenv("AUTO_RELOAD"): reload = True - reload_extra_files = glob('board/templates/**/*.html', recursive=True) + glob('board/static/*.css', recursive=True) \ No newline at end of file + reload_extra_files = glob("board/templates/**/*.html", recursive=True) + glob( + "board/static/*.css", recursive=True + ) --- migrations/env.py +++ migrations/env.py @@ -12,32 +12,31 @@ # Interpret the config file for Python logging. # This line sets up loggers basically. fileConfig(config.config_file_name) -logger = logging.getLogger('alembic.env') +logger = logging.getLogger("alembic.env") def get_engine(): try: # this works with Flask-SQLAlchemy<3 and Alchemical - return current_app.extensions['migrate'].db.get_engine() + return current_app.extensions["migrate"].db.get_engine() except (TypeError, AttributeError): # this works with Flask-SQLAlchemy>=3 - return current_app.extensions['migrate'].db.engine + return current_app.extensions["migrate"].db.engine def get_engine_url(): try: - return get_engine().url.render_as_string(hide_password=False).replace( - '%', '%%') + return get_engine().url.render_as_string(hide_password=False).replace("%", "%%") except AttributeError: - return str(get_engine().url).replace('%', '%%') + return str(get_engine().url).replace("%", "%%") # add your model's MetaData object here # for 'autogenerate' support # from myapp import mymodel # target_metadata = mymodel.Base.metadata -config.set_main_option('sqlalchemy.url', get_engine_url()) -target_db = current_app.extensions['migrate'].db +config.set_main_option("sqlalchemy.url", get_engine_url()) +target_db = current_app.extensions["migrate"].db # other values from the config, defined by the needs of env.py, # can be acquired: @@ -46,7 +45,7 @@ def get_metadata(): - if hasattr(target_db, 'metadatas'): + if hasattr(target_db, "metadatas"): return target_db.metadatas[None] return target_db.metadata @@ -65,8 +64,7 @@ """ url = config.get_main_option("sqlalchemy.url") context.configure( - url=url, target_metadata=get_metadata(), literal_binds=True, - compare_type=True + url=url, target_metadata=get_metadata(), literal_binds=True, compare_type=True ) with context.begin_transaction(): @@ -85,13 +83,13 @@ # when there are no changes to the schema # reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html def process_revision_directives(context, revision, directives): - if getattr(config.cmd_opts, 'autogenerate', False): + if getattr(config.cmd_opts, "autogenerate", False): script = directives[0] if script.upgrade_ops.is_empty(): directives[:] = [] - logger.info('No changes in schema detected.') + logger.info("No changes in schema detected.") - conf_args = current_app.extensions['migrate'].configure_args + conf_args = current_app.extensions["migrate"].configure_args if conf_args.get("process_revision_directives") is None: conf_args["process_revision_directives"] = process_revision_directives @@ -99,9 +97,7 @@ with connectable.connect() as connection: context.configure( - connection=connection, - target_metadata=get_metadata(), - **conf_args + connection=connection, target_metadata=get_metadata(), **conf_args ) with context.begin_transaction(): --- migrations/versions/343d75cb7e92_add_new_user_fields.py +++ migrations/versions/343d75cb7e92_add_new_user_fields.py @@ -5,30 +5,33 @@ Create Date: 2025-06-15 12:56:02.953010 """ + from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '343d75cb7e92' -down_revision = '91d28e4d9bcd' +revision = "343d75cb7e92" +down_revision = "91d28e4d9bcd" branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.add_column(sa.Column('about_me', sa.String(length=140), nullable=True)) - batch_op.add_column(sa.Column('last_seen', sa.DateTime(timezone=True), nullable=True)) + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.add_column(sa.Column("about_me", sa.String(length=140), nullable=True)) + batch_op.add_column( + sa.Column("last_seen", sa.DateTime(timezone=True), nullable=True) + ) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.drop_column('last_seen') - batch_op.drop_column('about_me') + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.drop_column("last_seen") + batch_op.drop_column("about_me") # ### end Alembic commands ### --- migrations/versions/63b1599550fc_add_username_constraints.py +++ migrations/versions/63b1599550fc_add_username_constraints.py @@ -5,28 +5,31 @@ Create Date: 2025-06-15 13:43:31.537580 """ + from alembic import op -import sqlalchemy as sa # noqa: F401 +import sqlalchemy as sa # noqa: F401 # revision identifiers, used by Alembic. -revision = '63b1599550fc' -down_revision = '343d75cb7e92' +revision = "63b1599550fc" +down_revision = "343d75cb7e92" branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_users_username'), ['username'], unique=True) + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.create_index( + batch_op.f("ix_users_username"), ["username"], unique=True + ) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_users_username')) + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_users_username")) # ### end Alembic commands ### --- migrations/versions/82d5e7b23e28_add_language_to_post.py +++ migrations/versions/82d5e7b23e28_add_language_to_post.py @@ -5,28 +5,29 @@ Create Date: 2025-08-01 15:19:47.077103 """ + from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '82d5e7b23e28' -down_revision = 'e93ccd60891f' +revision = "82d5e7b23e28" +down_revision = "e93ccd60891f" branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('posts', schema=None) as batch_op: - batch_op.add_column(sa.Column('language', sa.String(length=5), nullable=True)) + with op.batch_alter_table("posts", schema=None) as batch_op: + batch_op.add_column(sa.Column("language", sa.String(length=5), nullable=True)) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('posts', schema=None) as batch_op: - batch_op.drop_column('language') + with op.batch_alter_table("posts", schema=None) as batch_op: + batch_op.drop_column("language") # ### end Alembic commands ### --- migrations/versions/91d28e4d9bcd_init.py +++ migrations/versions/91d28e4d9bcd_init.py @@ -1,16 +1,17 @@ """Setup Database Revision ID: 91d28e4d9bcd -Revises: +Revises: Create Date: 2025-06-13 14:00:11.591660 """ + from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '91d28e4d9bcd' +revision = "91d28e4d9bcd" down_revision = None branch_labels = None depends_on = None @@ -18,38 +19,43 @@ def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.create_table('users', - sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - sa.Column('username', sa.String(length=256), nullable=False), - sa.Column('email', sa.String(), nullable=False), - sa.Column('password_hash', sa.String(length=256), nullable=True), - sa.PrimaryKeyConstraint('id') + op.create_table( + "users", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("username", sa.String(length=256), nullable=False), + sa.Column("email", sa.String(), nullable=False), + sa.Column("password_hash", sa.String(length=256), nullable=True), + sa.PrimaryKeyConstraint("id"), ) - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_users_email'), ['email'], unique=True) + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_users_email"), ["email"], unique=True) - op.create_table('posts', - sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - sa.Column('message', sa.Text(), nullable=False), - sa.Column('created', sa.DateTime(timezone=True), nullable=True), - sa.Column('user_id', sa.Integer(), nullable=True), - sa.ForeignKeyConstraint(['user_id'], ['users.id'], ), - sa.PrimaryKeyConstraint('id') + op.create_table( + "posts", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("message", sa.Text(), nullable=False), + sa.Column("created", sa.DateTime(timezone=True), nullable=True), + sa.Column("user_id", sa.Integer(), nullable=True), + sa.ForeignKeyConstraint( + ["user_id"], + ["users.id"], + ), + sa.PrimaryKeyConstraint("id"), ) - with op.batch_alter_table('posts', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_posts_user_id'), ['user_id'], unique=False) + with op.batch_alter_table("posts", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_posts_user_id"), ["user_id"], unique=False) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('posts', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_posts_user_id')) + with op.batch_alter_table("posts", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_posts_user_id")) - op.drop_table('posts') - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_users_email')) + op.drop_table("posts") + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_users_email")) - op.drop_table('users') + op.drop_table("users") # ### end Alembic commands ### --- migrations/versions/e93ccd60891f_add_followers.py +++ migrations/versions/e93ccd60891f_add_followers.py @@ -5,38 +5,50 @@ Create Date: 2025-06-16 16:27:23.142554 """ + from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = 'e93ccd60891f' -down_revision = '63b1599550fc' +revision = "e93ccd60891f" +down_revision = "63b1599550fc" branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.create_table('followers', - sa.Column('follower_id', sa.Integer(), nullable=False), - sa.Column('followed_id', sa.Integer(), nullable=False), - sa.ForeignKeyConstraint(['followed_id'], ['users.id'], ), - sa.ForeignKeyConstraint(['follower_id'], ['users.id'], ), - sa.PrimaryKeyConstraint('follower_id', 'followed_id') + op.create_table( + "followers", + sa.Column("follower_id", sa.Integer(), nullable=False), + sa.Column("followed_id", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ["followed_id"], + ["users.id"], + ), + sa.ForeignKeyConstraint( + ["follower_id"], + ["users.id"], + ), + sa.PrimaryKeyConstraint("follower_id", "followed_id"), ) - with op.batch_alter_table('followers', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_followers_followed_id'), ['followed_id'], unique=False) - batch_op.create_index(batch_op.f('ix_followers_follower_id'), ['follower_id'], unique=False) + with op.batch_alter_table("followers", schema=None) as batch_op: + batch_op.create_index( + batch_op.f("ix_followers_followed_id"), ["followed_id"], unique=False + ) + batch_op.create_index( + batch_op.f("ix_followers_follower_id"), ["follower_id"], unique=False + ) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('followers', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_followers_follower_id')) - batch_op.drop_index(batch_op.f('ix_followers_followed_id')) + with op.batch_alter_table("followers", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_followers_follower_id")) + batch_op.drop_index(batch_op.f("ix_followers_followed_id")) - op.drop_table('followers') + op.drop_table("followers") # ### end Alembic commands ### --- test_bedrock.py +++ test_bedrock.py @@ -25,29 +25,16 @@ """ req = { - "messages": [ - { - "role": "user", - "content": [ - { - "text": prompt - } - ] - } - ], - "inferenceConfig": { - "temperature": 0.1, - "topP": 0.9, - "maxTokens": 10240 - } + "messages": [{"role": "user", "content": [{"text": prompt}]}], + "inferenceConfig": {"temperature": 0.1, "topP": 0.9, "maxTokens": 10240}, } bedrock_response = bedrock_client.invoke_model( - modelId = "us.amazon.nova-micro-v1:0", + modelId="us.amazon.nova-micro-v1:0", body=json.dumps(req), - contentType="application/json" + contentType="application/json", ) resp = json.loads(bedrock_response["body"].read()) - translation = resp["output"]["message"]["content"][0]['text'] + translation = resp["output"]["message"]["content"][0]["text"] print(translation) --- tests/conftest.py +++ tests/conftest.py @@ -4,13 +4,15 @@ from board.database import Database, User -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def app(): app = create_app() - app.config.update({ - "TESTING": True, - "WTF_CSRF_ENABLED": False # needed to disable CSRF protection in forms else tests will fail - }) + app.config.update( + { + "TESTING": True, + "WTF_CSRF_ENABLED": False, # needed to disable CSRF protection in forms else tests will fail + } + ) # Other setup can go here app.register_blueprint(pages.bp, name="testpages") @@ -22,7 +24,7 @@ username="chee", email="test@example.com", ) - + user.set_password("testing") Database.session.add(user) Database.session.commit() @@ -37,48 +39,48 @@ Database.session.commit() -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def client(app): return app.test_client() -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def new_user(): - user = User(email='newuser@example.com',username='New User') + user = User(email="newuser@example.com", username="New User") return user -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def second_user(): - user = User(email='seconduser@example.com', username='Second User') + user = User(email="seconduser@example.com", username="Second User") return user -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def test_user(app): with app.app_context(): - user = Database.session.scalar( + user = Database.session.scalar( Database.select(User).where(User.email == "test@example.com") ) return user -@pytest.fixture(scope='function') +@pytest.fixture(scope="function") def login_user(app, test_user): app.test_client_class = FlaskLoginClient with app.test_client(user=test_user) as client: yield client - client.get('/logout') + client.get("/logout") -# Sets the session ID directly -@pytest.fixture(scope='function') +# Sets the session ID directly +@pytest.fixture(scope="function") def login_user_via_session(app, client, test_user): with client.session_transaction() as session: - session['_user_id'] = test_user.id - + session["_user_id"] = test_user.id + yield client - - client.get('/logout') + + client.get("/logout") --- tests/functional/test_pages.py +++ tests/functional/test_pages.py @@ -4,14 +4,14 @@ def test_request_home(client): response = client.get("/", follow_redirects=True) assert response.status_code == 200 - assert b'Login' in response.data + assert b"Login" in response.data def test_request_home_authenticated(login_user_via_session): resp = login_user_via_session.get("/") - assert b'/user/chee' in resp.data - assert b'Logout' in resp.data - assert b'Say something' in resp.data + assert b"/user/chee" in resp.data + assert b"Logout" in resp.data + assert b"Say something" in resp.data # The login_user is for logging in @@ -21,20 +21,17 @@ # Use the FlaskLoginCLient session_transaction block to retrieve the saved session user ID # Can also replace the session['_user_id'] directly to simulate a logged in user... - # + # with login_user.session_transaction() as session: - session_id = int(session['_user_id']) + session_id = int(session["_user_id"]) with app.app_context(): session_user = Database.session.scalar( Database.select(User).where(User.id == session_id) ) - new_user.set_password('MyNewPassword') - post = Post( - message='A test post', - author=new_user - ) + new_user.set_password("MyNewPassword") + post = Post(message="A test post", author=new_user) Database.session.add(new_user) Database.session.add(post) Database.session.commit() @@ -57,4 +54,4 @@ def test_not_found(client): response = client.get("/nan") assert response.status_code == 404 - assert b"Page not found" in response.data \ No newline at end of file + assert b"Page not found" in response.data --- tests/functional/test_posts.py +++ tests/functional/test_posts.py @@ -5,7 +5,7 @@ """ response = client.get("/posts/new", follow_redirects=True) assert response.status_code == 200 - assert b'Please log in to access this page' in response.data + assert b"Please log in to access this page" in response.data def test_get_new_posts_authenticated(login_user): @@ -25,45 +25,42 @@ Test /posts/create without valid session user Should redirect to login page """ - response = client.post("/posts/create", - follow_redirects=True) + response = client.post("/posts/create", follow_redirects=True) assert response.status_code == 200 assert len(response.history) == 1 - assert b'Please log in to access this page' in response.data + assert b"Please log in to access this page" in response.data def test_post_create_authenticated(login_user): - response = login_user.post("/posts/create", - data={"message": "Test Message"}, - follow_redirects=True) + response = login_user.post( + "/posts/create", data={"message": "Test Message"}, follow_redirects=True + ) assert response.status_code == 200 assert len(response.history) == 1 - assert b'Thanks for posting' in response.data - assert b'Test Message' in response.data + assert b"Thanks for posting" in response.data + assert b"Test Message" in response.data def test_post_create_invalid_authenticated(login_user): - response = login_user.post("/posts/create", - data={"message": None}, - follow_redirects=True) + response = login_user.post( + "/posts/create", data={"message": None}, follow_redirects=True + ) assert response.status_code == 200 - assert b'Message cannot be blank' in response.data + assert b"Message cannot be blank" in response.data def test_post_list(client): - response = client.get("/posts", - follow_redirects=True) + response = client.get("/posts", follow_redirects=True) assert response.status_code == 200 assert len(response.history) == 1 - assert b'Please log in to access this page' + assert b"Please log in to access this page" def test_post_list_authenticated(login_user): - login_user.post("/posts/create", - data={"message": "Test Message New"}) - + login_user.post("/posts/create", data={"message": "Test Message New"}) + response = login_user.get("/posts") assert response.status_code == 200 - assert b'Posts' in response.data - assert b'Test Message New' in response.data + assert b"Posts" in response.data + assert b"Test Message New" in response.data assert b'chee said' in response.data --- tests/functional/test_users.py +++ tests/functional/test_users.py @@ -1,156 +1,213 @@ def test_get_login(client): resp = client.get("/login") assert resp.status_code == 200 - assert b'Sign In' in resp.data - assert b'Email' in resp.data - assert b'Password' in resp.data - assert b'Click to Register' in resp.data + assert b"Sign In" in resp.data + assert b"Email" in resp.data + assert b"Password" in resp.data + assert b"Click to Register" in resp.data def test_post_login(client): - resp = client.post("/login", - data={"email": "test@example.com", "password": "testing"}, - follow_redirects=True) + resp = client.post( + "/login", + data={"email": "test@example.com", "password": "testing"}, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'Hi, chee' in resp.data - assert b'Logout' in resp.data + assert b"Hi, chee" in resp.data + assert b"Logout" in resp.data client.get("/logout") def test_post_login_invalid_password(client): - resp = client.post("/login", - data={"email": "test@example.com", "password": "wrong"}, - follow_redirects=True) - + resp = client.post( + "/login", + data={"email": "test@example.com", "password": "wrong"}, + follow_redirects=True, + ) + assert resp.status_code == 200 - assert b'Invalid username or password' in resp.data - assert b'Sign In' in resp.data + assert b"Invalid username or password" in resp.data + assert b"Sign In" in resp.data def test_post_login_invalid_email(client): - resp = client.post("/login", - data={"email": "none@example.com", "password": "testing"}, - follow_redirects=True) + resp = client.post( + "/login", + data={"email": "none@example.com", "password": "testing"}, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'Invalid username or password' in resp.data - assert b'Sign In' in resp.data + assert b"Invalid username or password" in resp.data + assert b"Sign In" in resp.data def test_post_login_if_authenticated(login_user): - resp = login_user.post("login", - data={"email": "test@example.com", "password": "testing"}, - follow_redirects=True) + resp = login_user.post( + "login", + data={"email": "test@example.com", "password": "testing"}, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'Logout' in resp.data + assert b"Logout" in resp.data def test_get_logout(login_user): - resp = login_user.get("/logout", - follow_redirects=True) + resp = login_user.get("/logout", follow_redirects=True) assert resp.status_code == 200 - assert b'Login' in resp.data + assert b"Login" in resp.data def test_get_register(client): resp = client.get("/register") assert resp.status_code == 200 - assert b'Register' in resp.data - assert b'Username' in resp.data - assert b'Password' in resp.data - assert b'Repeat Password' in resp.data + assert b"Register" in resp.data + assert b"Username" in resp.data + assert b"Password" in resp.data + assert b"Repeat Password" in resp.data def test_post_register(client): - resp = client.post("/register", - data={"username": "second user", "email": "user@example.com", "password": "testing", "password2": "testing"}, - follow_redirects=True) + resp = client.post( + "/register", + data={ + "username": "second user", + "email": "user@example.com", + "password": "testing", + "password2": "testing", + }, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'You have registered successfully' in resp.data - assert b'Sign In' in resp.data + assert b"You have registered successfully" in resp.data + assert b"Sign In" in resp.data - resp = client.post("/login", - data={"email": "user@example.com", "password": "testing"}, - follow_redirects=True) + resp = client.post( + "/login", + data={"email": "user@example.com", "password": "testing"}, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'Hi, second user' in resp.data - assert b'Logout' in resp.data + assert b"Hi, second user" in resp.data + assert b"Logout" in resp.data client.get("/logout") def test_post_register_missing_username(client): - resp = client.post("/register", - data={"email": "user22@example.com", "password": "testing", "password2": "testing"}, - follow_redirects=True) - assert b'Register' in resp.data - assert b'This field is required' in resp.data + resp = client.post( + "/register", + data={ + "email": "user22@example.com", + "password": "testing", + "password2": "testing", + }, + follow_redirects=True, + ) + assert b"Register" in resp.data + assert b"This field is required" in resp.data def test_post_register_invalid_email(client): - resp = client.post("/register", - data={"username": "test user", "email": "none", "password": "testing", "password2": "testing"}, - follow_redirects=True) - assert b'Register' in resp.data - assert b'Invalid email address' in resp.data + resp = client.post( + "/register", + data={ + "username": "test user", + "email": "none", + "password": "testing", + "password2": "testing", + }, + follow_redirects=True, + ) + assert b"Register" in resp.data + assert b"Invalid email address" in resp.data def test_post_register_exists_username(client): - resp = client.post("/register", - data={"username": "second user", "email": "user22@example.com", "password": "testing", "password2": "testing"}, - follow_redirects=True) - assert b'Register' in resp.data - assert b'Please use a different username' in resp.data + resp = client.post( + "/register", + data={ + "username": "second user", + "email": "user22@example.com", + "password": "testing", + "password2": "testing", + }, + follow_redirects=True, + ) + assert b"Register" in resp.data + assert b"Please use a different username" in resp.data def test_post_register_exists_email(client): - resp = client.post("/register", - data={"username": "third user", "email": "user@example.com", "password": "testing", "password2": "testing"}, - follow_redirects=True) - assert b'Register' in resp.data - assert b'Please use a different email address' in resp.data + resp = client.post( + "/register", + data={ + "username": "third user", + "email": "user@example.com", + "password": "testing", + "password2": "testing", + }, + follow_redirects=True, + ) + assert b"Register" in resp.data + assert b"Please use a different email address" in resp.data def test_post_register_missing_password(client): - resp = client.post("/register", - data={"username": "third user", "email": "usertest@example.com"}, - follow_redirects=True) - assert b'Register' in resp.data - assert b'This field is required' in resp.data + resp = client.post( + "/register", + data={"username": "third user", "email": "usertest@example.com"}, + follow_redirects=True, + ) + assert b"Register" in resp.data + assert b"This field is required" in resp.data def test_post_register_mismatch_password(client): - resp = client.post("/register", - data={"username": "third user", "email": "usertest@example.com", "password": "testing", "password2": "no match"}, - follow_redirects=True) - assert b'Register' in resp.data - assert b'Field must be equal to password' in resp.data + resp = client.post( + "/register", + data={ + "username": "third user", + "email": "usertest@example.com", + "password": "testing", + "password2": "no match", + }, + follow_redirects=True, + ) + assert b"Register" in resp.data + assert b"Field must be equal to password" in resp.data def test_reset_password_request_when_authenticated(login_user): - resp = login_user.get('/reset_password_request', follow_redirects=True) + resp = login_user.get("/reset_password_request", follow_redirects=True) assert resp.status_code == 200 - assert b'You are already logged in' in resp.data + assert b"You are already logged in" in resp.data def test_reset_password_request(client): - resp = client.post("/reset_password_request", - data={"email": "test@example.com"}, - follow_redirects=True) + resp = client.post( + "/reset_password_request", + data={"email": "test@example.com"}, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'Check your email for instructions on how to reset password' in resp.data + assert b"Check your email for instructions on how to reset password" in resp.data def test_reset_password_request_no_user(client): - resp = client.post("/reset_password_request", - data={"email": "none@example.com"}, - follow_redirects=True) + resp = client.post( + "/reset_password_request", + data={"email": "none@example.com"}, + follow_redirects=True, + ) assert resp.status_code == 200 - assert b'User does not exist' in resp.data + assert b"User does not exist" in resp.data def test_reset_password_when_authenticated(login_user): - resp = login_user.get('/reset_password/token', follow_redirects=True) + resp = login_user.get("/reset_password/token", follow_redirects=True) assert resp.status_code == 200 - assert b'You are already logged in' in resp.data + assert b"You are already logged in" in resp.data + -# TODO: Need to mock out jwt and User.verify_reset_password_token to continue testinng the reset password route \ No newline at end of file +# TODO: Need to mock out jwt and User.verify_reset_password_token to continue testinng the reset password route --- tests/unit/test_models.py +++ tests/unit/test_models.py @@ -16,16 +16,16 @@ def test_set_password(new_user): - new_user.set_password('MyNewPassword') - assert new_user.password_hash != 'MyNewPassword' - assert new_user.check_password('MyNewPassword') - assert not new_user.check_password('Wrong') + new_user.set_password("MyNewPassword") + assert new_user.password_hash != "MyNewPassword" + assert new_user.check_password("MyNewPassword") + assert not new_user.check_password("Wrong") def test_has_many_posts(app, new_user): with app.app_context(): - new_user.set_password('MyNewPassword') - post = Post(message='A test post', author=new_user) + new_user.set_password("MyNewPassword") + post = Post(message="A test post", author=new_user) post2 = Post(message="A second post", author=new_user) Database.session.add(new_user) @@ -40,10 +40,10 @@ def test_user_avatar(new_user): - hashed = md5(new_user.email.lower().encode('utf-8')).hexdigest() + hashed = md5(new_user.email.lower().encode("utf-8")).hexdigest() profile_pic = new_user.avatar(20) assert hashed in profile_pic - assert '20' in profile_pic + assert "20" in profile_pic def test_user_following(app, new_user, second_user): @@ -52,7 +52,7 @@ Database.session.add(second_user) Database.session.commit() new_user.follow(second_user) - + assert new_user.is_following(second_user) assert new_user.following_count() == 1 assert second_user.followers_count() == 1 @@ -104,12 +104,13 @@ # Tests that no user is returned as jwt.decode failed assert User.verify_reset_password_token(token) is None - mock_logger.assert_called_once_with("Failure in JWT decode of password reset token") + mock_logger.assert_called_once_with( + "Failure in JWT decode of password reset token" + ) - def test_post(new_user): post = Post(message="Test Post", author=new_user, id=123) assert post.author == new_user assert post.message == "Test Post" - assert post.__repr__() == f"" \ No newline at end of file + assert post.__repr__() == f"" ```