unwind/tests/test_db.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

411 lines
12 KiB
Python
Raw Normal View History

2021-12-19 19:30:08 +01:00
from datetime import datetime
2023-02-04 18:15:14 +01:00
2021-12-19 19:30:08 +01:00
import pytest
from unwind import db, models, web_models
2023-03-19 22:59:08 +01:00
_movie_imdb_id = 1234567
def a_movie(**kwds) -> models.Movie:
global _movie_imdb_id
_movie_imdb_id += 1
args = {
"title": "test movie",
"release_year": 2013,
"media_type": "Movie",
"imdb_id": f"tt{_movie_imdb_id}",
"genres": {"genre-1"},
} | kwds
return models.Movie(**args)
2021-12-19 19:30:08 +01:00
@pytest.mark.asyncio
async def test_current_patch_level(shared_conn: db.Database):
async with shared_conn.transaction(force_rollback=True):
patch_level = "some-patch-level"
assert patch_level != await db.current_patch_level(shared_conn)
await db.set_current_patch_level(shared_conn, patch_level)
assert patch_level == await db.current_patch_level(shared_conn)
2023-03-20 21:37:50 +01:00
@pytest.mark.asyncio
async def test_get(shared_conn: db.Database):
async with shared_conn.transaction(force_rollback=True):
m1 = a_movie()
await db.add(m1)
m2 = a_movie(release_year=m1.release_year + 1)
await db.add(m2)
assert None == await db.get(models.Movie)
assert None == await db.get(models.Movie, id="blerp")
assert m1 == await db.get(models.Movie, id=str(m1.id))
assert m2 == await db.get(models.Movie, release_year=m2.release_year)
assert None == await db.get(
models.Movie, id=str(m1.id), release_year=m2.release_year
)
assert m2 == await db.get(
models.Movie, id=str(m2.id), release_year=m2.release_year
)
assert m1 == await db.get(
models.Movie, media_type=m1.media_type, order_by=("release_year", "asc")
)
assert m2 == await db.get(
models.Movie, media_type=m1.media_type, order_by=("release_year", "desc")
)
@pytest.mark.asyncio
async def test_get_all(shared_conn: db.Database):
async with shared_conn.transaction(force_rollback=True):
2023-03-19 22:59:08 +01:00
m1 = a_movie()
await db.add(m1)
2023-03-19 22:59:08 +01:00
m2 = a_movie(release_year=m1.release_year)
await db.add(m2)
2023-03-19 22:59:08 +01:00
m3 = a_movie(release_year=m1.release_year + 1)
await db.add(m3)
assert [] == list(await db.get_all(models.Movie, id="blerp"))
assert [m1] == list(await db.get_all(models.Movie, id=str(m1.id)))
2023-03-19 23:14:59 +01:00
assert [m1, m2] == list(
await db.get_all(models.Movie, release_year=m1.release_year)
)
assert [m1, m2, m3] == list(await db.get_all(models.Movie))
2023-03-19 23:14:59 +01:00
@pytest.mark.asyncio
async def test_get_many(shared_conn: db.Database):
async with shared_conn.transaction(force_rollback=True):
m1 = a_movie()
await db.add(m1)
m2 = a_movie(release_year=m1.release_year)
await db.add(m2)
m3 = a_movie(release_year=m1.release_year + 1)
await db.add(m3)
assert [] == list(await db.get_many(models.Movie)), "selected nothing"
assert [m1] == list(await db.get_many(models.Movie, id=[str(m1.id)]))
assert [m1] == list(await db.get_many(models.Movie, id={str(m1.id)}))
assert [m1, m2] == list(
await db.get_many(models.Movie, release_year=[m1.release_year])
)
assert [m1, m2, m3] == list(
await db.get_many(
models.Movie, release_year=[m1.release_year, m3.release_year]
)
)
@pytest.mark.asyncio
async def test_add_and_get(shared_conn: db.Database):
2021-12-19 19:30:08 +01:00
async with shared_conn.transaction(force_rollback=True):
2023-03-19 22:59:08 +01:00
m1 = a_movie()
2021-12-19 19:30:08 +01:00
await db.add(m1)
2023-03-19 22:59:08 +01:00
m2 = a_movie()
2021-12-19 19:30:08 +01:00
await db.add(m2)
assert m1 == await db.get(models.Movie, id=str(m1.id))
assert m2 == await db.get(models.Movie, id=str(m2.id))
@pytest.mark.asyncio
async def test_update(shared_conn: db.Database):
async with shared_conn.transaction(force_rollback=True):
m = a_movie()
await db.add(m)
assert m == await db.get(models.Movie, id=str(m.id))
m.title += "something else"
assert m != await db.get(models.Movie, id=str(m.id))
await db.update(m)
assert m == await db.get(models.Movie, id=str(m.id))
@pytest.mark.asyncio
async def test_remove(shared_conn: db.Database):
async with shared_conn.transaction(force_rollback=True):
m1 = a_movie()
await db.add(m1)
assert m1 == await db.get(models.Movie, id=str(m1.id))
await db.remove(m1)
assert None == await db.get(models.Movie, id=str(m1.id))
@pytest.mark.asyncio
async def test_find_ratings(shared_conn: db.Database):
2021-12-19 19:30:08 +01:00
async with shared_conn.transaction(force_rollback=True):
2023-03-19 22:59:08 +01:00
m1 = a_movie(
2021-12-19 19:30:08 +01:00
title="test movie",
release_year=2013,
genres={"genre-1"},
)
await db.add(m1)
2023-03-19 22:59:08 +01:00
m2 = a_movie(
2021-12-19 19:30:08 +01:00
title="it's anöther Movie, Part 2",
release_year=2015,
genres={"genre-2"},
)
await db.add(m2)
2023-03-19 22:59:08 +01:00
m3 = a_movie(
2021-12-19 19:30:08 +01:00
title="movie it's, Part 3",
2023-03-19 22:59:08 +01:00
release_year=m2.release_year,
genres=m2.genres,
2021-12-19 19:30:08 +01:00
)
await db.add(m3)
u1 = models.User(
imdb_id="u00001",
name="User1",
secret="secret1",
)
await db.add(u1)
u2 = models.User(
imdb_id="u00002",
name="User2",
secret="secret2",
)
await db.add(u2)
r1 = models.Rating(
movie_id=m2.id,
movie=m2,
user_id=u1.id,
user=u1,
score=66,
rating_date=datetime.now(),
)
await db.add(r1)
r2 = models.Rating(
movie_id=m2.id,
movie=m2,
user_id=u2.id,
user=u2,
score=77,
rating_date=datetime.now(),
)
await db.add(r2)
# ---
rows = await db.find_ratings(
title=m1.title,
media_type=m1.media_type,
exact=True,
ignore_tv_episodes=True,
include_unrated=True,
yearcomp=("=", m1.release_year),
limit_rows=3,
user_ids=[],
)
ratings = (web_models.Rating(**r) for r in rows)
assert (web_models.RatingAggregate.from_movie(m1),) == tuple(
web_models.aggregate_ratings(ratings, user_ids=[])
)
rows = await db.find_ratings(title="movie", include_unrated=False)
ratings = tuple(web_models.Rating(**r) for r in rows)
assert (
web_models.Rating.from_movie(m2, rating=r1),
web_models.Rating.from_movie(m2, rating=r2),
) == ratings
rows = await db.find_ratings(title="movie", include_unrated=True)
ratings = tuple(web_models.Rating(**r) for r in rows)
assert (
web_models.Rating.from_movie(m1),
web_models.Rating.from_movie(m2, rating=r1),
web_models.Rating.from_movie(m2, rating=r2),
web_models.Rating.from_movie(m3),
) == ratings
aggr = web_models.aggregate_ratings(ratings, user_ids=[])
assert tuple(
web_models.RatingAggregate.from_movie(m) for m in [m1, m2, m3]
) == tuple(aggr)
aggr = web_models.aggregate_ratings(ratings, user_ids=[str(u1.id)])
assert (
web_models.RatingAggregate.from_movie(m1),
web_models.RatingAggregate.from_movie(m2, ratings=[r1]),
web_models.RatingAggregate.from_movie(m3),
) == tuple(aggr)
aggr = web_models.aggregate_ratings(ratings, user_ids=[str(u1.id), str(u2.id)])
assert (
web_models.RatingAggregate.from_movie(m1),
web_models.RatingAggregate.from_movie(m2, ratings=[r1, r2]),
web_models.RatingAggregate.from_movie(m3),
) == tuple(aggr)
rows = await db.find_ratings(title="movie", include_unrated=True)
ratings = (web_models.Rating(**r) for r in rows)
aggr = web_models.aggregate_ratings(ratings, user_ids=[])
assert tuple(
web_models.RatingAggregate.from_movie(m) for m in [m1, m2, m3]
) == tuple(aggr)
rows = await db.find_ratings(title="test", include_unrated=True)
ratings = tuple(web_models.Rating(**r) for r in rows)
assert (web_models.Rating.from_movie(m1),) == ratings
@pytest.mark.asyncio
async def test_ratings_for_movies(shared_conn: db.Database):
async with shared_conn.transaction(force_rollback=True):
m1 = a_movie()
await db.add(m1)
m2 = a_movie()
await db.add(m2)
u1 = models.User(
imdb_id="u00001",
name="User1",
secret="secret1",
)
await db.add(u1)
u2 = models.User(
imdb_id="u00002",
name="User2",
secret="secret2",
)
await db.add(u2)
r1 = models.Rating(
movie_id=m2.id,
movie=m2,
user_id=u1.id,
user=u1,
score=66,
rating_date=datetime.now(),
)
await db.add(r1)
# ---
movie_ids = [m1.id]
user_ids = []
assert tuple() == tuple(
await db.ratings_for_movies(movie_ids=movie_ids, user_ids=user_ids)
)
movie_ids = [m2.id]
user_ids = []
assert (r1,) == tuple(
await db.ratings_for_movies(movie_ids=movie_ids, user_ids=user_ids)
)
movie_ids = [m2.id]
user_ids = [u2.id]
assert tuple() == tuple(
await db.ratings_for_movies(movie_ids=movie_ids, user_ids=user_ids)
)
movie_ids = [m2.id]
user_ids = [u1.id]
assert (r1,) == tuple(
await db.ratings_for_movies(movie_ids=movie_ids, user_ids=user_ids)
)
movie_ids = [m1.id, m2.id]
user_ids = [u1.id, u2.id]
assert (r1,) == tuple(
await db.ratings_for_movies(movie_ids=movie_ids, user_ids=user_ids)
)
2023-03-28 21:49:02 +02:00
@pytest.mark.asyncio
async def test_find_movies(shared_conn: db.Database):
async with shared_conn.transaction(force_rollback=True):
m1 = a_movie(title="movie one")
await db.add(m1)
m2 = a_movie(title="movie two", imdb_score=33, release_year=m1.release_year + 1)
await db.add(m2)
u1 = models.User(
imdb_id="u00001",
name="User1",
secret="secret1",
)
await db.add(u1)
u2 = models.User(
imdb_id="u00002",
name="User2",
secret="secret2",
)
await db.add(u2)
r1 = models.Rating(
movie_id=m2.id,
movie=m2,
user_id=u1.id,
user=u1,
score=66,
rating_date=datetime.now(),
)
await db.add(r1)
# ---
assert () == tuple(await db.find_movies(title=m1.title, include_unrated=False))
assert ((m1, []),) == tuple(
await db.find_movies(title=m1.title, include_unrated=True)
)
assert ((m1, []),) == tuple(
await db.find_movies(title="mo on", exact=False, include_unrated=True)
)
assert ((m1, []),) == tuple(
await db.find_movies(title="movie one", exact=True, include_unrated=True)
)
assert () == tuple(
await db.find_movies(title="mo on", exact=True, include_unrated=True)
)
assert ((m2, []),) == tuple(
await db.find_movies(title="movie", exact=False, include_unrated=False)
)
assert ((m2, []), (m1, [])) == tuple(
await db.find_movies(title="movie", exact=False, include_unrated=True)
)
assert ((m1, []),) == tuple(
await db.find_movies(include_unrated=True, yearcomp=("=", m1.release_year))
)
assert ((m2, []),) == tuple(
await db.find_movies(include_unrated=True, yearcomp=("=", m2.release_year))
)
assert ((m1, []),) == tuple(
await db.find_movies(include_unrated=True, yearcomp=("<", m2.release_year))
)
assert ((m2, []),) == tuple(
await db.find_movies(include_unrated=True, yearcomp=(">", m1.release_year))
)
assert ((m2, []), (m1, [])) == tuple(await db.find_movies(include_unrated=True))
assert ((m2, []),) == tuple(
await db.find_movies(include_unrated=True, limit_rows=1)
)
assert ((m1, []),) == tuple(
await db.find_movies(include_unrated=True, skip_rows=1)
)
assert ((m2, [r1]), (m1, [])) == tuple(
await db.find_movies(include_unrated=True, user_ids=[u1.id, u2.id])
)