Skip to content

Instantly share code, notes, and snippets.

@naveen1337
Created November 3, 2024 17:25
Show Gist options
  • Select an option

  • Save naveen1337/ca51b3d42ee895f63af1a28f11cade88 to your computer and use it in GitHub Desktop.

Select an option

Save naveen1337/ca51b3d42ee895f63af1a28f11cade88 to your computer and use it in GitHub Desktop.

Revisions

  1. naveen1337 created this gist Nov 3, 2024.
    565 changes: 565 additions & 0 deletions psycopg_crud_sharable_functions.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,565 @@
    from configs.db_connection import release_conn
    from pydantic import BaseModel
    from psycopg2 import sql, extras
    import logging
    from constants.function_debug_codes import db_query_codes
    from constants.app_constants import __ENV__
    import traceback


    async def get_rows(
    cursor, tbl, select, where_col, where_value, name=None, inc_del=False, limit=None
    ):
    """it will return rows in List
    * if no records found it will return err_text as "NOT_FOUND"
    """
    # TODO: Check is there function expect [] as error response
    debug_code = db_query_codes["get_rows"]
    try:
    query = sql.SQL("SELECT {fields} FROM {tbl} WHERE {column} = {value}").format(
    fields=sql.SQL(",").join(map(sql.Identifier, select)),
    tbl=sql.Identifier(tbl),
    column=sql.Identifier(where_col),
    value=sql.Literal(where_value),
    )
    qs = query.as_string(cursor)
    # print(qs)
    if inc_del is False:
    qs += " AND deleted_at is null"
    # return qs
    cursor.execute(qs)

    db_res = cursor.fetchall()

    if len(db_res) == 0:
    return None, "NOT_FOUND"

    if limit == None:
    # db_res = cursor.fetchall()
    return db_res,None

    if limit == 1:
    # db_res = cursor.fetchall()
    # print("db_res",db_res)
    return db_res[0],None

    if limit and limit != 1:
    db_res = cursor.fetchmany(limit)
    return db_res,None

    # if len(db_res) > 0:
    # return db_res, None

    return None, "unhandled get_rows"
    except Exception:
    logging.error(traceback.format_exc())
    return None, debug_code


    async def get_rows_two_where_and(
    cursor,
    tbl,
    select,
    where_col_1,
    where_value_1,
    where_col_2,
    where_value_2,
    name=None,
    inc_del=False,
    limit=None,
    ):
    """it will return rows in List
    * if no records found it will return err_text as "NOT_FOUND"
    """
    # TODO: Check is there function expect [] as error response
    debug_code = db_query_codes["get_rows_two_where_and"]
    try:
    query = sql.SQL(
    "SELECT {fields} FROM {tbl} WHERE {where_col_1} = {where_value_1} AND {where_col_2} = {where_value_2} "
    ).format(
    fields=sql.SQL(",").join(map(sql.Identifier, select)),
    tbl=sql.Identifier(tbl),
    where_col_1=sql.Identifier(where_col_1),
    where_value_1=sql.Literal(where_value_1),
    where_col_2=sql.Identifier(where_col_2),
    where_value_2=sql.Literal(where_value_2),
    )
    qs = query.as_string(cursor)
    if inc_del is False:
    qs += " AND deleted_at is null"
    # return qs
    cursor.execute(qs)
    db_res = cursor.fetchall()
    if len(db_res) == 0:
    return None, "NOT_FOUND"
    if limit == 1:
    return db_res[0], None
    if len(db_res) > 0:
    return db_res, None
    return None, "unhandled get_rows"
    except Exception:
    logging.error(traceback.format_exc())
    return None, debug_code


    async def get_rows_where_in(
    cursor, tbl, select, where_col, where_in_value, name=None, inc_del=False
    ):
    debug_code = db_query_codes["get_rows_where_in"]

    try:
    query = sql.SQL("SELECT {fields} FROM {tbl} WHERE {column} IN {value}").format(
    fields=sql.SQL(",").join(map(sql.Identifier, select)),
    tbl=sql.Identifier(tbl),
    column=sql.Identifier(where_col),
    value=sql.Literal(where_in_value),
    )
    qs = query.as_string(cursor)

    if inc_del is False:
    qs += " AND deleted_at is null"

    cursor.execute(qs)
    db_res = cursor.fetchall()

    if len(db_res) == 0:
    return None, "NOT_FOUND"

    if len(db_res) > 0:
    return db_res, None

    return None, "unhandled get_rows_where_in"
    except Exception:
    logging.error(traceback.format_exc())
    return None, get_rows_where_in


    async def update_row(
    cursor: any,
    tbl: str,
    column: str,
    value: str,
    where_col: str,
    where_value: any,
    name=None,
    returning=None,
    limit=None,
    ):
    debug_code = db_query_codes["update_row"]

    try:
    query = sql.SQL(
    "UPDATE {tbl} SET {column} = {value} WHERE {where_col} = {where_value}"
    ).format(
    tbl=sql.Identifier(tbl),
    column=sql.Identifier(column),
    value=sql.Literal(value),
    where_col=sql.Identifier(where_col),
    where_value=sql.Literal(where_value),
    )
    qs = query.as_string(cursor)

    if returning:
    qs += f""" RETURNING {', '.join(returning)} """


    cursor.execute(qs)

    if returning:
    db_res = cursor.fetchall()
    if (len(db_res) == 0):
    return None,"NO_UPDATES_MADE"
    if limit == 1:
    return db_res[0], None
    return db_res, None
    else:
    return True, None

    except Exception:
    logging.error(traceback.format_exc())
    return None,debug_code


    async def get_rows_no_where(
    cursor, select: list(), tbl: str, name=None, inc_del=False, limit=None
    ):
    """
    Args:
    cursor* : db cursor
    select* : list
    tbl*: Strig
    """
    debug_code = db_query_codes["get_rows_no_where"]

    try:
    query = sql.SQL("SELECT {fields} FROM {table}").format(
    fields=sql.SQL(",").join(map(sql.Identifier, select)),
    table=sql.Identifier(tbl),
    )
    qs = query.as_string(cursor)
    if inc_del is False:
    qs += " WHERE deleted_at is null"
    # return qs
    cursor.execute(qs)
    db_res = cursor.fetchall()

    if len(db_res) == 0:
    return None, "NOT_FOUND"

    if limit == 1:
    return db_res[0], None

    if len(db_res) > 0:
    return db_res, None
    except Exception:
    logging.error(traceback.format_exc())
    return None, debug_code


    async def get_rows_custom_query(cursor, query):
    """takes a sql query and execute and returns all the response.
    Args:
    cursor: a database cursor
    query: String :: a sql string
    Returns: List
    list of object from database
    """
    debug_code = db_query_codes["get_rows_custom_query"]

    try:
    # print(query)
    cursor.execute(query)
    db_res = cursor.fetchall()
    # print(db_res)
    if len(db_res) == 0:
    return None, "NOT_FOUND"
    return db_res, None
    except Exception:
    logging.error(traceback.format_exc())
    return None,debug_code


    async def is_row_exists_by_value(cursor, tbl, column, value, inc_del=False):
    debug_code = db_query_codes["is_row_exists_by_value"]
    try:
    query = sql.SQL(
    "SELECT EXISTS (SELECT from {tbl} WHERE {column}= {value})"
    ).format(
    tbl=sql.Identifier(tbl),
    column=sql.Identifier(column),
    value=sql.Literal(value),
    )
    qs = query.as_string(cursor)
    if inc_del is False:
    qs += " AND WHERE deleted_at is null"
    cursor.execute(qs)
    db_res = cursor.fetchall()
    if db_res[0]["exists"] is True:
    return True, None
    if db_res[0]["exists"] is False:
    return False, None
    return None, "unhandled"
    except Exception:
    logging.error(traceback.format_exc())
    return None, debug_code

    async def search_rows_by_value(
    cursor, tbl, select, column, value, inc_del=False,limit=100
    ):
    debug_code = db_query_codes["search_rows_by_value"]
    input_value = value.replace(" ","&")
    try:
    query = sql.SQL(
    """
    SELECT {fields}
    FROM {tbl}
    WHERE {column} @@ (
    SELECT to_tsquery('english', {value})
    )
    """
    ).format(
    tbl=sql.Identifier(tbl),
    fields=sql.SQL(",").join(map(sql.Identifier, select)),
    column=sql.Identifier(column),
    value=sql.Literal(f"{input_value}:*"),
    )
    qs = query.as_string(cursor)


    if inc_del is False:
    qs += " AND deleted_at is null"

    cursor.execute(qs)
    db_res = cursor.fetchmany(limit)

    if len(db_res) == 0:
    return None, "NOT_FOUND"

    return db_res, None

    except Exception:
    logging.error(traceback.format_exc())
    return None, debug_code

    async def search_rows_by_value_with_where(
    cursor, tbl, select, column, value, where_col, where_value, inc_del=False,limit=100
    ):
    debug_code = db_query_codes["search_rows_by_value_with_where"]
    input_value = value.replace(" ","&")
    try:
    query = sql.SQL(
    """
    SELECT {fields}
    FROM {tbl}
    WHERE {column} @@ (
    SELECT to_tsquery('english', {value})
    )
    AND {where_col} = {where_value}
    """
    ).format(
    tbl=sql.Identifier(tbl),
    fields=sql.SQL(",").join(map(sql.Identifier, select)),
    column=sql.Identifier(column),
    value=sql.Literal(f"{input_value}:*"),
    where_col=sql.Identifier(where_col),
    where_value=sql.Literal(where_value),
    )
    qs = query.as_string(cursor)


    if inc_del is False:
    qs += " AND deleted_at is null"

    cursor.execute(qs)
    db_res = cursor.fetchmany(limit)

    if len(db_res) == 0:
    return None, "NOT_FOUND"

    return db_res, None

    except Exception:
    logging.error(traceback.format_exc())
    return None, debug_code


    async def count_rows(cursor, tbl, column, inc_del=False):
    debug_code = db_query_codes["count_rows"]
    try:
    query = sql.SQL("SELECT COUNT({column}) from {tbl}").format(
    tbl=sql.Identifier(tbl),
    column=sql.Identifier(column),
    )
    qs = query.as_string(cursor)

    if inc_del is False:
    qs += " WHERE deleted_at is null"

    cursor.execute(qs)
    db_res = cursor.fetchall()

    return db_res[0]["count"], None

    except Exception:
    logging.error(traceback.format_exc())
    return None, debug_code


    async def count_rows_by_condition(cursor, tbl,column, where_col, where_value, inc_del=False):
    debug_code = db_query_codes["count_rows_by_condition"]

    try:
    query = sql.SQL(
    "SELECT COUNT({column}) from {tbl} WHERE {where_col} = {where_value}"
    ).format(
    tbl=sql.Identifier(tbl),
    column=sql.Identifier(column),
    where_col=sql.Identifier(where_col),
    where_value=sql.Literal(where_value),
    )
    qs = query.as_string(cursor)

    if inc_del is False:
    qs += " AND deleted_at is null"

    cursor.execute(qs)
    db_res = cursor.fetchall()

    return db_res[0]["count"], None

    except Exception:
    logging.error(traceback.format_exc())
    return None, debug_code



    async def update_many_columns(
    cursor, tbl, new_data, where_col, where_value,limit=None, returning=None
    ):
    debug_code = db_query_codes["update_many_columns"]

    try:
    query = sql.SQL("UPDATE {tbl} SET {new_data} WHERE {where_col} = {where_value}").format(
    tbl=sql.Identifier(tbl),
    where_col=sql.Identifier(where_col),
    where_value=sql.Literal(where_value),
    new_data=sql.SQL(", ").join(
    sql.Composed(
    [sql.Identifier(key), sql.SQL(" = "), sql.Placeholder(key)]
    )
    for key in new_data.keys()
    ),
    )

    qs = query.as_string(cursor)

    if returning:
    qs += f""" RETURNING {', '.join(returning)} """

    cursor.execute(qs, new_data)

    db_res = None

    if returning:
    db_res = cursor.fetchall()
    if (len(db_res) == 0):
    return None,"NO_UPDATES_MADE"

    if limit == 1:
    return db_res[0], None

    if len(db_res) > 0:
    return db_res, None
    else:
    return True,None
    # db_res = cursor.fetchall()
    # if (len(db_res) == 0):
    # return None,"NO_UPDATES_MADE"

    # if len(db_res) > 0:
    # return True, None

    except Exception:
    logging.error(traceback.format_exc())
    return None, debug_code


    async def insert_row(
    cursor,
    tbl: str,
    fields: list,
    values: list,
    name=None,
    returning=None,
    ):
    """
    Args:
    cursor = a databse cursor.
    tbl = STRING, Table name.
    fields = TUPLE.
    values = TUPLE.
    returning = LIST : list of returning type
    """
    debug_code = db_query_codes["insert_row"]

    try:
    query = sql.SQL("INSERT INTO {tbl} ({fields}) values ({values})").format(
    tbl=sql.Identifier(tbl),
    fields=sql.SQL(", ").join(map(sql.Identifier, fields)),
    values=sql.SQL(", ").join(sql.Placeholder() * len(values)),
    )
    qs = query.as_string(cursor)

    if returning:
    qs += f""" RETURNING {', '.join(returning)} """

    cursor.execute(qs, values)

    if returning:
    new_row = dict(cursor.fetchone())
    return new_row, None
    return True, None
    except Exception:
    logging.error(traceback.format_exc())
    return None, debug_code


    async def delete_row(
    cursor: any,
    tbl: str,
    where_col: str,
    where_value: str,
    limit=None,
    returning=None,
    ):
    debug_code = db_query_codes["delete_row"]

    try:
    query = sql.SQL("DELETE FROM {tbl} WHERE {where_col} = {where_value}").format(
    tbl=sql.Identifier(tbl),
    where_col=sql.Identifier(where_col),
    where_value=sql.Literal(where_value),
    )
    qs = query.as_string(cursor)

    if returning:
    qs += f""" RETURNING {', '.join(returning)} """

    cursor.execute(qs)

    if returning:
    res_row = cursor.fetchall()

    if len(res_row) == 0:
    return None, "NOT_FOUND"

    if limit == 1:
    return res_row[0], None

    if len(res_row) > 0:
    return res_row, None
    return True, None
    except Exception:
    logging.error(traceback.format_exc())
    return None, debug_code


    async def insert_many_rows(cursor, tbl, fields, values, returning=None):
    debug_code = db_query_codes["insert_many_rows"]

    try:
    qs = sql.SQL("INSERT INTO {tbl} ({fields}) values %s").format(
    tbl=sql.Identifier(tbl),
    fields=sql.SQL(", ").join(map(sql.Identifier, fields)),
    )
    query = qs.as_string(cursor)

    if returning:
    query += f""" RETURNING {', '.join(returning)} """

    insert_res = extras.execute_values(
    cursor, query, values, template=None, page_size=100, fetch=True if returning else False
    )
    if returning:
    return insert_res, None
    return True, None
    except Exception:
    logging.error(traceback.format_exc())
    return None, debug_code




    # Should be disable in Production [[[CAUTION]]]
    async def run_sql_query(cursor,sql,fetch:bool):
    try:
    cursor.execute(sql)
    if fetch:
    db_res = cursor.fetchall()
    return db_res, None
    else:
    return True,None
    except Exception:
    logging.error(traceback.format_exc())
    return None, "error on run_sql_query"