Created
November 3, 2024 17:25
-
-
Save naveen1337/ca51b3d42ee895f63af1a28f11cade88 to your computer and use it in GitHub Desktop.
Revisions
-
naveen1337 created this gist
Nov 3, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,565 @@ from configs.db_connection import release_conn from pydantic import BaseModel from psycopg2 import sql, extras import logging from constants.function_debug_codes import db_query_codes from constants.app_constants import __ENV__ import traceback async def get_rows( cursor, tbl, select, where_col, where_value, name=None, inc_del=False, limit=None ): """it will return rows in List * if no records found it will return err_text as "NOT_FOUND" """ # TODO: Check is there function expect [] as error response debug_code = db_query_codes["get_rows"] try: query = sql.SQL("SELECT {fields} FROM {tbl} WHERE {column} = {value}").format( fields=sql.SQL(",").join(map(sql.Identifier, select)), tbl=sql.Identifier(tbl), column=sql.Identifier(where_col), value=sql.Literal(where_value), ) qs = query.as_string(cursor) # print(qs) if inc_del is False: qs += " AND deleted_at is null" # return qs cursor.execute(qs) db_res = cursor.fetchall() if len(db_res) == 0: return None, "NOT_FOUND" if limit == None: # db_res = cursor.fetchall() return db_res,None if limit == 1: # db_res = cursor.fetchall() # print("db_res",db_res) return db_res[0],None if limit and limit != 1: db_res = cursor.fetchmany(limit) return db_res,None # if len(db_res) > 0: # return db_res, None return None, "unhandled get_rows" except Exception: logging.error(traceback.format_exc()) return None, debug_code async def get_rows_two_where_and( cursor, tbl, select, where_col_1, where_value_1, where_col_2, where_value_2, name=None, inc_del=False, limit=None, ): """it will return rows in List * if no records found it will return err_text as "NOT_FOUND" """ # TODO: Check is there function expect [] as error response debug_code = db_query_codes["get_rows_two_where_and"] try: query = sql.SQL( "SELECT {fields} FROM {tbl} WHERE {where_col_1} = {where_value_1} AND {where_col_2} = {where_value_2} " ).format( fields=sql.SQL(",").join(map(sql.Identifier, select)), tbl=sql.Identifier(tbl), where_col_1=sql.Identifier(where_col_1), where_value_1=sql.Literal(where_value_1), where_col_2=sql.Identifier(where_col_2), where_value_2=sql.Literal(where_value_2), ) qs = query.as_string(cursor) if inc_del is False: qs += " AND deleted_at is null" # return qs cursor.execute(qs) db_res = cursor.fetchall() if len(db_res) == 0: return None, "NOT_FOUND" if limit == 1: return db_res[0], None if len(db_res) > 0: return db_res, None return None, "unhandled get_rows" except Exception: logging.error(traceback.format_exc()) return None, debug_code async def get_rows_where_in( cursor, tbl, select, where_col, where_in_value, name=None, inc_del=False ): debug_code = db_query_codes["get_rows_where_in"] try: query = sql.SQL("SELECT {fields} FROM {tbl} WHERE {column} IN {value}").format( fields=sql.SQL(",").join(map(sql.Identifier, select)), tbl=sql.Identifier(tbl), column=sql.Identifier(where_col), value=sql.Literal(where_in_value), ) qs = query.as_string(cursor) if inc_del is False: qs += " AND deleted_at is null" cursor.execute(qs) db_res = cursor.fetchall() if len(db_res) == 0: return None, "NOT_FOUND" if len(db_res) > 0: return db_res, None return None, "unhandled get_rows_where_in" except Exception: logging.error(traceback.format_exc()) return None, get_rows_where_in async def update_row( cursor: any, tbl: str, column: str, value: str, where_col: str, where_value: any, name=None, returning=None, limit=None, ): debug_code = db_query_codes["update_row"] try: query = sql.SQL( "UPDATE {tbl} SET {column} = {value} WHERE {where_col} = {where_value}" ).format( tbl=sql.Identifier(tbl), column=sql.Identifier(column), value=sql.Literal(value), where_col=sql.Identifier(where_col), where_value=sql.Literal(where_value), ) qs = query.as_string(cursor) if returning: qs += f""" RETURNING {', '.join(returning)} """ cursor.execute(qs) if returning: db_res = cursor.fetchall() if (len(db_res) == 0): return None,"NO_UPDATES_MADE" if limit == 1: return db_res[0], None return db_res, None else: return True, None except Exception: logging.error(traceback.format_exc()) return None,debug_code async def get_rows_no_where( cursor, select: list(), tbl: str, name=None, inc_del=False, limit=None ): """ Args: cursor* : db cursor select* : list tbl*: Strig """ debug_code = db_query_codes["get_rows_no_where"] try: query = sql.SQL("SELECT {fields} FROM {table}").format( fields=sql.SQL(",").join(map(sql.Identifier, select)), table=sql.Identifier(tbl), ) qs = query.as_string(cursor) if inc_del is False: qs += " WHERE deleted_at is null" # return qs cursor.execute(qs) db_res = cursor.fetchall() if len(db_res) == 0: return None, "NOT_FOUND" if limit == 1: return db_res[0], None if len(db_res) > 0: return db_res, None except Exception: logging.error(traceback.format_exc()) return None, debug_code async def get_rows_custom_query(cursor, query): """takes a sql query and execute and returns all the response. Args: cursor: a database cursor query: String :: a sql string Returns: List list of object from database """ debug_code = db_query_codes["get_rows_custom_query"] try: # print(query) cursor.execute(query) db_res = cursor.fetchall() # print(db_res) if len(db_res) == 0: return None, "NOT_FOUND" return db_res, None except Exception: logging.error(traceback.format_exc()) return None,debug_code async def is_row_exists_by_value(cursor, tbl, column, value, inc_del=False): debug_code = db_query_codes["is_row_exists_by_value"] try: query = sql.SQL( "SELECT EXISTS (SELECT from {tbl} WHERE {column}= {value})" ).format( tbl=sql.Identifier(tbl), column=sql.Identifier(column), value=sql.Literal(value), ) qs = query.as_string(cursor) if inc_del is False: qs += " AND WHERE deleted_at is null" cursor.execute(qs) db_res = cursor.fetchall() if db_res[0]["exists"] is True: return True, None if db_res[0]["exists"] is False: return False, None return None, "unhandled" except Exception: logging.error(traceback.format_exc()) return None, debug_code async def search_rows_by_value( cursor, tbl, select, column, value, inc_del=False,limit=100 ): debug_code = db_query_codes["search_rows_by_value"] input_value = value.replace(" ","&") try: query = sql.SQL( """ SELECT {fields} FROM {tbl} WHERE {column} @@ ( SELECT to_tsquery('english', {value}) ) """ ).format( tbl=sql.Identifier(tbl), fields=sql.SQL(",").join(map(sql.Identifier, select)), column=sql.Identifier(column), value=sql.Literal(f"{input_value}:*"), ) qs = query.as_string(cursor) if inc_del is False: qs += " AND deleted_at is null" cursor.execute(qs) db_res = cursor.fetchmany(limit) if len(db_res) == 0: return None, "NOT_FOUND" return db_res, None except Exception: logging.error(traceback.format_exc()) return None, debug_code async def search_rows_by_value_with_where( cursor, tbl, select, column, value, where_col, where_value, inc_del=False,limit=100 ): debug_code = db_query_codes["search_rows_by_value_with_where"] input_value = value.replace(" ","&") try: query = sql.SQL( """ SELECT {fields} FROM {tbl} WHERE {column} @@ ( SELECT to_tsquery('english', {value}) ) AND {where_col} = {where_value} """ ).format( tbl=sql.Identifier(tbl), fields=sql.SQL(",").join(map(sql.Identifier, select)), column=sql.Identifier(column), value=sql.Literal(f"{input_value}:*"), where_col=sql.Identifier(where_col), where_value=sql.Literal(where_value), ) qs = query.as_string(cursor) if inc_del is False: qs += " AND deleted_at is null" cursor.execute(qs) db_res = cursor.fetchmany(limit) if len(db_res) == 0: return None, "NOT_FOUND" return db_res, None except Exception: logging.error(traceback.format_exc()) return None, debug_code async def count_rows(cursor, tbl, column, inc_del=False): debug_code = db_query_codes["count_rows"] try: query = sql.SQL("SELECT COUNT({column}) from {tbl}").format( tbl=sql.Identifier(tbl), column=sql.Identifier(column), ) qs = query.as_string(cursor) if inc_del is False: qs += " WHERE deleted_at is null" cursor.execute(qs) db_res = cursor.fetchall() return db_res[0]["count"], None except Exception: logging.error(traceback.format_exc()) return None, debug_code async def count_rows_by_condition(cursor, tbl,column, where_col, where_value, inc_del=False): debug_code = db_query_codes["count_rows_by_condition"] try: query = sql.SQL( "SELECT COUNT({column}) from {tbl} WHERE {where_col} = {where_value}" ).format( tbl=sql.Identifier(tbl), column=sql.Identifier(column), where_col=sql.Identifier(where_col), where_value=sql.Literal(where_value), ) qs = query.as_string(cursor) if inc_del is False: qs += " AND deleted_at is null" cursor.execute(qs) db_res = cursor.fetchall() return db_res[0]["count"], None except Exception: logging.error(traceback.format_exc()) return None, debug_code async def update_many_columns( cursor, tbl, new_data, where_col, where_value,limit=None, returning=None ): debug_code = db_query_codes["update_many_columns"] try: query = sql.SQL("UPDATE {tbl} SET {new_data} WHERE {where_col} = {where_value}").format( tbl=sql.Identifier(tbl), where_col=sql.Identifier(where_col), where_value=sql.Literal(where_value), new_data=sql.SQL(", ").join( sql.Composed( [sql.Identifier(key), sql.SQL(" = "), sql.Placeholder(key)] ) for key in new_data.keys() ), ) qs = query.as_string(cursor) if returning: qs += f""" RETURNING {', '.join(returning)} """ cursor.execute(qs, new_data) db_res = None if returning: db_res = cursor.fetchall() if (len(db_res) == 0): return None,"NO_UPDATES_MADE" if limit == 1: return db_res[0], None if len(db_res) > 0: return db_res, None else: return True,None # db_res = cursor.fetchall() # if (len(db_res) == 0): # return None,"NO_UPDATES_MADE" # if len(db_res) > 0: # return True, None except Exception: logging.error(traceback.format_exc()) return None, debug_code async def insert_row( cursor, tbl: str, fields: list, values: list, name=None, returning=None, ): """ Args: cursor = a databse cursor. tbl = STRING, Table name. fields = TUPLE. values = TUPLE. returning = LIST : list of returning type """ debug_code = db_query_codes["insert_row"] try: query = sql.SQL("INSERT INTO {tbl} ({fields}) values ({values})").format( tbl=sql.Identifier(tbl), fields=sql.SQL(", ").join(map(sql.Identifier, fields)), values=sql.SQL(", ").join(sql.Placeholder() * len(values)), ) qs = query.as_string(cursor) if returning: qs += f""" RETURNING {', '.join(returning)} """ cursor.execute(qs, values) if returning: new_row = dict(cursor.fetchone()) return new_row, None return True, None except Exception: logging.error(traceback.format_exc()) return None, debug_code async def delete_row( cursor: any, tbl: str, where_col: str, where_value: str, limit=None, returning=None, ): debug_code = db_query_codes["delete_row"] try: query = sql.SQL("DELETE FROM {tbl} WHERE {where_col} = {where_value}").format( tbl=sql.Identifier(tbl), where_col=sql.Identifier(where_col), where_value=sql.Literal(where_value), ) qs = query.as_string(cursor) if returning: qs += f""" RETURNING {', '.join(returning)} """ cursor.execute(qs) if returning: res_row = cursor.fetchall() if len(res_row) == 0: return None, "NOT_FOUND" if limit == 1: return res_row[0], None if len(res_row) > 0: return res_row, None return True, None except Exception: logging.error(traceback.format_exc()) return None, debug_code async def insert_many_rows(cursor, tbl, fields, values, returning=None): debug_code = db_query_codes["insert_many_rows"] try: qs = sql.SQL("INSERT INTO {tbl} ({fields}) values %s").format( tbl=sql.Identifier(tbl), fields=sql.SQL(", ").join(map(sql.Identifier, fields)), ) query = qs.as_string(cursor) if returning: query += f""" RETURNING {', '.join(returning)} """ insert_res = extras.execute_values( cursor, query, values, template=None, page_size=100, fetch=True if returning else False ) if returning: return insert_res, None return True, None except Exception: logging.error(traceback.format_exc()) return None, debug_code # Should be disable in Production [[[CAUTION]]] async def run_sql_query(cursor,sql,fetch:bool): try: cursor.execute(sql) if fetch: db_res = cursor.fetchall() return db_res, None else: return True,None except Exception: logging.error(traceback.format_exc()) return None, "error on run_sql_query"