Skip to content

Instantly share code, notes, and snippets.

@volodink
Created February 4, 2022 11:23
Show Gist options
  • Select an option

  • Save volodink/072ed55ec91a88c40b84df816d880be1 to your computer and use it in GitHub Desktop.

Select an option

Save volodink/072ed55ec91a88c40b84df816d880be1 to your computer and use it in GitHub Desktop.
mongo
from typing import List, Dict
from fastapi import FastAPI
import motor.motor_asyncio
from bson import ObjectId
from pydantic import BaseModel, Field
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid objectid")
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
class AddressModel(BaseModel):
street: str
city: str
class PersonModel(BaseModel):
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
firstName: str
address: AddressModel
phoneNumbers: List[str]
class Config:
allow_population_by_field_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
db = client.instagram
persons = db.persons
print(persons)
app = FastAPI()
@app.get("/{user}", response_model=List[PersonModel])
async def root(user: str):
documents = await db.persons.find({"firstName": user}).to_list(1000)
return documents
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment