Skip to main content

FastAPI psycopg3 API

Command Line

Terminal
python -m venv venv
. venv/bin/activate # mac and linux
. venv/Scripts/activate # windows
pip install fastapi
pip install "uvicorn[standard]"
pip install psycopg
pip install psycopg_pool
psql -U postgres -c "create database fastapi_psycopg3"
psql -U postgres -d fastapi_psycopg3 -f v1_create_todos_table.sql

Routers

app/routers/todos_v1.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from psycopg.rows import class_row

from app.db import get_conn

router = APIRouter(prefix="/v1/todos")


class ToDo(BaseModel):
id: int | None
name: str
completed: bool


@router.post("")
def create_todo(todo: ToDo):
with get_conn() as conn:
conn.execute(
"insert into todos (name, completed) values (%s, %s)",
[todo.name, todo.completed],
)


@router.get("")
def get_todos():
with get_conn() as conn, conn.cursor(row_factory=class_row(ToDo)) as cur:
records = cur.execute("select * from todos").fetchall()
return records


@router.get("/{id}")
def get_todo(id: int):
with get_conn() as conn, conn.cursor(row_factory=class_row(ToDo)) as cur:
record = cur.execute("select * from todos where id=%s", [id]).fetchone()
if not record:
raise HTTPException(404)
return record


@router.put("/{id}")
def update_todo(id: int, todo: ToDo):
with get_conn() as conn, conn.cursor(row_factory=class_row(ToDo)) as cur:
record = cur.execute(
"update todos set name=%s, completed=%s where id=%s returning *",
[todo.name, todo.completed, id],
).fetchone()
return record


@router.delete("/{id}")
def delete_todo(id: int):
with get_conn() as conn:
conn.execute("delete from todos where id=%s", [id])

app/routers/todos_v2.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from psycopg.rows import class_row

from app.db import get_pool

router = APIRouter(prefix="/v2/todos")

pool = get_pool()


class ToDo(BaseModel):
id: int | None
name: str
completed: bool


@router.post("")
def create_todo(todo: ToDo):
with pool.connection() as conn:
conn.execute(
"insert into todos (name, completed) values (%s, %s)",
[todo.name, todo.completed],
)


@router.get("")
def get_todos():
with pool.connection() as conn, conn.cursor(row_factory=class_row(ToDo)) as cur:
records = cur.execute("select * from todos").fetchall()
return records


@router.get("/{id}")
def get_todo(id: int):
with pool.connection() as conn, conn.cursor(row_factory=class_row(ToDo)) as cur:
record = cur.execute("select * from todos where id=%s", [id]).fetchone()
if not record:
raise HTTPException(404)
return record


@router.put("/{id}")
def update_todo(id: int, todo: ToDo):
with pool.connection() as conn, conn.cursor(row_factory=class_row(ToDo)) as cur:
record = cur.execute(
"update todos set name=%s, completed=%s where id=%s returning *",
[todo.name, todo.completed, id],
).fetchone()
return record


@router.delete("/{id}")
def delete_todo(id: int):
with pool.connection() as conn:
conn.execute("delete from todos where id=%s", [id])

app/routers/todos_v3.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from psycopg.rows import class_row

from app.db import get_async_pool

router = APIRouter(prefix="/v3/todos")

pool = get_async_pool()


class ToDo(BaseModel):
id: int | None
name: str
completed: bool


@router.post("")
async def create_todo(todo: ToDo):
async with pool.connection() as conn:
await conn.execute(
"insert into todos (name, completed) values (%s, %s)",
[todo.name, todo.completed],
)


@router.get("")
async def get_todos():
async with pool.connection() as conn, conn.cursor(
row_factory=class_row(ToDo)
) as cur:
await cur.execute("select * from todos")
records = await cur.fetchall()
return records


@router.get("/{id}")
async def get_todo(id: int):
async with pool.connection() as conn, conn.cursor(
row_factory=class_row(ToDo)
) as cur:
await cur.execute("select * from todos where id=%s", [id])
record = await cur.fetchone()
if not record:
raise HTTPException(404)
return record


@router.put("/{id}")
async def update_todo(id: int, todo: ToDo):
async with pool.connection() as conn, conn.cursor(
row_factory=class_row(ToDo)
) as cur:
await cur.execute(
"update todos set name=%s, completed=%s where id=%s returning *",
[todo.name, todo.completed, id],
)
record = await cur.fetchone()
return record


@router.delete("/{id}")
async def delete_todo(id: int):
async with pool.connection() as conn:
await conn.execute("delete from todos where id=%s", [id])

Config

app/config.py
from functools import lru_cache

from pydantic import BaseSettings

# uncomment to see psycopg.pool logs
# import logging
# logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
# logging.getLogger("psycopg.pool").setLevel(logging.INFO)


class Settings(BaseSettings):
db_user: str
db_password: str
db_host: str
db_port: str
db_name: str

class Config:
env_file = ".env"


@lru_cache()
def get_settings():
return Settings()

DB

app/db.py
from functools import lru_cache

import psycopg
from psycopg_pool import ConnectionPool, AsyncConnectionPool

from app.config import get_settings

settings = get_settings()

conninfo = f"user={settings.db_user} password={settings.db_password} host={settings.db_host} port={settings.db_port} dbname={settings.db_name}"


def get_conn():
return psycopg.connect(conninfo=conninfo)


@lru_cache()
def get_pool():
return ConnectionPool(conninfo=conninfo)


@lru_cache()
def get_async_pool():
return AsyncConnectionPool(conninfo=conninfo)

Main

app/main.py
import asyncio
from fastapi import FastAPI

from app.routers import todos_v1, todos_v2, todos_v3
from app.db import get_pool, get_async_pool

app = FastAPI()

pool = get_pool()
async_pool = get_async_pool()

app.include_router(todos_v1.router)
app.include_router(todos_v2.router)
app.include_router(todos_v3.router)


async def check_connections():
while True:
await asyncio.sleep(600)
print("check connections")
pool.check()


async def check_async_connections():
while True:
await asyncio.sleep(600)
print("check async connections")
await async_pool.check()


@app.on_event("startup")
def startup():
asyncio.create_task(check_connections())
asyncio.create_task(check_async_connections())


@app.get("/")
def read_root():
return {"Hello": "World"}

Env

.env
DB_USER=postgres
DB_PASSWORD=postgres
DB_HOST=localhost
DB_PORT=5432
DB_NAME=fastapi_psycopg3

References