-
Notifications
You must be signed in to change notification settings - Fork 28
Description
I'm looking for a solution which may be documented and I haven't seen it yet or I am finding some bug when trying to initialize the versioning_manager or related to #39
My simplified app example I'm working with is as follows and is mostly a copy of the FastAPI documentation's SQL integration guide found here with some modifications for async database access via SQLAlchemy 1.4 based on this
What I notice about the error is the error does not come directly from the .init(Base) call but when the Base.metadata.create_all is called the error is raised. The error does not occur and I do not see any Activity table built or schemas in the database if I move init_versioning_manager after create_db_and_tables.
I would greatly appreciate help with this as I'm no longer sure what is wrong or if this is functionality that is not yet supported with async SQLAlchemy 1.4 or something else entirely. I also would love to use this project since I can not use SQLAlchemy-Continuum anylonger with SLQA1.4 since I'm now using the core api and not the orm so the events used by continuum do not work with the core api and this project seemed like my solution.
sqlalchemy.exc.InvalidRequestError
INFO: Started server process [24408]
INFO: Waiting for application startup.
ERROR: Traceback (most recent call last):
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/starlette/routing.py", line 621, in lifespan
async with self.lifespan_context(app):
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/starlette/routing.py", line 518, in __aenter__
await self._router.startup()
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/starlette/routing.py", line 598, in startup
await handler()
File "/home/michael/personal/test-audit/app/main.py", line 15, in on_startup
await create_db_and_tables()
File "/home/michael/personal/test-audit/app/database.py", line 21, in create_db_and_tables
await conn.run_sync(Base.metadata.create_all)
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/ext/asyncio/engine.py", line 559, in run_sync
return await greenlet_spawn(fn, conn, *arg, **kw)
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 136, in greenlet_spawn
result = context.switch(value)
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/schema.py", line 4785, in create_all
bind._run_ddl_visitor(
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2113, in _run_ddl_visitor
visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/visitors.py", line 524, in traverse_single
return meth(obj, **kw)
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 849, in visit_metadata
self.traverse_single(
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/visitors.py", line 524, in traverse_single
return meth(obj, **kw)
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 915, in visit_table
table.dispatch.after_create(
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/event/attr.py", line 343, in __call__
fn(*args, **kw)
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/postgresql_audit/base.py", line 262, in create_audit_table
StatementExecutor(sql)(target, bind, **kwargs)
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/postgresql_audit/base.py", line 38, in __call__
tx = bind.begin()
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/future/engine.py", line 144, in begin
return super(Connection, self).begin()
File "/home/michael/.cache/pypoetry/virtualenvs/test-audit-1r1WU4Sj-py3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 766, in begin
raise exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: This connection has already initialized a SQLAlchemy Transaction() object via begin() or autobegin; can't call begin() here unless rollback() or commit() is called first.
ERROR: Application startup failed. Exiting.
.
├── docker-compose.yml
├── main.py
├── app
│ ├── crud.py
│ ├── database.py
│ ├── main.py
│ ├── models.py
│ └── schemas.py
└── scripts
└── init.sh
[tool.poetry.dependencies]
python = "^3.8"
PostgreSQL-Audit = "^0.13.0"
fastapi = "^0.74.1"
uvicorn = "^0.17.5"
asyncpg = "^0.25.0"
main.py
import uvicorn
if __name__ == "__main__":
uvicorn.run("app.main:app", host="0.0.0.0", port=5001, log_level="debug")docker-compose.yml
version: "3.9"
services:
db:
image: postgres:13
environment:
- POSTGRES_USER=db_user
- POSTGRES_PASSWORD=db_pass
- PGDATA=/var/lib/postgresql/data/pgdata
ports:
- "5432:5432"
volumes:
- ./scripts:/docker-entrypoint-initdb.d
admin:
image: adminer
restart: always
ports:
- 8081:8080
depends_on:
- dbscripts/init.sh
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" <<EOF
CREATE EXTENSION IF NOT EXISTS btree_gist;
EOFapp/crud.py
import abc
from typing import Generic, List, Type, TypeVar
from sqlalchemy import select
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from . import models, schemas
IN_SCHEMA = TypeVar("IN_SCHEMA", bound=BaseModel)
SCHEMA = TypeVar("SCHEMA", bound=BaseModel)
TABLE = TypeVar("TABLE")
class CRUDBase(Generic[TABLE, IN_SCHEMA, SCHEMA], metaclass=abc.ABCMeta):
def __init__(self, db_session: AsyncSession, *args, **kwargs) -> None:
self._db_session: AsyncSession = db_session
@property
@abc.abstractmethod
def _table(self) -> Type[TABLE]:
...
@property
@abc.abstractmethod
def _schema(self) -> Type[SCHEMA]:
...
async def create(self, in_schema: IN_SCHEMA) -> SCHEMA:
item = self._table(**in_schema.dict())
self._db_session.add(item)
await self._db_session.commit()
return self._schema.from_orm(item)
async def get_by_id(self, item_id: int) -> SCHEMA:
query = (
select(self._table)
.where(self._table.id == item_id)
)
(item,) = (await self._db_session.execute(query)).one()
return self._schema.from_orm(item)
async def get_multi(self) -> List[SCHEMA]:
query = select(self._table)
results = await self._db_session.execute(query)
return (self._schema.from_orm(item) for item in results.scalars())
async def remove(self, item_id: int) -> SCHEMA:
item = await self._get_one(item_id)
await self._db_session.delete(item)
await self._db_session.commit()
return self._schema.from_orm(item)
class CRUDItem(CRUDBase[models.Item, schemas.ItemCreate, schemas.Item]):
@property
def _in_schema(self) -> Type[schemas.ItemCreate]:
return schemas.ItemCreate
@property
def _schema(self) -> Type[schemas.Item]:
return schemas.Item
@property
def _table(self) -> Type[models.Item]:
return models.Itemapp/database.py
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "postgresql+asyncpg://db_user:db_pass@localhost:5432/db_user"
engine = create_async_engine(SQLALCHEMY_DATABASE_URL)
async_session_maker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base()
# please don't mind the exact implementation here... I've tried many different ways to call `versioning_manager.init(Base)` and this is just the last one I attempted.
async def init_versioning_manager():
from postgresql_audit import versioning_manager
async with async_session_maker() as session:
versioning_manager.init(Base)
await session.commit()
async def create_db_and_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
await session.commit()app/main.py
from typing import List
from fastapi import Depends, FastAPI
from sqlalchemy.ext.asyncio.session import AsyncSession
from . import crud, schemas
from .database import create_db_and_tables, get_async_session, init_versioning_manager
app = FastAPI()
@app.on_event("startup")
async def on_startup():
# Not needed if you setup a migration system like Alembic
await init_versioning_manager()
await create_db_and_tables()
@app.post("/items/", response_model=schemas.Item)
async def create_item(
db: AsyncSession = Depends(get_async_session),
*,
item: schemas.ItemCreate,
):
item_crud = crud.CRUDItem(db)
item = await item_crud.create(in_schema=item)
return item
@app.get("/items/", response_model=List[schemas.Item])
async def read_items(db: AsyncSession = Depends(get_async_session)):
item_crud = crud.CRUDItem(db)
items = await item_crud.get_multi()
return itemsapp/models.py
from sqlalchemy import Column, Integer, String
from .database import Base
class Item(Base):
__tablename__ = "items"
__versioned__ = {}
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String)
description = Column(String)app/schemas.py
from typing import List, Optional
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Optional[str] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
class Config:
orm_mode = True