Skip to content

Instantly share code, notes, and snippets.

@geeksambhu
Last active June 26, 2025 20:00
Show Gist options
  • Select an option

  • Save geeksambhu/c22ffd60a9528379b825dd5016f615fe to your computer and use it in GitHub Desktop.

Select an option

Save geeksambhu/c22ffd60a9528379b825dd5016f615fe to your computer and use it in GitHub Desktop.

Revisions

  1. geeksambhu renamed this gist Jun 26, 2025. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. geeksambhu revised this gist Jun 26, 2025. 1 changed file with 117 additions and 30 deletions.
    147 changes: 117 additions & 30 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -1,30 +1,117 @@
    class JobExecution(BaseModel):
    job_execution_id: int
    version: Optional[int] = None
    job_instance_id: Optional[int] = None
    create_time: datetime = datetime.utcnow()
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    status: Optional[str] = None
    exit_code: Optional[str] = None
    exit_message: Optional[str] = None


    class StepExecution(BaseModel):
    job_execution_id: int
    step_execution_id: int
    version: Optional[int] = None
    step_name: str
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    status: Optional[str] = None
    commit_count: int = 0
    read_count: int = 0
    filter_count: int = 0
    write_count: int = 0
    read_skip_count: int = 0
    write_skip_count: int = 0
    process_skip_count: int = 0
    rollback_count: int = 0
    exit_code: Optional[str] = None
    exit_message: Optional[str] = None
    from __future__ import annotations
    from typing import Any, Dict, Iterable, Type, TypeVar, Protocol

    from boto3.dynamodb.conditions import Attr, Key
    from pydantic import BaseModel, ValidationError

    T = TypeVar("T", bound=BaseModel)


    class TableLike(Protocol):
    def put_item(self, **kwargs): ...
    def get_item(self, **kwargs): ...
    def delete_item(self, **kwargs): ...
    def update_item(self, **kwargs): ...
    def scan(self, **kwargs): ...
    def query(self, **kwargs): ...


    class DynamoRepo:
    """
    A clean, injected DynamoDB repository with Pydantic validation.

    - insert/upsert use put_item (entire object)
    - update uses SET expressions (partial)
    """

    def __init__(
    self,
    table: TableLike,
    model_cls: Type[T],
    *,
    pk_attr: str,
    sk_attr: str | None = None,
    ):
    self.table = table
    self.model_cls = model_cls
    self.pk_attr = pk_attr
    self.sk_attr = sk_attr

    def _key(self, d: Dict[str, Any]) -> Dict[str, Any]:
    key = {self.pk_attr: d[self.pk_attr]}
    if self.sk_attr:
    key[self.sk_attr] = d[self.sk_attr]
    return key

    def _validate(self, payload: dict | BaseModel) -> dict:
    if isinstance(payload, BaseModel):
    return payload.dict()
    try:
    return self.model_cls(**payload).dict()
    except ValidationError as e:
    raise ValueError(f"Validation failed: {e}") from None

    def insert(self, data: dict | T) -> None:
    item = self._validate(data)
    self.table.put_item(Item=item)

    upsert = insert

    def delete(self, pk: Any, sk: Any | None = None) -> None:
    key = {self.pk_attr: pk}
    if self.sk_attr:
    key[self.sk_attr] = sk
    self.table.delete_item(Key=key)

    def update(self, pk: Any, sk: Any | None = None, **changes: Any) -> None:
    if not changes:
    return

    # Validate merged model to avoid invalid writes
    existing = self.select(pk, sk)
    if existing is None:
    raise KeyError("Item not found")

    merged = existing.dict()
    merged.update(changes)
    self._validate(merged) # Ensure valid update

    # SET expression-style update
    expr, names, values = [], {}, {}
    for i, (k, v) in enumerate(changes.items()):
    names[f"#k{i}"] = k
    values[f":v{i}"] = v
    expr.append(f"#k{i} = :v{i}")

    update_expression = "SET " + ", ".join(expr)

    key = {self.pk_attr: pk}
    if self.sk_attr:
    key[self.sk_attr] = sk

    self.table.update_item(
    Key=key,
    UpdateExpression=update_expression,
    ExpressionAttributeNames=names,
    ExpressionAttributeValues=values,
    )

    def select(self, pk: Any, sk: Any | None = None) -> T | None:
    key = {self.pk_attr: pk}
    if self.sk_attr:
    key[self.sk_attr] = sk
    resp = self.table.get_item(Key=key)
    return None if "Item" not in resp else self.model_cls(**resp["Item"])

    def count(self, attr: str, value: Any) -> int:
    resp = self.table.scan(
    FilterExpression=Attr(attr).eq(value),
    Select="COUNT",
    )
    return resp["Count"]

    def select_partition(self, pk: Any) -> Iterable[T]:
    resp = self.table.query(
    KeyConditionExpression=Key(self.pk_attr).eq(pk)
    )
    return (self.model_cls(**i) for i in resp.get("Items", []))
  3. geeksambhu created this gist Jun 26, 2025.
    30 changes: 30 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,30 @@
    class JobExecution(BaseModel):
    job_execution_id: int
    version: Optional[int] = None
    job_instance_id: Optional[int] = None
    create_time: datetime = datetime.utcnow()
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    status: Optional[str] = None
    exit_code: Optional[str] = None
    exit_message: Optional[str] = None


    class StepExecution(BaseModel):
    job_execution_id: int
    step_execution_id: int
    version: Optional[int] = None
    step_name: str
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    status: Optional[str] = None
    commit_count: int = 0
    read_count: int = 0
    filter_count: int = 0
    write_count: int = 0
    read_skip_count: int = 0
    write_skip_count: int = 0
    process_skip_count: int = 0
    rollback_count: int = 0
    exit_code: Optional[str] = None
    exit_message: Optional[str] = None