Skip to content

Instantly share code, notes, and snippets.

@naveen1337
Created November 3, 2024 17:25
Show Gist options
  • Select an option

  • Save naveen1337/ca51b3d42ee895f63af1a28f11cade88 to your computer and use it in GitHub Desktop.

Select an option

Save naveen1337/ca51b3d42ee895f63af1a28f11cade88 to your computer and use it in GitHub Desktop.
from configs.db_connection import release_conn
from pydantic import BaseModel
from psycopg2 import sql, extras
import logging
from constants.function_debug_codes import db_query_codes
from constants.app_constants import __ENV__
import traceback
async def get_rows(
cursor, tbl, select, where_col, where_value, name=None, inc_del=False, limit=None
):
"""it will return rows in List
* if no records found it will return err_text as "NOT_FOUND"
"""
# TODO: Check is there function expect [] as error response
debug_code = db_query_codes["get_rows"]
try:
query = sql.SQL("SELECT {fields} FROM {tbl} WHERE {column} = {value}").format(
fields=sql.SQL(",").join(map(sql.Identifier, select)),
tbl=sql.Identifier(tbl),
column=sql.Identifier(where_col),
value=sql.Literal(where_value),
)
qs = query.as_string(cursor)
# print(qs)
if inc_del is False:
qs += " AND deleted_at is null"
# return qs
cursor.execute(qs)
db_res = cursor.fetchall()
if len(db_res) == 0:
return None, "NOT_FOUND"
if limit == None:
# db_res = cursor.fetchall()
return db_res,None
if limit == 1:
# db_res = cursor.fetchall()
# print("db_res",db_res)
return db_res[0],None
if limit and limit != 1:
db_res = cursor.fetchmany(limit)
return db_res,None
# if len(db_res) > 0:
# return db_res, None
return None, "unhandled get_rows"
except Exception:
logging.error(traceback.format_exc())
return None, debug_code
async def get_rows_two_where_and(
cursor,
tbl,
select,
where_col_1,
where_value_1,
where_col_2,
where_value_2,
name=None,
inc_del=False,
limit=None,
):
"""it will return rows in List
* if no records found it will return err_text as "NOT_FOUND"
"""
# TODO: Check is there function expect [] as error response
debug_code = db_query_codes["get_rows_two_where_and"]
try:
query = sql.SQL(
"SELECT {fields} FROM {tbl} WHERE {where_col_1} = {where_value_1} AND {where_col_2} = {where_value_2} "
).format(
fields=sql.SQL(",").join(map(sql.Identifier, select)),
tbl=sql.Identifier(tbl),
where_col_1=sql.Identifier(where_col_1),
where_value_1=sql.Literal(where_value_1),
where_col_2=sql.Identifier(where_col_2),
where_value_2=sql.Literal(where_value_2),
)
qs = query.as_string(cursor)
if inc_del is False:
qs += " AND deleted_at is null"
# return qs
cursor.execute(qs)
db_res = cursor.fetchall()
if len(db_res) == 0:
return None, "NOT_FOUND"
if limit == 1:
return db_res[0], None
if len(db_res) > 0:
return db_res, None
return None, "unhandled get_rows"
except Exception:
logging.error(traceback.format_exc())
return None, debug_code
async def get_rows_where_in(
cursor, tbl, select, where_col, where_in_value, name=None, inc_del=False
):
debug_code = db_query_codes["get_rows_where_in"]
try:
query = sql.SQL("SELECT {fields} FROM {tbl} WHERE {column} IN {value}").format(
fields=sql.SQL(",").join(map(sql.Identifier, select)),
tbl=sql.Identifier(tbl),
column=sql.Identifier(where_col),
value=sql.Literal(where_in_value),
)
qs = query.as_string(cursor)
if inc_del is False:
qs += " AND deleted_at is null"
cursor.execute(qs)
db_res = cursor.fetchall()
if len(db_res) == 0:
return None, "NOT_FOUND"
if len(db_res) > 0:
return db_res, None
return None, "unhandled get_rows_where_in"
except Exception:
logging.error(traceback.format_exc())
return None, get_rows_where_in
async def update_row(
cursor: any,
tbl: str,
column: str,
value: str,
where_col: str,
where_value: any,
name=None,
returning=None,
limit=None,
):
debug_code = db_query_codes["update_row"]
try:
query = sql.SQL(
"UPDATE {tbl} SET {column} = {value} WHERE {where_col} = {where_value}"
).format(
tbl=sql.Identifier(tbl),
column=sql.Identifier(column),
value=sql.Literal(value),
where_col=sql.Identifier(where_col),
where_value=sql.Literal(where_value),
)
qs = query.as_string(cursor)
if returning:
qs += f""" RETURNING {', '.join(returning)} """
cursor.execute(qs)
if returning:
db_res = cursor.fetchall()
if (len(db_res) == 0):
return None,"NO_UPDATES_MADE"
if limit == 1:
return db_res[0], None
return db_res, None
else:
return True, None
except Exception:
logging.error(traceback.format_exc())
return None,debug_code
async def get_rows_no_where(
cursor, select: list(), tbl: str, name=None, inc_del=False, limit=None
):
"""
Args:
cursor* : db cursor
select* : list
tbl*: Strig
"""
debug_code = db_query_codes["get_rows_no_where"]
try:
query = sql.SQL("SELECT {fields} FROM {table}").format(
fields=sql.SQL(",").join(map(sql.Identifier, select)),
table=sql.Identifier(tbl),
)
qs = query.as_string(cursor)
if inc_del is False:
qs += " WHERE deleted_at is null"
# return qs
cursor.execute(qs)
db_res = cursor.fetchall()
if len(db_res) == 0:
return None, "NOT_FOUND"
if limit == 1:
return db_res[0], None
if len(db_res) > 0:
return db_res, None
except Exception:
logging.error(traceback.format_exc())
return None, debug_code
async def get_rows_custom_query(cursor, query):
"""takes a sql query and execute and returns all the response.
Args:
cursor: a database cursor
query: String :: a sql string
Returns: List
list of object from database
"""
debug_code = db_query_codes["get_rows_custom_query"]
try:
# print(query)
cursor.execute(query)
db_res = cursor.fetchall()
# print(db_res)
if len(db_res) == 0:
return None, "NOT_FOUND"
return db_res, None
except Exception:
logging.error(traceback.format_exc())
return None,debug_code
async def is_row_exists_by_value(cursor, tbl, column, value, inc_del=False):
debug_code = db_query_codes["is_row_exists_by_value"]
try:
query = sql.SQL(
"SELECT EXISTS (SELECT from {tbl} WHERE {column}= {value})"
).format(
tbl=sql.Identifier(tbl),
column=sql.Identifier(column),
value=sql.Literal(value),
)
qs = query.as_string(cursor)
if inc_del is False:
qs += " AND WHERE deleted_at is null"
cursor.execute(qs)
db_res = cursor.fetchall()
if db_res[0]["exists"] is True:
return True, None
if db_res[0]["exists"] is False:
return False, None
return None, "unhandled"
except Exception:
logging.error(traceback.format_exc())
return None, debug_code
async def search_rows_by_value(
cursor, tbl, select, column, value, inc_del=False,limit=100
):
debug_code = db_query_codes["search_rows_by_value"]
input_value = value.replace(" ","&")
try:
query = sql.SQL(
"""
SELECT {fields}
FROM {tbl}
WHERE {column} @@ (
SELECT to_tsquery('english', {value})
)
"""
).format(
tbl=sql.Identifier(tbl),
fields=sql.SQL(",").join(map(sql.Identifier, select)),
column=sql.Identifier(column),
value=sql.Literal(f"{input_value}:*"),
)
qs = query.as_string(cursor)
if inc_del is False:
qs += " AND deleted_at is null"
cursor.execute(qs)
db_res = cursor.fetchmany(limit)
if len(db_res) == 0:
return None, "NOT_FOUND"
return db_res, None
except Exception:
logging.error(traceback.format_exc())
return None, debug_code
async def search_rows_by_value_with_where(
cursor, tbl, select, column, value, where_col, where_value, inc_del=False,limit=100
):
debug_code = db_query_codes["search_rows_by_value_with_where"]
input_value = value.replace(" ","&")
try:
query = sql.SQL(
"""
SELECT {fields}
FROM {tbl}
WHERE {column} @@ (
SELECT to_tsquery('english', {value})
)
AND {where_col} = {where_value}
"""
).format(
tbl=sql.Identifier(tbl),
fields=sql.SQL(",").join(map(sql.Identifier, select)),
column=sql.Identifier(column),
value=sql.Literal(f"{input_value}:*"),
where_col=sql.Identifier(where_col),
where_value=sql.Literal(where_value),
)
qs = query.as_string(cursor)
if inc_del is False:
qs += " AND deleted_at is null"
cursor.execute(qs)
db_res = cursor.fetchmany(limit)
if len(db_res) == 0:
return None, "NOT_FOUND"
return db_res, None
except Exception:
logging.error(traceback.format_exc())
return None, debug_code
async def count_rows(cursor, tbl, column, inc_del=False):
debug_code = db_query_codes["count_rows"]
try:
query = sql.SQL("SELECT COUNT({column}) from {tbl}").format(
tbl=sql.Identifier(tbl),
column=sql.Identifier(column),
)
qs = query.as_string(cursor)
if inc_del is False:
qs += " WHERE deleted_at is null"
cursor.execute(qs)
db_res = cursor.fetchall()
return db_res[0]["count"], None
except Exception:
logging.error(traceback.format_exc())
return None, debug_code
async def count_rows_by_condition(cursor, tbl,column, where_col, where_value, inc_del=False):
debug_code = db_query_codes["count_rows_by_condition"]
try:
query = sql.SQL(
"SELECT COUNT({column}) from {tbl} WHERE {where_col} = {where_value}"
).format(
tbl=sql.Identifier(tbl),
column=sql.Identifier(column),
where_col=sql.Identifier(where_col),
where_value=sql.Literal(where_value),
)
qs = query.as_string(cursor)
if inc_del is False:
qs += " AND deleted_at is null"
cursor.execute(qs)
db_res = cursor.fetchall()
return db_res[0]["count"], None
except Exception:
logging.error(traceback.format_exc())
return None, debug_code
async def update_many_columns(
cursor, tbl, new_data, where_col, where_value,limit=None, returning=None
):
debug_code = db_query_codes["update_many_columns"]
try:
query = sql.SQL("UPDATE {tbl} SET {new_data} WHERE {where_col} = {where_value}").format(
tbl=sql.Identifier(tbl),
where_col=sql.Identifier(where_col),
where_value=sql.Literal(where_value),
new_data=sql.SQL(", ").join(
sql.Composed(
[sql.Identifier(key), sql.SQL(" = "), sql.Placeholder(key)]
)
for key in new_data.keys()
),
)
qs = query.as_string(cursor)
if returning:
qs += f""" RETURNING {', '.join(returning)} """
cursor.execute(qs, new_data)
db_res = None
if returning:
db_res = cursor.fetchall()
if (len(db_res) == 0):
return None,"NO_UPDATES_MADE"
if limit == 1:
return db_res[0], None
if len(db_res) > 0:
return db_res, None
else:
return True,None
# db_res = cursor.fetchall()
# if (len(db_res) == 0):
# return None,"NO_UPDATES_MADE"
# if len(db_res) > 0:
# return True, None
except Exception:
logging.error(traceback.format_exc())
return None, debug_code
async def insert_row(
cursor,
tbl: str,
fields: list,
values: list,
name=None,
returning=None,
):
"""
Args:
cursor = a databse cursor.
tbl = STRING, Table name.
fields = TUPLE.
values = TUPLE.
returning = LIST : list of returning type
"""
debug_code = db_query_codes["insert_row"]
try:
query = sql.SQL("INSERT INTO {tbl} ({fields}) values ({values})").format(
tbl=sql.Identifier(tbl),
fields=sql.SQL(", ").join(map(sql.Identifier, fields)),
values=sql.SQL(", ").join(sql.Placeholder() * len(values)),
)
qs = query.as_string(cursor)
if returning:
qs += f""" RETURNING {', '.join(returning)} """
cursor.execute(qs, values)
if returning:
new_row = dict(cursor.fetchone())
return new_row, None
return True, None
except Exception:
logging.error(traceback.format_exc())
return None, debug_code
async def delete_row(
cursor: any,
tbl: str,
where_col: str,
where_value: str,
limit=None,
returning=None,
):
debug_code = db_query_codes["delete_row"]
try:
query = sql.SQL("DELETE FROM {tbl} WHERE {where_col} = {where_value}").format(
tbl=sql.Identifier(tbl),
where_col=sql.Identifier(where_col),
where_value=sql.Literal(where_value),
)
qs = query.as_string(cursor)
if returning:
qs += f""" RETURNING {', '.join(returning)} """
cursor.execute(qs)
if returning:
res_row = cursor.fetchall()
if len(res_row) == 0:
return None, "NOT_FOUND"
if limit == 1:
return res_row[0], None
if len(res_row) > 0:
return res_row, None
return True, None
except Exception:
logging.error(traceback.format_exc())
return None, debug_code
async def insert_many_rows(cursor, tbl, fields, values, returning=None):
debug_code = db_query_codes["insert_many_rows"]
try:
qs = sql.SQL("INSERT INTO {tbl} ({fields}) values %s").format(
tbl=sql.Identifier(tbl),
fields=sql.SQL(", ").join(map(sql.Identifier, fields)),
)
query = qs.as_string(cursor)
if returning:
query += f""" RETURNING {', '.join(returning)} """
insert_res = extras.execute_values(
cursor, query, values, template=None, page_size=100, fetch=True if returning else False
)
if returning:
return insert_res, None
return True, None
except Exception:
logging.error(traceback.format_exc())
return None, debug_code
# Should be disable in Production [[[CAUTION]]]
async def run_sql_query(cursor,sql,fetch:bool):
try:
cursor.execute(sql)
if fetch:
db_res = cursor.fetchall()
return db_res, None
else:
return True,None
except Exception:
logging.error(traceback.format_exc())
return None, "error on run_sql_query"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment