Last active
June 26, 2025 20:00
-
-
Save geeksambhu/c22ffd60a9528379b825dd5016f615fe to your computer and use it in GitHub Desktop.
Revisions
-
geeksambhu renamed this gist
Jun 26, 2025 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
geeksambhu revised this gist
Jun 26, 2025 . 1 changed file with 117 additions and 30 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,30 +1,117 @@ from __future__ import annotations from typing import Any, Dict, Iterable, Type, TypeVar, Protocol from boto3.dynamodb.conditions import Attr, Key from pydantic import BaseModel, ValidationError T = TypeVar("T", bound=BaseModel) class TableLike(Protocol): def put_item(self, **kwargs): ... def get_item(self, **kwargs): ... def delete_item(self, **kwargs): ... def update_item(self, **kwargs): ... def scan(self, **kwargs): ... def query(self, **kwargs): ... class DynamoRepo: """ A clean, injected DynamoDB repository with Pydantic validation. - insert/upsert use put_item (entire object) - update uses SET expressions (partial) """ def __init__( self, table: TableLike, model_cls: Type[T], *, pk_attr: str, sk_attr: str | None = None, ): self.table = table self.model_cls = model_cls self.pk_attr = pk_attr self.sk_attr = sk_attr def _key(self, d: Dict[str, Any]) -> Dict[str, Any]: key = {self.pk_attr: d[self.pk_attr]} if self.sk_attr: key[self.sk_attr] = d[self.sk_attr] return key def _validate(self, payload: dict | BaseModel) -> dict: if isinstance(payload, BaseModel): return payload.dict() try: return self.model_cls(**payload).dict() except ValidationError as e: raise ValueError(f"Validation failed: {e}") from None def insert(self, data: dict | T) -> None: item = self._validate(data) self.table.put_item(Item=item) upsert = insert def delete(self, pk: Any, sk: Any | None = None) -> None: key = {self.pk_attr: pk} if self.sk_attr: key[self.sk_attr] = sk self.table.delete_item(Key=key) def update(self, pk: Any, sk: Any | None = None, **changes: Any) -> None: if not changes: return # Validate merged model to avoid invalid writes existing = self.select(pk, sk) if existing is None: raise KeyError("Item not found") merged = existing.dict() merged.update(changes) self._validate(merged) # Ensure valid update # SET expression-style update expr, names, values = [], {}, {} for i, (k, v) in enumerate(changes.items()): names[f"#k{i}"] = k values[f":v{i}"] = v expr.append(f"#k{i} = :v{i}") update_expression = "SET " + ", ".join(expr) key = {self.pk_attr: pk} if self.sk_attr: key[self.sk_attr] = sk self.table.update_item( Key=key, UpdateExpression=update_expression, ExpressionAttributeNames=names, ExpressionAttributeValues=values, ) def select(self, pk: Any, sk: Any | None = None) -> T | None: key = {self.pk_attr: pk} if self.sk_attr: key[self.sk_attr] = sk resp = self.table.get_item(Key=key) return None if "Item" not in resp else self.model_cls(**resp["Item"]) def count(self, attr: str, value: Any) -> int: resp = self.table.scan( FilterExpression=Attr(attr).eq(value), Select="COUNT", ) return resp["Count"] def select_partition(self, pk: Any) -> Iterable[T]: resp = self.table.query( KeyConditionExpression=Key(self.pk_attr).eq(pk) ) return (self.model_cls(**i) for i in resp.get("Items", [])) -
geeksambhu created this gist
Jun 26, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,30 @@ class JobExecution(BaseModel): job_execution_id: int version: Optional[int] = None job_instance_id: Optional[int] = None create_time: datetime = datetime.utcnow() start_time: Optional[datetime] = None end_time: Optional[datetime] = None status: Optional[str] = None exit_code: Optional[str] = None exit_message: Optional[str] = None class StepExecution(BaseModel): job_execution_id: int step_execution_id: int version: Optional[int] = None step_name: str start_time: Optional[datetime] = None end_time: Optional[datetime] = None status: Optional[str] = None commit_count: int = 0 read_count: int = 0 filter_count: int = 0 write_count: int = 0 read_skip_count: int = 0 write_skip_count: int = 0 process_skip_count: int = 0 rollback_count: int = 0 exit_code: Optional[str] = None exit_message: Optional[str] = None