Created
November 3, 2024 17:25
-
-
Save naveen1337/ca51b3d42ee895f63af1a28f11cade88 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from configs.db_connection import release_conn | |
| from pydantic import BaseModel | |
| from psycopg2 import sql, extras | |
| import logging | |
| from constants.function_debug_codes import db_query_codes | |
| from constants.app_constants import __ENV__ | |
| import traceback | |
| async def get_rows( | |
| cursor, tbl, select, where_col, where_value, name=None, inc_del=False, limit=None | |
| ): | |
| """it will return rows in List | |
| * if no records found it will return err_text as "NOT_FOUND" | |
| """ | |
| # TODO: Check is there function expect [] as error response | |
| debug_code = db_query_codes["get_rows"] | |
| try: | |
| query = sql.SQL("SELECT {fields} FROM {tbl} WHERE {column} = {value}").format( | |
| fields=sql.SQL(",").join(map(sql.Identifier, select)), | |
| tbl=sql.Identifier(tbl), | |
| column=sql.Identifier(where_col), | |
| value=sql.Literal(where_value), | |
| ) | |
| qs = query.as_string(cursor) | |
| # print(qs) | |
| if inc_del is False: | |
| qs += " AND deleted_at is null" | |
| # return qs | |
| cursor.execute(qs) | |
| db_res = cursor.fetchall() | |
| if len(db_res) == 0: | |
| return None, "NOT_FOUND" | |
| if limit == None: | |
| # db_res = cursor.fetchall() | |
| return db_res,None | |
| if limit == 1: | |
| # db_res = cursor.fetchall() | |
| # print("db_res",db_res) | |
| return db_res[0],None | |
| if limit and limit != 1: | |
| db_res = cursor.fetchmany(limit) | |
| return db_res,None | |
| # if len(db_res) > 0: | |
| # return db_res, None | |
| return None, "unhandled get_rows" | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, debug_code | |
| async def get_rows_two_where_and( | |
| cursor, | |
| tbl, | |
| select, | |
| where_col_1, | |
| where_value_1, | |
| where_col_2, | |
| where_value_2, | |
| name=None, | |
| inc_del=False, | |
| limit=None, | |
| ): | |
| """it will return rows in List | |
| * if no records found it will return err_text as "NOT_FOUND" | |
| """ | |
| # TODO: Check is there function expect [] as error response | |
| debug_code = db_query_codes["get_rows_two_where_and"] | |
| try: | |
| query = sql.SQL( | |
| "SELECT {fields} FROM {tbl} WHERE {where_col_1} = {where_value_1} AND {where_col_2} = {where_value_2} " | |
| ).format( | |
| fields=sql.SQL(",").join(map(sql.Identifier, select)), | |
| tbl=sql.Identifier(tbl), | |
| where_col_1=sql.Identifier(where_col_1), | |
| where_value_1=sql.Literal(where_value_1), | |
| where_col_2=sql.Identifier(where_col_2), | |
| where_value_2=sql.Literal(where_value_2), | |
| ) | |
| qs = query.as_string(cursor) | |
| if inc_del is False: | |
| qs += " AND deleted_at is null" | |
| # return qs | |
| cursor.execute(qs) | |
| db_res = cursor.fetchall() | |
| if len(db_res) == 0: | |
| return None, "NOT_FOUND" | |
| if limit == 1: | |
| return db_res[0], None | |
| if len(db_res) > 0: | |
| return db_res, None | |
| return None, "unhandled get_rows" | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, debug_code | |
| async def get_rows_where_in( | |
| cursor, tbl, select, where_col, where_in_value, name=None, inc_del=False | |
| ): | |
| debug_code = db_query_codes["get_rows_where_in"] | |
| try: | |
| query = sql.SQL("SELECT {fields} FROM {tbl} WHERE {column} IN {value}").format( | |
| fields=sql.SQL(",").join(map(sql.Identifier, select)), | |
| tbl=sql.Identifier(tbl), | |
| column=sql.Identifier(where_col), | |
| value=sql.Literal(where_in_value), | |
| ) | |
| qs = query.as_string(cursor) | |
| if inc_del is False: | |
| qs += " AND deleted_at is null" | |
| cursor.execute(qs) | |
| db_res = cursor.fetchall() | |
| if len(db_res) == 0: | |
| return None, "NOT_FOUND" | |
| if len(db_res) > 0: | |
| return db_res, None | |
| return None, "unhandled get_rows_where_in" | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, get_rows_where_in | |
| async def update_row( | |
| cursor: any, | |
| tbl: str, | |
| column: str, | |
| value: str, | |
| where_col: str, | |
| where_value: any, | |
| name=None, | |
| returning=None, | |
| limit=None, | |
| ): | |
| debug_code = db_query_codes["update_row"] | |
| try: | |
| query = sql.SQL( | |
| "UPDATE {tbl} SET {column} = {value} WHERE {where_col} = {where_value}" | |
| ).format( | |
| tbl=sql.Identifier(tbl), | |
| column=sql.Identifier(column), | |
| value=sql.Literal(value), | |
| where_col=sql.Identifier(where_col), | |
| where_value=sql.Literal(where_value), | |
| ) | |
| qs = query.as_string(cursor) | |
| if returning: | |
| qs += f""" RETURNING {', '.join(returning)} """ | |
| cursor.execute(qs) | |
| if returning: | |
| db_res = cursor.fetchall() | |
| if (len(db_res) == 0): | |
| return None,"NO_UPDATES_MADE" | |
| if limit == 1: | |
| return db_res[0], None | |
| return db_res, None | |
| else: | |
| return True, None | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None,debug_code | |
| async def get_rows_no_where( | |
| cursor, select: list(), tbl: str, name=None, inc_del=False, limit=None | |
| ): | |
| """ | |
| Args: | |
| cursor* : db cursor | |
| select* : list | |
| tbl*: Strig | |
| """ | |
| debug_code = db_query_codes["get_rows_no_where"] | |
| try: | |
| query = sql.SQL("SELECT {fields} FROM {table}").format( | |
| fields=sql.SQL(",").join(map(sql.Identifier, select)), | |
| table=sql.Identifier(tbl), | |
| ) | |
| qs = query.as_string(cursor) | |
| if inc_del is False: | |
| qs += " WHERE deleted_at is null" | |
| # return qs | |
| cursor.execute(qs) | |
| db_res = cursor.fetchall() | |
| if len(db_res) == 0: | |
| return None, "NOT_FOUND" | |
| if limit == 1: | |
| return db_res[0], None | |
| if len(db_res) > 0: | |
| return db_res, None | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, debug_code | |
| async def get_rows_custom_query(cursor, query): | |
| """takes a sql query and execute and returns all the response. | |
| Args: | |
| cursor: a database cursor | |
| query: String :: a sql string | |
| Returns: List | |
| list of object from database | |
| """ | |
| debug_code = db_query_codes["get_rows_custom_query"] | |
| try: | |
| # print(query) | |
| cursor.execute(query) | |
| db_res = cursor.fetchall() | |
| # print(db_res) | |
| if len(db_res) == 0: | |
| return None, "NOT_FOUND" | |
| return db_res, None | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None,debug_code | |
| async def is_row_exists_by_value(cursor, tbl, column, value, inc_del=False): | |
| debug_code = db_query_codes["is_row_exists_by_value"] | |
| try: | |
| query = sql.SQL( | |
| "SELECT EXISTS (SELECT from {tbl} WHERE {column}= {value})" | |
| ).format( | |
| tbl=sql.Identifier(tbl), | |
| column=sql.Identifier(column), | |
| value=sql.Literal(value), | |
| ) | |
| qs = query.as_string(cursor) | |
| if inc_del is False: | |
| qs += " AND WHERE deleted_at is null" | |
| cursor.execute(qs) | |
| db_res = cursor.fetchall() | |
| if db_res[0]["exists"] is True: | |
| return True, None | |
| if db_res[0]["exists"] is False: | |
| return False, None | |
| return None, "unhandled" | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, debug_code | |
| async def search_rows_by_value( | |
| cursor, tbl, select, column, value, inc_del=False,limit=100 | |
| ): | |
| debug_code = db_query_codes["search_rows_by_value"] | |
| input_value = value.replace(" ","&") | |
| try: | |
| query = sql.SQL( | |
| """ | |
| SELECT {fields} | |
| FROM {tbl} | |
| WHERE {column} @@ ( | |
| SELECT to_tsquery('english', {value}) | |
| ) | |
| """ | |
| ).format( | |
| tbl=sql.Identifier(tbl), | |
| fields=sql.SQL(",").join(map(sql.Identifier, select)), | |
| column=sql.Identifier(column), | |
| value=sql.Literal(f"{input_value}:*"), | |
| ) | |
| qs = query.as_string(cursor) | |
| if inc_del is False: | |
| qs += " AND deleted_at is null" | |
| cursor.execute(qs) | |
| db_res = cursor.fetchmany(limit) | |
| if len(db_res) == 0: | |
| return None, "NOT_FOUND" | |
| return db_res, None | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, debug_code | |
| async def search_rows_by_value_with_where( | |
| cursor, tbl, select, column, value, where_col, where_value, inc_del=False,limit=100 | |
| ): | |
| debug_code = db_query_codes["search_rows_by_value_with_where"] | |
| input_value = value.replace(" ","&") | |
| try: | |
| query = sql.SQL( | |
| """ | |
| SELECT {fields} | |
| FROM {tbl} | |
| WHERE {column} @@ ( | |
| SELECT to_tsquery('english', {value}) | |
| ) | |
| AND {where_col} = {where_value} | |
| """ | |
| ).format( | |
| tbl=sql.Identifier(tbl), | |
| fields=sql.SQL(",").join(map(sql.Identifier, select)), | |
| column=sql.Identifier(column), | |
| value=sql.Literal(f"{input_value}:*"), | |
| where_col=sql.Identifier(where_col), | |
| where_value=sql.Literal(where_value), | |
| ) | |
| qs = query.as_string(cursor) | |
| if inc_del is False: | |
| qs += " AND deleted_at is null" | |
| cursor.execute(qs) | |
| db_res = cursor.fetchmany(limit) | |
| if len(db_res) == 0: | |
| return None, "NOT_FOUND" | |
| return db_res, None | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, debug_code | |
| async def count_rows(cursor, tbl, column, inc_del=False): | |
| debug_code = db_query_codes["count_rows"] | |
| try: | |
| query = sql.SQL("SELECT COUNT({column}) from {tbl}").format( | |
| tbl=sql.Identifier(tbl), | |
| column=sql.Identifier(column), | |
| ) | |
| qs = query.as_string(cursor) | |
| if inc_del is False: | |
| qs += " WHERE deleted_at is null" | |
| cursor.execute(qs) | |
| db_res = cursor.fetchall() | |
| return db_res[0]["count"], None | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, debug_code | |
| async def count_rows_by_condition(cursor, tbl,column, where_col, where_value, inc_del=False): | |
| debug_code = db_query_codes["count_rows_by_condition"] | |
| try: | |
| query = sql.SQL( | |
| "SELECT COUNT({column}) from {tbl} WHERE {where_col} = {where_value}" | |
| ).format( | |
| tbl=sql.Identifier(tbl), | |
| column=sql.Identifier(column), | |
| where_col=sql.Identifier(where_col), | |
| where_value=sql.Literal(where_value), | |
| ) | |
| qs = query.as_string(cursor) | |
| if inc_del is False: | |
| qs += " AND deleted_at is null" | |
| cursor.execute(qs) | |
| db_res = cursor.fetchall() | |
| return db_res[0]["count"], None | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, debug_code | |
| async def update_many_columns( | |
| cursor, tbl, new_data, where_col, where_value,limit=None, returning=None | |
| ): | |
| debug_code = db_query_codes["update_many_columns"] | |
| try: | |
| query = sql.SQL("UPDATE {tbl} SET {new_data} WHERE {where_col} = {where_value}").format( | |
| tbl=sql.Identifier(tbl), | |
| where_col=sql.Identifier(where_col), | |
| where_value=sql.Literal(where_value), | |
| new_data=sql.SQL(", ").join( | |
| sql.Composed( | |
| [sql.Identifier(key), sql.SQL(" = "), sql.Placeholder(key)] | |
| ) | |
| for key in new_data.keys() | |
| ), | |
| ) | |
| qs = query.as_string(cursor) | |
| if returning: | |
| qs += f""" RETURNING {', '.join(returning)} """ | |
| cursor.execute(qs, new_data) | |
| db_res = None | |
| if returning: | |
| db_res = cursor.fetchall() | |
| if (len(db_res) == 0): | |
| return None,"NO_UPDATES_MADE" | |
| if limit == 1: | |
| return db_res[0], None | |
| if len(db_res) > 0: | |
| return db_res, None | |
| else: | |
| return True,None | |
| # db_res = cursor.fetchall() | |
| # if (len(db_res) == 0): | |
| # return None,"NO_UPDATES_MADE" | |
| # if len(db_res) > 0: | |
| # return True, None | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, debug_code | |
| async def insert_row( | |
| cursor, | |
| tbl: str, | |
| fields: list, | |
| values: list, | |
| name=None, | |
| returning=None, | |
| ): | |
| """ | |
| Args: | |
| cursor = a databse cursor. | |
| tbl = STRING, Table name. | |
| fields = TUPLE. | |
| values = TUPLE. | |
| returning = LIST : list of returning type | |
| """ | |
| debug_code = db_query_codes["insert_row"] | |
| try: | |
| query = sql.SQL("INSERT INTO {tbl} ({fields}) values ({values})").format( | |
| tbl=sql.Identifier(tbl), | |
| fields=sql.SQL(", ").join(map(sql.Identifier, fields)), | |
| values=sql.SQL(", ").join(sql.Placeholder() * len(values)), | |
| ) | |
| qs = query.as_string(cursor) | |
| if returning: | |
| qs += f""" RETURNING {', '.join(returning)} """ | |
| cursor.execute(qs, values) | |
| if returning: | |
| new_row = dict(cursor.fetchone()) | |
| return new_row, None | |
| return True, None | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, debug_code | |
| async def delete_row( | |
| cursor: any, | |
| tbl: str, | |
| where_col: str, | |
| where_value: str, | |
| limit=None, | |
| returning=None, | |
| ): | |
| debug_code = db_query_codes["delete_row"] | |
| try: | |
| query = sql.SQL("DELETE FROM {tbl} WHERE {where_col} = {where_value}").format( | |
| tbl=sql.Identifier(tbl), | |
| where_col=sql.Identifier(where_col), | |
| where_value=sql.Literal(where_value), | |
| ) | |
| qs = query.as_string(cursor) | |
| if returning: | |
| qs += f""" RETURNING {', '.join(returning)} """ | |
| cursor.execute(qs) | |
| if returning: | |
| res_row = cursor.fetchall() | |
| if len(res_row) == 0: | |
| return None, "NOT_FOUND" | |
| if limit == 1: | |
| return res_row[0], None | |
| if len(res_row) > 0: | |
| return res_row, None | |
| return True, None | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, debug_code | |
| async def insert_many_rows(cursor, tbl, fields, values, returning=None): | |
| debug_code = db_query_codes["insert_many_rows"] | |
| try: | |
| qs = sql.SQL("INSERT INTO {tbl} ({fields}) values %s").format( | |
| tbl=sql.Identifier(tbl), | |
| fields=sql.SQL(", ").join(map(sql.Identifier, fields)), | |
| ) | |
| query = qs.as_string(cursor) | |
| if returning: | |
| query += f""" RETURNING {', '.join(returning)} """ | |
| insert_res = extras.execute_values( | |
| cursor, query, values, template=None, page_size=100, fetch=True if returning else False | |
| ) | |
| if returning: | |
| return insert_res, None | |
| return True, None | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, debug_code | |
| # Should be disable in Production [[[CAUTION]]] | |
| async def run_sql_query(cursor,sql,fetch:bool): | |
| try: | |
| cursor.execute(sql) | |
| if fetch: | |
| db_res = cursor.fetchall() | |
| return db_res, None | |
| else: | |
| return True,None | |
| except Exception: | |
| logging.error(traceback.format_exc()) | |
| return None, "error on run_sql_query" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment