unwind/alembic/env.py

104 lines
2.8 KiB
Python
Raw Normal View History

import asyncio
from logging.config import fileConfig
import sqlalchemy as sa
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from unwind import db, models
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
def is_different_type(
context,
inspected_column: sa.Column,
metadata_column: sa.Column,
inspected_type: sa.types.TypeEngine,
metadata_type: sa.types.TypeEngine,
) -> bool | None:
# We used "TEXT" in our manual SQL, which in SQLite is the same as VARCHAR, but
# for SQLAlchemy/Alembic looks different.
equiv_types = [(sa.TEXT, sa.String)]
for types in equiv_types:
if isinstance(inspected_type, types) and isinstance(metadata_type, types):
return False
return None # defer to default compare implementation
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
context.configure(
url=db._connection_uri(),
target_metadata=models.metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
compare_type=is_different_type,
render_as_batch=True,
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(
connection=connection,
target_metadata=models.metadata,
compare_type=is_different_type,
render_as_batch=True,
)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
url=db._connection_uri(),
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online_async() -> None:
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online_async()