unwind/tests/test_db.py
ducklet 4981de4a04 remove databases, use SQLAlechemy 2.0 instead
Among the many changes we switch to using SQLAlchemy's connection pool,
which means we are no longer required to guard against multiple threads
working on the database.
All db funcs now receive a connection to use as their first argument,
this allows the caller to control transaction & rollback behavior.
2023-11-27 23:24:35 +01:00

419 lines
11 KiB
Python

from datetime import datetime
import pytest
from unwind import db, models, web_models
_movie_imdb_id = 1230000
def a_movie(**kwds) -> models.Movie:
global _movie_imdb_id
_movie_imdb_id += 1
args = {
"title": "test movie",
"release_year": 2013,
"media_type": "Movie",
"imdb_id": f"tt{_movie_imdb_id}",
"genres": {"genre-1"},
} | kwds
return models.Movie(**args)
@pytest.mark.asyncio
async def test_current_patch_level(conn: db.Connection):
patch_level = "some-patch-level"
assert patch_level != await db.current_patch_level(conn)
await db.set_current_patch_level(conn, patch_level)
assert patch_level == await db.current_patch_level(conn)
@pytest.mark.asyncio
async def test_get(conn: db.Connection):
m1 = a_movie()
await db.add(conn, m1)
m2 = a_movie(release_year=m1.release_year + 1)
await db.add(conn, m2)
assert None is await db.get(conn, models.Movie)
assert None is await db.get(conn, models.Movie, id="blerp")
assert m1 == await db.get(conn, models.Movie, id=str(m1.id))
assert m2 == await db.get(conn, models.Movie, release_year=m2.release_year)
assert None is await db.get(
conn, models.Movie, id=str(m1.id), release_year=m2.release_year
)
assert m2 == await db.get(
conn, models.Movie, id=str(m2.id), release_year=m2.release_year
)
assert m1 == await db.get(
conn,
models.Movie,
media_type=m1.media_type,
order_by=(models.movies.c.release_year, "asc"),
)
assert m2 == await db.get(
conn,
models.Movie,
media_type=m1.media_type,
order_by=(models.movies.c.release_year, "desc"),
)
@pytest.mark.asyncio
async def test_get_all(conn: db.Connection):
m1 = a_movie()
await db.add(conn, m1)
m2 = a_movie(release_year=m1.release_year)
await db.add(conn, m2)
m3 = a_movie(release_year=m1.release_year + 1)
await db.add(conn, m3)
assert [] == list(await db.get_all(conn, models.Movie, id="blerp"))
assert [m1] == list(await db.get_all(conn, models.Movie, id=str(m1.id)))
assert [m1, m2] == list(
await db.get_all(conn, models.Movie, release_year=m1.release_year)
)
assert [m1, m2, m3] == list(await db.get_all(conn, models.Movie))
@pytest.mark.asyncio
async def test_get_many(conn: db.Connection):
m1 = a_movie()
await db.add(conn, m1)
m2 = a_movie(release_year=m1.release_year)
await db.add(conn, m2)
m3 = a_movie(release_year=m1.release_year + 1)
await db.add(conn, m3)
assert [] == list(await db.get_many(conn, models.Movie)), "selected nothing"
assert [m1] == list(await db.get_many(conn, models.Movie, id=[str(m1.id)]))
assert [m1] == list(await db.get_many(conn, models.Movie, id={str(m1.id)}))
assert [m1, m2] == list(
await db.get_many(conn, models.Movie, release_year=[m1.release_year])
)
assert [m1, m2, m3] == list(
await db.get_many(
conn, models.Movie, release_year=[m1.release_year, m3.release_year]
)
)
@pytest.mark.asyncio
async def test_add_and_get(conn: db.Connection):
m1 = a_movie()
await db.add(conn, m1)
m2 = a_movie()
await db.add(conn, m2)
assert m1 == await db.get(conn, models.Movie, id=str(m1.id))
assert m2 == await db.get(conn, models.Movie, id=str(m2.id))
@pytest.mark.asyncio
async def test_update(conn: db.Connection):
m = a_movie()
await db.add(conn, m)
assert m == await db.get(conn, models.Movie, id=str(m.id))
m.title += "something else"
assert m != await db.get(conn, models.Movie, id=str(m.id))
await db.update(conn, m)
assert m == await db.get(conn, models.Movie, id=str(m.id))
@pytest.mark.asyncio
async def test_remove(conn: db.Connection):
m1 = a_movie()
await db.add(conn, m1)
assert m1 == await db.get(conn, models.Movie, id=str(m1.id))
await db.remove(conn, m1)
assert None is await db.get(conn, models.Movie, id=str(m1.id))
@pytest.mark.asyncio
async def test_find_ratings(conn: db.Connection):
m1 = a_movie(
title="test movie",
release_year=2013,
genres={"genre-1"},
)
await db.add(conn, m1)
m2 = a_movie(
title="it's anöther Movie, Part 2",
release_year=2015,
genres={"genre-2"},
)
await db.add(conn, m2)
m3 = a_movie(
title="movie it's, Part 3",
release_year=m2.release_year,
genres=m2.genres,
)
await db.add(conn, m3)
u1 = models.User(
imdb_id="u00001",
name="User1",
secret="secret1",
)
await db.add(conn, u1)
u2 = models.User(
imdb_id="u00002",
name="User2",
secret="secret2",
)
await db.add(conn, u2)
r1 = models.Rating(
movie_id=m2.id,
movie=m2,
user_id=u1.id,
user=u1,
score=66,
rating_date=datetime.now(),
)
await db.add(conn, r1)
r2 = models.Rating(
movie_id=m2.id,
movie=m2,
user_id=u2.id,
user=u2,
score=77,
rating_date=datetime.now(),
)
await db.add(conn, r2)
# ---
rows = await db.find_ratings(
conn,
title=m1.title,
media_type=m1.media_type,
exact=True,
ignore_tv_episodes=True,
include_unrated=True,
yearcomp=("=", m1.release_year),
limit_rows=3,
user_ids=[],
)
ratings = (web_models.Rating(**r) for r in rows)
assert (web_models.RatingAggregate.from_movie(m1),) == tuple(
web_models.aggregate_ratings(ratings, user_ids=[])
)
rows = await db.find_ratings(conn, title="movie", include_unrated=False)
ratings = tuple(web_models.Rating(**r) for r in rows)
assert (
web_models.Rating.from_movie(m2, rating=r1),
web_models.Rating.from_movie(m2, rating=r2),
) == ratings
rows = await db.find_ratings(conn, title="movie", include_unrated=True)
ratings = tuple(web_models.Rating(**r) for r in rows)
assert (
web_models.Rating.from_movie(m1),
web_models.Rating.from_movie(m2, rating=r1),
web_models.Rating.from_movie(m2, rating=r2),
web_models.Rating.from_movie(m3),
) == ratings
aggr = web_models.aggregate_ratings(ratings, user_ids=[])
assert tuple(
web_models.RatingAggregate.from_movie(m) for m in [m1, m2, m3]
) == tuple(aggr)
aggr = web_models.aggregate_ratings(ratings, user_ids=[str(u1.id)])
assert (
web_models.RatingAggregate.from_movie(m1),
web_models.RatingAggregate.from_movie(m2, ratings=[r1]),
web_models.RatingAggregate.from_movie(m3),
) == tuple(aggr)
aggr = web_models.aggregate_ratings(ratings, user_ids=[str(u1.id), str(u2.id)])
assert (
web_models.RatingAggregate.from_movie(m1),
web_models.RatingAggregate.from_movie(m2, ratings=[r1, r2]),
web_models.RatingAggregate.from_movie(m3),
) == tuple(aggr)
rows = await db.find_ratings(conn, title="movie", include_unrated=True)
ratings = (web_models.Rating(**r) for r in rows)
aggr = web_models.aggregate_ratings(ratings, user_ids=[])
assert tuple(
web_models.RatingAggregate.from_movie(m) for m in [m1, m2, m3]
) == tuple(aggr)
rows = await db.find_ratings(conn, title="test", include_unrated=True)
ratings = tuple(web_models.Rating(**r) for r in rows)
assert (web_models.Rating.from_movie(m1),) == ratings
@pytest.mark.asyncio
async def test_ratings_for_movies(conn: db.Connection):
m1 = a_movie()
await db.add(conn, m1)
m2 = a_movie()
await db.add(conn, m2)
u1 = models.User(
imdb_id="u00001",
name="User1",
secret="secret1",
)
await db.add(conn, u1)
u2 = models.User(
imdb_id="u00002",
name="User2",
secret="secret2",
)
await db.add(conn, u2)
r1 = models.Rating(
movie_id=m2.id,
movie=m2,
user_id=u1.id,
user=u1,
score=66,
rating_date=datetime.now(),
)
await db.add(conn, r1)
# ---
movie_ids = [m1.id]
user_ids = []
assert tuple() == tuple(
await db.ratings_for_movies(conn, movie_ids=movie_ids, user_ids=user_ids)
)
movie_ids = [m2.id]
user_ids = []
assert (r1,) == tuple(
await db.ratings_for_movies(conn, movie_ids=movie_ids, user_ids=user_ids)
)
movie_ids = [m2.id]
user_ids = [u2.id]
assert tuple() == tuple(
await db.ratings_for_movies(conn, movie_ids=movie_ids, user_ids=user_ids)
)
movie_ids = [m2.id]
user_ids = [u1.id]
assert (r1,) == tuple(
await db.ratings_for_movies(conn, movie_ids=movie_ids, user_ids=user_ids)
)
movie_ids = [m1.id, m2.id]
user_ids = [u1.id, u2.id]
assert (r1,) == tuple(
await db.ratings_for_movies(conn, movie_ids=movie_ids, user_ids=user_ids)
)
@pytest.mark.asyncio
async def test_find_movies(conn: db.Connection):
m1 = a_movie(title="movie one")
await db.add(conn, m1)
m2 = a_movie(title="movie two", imdb_score=33, release_year=m1.release_year + 1)
await db.add(conn, m2)
u1 = models.User(
imdb_id="u00001",
name="User1",
secret="secret1",
)
await db.add(conn, u1)
u2 = models.User(
imdb_id="u00002",
name="User2",
secret="secret2",
)
await db.add(conn, u2)
r1 = models.Rating(
movie_id=m2.id,
movie=m2,
user_id=u1.id,
user=u1,
score=66,
rating_date=datetime.now(),
)
await db.add(conn, r1)
# ---
assert () == tuple(
await db.find_movies(conn, title=m1.title, include_unrated=False)
)
assert ((m1, []),) == tuple(
await db.find_movies(conn, title=m1.title, include_unrated=True)
)
assert ((m1, []),) == tuple(
await db.find_movies(conn, title="mo on", exact=False, include_unrated=True)
)
assert ((m1, []),) == tuple(
await db.find_movies(conn, title="movie one", exact=True, include_unrated=True)
)
assert () == tuple(
await db.find_movies(conn, title="mo on", exact=True, include_unrated=True)
)
assert ((m2, []),) == tuple(
await db.find_movies(conn, title="movie", exact=False, include_unrated=False)
)
assert ((m2, []), (m1, [])) == tuple(
await db.find_movies(conn, title="movie", exact=False, include_unrated=True)
)
assert ((m1, []),) == tuple(
await db.find_movies(
conn, include_unrated=True, yearcomp=("=", m1.release_year)
)
)
assert ((m2, []),) == tuple(
await db.find_movies(
conn, include_unrated=True, yearcomp=("=", m2.release_year)
)
)
assert ((m1, []),) == tuple(
await db.find_movies(
conn, include_unrated=True, yearcomp=("<", m2.release_year)
)
)
assert ((m2, []),) == tuple(
await db.find_movies(
conn, include_unrated=True, yearcomp=(">", m1.release_year)
)
)
assert ((m2, []), (m1, [])) == tuple(
await db.find_movies(conn, include_unrated=True)
)
assert ((m2, []),) == tuple(
await db.find_movies(conn, include_unrated=True, limit_rows=1)
)
assert ((m1, []),) == tuple(
await db.find_movies(conn, include_unrated=True, skip_rows=1)
)
assert ((m2, [r1]), (m1, [])) == tuple(
await db.find_movies(conn, include_unrated=True, user_ids=[u1.id, u2.id])
)