unwind/alembic/env.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

109 lines
3 KiB
Python
Raw Permalink Normal View History

import asyncio
from logging.config import fileConfig
import sqlalchemy as sa
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from unwind import db, models
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
def is_different_type(
context,
inspected_column: sa.Column,
metadata_column: sa.Column,
inspected_type: sa.types.TypeEngine,
metadata_type: sa.types.TypeEngine,
) -> bool | None:
# We used "TEXT" in our manual SQL, which in SQLite is the same as VARCHAR, but
# for SQLAlchemy/Alembic looks different.
equiv_types = [(sa.TEXT, sa.String)]
for types in equiv_types:
if isinstance(inspected_type, types) and isinstance(metadata_type, types):
return False
return None # defer to default compare implementation
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
context.configure(
url=db._connection_uri(),
target_metadata=models.metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
compare_type=is_different_type,
render_as_batch=True,
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(
connection=connection,
target_metadata=models.metadata,
compare_type=is_different_type,
render_as_batch=True,
)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
url=db._connection_uri(),
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
# Support having a (sync) connection passed in from another script.
if (conn := config.attributes.get("connection")) and isinstance(
conn, sa.Connection
):
do_run_migrations(conn)
else:
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()