--- board/__init__.py
+++ board/__init__.py
@@ -18,7 +18,7 @@
def get_locale():
- return request.accept_languages.best_match(['en', 'es'])
+ return request.accept_languages.best_match(["en", "es"])
def create_app():
@@ -38,18 +38,16 @@
# Setting up translation
Babel(app, locale_selector=get_locale)
-
# Sets up Cloudwatch Logs Embedded Metrics
@metric_scope
def my_handler(path, method, duration, metrics):
- metrics.put_dimensions({'Path': path})
- metrics.put_metric('Latency', duration, 'Milliseconds')
- metrics.set_property('Method', method)
- metrics.set_property('Path', path)
- metrics.set_namespace('flaskappv22')
+ metrics.put_dimensions({"Path": path})
+ metrics.put_metric("Latency", duration, "Milliseconds")
+ metrics.set_property("Method", method)
+ metrics.set_property("Path", path)
+ metrics.set_namespace("flaskappv22")
metrics.set_timestamp(datetime.datetime.now())
-
# Setup logging
@app.before_request
@@ -63,8 +61,8 @@
"url": request.url,
"path": request.path,
"client_ip": client_ip,
- "user_agent": request.headers.get("user-agent")
- }
+ "user_agent": request.headers.get("user-agent"),
+ },
)
@after_this_request
@@ -75,7 +73,7 @@
log_level = logging.ERROR
elif response.status_code >= 400:
log_level = logging.WARNING
-
+
app.logger.log(
log_level,
f"{request.method} to {request.path} completed with status {response.status_code}",
@@ -84,8 +82,8 @@
"url": request.url,
"path": request.path,
"status_code": response.status_code,
- "latency": duration
- }
+ "latency": duration,
+ },
)
if os.getenv("ENV_TYPE") == "prod":
@@ -93,38 +91,41 @@
return response
-
# Set up moment js
@app.before_request
def moment_before_request():
g.locale = str(get_locale())
-
+
# moment = Moment(app)
Moment(app)
# Set up flask-mail
- app.config['MAIL_SERVER'] = os.getenv('MAIL_SERVER')
- app.config['MAIL_PORT'] = os.getenv('MAIL_PORT')
+ app.config["MAIL_SERVER"] = os.getenv("MAIL_SERVER")
+ app.config["MAIL_PORT"] = os.getenv("MAIL_PORT")
Mailer.init_app(app)
login.login_manager.init_app(app)
login.login_manager.login_view = "users.login"
- login.login_message = _l('Please login to access this page.')
+ login.login_message = _l("Please login to access this page.")
if os.environ.get("TESTING"):
- app.config["SQLALCHEMY_DATABASE_URI"] = f"postgresql://{os.environ.get('RDS_TEST_USERNAME')}:{os.environ.get('RDS_TEST_PASSWORD')}@{os.environ.get('RDS_HOSTNAME')}:{os.environ.get('RDS_TEST_PORT')}/{os.environ.get('RDS_TEST_DB_NAME')}"
+ app.config["SQLALCHEMY_DATABASE_URI"] = (
+ f"postgresql://{os.environ.get('RDS_TEST_USERNAME')}:{os.environ.get('RDS_TEST_PASSWORD')}@{os.environ.get('RDS_HOSTNAME')}:{os.environ.get('RDS_TEST_PORT')}/{os.environ.get('RDS_TEST_DB_NAME')}"
+ )
else:
- app.config["SQLALCHEMY_DATABASE_URI"] = f"postgresql://{os.environ.get('RDS_USERNAME')}:{os.environ.get('RDS_PASSWORD')}@{os.environ.get('RDS_HOSTNAME')}:{os.environ.get('RDS_PORT')}/{os.environ.get('RDS_DB_NAME')}"
+ app.config["SQLALCHEMY_DATABASE_URI"] = (
+ f"postgresql://{os.environ.get('RDS_USERNAME')}:{os.environ.get('RDS_PASSWORD')}@{os.environ.get('RDS_HOSTNAME')}:{os.environ.get('RDS_PORT')}/{os.environ.get('RDS_DB_NAME')}"
+ )
# Set pagination value
- app.config['POSTS_PER_PAGE'] = int(os.getenv("POSTS_PER_PAGE", 20))
+ app.config["POSTS_PER_PAGE"] = int(os.getenv("POSTS_PER_PAGE", 20))
database.Database.init_app(app)
# Adds migration commands via Flask-migrate but doesn't run the actual migration...
Migrate(app, database.Database)
# Migrate.init_app(app, database.Database)
-
+
app.register_blueprint(pages.bp)
app.register_blueprint(posts.bp)
app.register_blueprint(users.bp)
@@ -132,7 +133,7 @@
app.register_error_handler(404, errors.page_not_found)
app.register_error_handler(500, errors.internal_error)
app.register_error_handler(Exception, errors.unhandled_exception_handler)
-
+
# app.logger.info("Microblog startup")
return app
--- board/cli.py
+++ board/cli.py
@@ -30,14 +30,43 @@
try:
with open(messages_path, "w") as _tmp:
- result = subprocess.run([pybabel_bin, "extract", "-F", "babel.cfg", "-k", "_l", "-o", messages_path, "."], capture_output=True, text=True, check=True)
+ result = subprocess.run(
+ [
+ pybabel_bin,
+ "extract",
+ "-F",
+ "babel.cfg",
+ "-k",
+ "_l",
+ "-o",
+ messages_path,
+ ".",
+ ],
+ capture_output=True,
+ text=True,
+ check=True,
+ )
print(result.stdout)
print(result.stderr)
if result.returncode != 0:
raise RuntimeError("extract command failed")
- result = subprocess.run([pybabel_bin, "init", "-i", messages_path, "-d", "board.translations", "-l", lang], capture_output=True, text=True, check=True)
+ result = subprocess.run(
+ [
+ pybabel_bin,
+ "init",
+ "-i",
+ messages_path,
+ "-d",
+ "board.translations",
+ "-l",
+ lang,
+ ],
+ capture_output=True,
+ text=True,
+ check=True,
+ )
print(result.stdout)
print(result.stderr)
@@ -56,16 +85,43 @@
try:
with open(messages_path, "w") as _tmp:
- result = subprocess.run([pybabel_bin, "extract", "-F", "babel.cfg", "-k", "_l", "-o", messages_path, "."], capture_output=True, text=True, check=True)
+ result = subprocess.run(
+ [
+ pybabel_bin,
+ "extract",
+ "-F",
+ "babel.cfg",
+ "-k",
+ "_l",
+ "-o",
+ messages_path,
+ ".",
+ ],
+ capture_output=True,
+ text=True,
+ check=True,
+ )
print(result.stdout)
print(result.stderr)
if result.returncode != 0:
raise RuntimeError("Babel extract command failed")
-
- result = subprocess.run([pybabel_bin, "update", "-i", messages_path, "-d", "board/translations"], capture_output=True, text=True, check=True)
+ result = subprocess.run(
+ [
+ pybabel_bin,
+ "update",
+ "-i",
+ messages_path,
+ "-d",
+ "board/translations",
+ ],
+ capture_output=True,
+ text=True,
+ check=True,
+ )
+
print(result.stdout)
print(result.stderr)
@@ -82,7 +138,12 @@
Compile all languages
"""
- result = subprocess.run([pybabel_bin, "compile", "-d", "board/translations"], capture_output=True, text=True, check=True)
+ result = subprocess.run(
+ [pybabel_bin, "compile", "-d", "board/translations"],
+ capture_output=True,
+ text=True,
+ check=True,
+ )
print(result.stdout)
print(result.stderr)
--- board/database.py
+++ board/database.py
@@ -25,84 +25,84 @@
# Users followers table
followers = Table(
- 'followers',
+ "followers",
Base.metadata,
- Column('follower_id',
- Integer(),
- ForeignKey('users.id'),
- primary_key=True,
- index=True),
- Column('followed_id',
- Integer(),
- ForeignKey('users.id'),
- primary_key=True,
- index=True)
+ Column(
+ "follower_id", Integer(), ForeignKey("users.id"), primary_key=True, index=True
+ ),
+ Column(
+ "followed_id", Integer(), ForeignKey("users.id"), primary_key=True, index=True
+ ),
)
class User(UserMixin, Database.Model):
- __tablename__ = 'users'
+ __tablename__ = "users"
id = mapped_column(Integer(), primary_key=True, autoincrement=True)
- username = mapped_column(String(length=256), unique=True, index=True, nullable=False)
+ username = mapped_column(
+ String(length=256), unique=True, index=True, nullable=False
+ )
email = mapped_column(String(), unique=True, index=True, nullable=False)
password_hash = mapped_column(String(length=256))
about_me = mapped_column(String(length=140))
- last_seen = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
+ last_seen = mapped_column(
+ DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
+ )
# Posts
- posts: WriteOnlyMapped['Post'] = relationship('Post', back_populates='author')
+ posts: WriteOnlyMapped["Post"] = relationship("Post", back_populates="author")
# Followers
- following: WriteOnlyMapped['User'] = relationship(
+ following: WriteOnlyMapped["User"] = relationship(
secondary=followers,
primaryjoin=(followers.c.follower_id == id),
secondaryjoin=(followers.c.followed_id == id),
- back_populates='followers')
-
- followers: WriteOnlyMapped['User'] = relationship(
+ back_populates="followers",
+ )
+
+ followers: WriteOnlyMapped["User"] = relationship(
secondary=followers,
primaryjoin=(followers.c.followed_id == id),
secondaryjoin=(followers.c.follower_id == id),
- back_populates='following')
+ back_populates="following",
+ )
def set_password(self, password):
self.password_hash = generate_password_hash(password)
-
+
def check_password(self, password):
return check_password_hash(self.password_hash, password)
-
+
def __repr__(self):
return f"<User: {self.email}>"
-
+
def avatar(self, size):
- digest = hashlib.new("md5", self.email.lower().encode('utf-8'), usedforsecurity=False).hexdigest()
- return f'https://www.gravatar.com/avatar/{digest}?d=identicon&s={size}'
-
+ digest = hashlib.new(
+ "md5", self.email.lower().encode("utf-8"), usedforsecurity=False
+ ).hexdigest()
+ return f"https://www.gravatar.com/avatar/{digest}?d=identicon&s={size}"
+
def follow(self, user):
if not self.is_following(user):
self.following.add(user)
-
+
def unfollow(self, user):
if self.is_following(user):
self.following.remove(user)
-
+
def is_following(self, user):
query = self.following.select().where(User.id == user.id)
return Database.session.scalar(query) is not None
-
+
def followers_count(self):
- query = Select(func.count()).select_from(
- self.followers.select().subquery()
- )
+ query = Select(func.count()).select_from(self.followers.select().subquery())
return Database.session.scalar(query)
-
+
def following_count(self):
- query = Select(func.count()).select_from(
- self.following.select().subquery()
- )
+ query = Select(func.count()).select_from(self.following.select().subquery())
return Database.session.scalar(query)
-
+
def following_posts(self):
Author = so.aliased(User)
Follower = so.aliased(User)
@@ -111,45 +111,49 @@
Select(Post)
.join(Post.author.of_type(Author))
.join(Author.followers.of_type(Follower), isouter=True)
- .where(sa.or_(
- Follower.id == self.id,
- # Author.id == self.id,
- ))
+ .where(
+ sa.or_(
+ Follower.id == self.id,
+ # Author.id == self.id,
+ )
+ )
.group_by(Post)
.order_by(Post.created.desc())
)
-
+
def get_password_reset_token(self, expires_in=600):
return jwt.encode(
- {'reset_password': self.id, 'exp': time() + expires_in},
+ {"reset_password": self.id, "exp": time() + expires_in},
key=os.getenv("APP_SECRET"),
- algorithm='HS256'
+ algorithm="HS256",
)
-
+
@staticmethod
def verify_reset_password_token(token):
try:
- id = jwt.decode(
- token,
- key=os.getenv('APP_SECRET'),
- algorithms=['HS256']
- )['reset_password']
+ id = jwt.decode(token, key=os.getenv("APP_SECRET"), algorithms=["HS256"])[
+ "reset_password"
+ ]
except Exception as _:
- current_app.logger.exception("Failure in JWT decode of password reset token")
+ current_app.logger.exception(
+ "Failure in JWT decode of password reset token"
+ )
return
-
+
return Database.session.get(User, id)
class Post(Database.Model):
- __tablename__ = 'posts'
+ __tablename__ = "posts"
id = mapped_column(Integer(), primary_key=True, autoincrement=True)
message = mapped_column(Text(), nullable=False)
- created = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
+ created = mapped_column(
+ DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
+ )
language = mapped_column(String(length=5), nullable=True)
user_id = mapped_column(ForeignKey(User.id), index=True)
- author = relationship('User', back_populates='posts')
+ author = relationship("User", back_populates="posts")
def __repr__(self):
- return f"<Post: {self.id}>"
\ No newline at end of file
+ return f"<Post: {self.id}>"
--- board/email.py
+++ board/email.py
@@ -6,11 +6,7 @@
def send_email(subject, sender, recipients, text_body, html_body):
- msg = Message(
- subject,
- sender=sender,
- recipients=recipients
- )
+ msg = Message(subject, sender=sender, recipients=recipients)
msg.body = text_body
msg.html = html_body
Mailer.send(msg)
@@ -18,10 +14,10 @@
def send_password_reset_email(user):
token = user.get_password_reset_token()
- send_email('[Microblog] Reset Your Password',
- sender='[email protected]',
- recipients=[user.email],
- text_body=render_template('email/reset_password.txt',
- user=user, token=token),
- html_body=render_template('email/reset_password.html',
- user=user, token=token))
\ No newline at end of file
+ send_email(
+ "[Microblog] Reset Your Password",
+ sender="[email protected]",
+ recipients=[user.email],
+ text_body=render_template("email/reset_password.txt", user=user, token=token),
+ html_body=render_template("email/reset_password.html", user=user, token=token),
+ )
--- board/errors.py
+++ board/errors.py
@@ -18,11 +18,9 @@
current_app.logger.exception("unhandled exception during request processing")
response = e.get_response()
- response.data = json.dumps({
- "code": e.code,
- "name": e.name,
- "description": e.description
- })
+ response.data = json.dumps(
+ {"code": e.code, "name": e.name, "description": e.description}
+ )
response.content_type = "application/json"
return response
--- board/form.py
+++ board/form.py
@@ -6,36 +6,44 @@
class LoginForm(FlaskForm):
- email = StringField(_l('Email'), validators=[DataRequired(), Email()])
- password = PasswordField(_l('Password'), validators=[DataRequired()])
- remember_me = BooleanField(_l('Remember Me'))
- submit = SubmitField(_l('Sign In'))
+ email = StringField(_l("Email"), validators=[DataRequired(), Email()])
+ password = PasswordField(_l("Password"), validators=[DataRequired()])
+ remember_me = BooleanField(_l("Remember Me"))
+ submit = SubmitField(_l("Sign In"))
class RegistrationForm(FlaskForm):
- username = StringField('Username', validators=[DataRequired()])
- email = StringField('Email', validators=[DataRequired(), Email()])
- password = PasswordField('Password', validators=[DataRequired()])
- password2 = PasswordField('Repeat Password', validators=[DataRequired(), EqualTo('password')])
- submit = SubmitField('Register')
+ username = StringField("Username", validators=[DataRequired()])
+ email = StringField("Email", validators=[DataRequired(), Email()])
+ password = PasswordField("Password", validators=[DataRequired()])
+ password2 = PasswordField(
+ "Repeat Password", validators=[DataRequired(), EqualTo("password")]
+ )
+ submit = SubmitField("Register")
def validate_username(self, username):
- user = database.Database.session.scalar(database.Database.select(database.User).where(
- database.User.username == username.data))
+ user = database.Database.session.scalar(
+ database.Database.select(database.User).where(
+ database.User.username == username.data
+ )
+ )
if user is not None:
- raise ValidationError('Please use a different username.')
+ raise ValidationError("Please use a different username.")
def validate_email(self, email):
- user = database.Database.session.scalar(database.Database.select(database.User).where(
- database.User.email == email.data))
+ user = database.Database.session.scalar(
+ database.Database.select(database.User).where(
+ database.User.email == email.data
+ )
+ )
if user is not None:
- raise ValidationError('Please use a different email address.')
-
+ raise ValidationError("Please use a different email address.")
+
class EditProfileForm(FlaskForm):
- username = StringField('Username', validators=[DataRequired()])
- about_me = TextAreaField('About Me', validators=[Length(min=0, max=140)])
- submit = SubmitField('Submit')
+ username = StringField("Username", validators=[DataRequired()])
+ about_me = TextAreaField("About Me", validators=[Length(min=0, max=140)])
+ submit = SubmitField("Submit")
def __init__(self, original_username, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -43,29 +51,35 @@
def validate_username(self, username):
if username.data != self.original_username:
- user = database.Database.session.scalar(database.Database.select(database.User).where(
- database.User.username == username.data))
+ user = database.Database.session.scalar(
+ database.Database.select(database.User).where(
+ database.User.username == username.data
+ )
+ )
if user is not None:
- raise ValidationError('Please use a different username.')
+ raise ValidationError("Please use a different username.")
class EmptyForm(FlaskForm):
- submit = SubmitField('Submit')
+ submit = SubmitField("Submit")
class PostForm(FlaskForm):
- message = TextAreaField('Say something', validators=[DataRequired(), Length(min=1, max=140)])
- submit = SubmitField('Submit')
+ message = TextAreaField(
+ "Say something", validators=[DataRequired(), Length(min=1, max=140)]
+ )
+ submit = SubmitField("Submit")
class ResetPasswordRequestForm(FlaskForm):
- email = StringField('Email', validators=[DataRequired(), Email()])
- submit = SubmitField('Submit')
+ email = StringField("Email", validators=[DataRequired(), Email()])
+ submit = SubmitField("Submit")
class ResetPasswordForm(FlaskForm):
- password = PasswordField('Password', validators=[DataRequired()])
+ password = PasswordField("Password", validators=[DataRequired()])
password2 = PasswordField(
- 'Repeat Password', validators=[DataRequired(), EqualTo('password')])
- submit = SubmitField('Request Password Reset')
+ "Repeat Password", validators=[DataRequired(), EqualTo("password")]
+ )
+ submit = SubmitField("Request Password Reset")
--- board/log_context.py
+++ board/log_context.py
@@ -21,7 +21,7 @@
context = _log_context.get()
for key, value in context.items():
setattr(record, key, value)
-
+
return True
--- board/login.py
+++ board/login.py
@@ -7,4 +7,4 @@
@login_manager.user_loader
def load_user(id):
- return database.Database.session.get(database.User, int(id))
\ No newline at end of file
+ return database.Database.session.get(database.User, int(id))
--- board/pages.py
+++ board/pages.py
@@ -1,4 +1,12 @@
-from flask import Blueprint, render_template, flash, request, redirect, url_for, current_app
+from flask import (
+ Blueprint,
+ render_template,
+ flash,
+ request,
+ redirect,
+ url_for,
+ current_app,
+)
from flask_login import login_required, current_user
from langdetect import detect, LangDetectException
from board.form import PostForm
@@ -16,51 +24,61 @@
try:
language = detect(form.message.data)
except LangDetectException:
- language = ''
+ language = ""
- post = Post(
- message=form.message.data,
- language=language,
- author=current_user
- )
+ post = Post(message=form.message.data, language=language, author=current_user)
Database.session.add(post)
Database.session.commit()
- flash('Your post is now live!')
- return redirect(url_for('pages.home'))
-
+ flash("Your post is now live!")
+ return redirect(url_for("pages.home"))
+
# Show all posts of users you're currently following
- page = request.args.get('page', 1, type=int)
+ page = request.args.get("page", 1, type=int)
# posts = Database.session.scalars(
# current_user.following_posts()
# ).all()
- posts = Database.paginate(current_user.following_posts(),
- page=page,
- per_page=current_app.config['POSTS_PER_PAGE'],
- error_out=False)
-
- next_url = url_for('pages.home', page=posts.next_num) if posts.has_next else None
+ posts = Database.paginate(
+ current_user.following_posts(),
+ page=page,
+ per_page=current_app.config["POSTS_PER_PAGE"],
+ error_out=False,
+ )
+
+ next_url = url_for("pages.home", page=posts.next_num) if posts.has_next else None
- prev_url = url_for('pages.home', page=posts.prev_num) if posts.has_prev else None
+ prev_url = url_for("pages.home", page=posts.prev_num) if posts.has_prev else None
- return render_template("pages/home.html", title="Home Page", form=form, posts=posts.items, next_url=next_url, prev_url=prev_url)
+ return render_template(
+ "pages/home.html",
+ title="Home Page",
+ form=form,
+ posts=posts.items,
+ next_url=next_url,
+ prev_url=prev_url,
+ )
@bp.route("/explore")
@login_required
def explore():
- page = request.args.get('page', 1, type=int)
+ page = request.args.get("page", 1, type=int)
query = Database.select(Post).order_by(Post.created.desc())
# posts = Database.session.scalars(query).all()
- posts = Database.paginate(query,
- page=page,
- per_page=current_app.config['POSTS_PER_PAGE'],
- error_out=False)
-
- next_url = url_for('pages.explore', page=posts.next_num) if posts.has_next else None
+ posts = Database.paginate(
+ query, page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False
+ )
+
+ next_url = url_for("pages.explore", page=posts.next_num) if posts.has_next else None
- prev_url = url_for('pages.explore', page=posts.prev_num) if posts.has_prev else None
+ prev_url = url_for("pages.explore", page=posts.prev_num) if posts.has_prev else None
- return render_template("pages/home.html", title="Explore", posts=posts.items, next_url=next_url, prev_url=prev_url)
+ return render_template(
+ "pages/home.html",
+ title="Explore",
+ posts=posts.items,
+ next_url=next_url,
+ prev_url=prev_url,
+ )
@bp.get("/about")
--- board/posts.py
+++ board/posts.py
@@ -23,18 +23,16 @@
try:
language = detect(form.message.data)
except LangDetectException:
- language = ''
+ language = ""
new_post = database.Post(
- message=form.message.data,
- language=language,
- user_id=current_user.id
+ message=form.message.data, language=language, user_id=current_user.id
)
database.Database.session.add(new_post)
database.Database.session.commit()
current_app.logger.info(f"New post by author {current_user.email}")
flash(f"Thanks for posting, {current_user.email}", category="success")
- return redirect(url_for('posts.posts'))
+ return redirect(url_for("posts.posts"))
else:
flash("Message cannot be blank", category="error")
return render_template("posts/new.html", title="Add Post")
@@ -46,7 +44,9 @@
# TODO: How to call current_user.posts ???
current_app.logger.info(f"Viewing all posts by {current_user.email}")
posts = database.Database.session.execute(
- database.Database.select(database.Post).order_by(database.Post.created.desc()).where(database.Post.user_id == current_user.id )
+ database.Database.select(database.Post)
+ .order_by(database.Post.created.desc())
+ .where(database.Post.user_id == current_user.id)
).scalars()
- return render_template("posts/posts.html", title="All posts", posts=posts)
\ No newline at end of file
+ return render_template("posts/posts.html", title="All posts", posts=posts)
--- board/translate.py
+++ board/translate.py
@@ -7,8 +7,7 @@
def translate(text, source_lang, dest_lang):
bedrock_client = boto3.client(
- "bedrock-runtime",
- region_name=os.getenv("AWS_REGION", "us-east-1")
+ "bedrock-runtime", region_name=os.getenv("AWS_REGION", "us-east-1")
)
# TODO: Add logging config to Cloudwatch Logs here:
@@ -33,30 +32,17 @@
"""
req = {
- "messages": [
- {
- "role": "user",
- "content": [
- {
- "text": prompt
- }
- ]
- }
- ],
- "inferenceConfig": {
- "temperature": 0.1,
- "topP": 0.9,
- "maxTokens": 10240
- }
+ "messages": [{"role": "user", "content": [{"text": prompt}]}],
+ "inferenceConfig": {"temperature": 0.1, "topP": 0.9, "maxTokens": 10240},
}
bedrock_response = bedrock_client.invoke_model(
- modelId = "us.amazon.nova-micro-v1:0",
+ modelId="us.amazon.nova-micro-v1:0",
body=json.dumps(req),
- contentType="application/json"
+ contentType="application/json",
)
resp = json.loads(bedrock_response["body"].read())
- translation = resp["output"]["message"]["content"][0]['text']
+ translation = resp["output"]["message"]["content"][0]["text"]
return translation
--- board/translations.py
+++ board/translations.py
@@ -11,5 +11,5 @@
def get_translation():
data = request.get_json()
return {
- 'text': translate(data['text'], data['source_language'], data['dest_language'])
+ "text": translate(data["text"], data["source_language"], data["dest_language"])
}
--- board/users.py
+++ board/users.py
@@ -1,11 +1,26 @@
from urllib.parse import urlsplit
from datetime import datetime, timezone
-from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app
+from flask import (
+ Blueprint,
+ render_template,
+ request,
+ redirect,
+ url_for,
+ flash,
+ current_app,
+)
from flask_login import current_user, login_user, logout_user, login_required
import sqlalchemy as sa
from sqlalchemy.sql import func
from board.database import Database, User, Post
-from board.form import LoginForm, RegistrationForm, EditProfileForm, EmptyForm, ResetPasswordRequestForm, ResetPasswordForm
+from board.form import (
+ LoginForm,
+ RegistrationForm,
+ EditProfileForm,
+ EmptyForm,
+ ResetPasswordRequestForm,
+ ResetPasswordForm,
+)
from board.email import send_password_reset_email
from board.log_context import add_to_log_context
@@ -13,11 +28,11 @@
bp = Blueprint("users", __name__)
-@bp.route("/login", methods=['GET', 'POST'])
+@bp.route("/login", methods=["GET", "POST"])
def login():
if current_user.is_authenticated:
- flash('User already logged in')
- return redirect(url_for('pages.home'))
+ flash("User already logged in")
+ return redirect(url_for("pages.home"))
form = LoginForm()
@@ -26,16 +41,16 @@
Database.select(User).where(User.email == form.email.data)
)
if user is None or not user.check_password(form.password.data):
- flash('Invalid username or password')
- return redirect(url_for('users.login'))
+ flash("Invalid username or password")
+ return redirect(url_for("users.login"))
login_user(user, remember=form.remember_me.data)
with add_to_log_context(user_email=form.email.data):
current_app.logger.info(f"User {current_user.email} has logged in.")
-
- next_page = request.args.get('next')
- if not next_page or urlsplit(next_page).netloc != '':
- next_page = url_for('pages.home')
+
+ next_page = request.args.get("next")
+ if not next_page or urlsplit(next_page).netloc != "":
+ next_page = url_for("pages.home")
return redirect(next_page)
return render_template("users/login.html", form=form, title="Login")
@@ -48,28 +63,25 @@
with add_to_log_context():
current_app.logger.info("User has logged out.")
- return redirect(url_for('pages.home'))
+ return redirect(url_for("pages.home"))
-@bp.route("/register", methods=['GET', 'POST'])
+@bp.route("/register", methods=["GET", "POST"])
def register():
if current_user.is_authenticated:
- flash('You are already signed in')
- return redirect(url_for('pages.home'))
+ flash("You are already signed in")
+ return redirect(url_for("pages.home"))
form = RegistrationForm()
if form.validate_on_submit():
- user = User(
- username=form.username.data,
- email=form.email.data
- )
+ user = User(username=form.username.data, email=form.email.data)
user.set_password(form.password.data)
Database.session.add(user)
Database.session.commit()
- flash('You have registered successfully')
- return redirect(url_for('users.login'))
-
- return render_template('users/registration.html', form=form, title="Register")
+ flash("You have registered successfully")
+ return redirect(url_for("users.login"))
+
+ return render_template("users/registration.html", form=form, title="Register")
@bp.route("/user/<username>")
@@ -81,30 +93,42 @@
# )
user = Database.first_or_404(
- sa.select(User)
- .where(func.lower(User.username) == func.lower(username))
+ sa.select(User).where(func.lower(User.username) == func.lower(username))
)
print(f"USERNAME: {username}")
print(f"FOUND USER: {user}")
if user is None:
- flash('User does not exist')
- return redirect(url_for('pages.home'))
+ flash("User does not exist")
+ return redirect(url_for("pages.home"))
- page = request.args.get('page', 1, type=int)
+ page = request.args.get("page", 1, type=int)
query = user.posts.select().order_by(Post.created.desc())
posts = Database.paginate(
- query,
- page=page,
- per_page=current_app.config['POSTS_PER_PAGE'],
- error_out=False
+ query, page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False
)
- next_url = url_for('users.user', username=user.username, page=posts.next_num) if posts.has_next else None
+ next_url = (
+ url_for("users.user", username=user.username, page=posts.next_num)
+ if posts.has_next
+ else None
+ )
- prev_url = url_for('users.user', username=user.username, page=posts.prev_num) if posts.has_prev else None
+ prev_url = (
+ url_for("users.user", username=user.username, page=posts.prev_num)
+ if posts.has_prev
+ else None
+ )
- return render_template("users/user.html", user=user, posts=posts, form=form, next_url=next_url, prev_url=prev_url, title="Profile")
+ return render_template(
+ "users/user.html",
+ user=user,
+ posts=posts,
+ form=form,
+ next_url=next_url,
+ prev_url=prev_url,
+ title="Profile",
+ )
@bp.before_request
@@ -122,16 +146,16 @@
current_user.username = form.username.data
current_user.about_me = form.about_me.data
Database.session.commit()
- flash('Your profile changes have been saved')
- return redirect(url_for('users.edit_profile'))
- elif request.method == 'GET':
+ flash("Your profile changes have been saved")
+ return redirect(url_for("users.edit_profile"))
+ elif request.method == "GET":
form.username.data = current_user.username
form.about_me.data = current_user.about_me
-
+
return render_template("users/edit_profile.html", form=form, title="Edit Profile")
-@bp.route("/follow/<username>", methods=['POST'])
+@bp.route("/follow/<username>", methods=["POST"])
@login_required
def follow(username):
form = EmptyForm()
@@ -142,21 +166,21 @@
if user is None:
flash(f"User {username} not found")
- return redirect(url_for('pages.home'))
-
+ return redirect(url_for("pages.home"))
+
if user == current_user:
- flash('You cannot follow yourself')
- return redirect(url_for('users.user', username=username))
-
+ flash("You cannot follow yourself")
+ return redirect(url_for("users.user", username=username))
+
current_user.follow(user)
Database.session.commit()
flash(f"You are following {username}")
- return redirect(url_for('users.user', username=username))
+ return redirect(url_for("users.user", username=username))
else:
- return redirect(url_for('pages.home'))
+ return redirect(url_for("pages.home"))
-@bp.route("/unfollow/<username>", methods=['POST'])
+@bp.route("/unfollow/<username>", methods=["POST"])
@login_required
def unfollow(username):
form = EmptyForm()
@@ -167,26 +191,26 @@
if user is None:
flash(f"User {username} not found")
- return redirect(url_for('pages.home'))
-
+ return redirect(url_for("pages.home"))
+
if user == current_user:
- flash('You cannot unfollow yourself')
- return redirect(url_for('users.user', username=username))
-
+ flash("You cannot unfollow yourself")
+ return redirect(url_for("users.user", username=username))
+
current_user.unfollow(user)
Database.session.commit()
flash(f"You are not following {username}")
- return redirect(url_for('users.user', username=username))
+ return redirect(url_for("users.user", username=username))
else:
- return redirect(url_for('pages.home'))
-
+ return redirect(url_for("pages.home"))
+
-@bp.route('/reset_password_request', methods=['GET', 'POST'])
+@bp.route("/reset_password_request", methods=["GET", "POST"])
def reset_password_request():
if current_user.is_authenticated:
- flash('You are already logged in')
- return redirect(url_for('pages.home'))
-
+ flash("You are already logged in")
+ return redirect(url_for("pages.home"))
+
form = ResetPasswordRequestForm()
if form.validate_on_submit():
user = Database.session.scalar(
@@ -195,30 +219,34 @@
if user:
send_password_reset_email(user)
- flash('Check your email for instructions on how to reset password')
- return redirect(url_for('users.login'))
+ flash("Check your email for instructions on how to reset password")
+ return redirect(url_for("users.login"))
else:
- flash('User does not exist!')
-
- return render_template('users/reset_password_request.html', form=form, title="Reset Password")
+ flash("User does not exist!")
+
+ return render_template(
+ "users/reset_password_request.html", form=form, title="Reset Password"
+ )
-@bp.route('/reset_password/<token>', methods=["GET", "POST"])
+@bp.route("/reset_password/<token>", methods=["GET", "POST"])
def reset_password(token):
if current_user.is_authenticated:
- flash('You are already logged in')
- return redirect(url_for('pages.home'))
-
+ flash("You are already logged in")
+ return redirect(url_for("pages.home"))
+
user = User.verify_reset_password_token(token)
if not user:
- flash('User not found for password reset')
- return redirect(url_for('pages.home'))
+ flash("User not found for password reset")
+ return redirect(url_for("pages.home"))
form = ResetPasswordForm()
if form.validate_on_submit():
user.set_password(form.password.data)
Database.session.commit()
- flash('Your password has been reset')
- return redirect(url_for('users.login'))
-
- return render_template('users/reset_password.html', form=form, token=token, title="Reset Password")
\ No newline at end of file
+ flash("Your password has been reset")
+ return redirect(url_for("users.login"))
+
+ return render_template(
+ "users/reset_password.html", form=form, token=token, title="Reset Password"
+ )
--- gunicorn.conf.py
+++ gunicorn.conf.py
@@ -3,9 +3,11 @@
import multiprocessing
-bind = ':5000'
+bind = ":5000"
workers = multiprocessing.cpu_count()
-if os.getenv('AUTO_RELOAD'):
+if os.getenv("AUTO_RELOAD"):
reload = True
- reload_extra_files = glob('board/templates/**/*.html', recursive=True) + glob('board/static/*.css', recursive=True)
\ No newline at end of file
+ reload_extra_files = glob("board/templates/**/*.html", recursive=True) + glob(
+ "board/static/*.css", recursive=True
+ )
--- migrations/env.py
+++ migrations/env.py
@@ -12,32 +12,31 @@
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
-logger = logging.getLogger('alembic.env')
+logger = logging.getLogger("alembic.env")
def get_engine():
try:
# this works with Flask-SQLAlchemy<3 and Alchemical
- return current_app.extensions['migrate'].db.get_engine()
+ return current_app.extensions["migrate"].db.get_engine()
except (TypeError, AttributeError):
# this works with Flask-SQLAlchemy>=3
- return current_app.extensions['migrate'].db.engine
+ return current_app.extensions["migrate"].db.engine
def get_engine_url():
try:
- return get_engine().url.render_as_string(hide_password=False).replace(
- '%', '%%')
+ return get_engine().url.render_as_string(hide_password=False).replace("%", "%%")
except AttributeError:
- return str(get_engine().url).replace('%', '%%')
+ return str(get_engine().url).replace("%", "%%")
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
-config.set_main_option('sqlalchemy.url', get_engine_url())
-target_db = current_app.extensions['migrate'].db
+config.set_main_option("sqlalchemy.url", get_engine_url())
+target_db = current_app.extensions["migrate"].db
# other values from the config, defined by the needs of env.py,
# can be acquired:
@@ -46,7 +45,7 @@
def get_metadata():
- if hasattr(target_db, 'metadatas'):
+ if hasattr(target_db, "metadatas"):
return target_db.metadatas[None]
return target_db.metadata
@@ -65,8 +64,7 @@
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
- url=url, target_metadata=get_metadata(), literal_binds=True,
- compare_type=True
+ url=url, target_metadata=get_metadata(), literal_binds=True, compare_type=True
)
with context.begin_transaction():
@@ -85,13 +83,13 @@
# when there are no changes to the schema
# reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html
def process_revision_directives(context, revision, directives):
- if getattr(config.cmd_opts, 'autogenerate', False):
+ if getattr(config.cmd_opts, "autogenerate", False):
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
- logger.info('No changes in schema detected.')
+ logger.info("No changes in schema detected.")
- conf_args = current_app.extensions['migrate'].configure_args
+ conf_args = current_app.extensions["migrate"].configure_args
if conf_args.get("process_revision_directives") is None:
conf_args["process_revision_directives"] = process_revision_directives
@@ -99,9 +97,7 @@
with connectable.connect() as connection:
context.configure(
- connection=connection,
- target_metadata=get_metadata(),
- **conf_args
+ connection=connection, target_metadata=get_metadata(), **conf_args
)
with context.begin_transaction():
--- migrations/versions/343d75cb7e92_add_new_user_fields.py
+++ migrations/versions/343d75cb7e92_add_new_user_fields.py
@@ -5,30 +5,33 @@
Create Date: 2025-06-15 12:56:02.953010
"""
+
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
-revision = '343d75cb7e92'
-down_revision = '91d28e4d9bcd'
+revision = "343d75cb7e92"
+down_revision = "91d28e4d9bcd"
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
- with op.batch_alter_table('users', schema=None) as batch_op:
- batch_op.add_column(sa.Column('about_me', sa.String(length=140), nullable=True))
- batch_op.add_column(sa.Column('last_seen', sa.DateTime(timezone=True), nullable=True))
+ with op.batch_alter_table("users", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("about_me", sa.String(length=140), nullable=True))
+ batch_op.add_column(
+ sa.Column("last_seen", sa.DateTime(timezone=True), nullable=True)
+ )
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
- with op.batch_alter_table('users', schema=None) as batch_op:
- batch_op.drop_column('last_seen')
- batch_op.drop_column('about_me')
+ with op.batch_alter_table("users", schema=None) as batch_op:
+ batch_op.drop_column("last_seen")
+ batch_op.drop_column("about_me")
# ### end Alembic commands ###
--- migrations/versions/63b1599550fc_add_username_constraints.py
+++ migrations/versions/63b1599550fc_add_username_constraints.py
@@ -5,28 +5,31 @@
Create Date: 2025-06-15 13:43:31.537580
"""
+
from alembic import op
-import sqlalchemy as sa # noqa: F401
+import sqlalchemy as sa # noqa: F401
# revision identifiers, used by Alembic.
-revision = '63b1599550fc'
-down_revision = '343d75cb7e92'
+revision = "63b1599550fc"
+down_revision = "343d75cb7e92"
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
- with op.batch_alter_table('users', schema=None) as batch_op:
- batch_op.create_index(batch_op.f('ix_users_username'), ['username'], unique=True)
+ with op.batch_alter_table("users", schema=None) as batch_op:
+ batch_op.create_index(
+ batch_op.f("ix_users_username"), ["username"], unique=True
+ )
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
- with op.batch_alter_table('users', schema=None) as batch_op:
- batch_op.drop_index(batch_op.f('ix_users_username'))
+ with op.batch_alter_table("users", schema=None) as batch_op:
+ batch_op.drop_index(batch_op.f("ix_users_username"))
# ### end Alembic commands ###
--- migrations/versions/82d5e7b23e28_add_language_to_post.py
+++ migrations/versions/82d5e7b23e28_add_language_to_post.py
@@ -5,28 +5,29 @@
Create Date: 2025-08-01 15:19:47.077103
"""
+
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
-revision = '82d5e7b23e28'
-down_revision = 'e93ccd60891f'
+revision = "82d5e7b23e28"
+down_revision = "e93ccd60891f"
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
- with op.batch_alter_table('posts', schema=None) as batch_op:
- batch_op.add_column(sa.Column('language', sa.String(length=5), nullable=True))
+ with op.batch_alter_table("posts", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("language", sa.String(length=5), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
- with op.batch_alter_table('posts', schema=None) as batch_op:
- batch_op.drop_column('language')
+ with op.batch_alter_table("posts", schema=None) as batch_op:
+ batch_op.drop_column("language")
# ### end Alembic commands ###
--- migrations/versions/91d28e4d9bcd_init.py
+++ migrations/versions/91d28e4d9bcd_init.py
@@ -1,16 +1,17 @@
"""Setup Database
Revision ID: 91d28e4d9bcd
-Revises:
+Revises:
Create Date: 2025-06-13 14:00:11.591660
"""
+
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
-revision = '91d28e4d9bcd'
+revision = "91d28e4d9bcd"
down_revision = None
branch_labels = None
depends_on = None
@@ -18,38 +19,43 @@
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
- op.create_table('users',
- sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
- sa.Column('username', sa.String(length=256), nullable=False),
- sa.Column('email', sa.String(), nullable=False),
- sa.Column('password_hash', sa.String(length=256), nullable=True),
- sa.PrimaryKeyConstraint('id')
+ op.create_table(
+ "users",
+ sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
+ sa.Column("username", sa.String(length=256), nullable=False),
+ sa.Column("email", sa.String(), nullable=False),
+ sa.Column("password_hash", sa.String(length=256), nullable=True),
+ sa.PrimaryKeyConstraint("id"),
)
- with op.batch_alter_table('users', schema=None) as batch_op:
- batch_op.create_index(batch_op.f('ix_users_email'), ['email'], unique=True)
+ with op.batch_alter_table("users", schema=None) as batch_op:
+ batch_op.create_index(batch_op.f("ix_users_email"), ["email"], unique=True)
- op.create_table('posts',
- sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
- sa.Column('message', sa.Text(), nullable=False),
- sa.Column('created', sa.DateTime(timezone=True), nullable=True),
- sa.Column('user_id', sa.Integer(), nullable=True),
- sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
- sa.PrimaryKeyConstraint('id')
+ op.create_table(
+ "posts",
+ sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
+ sa.Column("message", sa.Text(), nullable=False),
+ sa.Column("created", sa.DateTime(timezone=True), nullable=True),
+ sa.Column("user_id", sa.Integer(), nullable=True),
+ sa.ForeignKeyConstraint(
+ ["user_id"],
+ ["users.id"],
+ ),
+ sa.PrimaryKeyConstraint("id"),
)
- with op.batch_alter_table('posts', schema=None) as batch_op:
- batch_op.create_index(batch_op.f('ix_posts_user_id'), ['user_id'], unique=False)
+ with op.batch_alter_table("posts", schema=None) as batch_op:
+ batch_op.create_index(batch_op.f("ix_posts_user_id"), ["user_id"], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
- with op.batch_alter_table('posts', schema=None) as batch_op:
- batch_op.drop_index(batch_op.f('ix_posts_user_id'))
+ with op.batch_alter_table("posts", schema=None) as batch_op:
+ batch_op.drop_index(batch_op.f("ix_posts_user_id"))
- op.drop_table('posts')
- with op.batch_alter_table('users', schema=None) as batch_op:
- batch_op.drop_index(batch_op.f('ix_users_email'))
+ op.drop_table("posts")
+ with op.batch_alter_table("users", schema=None) as batch_op:
+ batch_op.drop_index(batch_op.f("ix_users_email"))
- op.drop_table('users')
+ op.drop_table("users")
# ### end Alembic commands ###
--- migrations/versions/e93ccd60891f_add_followers.py
+++ migrations/versions/e93ccd60891f_add_followers.py
@@ -5,38 +5,50 @@
Create Date: 2025-06-16 16:27:23.142554
"""
+
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
-revision = 'e93ccd60891f'
-down_revision = '63b1599550fc'
+revision = "e93ccd60891f"
+down_revision = "63b1599550fc"
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
- op.create_table('followers',
- sa.Column('follower_id', sa.Integer(), nullable=False),
- sa.Column('followed_id', sa.Integer(), nullable=False),
- sa.ForeignKeyConstraint(['followed_id'], ['users.id'], ),
- sa.ForeignKeyConstraint(['follower_id'], ['users.id'], ),
- sa.PrimaryKeyConstraint('follower_id', 'followed_id')
+ op.create_table(
+ "followers",
+ sa.Column("follower_id", sa.Integer(), nullable=False),
+ sa.Column("followed_id", sa.Integer(), nullable=False),
+ sa.ForeignKeyConstraint(
+ ["followed_id"],
+ ["users.id"],
+ ),
+ sa.ForeignKeyConstraint(
+ ["follower_id"],
+ ["users.id"],
+ ),
+ sa.PrimaryKeyConstraint("follower_id", "followed_id"),
)
- with op.batch_alter_table('followers', schema=None) as batch_op:
- batch_op.create_index(batch_op.f('ix_followers_followed_id'), ['followed_id'], unique=False)
- batch_op.create_index(batch_op.f('ix_followers_follower_id'), ['follower_id'], unique=False)
+ with op.batch_alter_table("followers", schema=None) as batch_op:
+ batch_op.create_index(
+ batch_op.f("ix_followers_followed_id"), ["followed_id"], unique=False
+ )
+ batch_op.create_index(
+ batch_op.f("ix_followers_follower_id"), ["follower_id"], unique=False
+ )
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
- with op.batch_alter_table('followers', schema=None) as batch_op:
- batch_op.drop_index(batch_op.f('ix_followers_follower_id'))
- batch_op.drop_index(batch_op.f('ix_followers_followed_id'))
+ with op.batch_alter_table("followers", schema=None) as batch_op:
+ batch_op.drop_index(batch_op.f("ix_followers_follower_id"))
+ batch_op.drop_index(batch_op.f("ix_followers_followed_id"))
- op.drop_table('followers')
+ op.drop_table("followers")
# ### end Alembic commands ###
--- test_bedrock.py
+++ test_bedrock.py
@@ -25,29 +25,16 @@
"""
req = {
- "messages": [
- {
- "role": "user",
- "content": [
- {
- "text": prompt
- }
- ]
- }
- ],
- "inferenceConfig": {
- "temperature": 0.1,
- "topP": 0.9,
- "maxTokens": 10240
- }
+ "messages": [{"role": "user", "content": [{"text": prompt}]}],
+ "inferenceConfig": {"temperature": 0.1, "topP": 0.9, "maxTokens": 10240},
}
bedrock_response = bedrock_client.invoke_model(
- modelId = "us.amazon.nova-micro-v1:0",
+ modelId="us.amazon.nova-micro-v1:0",
body=json.dumps(req),
- contentType="application/json"
+ contentType="application/json",
)
resp = json.loads(bedrock_response["body"].read())
- translation = resp["output"]["message"]["content"][0]['text']
+ translation = resp["output"]["message"]["content"][0]["text"]
print(translation)
--- tests/conftest.py
+++ tests/conftest.py
@@ -4,13 +4,15 @@
from board.database import Database, User
-@pytest.fixture(scope='session')
+@pytest.fixture(scope="session")
def app():
app = create_app()
- app.config.update({
- "TESTING": True,
- "WTF_CSRF_ENABLED": False # needed to disable CSRF protection in forms else tests will fail
- })
+ app.config.update(
+ {
+ "TESTING": True,
+ "WTF_CSRF_ENABLED": False, # needed to disable CSRF protection in forms else tests will fail
+ }
+ )
# Other setup can go here
app.register_blueprint(pages.bp, name="testpages")
@@ -22,7 +24,7 @@
username="chee",
email="[email protected]",
)
-
+
user.set_password("testing")
Database.session.add(user)
Database.session.commit()
@@ -37,48 +39,48 @@
Database.session.commit()
-@pytest.fixture(scope='session')
+@pytest.fixture(scope="session")
def client(app):
return app.test_client()
-@pytest.fixture(scope='session')
+@pytest.fixture(scope="session")
def new_user():
- user = User(email='[email protected]',username='New User')
+ user = User(email="[email protected]", username="New User")
return user
-@pytest.fixture(scope='session')
+@pytest.fixture(scope="session")
def second_user():
- user = User(email='[email protected]', username='Second User')
+ user = User(email="[email protected]", username="Second User")
return user
-@pytest.fixture(scope='session')
+@pytest.fixture(scope="session")
def test_user(app):
with app.app_context():
- user = Database.session.scalar(
+ user = Database.session.scalar(
Database.select(User).where(User.email == "[email protected]")
)
return user
-@pytest.fixture(scope='function')
+@pytest.fixture(scope="function")
def login_user(app, test_user):
app.test_client_class = FlaskLoginClient
with app.test_client(user=test_user) as client:
yield client
- client.get('/logout')
+ client.get("/logout")
-# Sets the session ID directly
-@pytest.fixture(scope='function')
+# Sets the session ID directly
+@pytest.fixture(scope="function")
def login_user_via_session(app, client, test_user):
with client.session_transaction() as session:
- session['_user_id'] = test_user.id
-
+ session["_user_id"] = test_user.id
+
yield client
-
- client.get('/logout')
+
+ client.get("/logout")
--- tests/functional/test_pages.py
+++ tests/functional/test_pages.py
@@ -4,14 +4,14 @@
def test_request_home(client):
response = client.get("/", follow_redirects=True)
assert response.status_code == 200
- assert b'Login' in response.data
+ assert b"Login" in response.data
def test_request_home_authenticated(login_user_via_session):
resp = login_user_via_session.get("/")
- assert b'/user/chee' in resp.data
- assert b'Logout' in resp.data
- assert b'Say something' in resp.data
+ assert b"/user/chee" in resp.data
+ assert b"Logout" in resp.data
+ assert b"Say something" in resp.data
# The login_user is for logging in
@@ -21,20 +21,17 @@
# Use the FlaskLoginCLient session_transaction block to retrieve the saved session user ID
# Can also replace the session['_user_id'] directly to simulate a logged in user...
- #
+ #
with login_user.session_transaction() as session:
- session_id = int(session['_user_id'])
+ session_id = int(session["_user_id"])
with app.app_context():
session_user = Database.session.scalar(
Database.select(User).where(User.id == session_id)
)
- new_user.set_password('MyNewPassword')
- post = Post(
- message='A test post',
- author=new_user
- )
+ new_user.set_password("MyNewPassword")
+ post = Post(message="A test post", author=new_user)
Database.session.add(new_user)
Database.session.add(post)
Database.session.commit()
@@ -57,4 +54,4 @@
def test_not_found(client):
response = client.get("/nan")
assert response.status_code == 404
- assert b"Page not found" in response.data
\ No newline at end of file
+ assert b"Page not found" in response.data
--- tests/functional/test_posts.py
+++ tests/functional/test_posts.py
@@ -5,7 +5,7 @@
"""
response = client.get("/posts/new", follow_redirects=True)
assert response.status_code == 200
- assert b'Please log in to access this page' in response.data
+ assert b"Please log in to access this page" in response.data
def test_get_new_posts_authenticated(login_user):
@@ -25,45 +25,42 @@
Test /posts/create without valid session user
Should redirect to login page
"""
- response = client.post("/posts/create",
- follow_redirects=True)
+ response = client.post("/posts/create", follow_redirects=True)
assert response.status_code == 200
assert len(response.history) == 1
- assert b'Please log in to access this page' in response.data
+ assert b"Please log in to access this page" in response.data
def test_post_create_authenticated(login_user):
- response = login_user.post("/posts/create",
- data={"message": "Test Message"},
- follow_redirects=True)
+ response = login_user.post(
+ "/posts/create", data={"message": "Test Message"}, follow_redirects=True
+ )
assert response.status_code == 200
assert len(response.history) == 1
- assert b'Thanks for posting' in response.data
- assert b'Test Message' in response.data
+ assert b"Thanks for posting" in response.data
+ assert b"Test Message" in response.data
def test_post_create_invalid_authenticated(login_user):
- response = login_user.post("/posts/create",
- data={"message": None},
- follow_redirects=True)
+ response = login_user.post(
+ "/posts/create", data={"message": None}, follow_redirects=True
+ )
assert response.status_code == 200
- assert b'Message cannot be blank' in response.data
+ assert b"Message cannot be blank" in response.data
def test_post_list(client):
- response = client.get("/posts",
- follow_redirects=True)
+ response = client.get("/posts", follow_redirects=True)
assert response.status_code == 200
assert len(response.history) == 1
- assert b'Please log in to access this page'
+ assert b"Please log in to access this page"
def test_post_list_authenticated(login_user):
- login_user.post("/posts/create",
- data={"message": "Test Message New"})
-
+ login_user.post("/posts/create", data={"message": "Test Message New"})
+
response = login_user.get("/posts")
assert response.status_code == 200
- assert b'Posts' in response.data
- assert b'Test Message New' in response.data
+ assert b"Posts" in response.data
+ assert b"Test Message New" in response.data
assert b'<a href="/user/chee">chee</a> said' in response.data
--- tests/functional/test_users.py
+++ tests/functional/test_users.py
@@ -1,156 +1,213 @@
def test_get_login(client):
resp = client.get("/login")
assert resp.status_code == 200
- assert b'Sign In' in resp.data
- assert b'Email' in resp.data
- assert b'Password' in resp.data
- assert b'Click to Register' in resp.data
+ assert b"Sign In" in resp.data
+ assert b"Email" in resp.data
+ assert b"Password" in resp.data
+ assert b"Click to Register" in resp.data
def test_post_login(client):
- resp = client.post("/login",
- data={"email": "[email protected]", "password": "testing"},
- follow_redirects=True)
+ resp = client.post(
+ "/login",
+ data={"email": "[email protected]", "password": "testing"},
+ follow_redirects=True,
+ )
assert resp.status_code == 200
- assert b'Hi, chee' in resp.data
- assert b'Logout' in resp.data
+ assert b"Hi, chee" in resp.data
+ assert b"Logout" in resp.data
client.get("/logout")
def test_post_login_invalid_password(client):
- resp = client.post("/login",
- data={"email": "[email protected]", "password": "wrong"},
- follow_redirects=True)
-
+ resp = client.post(
+ "/login",
+ data={"email": "[email protected]", "password": "wrong"},
+ follow_redirects=True,
+ )
+
assert resp.status_code == 200
- assert b'Invalid username or password' in resp.data
- assert b'Sign In' in resp.data
+ assert b"Invalid username or password" in resp.data
+ assert b"Sign In" in resp.data
def test_post_login_invalid_email(client):
- resp = client.post("/login",
- data={"email": "[email protected]", "password": "testing"},
- follow_redirects=True)
+ resp = client.post(
+ "/login",
+ data={"email": "[email protected]", "password": "testing"},
+ follow_redirects=True,
+ )
assert resp.status_code == 200
- assert b'Invalid username or password' in resp.data
- assert b'Sign In' in resp.data
+ assert b"Invalid username or password" in resp.data
+ assert b"Sign In" in resp.data
def test_post_login_if_authenticated(login_user):
- resp = login_user.post("login",
- data={"email": "[email protected]", "password": "testing"},
- follow_redirects=True)
+ resp = login_user.post(
+ "login",
+ data={"email": "[email protected]", "password": "testing"},
+ follow_redirects=True,
+ )
assert resp.status_code == 200
- assert b'Logout' in resp.data
+ assert b"Logout" in resp.data
def test_get_logout(login_user):
- resp = login_user.get("/logout",
- follow_redirects=True)
+ resp = login_user.get("/logout", follow_redirects=True)
assert resp.status_code == 200
- assert b'Login' in resp.data
+ assert b"Login" in resp.data
def test_get_register(client):
resp = client.get("/register")
assert resp.status_code == 200
- assert b'Register' in resp.data
- assert b'Username' in resp.data
- assert b'Password' in resp.data
- assert b'Repeat Password' in resp.data
+ assert b"Register" in resp.data
+ assert b"Username" in resp.data
+ assert b"Password" in resp.data
+ assert b"Repeat Password" in resp.data
def test_post_register(client):
- resp = client.post("/register",
- data={"username": "second user", "email": "[email protected]", "password": "testing", "password2": "testing"},
- follow_redirects=True)
+ resp = client.post(
+ "/register",
+ data={
+ "username": "second user",
+ "email": "[email protected]",
+ "password": "testing",
+ "password2": "testing",
+ },
+ follow_redirects=True,
+ )
assert resp.status_code == 200
- assert b'You have registered successfully' in resp.data
- assert b'Sign In' in resp.data
+ assert b"You have registered successfully" in resp.data
+ assert b"Sign In" in resp.data
- resp = client.post("/login",
- data={"email": "[email protected]", "password": "testing"},
- follow_redirects=True)
+ resp = client.post(
+ "/login",
+ data={"email": "[email protected]", "password": "testing"},
+ follow_redirects=True,
+ )
assert resp.status_code == 200
- assert b'Hi, second user' in resp.data
- assert b'Logout' in resp.data
+ assert b"Hi, second user" in resp.data
+ assert b"Logout" in resp.data
client.get("/logout")
def test_post_register_missing_username(client):
- resp = client.post("/register",
- data={"email": "[email protected]", "password": "testing", "password2": "testing"},
- follow_redirects=True)
- assert b'Register' in resp.data
- assert b'This field is required' in resp.data
+ resp = client.post(
+ "/register",
+ data={
+ "email": "[email protected]",
+ "password": "testing",
+ "password2": "testing",
+ },
+ follow_redirects=True,
+ )
+ assert b"Register" in resp.data
+ assert b"This field is required" in resp.data
def test_post_register_invalid_email(client):
- resp = client.post("/register",
- data={"username": "test user", "email": "none", "password": "testing", "password2": "testing"},
- follow_redirects=True)
- assert b'Register' in resp.data
- assert b'Invalid email address' in resp.data
+ resp = client.post(
+ "/register",
+ data={
+ "username": "test user",
+ "email": "none",
+ "password": "testing",
+ "password2": "testing",
+ },
+ follow_redirects=True,
+ )
+ assert b"Register" in resp.data
+ assert b"Invalid email address" in resp.data
def test_post_register_exists_username(client):
- resp = client.post("/register",
- data={"username": "second user", "email": "[email protected]", "password": "testing", "password2": "testing"},
- follow_redirects=True)
- assert b'Register' in resp.data
- assert b'Please use a different username' in resp.data
+ resp = client.post(
+ "/register",
+ data={
+ "username": "second user",
+ "email": "[email protected]",
+ "password": "testing",
+ "password2": "testing",
+ },
+ follow_redirects=True,
+ )
+ assert b"Register" in resp.data
+ assert b"Please use a different username" in resp.data
def test_post_register_exists_email(client):
- resp = client.post("/register",
- data={"username": "third user", "email": "[email protected]", "password": "testing", "password2": "testing"},
- follow_redirects=True)
- assert b'Register' in resp.data
- assert b'Please use a different email address' in resp.data
+ resp = client.post(
+ "/register",
+ data={
+ "username": "third user",
+ "email": "[email protected]",
+ "password": "testing",
+ "password2": "testing",
+ },
+ follow_redirects=True,
+ )
+ assert b"Register" in resp.data
+ assert b"Please use a different email address" in resp.data
def test_post_register_missing_password(client):
- resp = client.post("/register",
- data={"username": "third user", "email": "[email protected]"},
- follow_redirects=True)
- assert b'Register' in resp.data
- assert b'This field is required' in resp.data
+ resp = client.post(
+ "/register",
+ data={"username": "third user", "email": "[email protected]"},
+ follow_redirects=True,
+ )
+ assert b"Register" in resp.data
+ assert b"This field is required" in resp.data
def test_post_register_mismatch_password(client):
- resp = client.post("/register",
- data={"username": "third user", "email": "[email protected]", "password": "testing", "password2": "no match"},
- follow_redirects=True)
- assert b'Register' in resp.data
- assert b'Field must be equal to password' in resp.data
+ resp = client.post(
+ "/register",
+ data={
+ "username": "third user",
+ "email": "[email protected]",
+ "password": "testing",
+ "password2": "no match",
+ },
+ follow_redirects=True,
+ )
+ assert b"Register" in resp.data
+ assert b"Field must be equal to password" in resp.data
def test_reset_password_request_when_authenticated(login_user):
- resp = login_user.get('/reset_password_request', follow_redirects=True)
+ resp = login_user.get("/reset_password_request", follow_redirects=True)
assert resp.status_code == 200
- assert b'You are already logged in' in resp.data
+ assert b"You are already logged in" in resp.data
def test_reset_password_request(client):
- resp = client.post("/reset_password_request",
- data={"email": "[email protected]"},
- follow_redirects=True)
+ resp = client.post(
+ "/reset_password_request",
+ data={"email": "[email protected]"},
+ follow_redirects=True,
+ )
assert resp.status_code == 200
- assert b'Check your email for instructions on how to reset password' in resp.data
+ assert b"Check your email for instructions on how to reset password" in resp.data
def test_reset_password_request_no_user(client):
- resp = client.post("/reset_password_request",
- data={"email": "[email protected]"},
- follow_redirects=True)
+ resp = client.post(
+ "/reset_password_request",
+ data={"email": "[email protected]"},
+ follow_redirects=True,
+ )
assert resp.status_code == 200
- assert b'User does not exist' in resp.data
+ assert b"User does not exist" in resp.data
def test_reset_password_when_authenticated(login_user):
- resp = login_user.get('/reset_password/token', follow_redirects=True)
+ resp = login_user.get("/reset_password/token", follow_redirects=True)
assert resp.status_code == 200
- assert b'You are already logged in' in resp.data
+ assert b"You are already logged in" in resp.data
+
-# TODO: Need to mock out jwt and User.verify_reset_password_token to continue testinng the reset password route
\ No newline at end of file
+# TODO: Need to mock out jwt and User.verify_reset_password_token to continue testinng the reset password route
--- tests/unit/test_models.py
+++ tests/unit/test_models.py
@@ -16,16 +16,16 @@
def test_set_password(new_user):
- new_user.set_password('MyNewPassword')
- assert new_user.password_hash != 'MyNewPassword'
- assert new_user.check_password('MyNewPassword')
- assert not new_user.check_password('Wrong')
+ new_user.set_password("MyNewPassword")
+ assert new_user.password_hash != "MyNewPassword"
+ assert new_user.check_password("MyNewPassword")
+ assert not new_user.check_password("Wrong")
def test_has_many_posts(app, new_user):
with app.app_context():
- new_user.set_password('MyNewPassword')
- post = Post(message='A test post', author=new_user)
+ new_user.set_password("MyNewPassword")
+ post = Post(message="A test post", author=new_user)
post2 = Post(message="A second post", author=new_user)
Database.session.add(new_user)
@@ -40,10 +40,10 @@
def test_user_avatar(new_user):
- hashed = md5(new_user.email.lower().encode('utf-8')).hexdigest()
+ hashed = md5(new_user.email.lower().encode("utf-8")).hexdigest()
profile_pic = new_user.avatar(20)
assert hashed in profile_pic
- assert '20' in profile_pic
+ assert "20" in profile_pic
def test_user_following(app, new_user, second_user):
@@ -52,7 +52,7 @@
Database.session.add(second_user)
Database.session.commit()
new_user.follow(second_user)
-
+
assert new_user.is_following(second_user)
assert new_user.following_count() == 1
assert second_user.followers_count() == 1
@@ -104,12 +104,13 @@
# Tests that no user is returned as jwt.decode failed
assert User.verify_reset_password_token(token) is None
- mock_logger.assert_called_once_with("Failure in JWT decode of password reset token")
+ mock_logger.assert_called_once_with(
+ "Failure in JWT decode of password reset token"
+ )
-
def test_post(new_user):
post = Post(message="Test Post", author=new_user, id=123)
assert post.author == new_user
assert post.message == "Test Post"
- assert post.__repr__() == f"<Post: {post.id}>"
\ No newline at end of file
+ assert post.__repr__() == f"<Post: {post.id}>"
Created
August 18, 2025 18:38
-
-
Save cheeyeo/6179f43f5eb18f6b23f4522718f7d221 to your computer and use it in GitHub Desktop.
Uusing ```diff``` in github markdown
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment