from datetime import datetime import pytest from unwind import db, models, web_models _movie_imdb_id = 1234567 def a_movie(**kwds) -> models.Movie: global _movie_imdb_id _movie_imdb_id += 1 args = { "title": "test movie", "release_year": 2013, "media_type": "Movie", "imdb_id": f"tt{_movie_imdb_id}", "genres": {"genre-1"}, } | kwds return models.Movie(**args) @pytest.mark.asyncio async def test_get_all(shared_conn: db.Database): async with shared_conn.transaction(force_rollback=True): m1 = a_movie() await db.add(m1) m2 = a_movie(release_year=m1.release_year) await db.add(m2) m3 = a_movie(release_year=m1.release_year + 1) await db.add(m3) assert [] == list(await db.get_all(models.Movie, id="blerp")) assert [m1] == list(await db.get_all(models.Movie, id=str(m1.id))) assert [m1, m2] == list(await db.get_all(models.Movie, release_year=2013)) assert [m1, m2, m3] == list(await db.get_all(models.Movie)) @pytest.mark.asyncio async def test_add_and_get(shared_conn: db.Database): async with shared_conn.transaction(force_rollback=True): m1 = a_movie() await db.add(m1) m2 = a_movie() await db.add(m2) assert m1 == await db.get(models.Movie, id=str(m1.id)) assert m2 == await db.get(models.Movie, id=str(m2.id)) @pytest.mark.asyncio async def test_find_ratings(shared_conn: db.Database): async with shared_conn.transaction(force_rollback=True): m1 = a_movie( title="test movie", release_year=2013, genres={"genre-1"}, ) await db.add(m1) m2 = a_movie( title="it's anöther Movie, Part 2", release_year=2015, genres={"genre-2"}, ) await db.add(m2) m3 = a_movie( title="movie it's, Part 3", release_year=m2.release_year, genres=m2.genres, ) await db.add(m3) u1 = models.User( imdb_id="u00001", name="User1", secret="secret1", ) await db.add(u1) u2 = models.User( imdb_id="u00002", name="User2", secret="secret2", ) await db.add(u2) r1 = models.Rating( movie_id=m2.id, movie=m2, user_id=u1.id, user=u1, score=66, rating_date=datetime.now(), ) await db.add(r1) r2 = models.Rating( movie_id=m2.id, movie=m2, user_id=u2.id, user=u2, score=77, rating_date=datetime.now(), ) await db.add(r2) # --- rows = await db.find_ratings( title=m1.title, media_type=m1.media_type, exact=True, ignore_tv_episodes=True, include_unrated=True, yearcomp=("=", m1.release_year), limit_rows=3, user_ids=[], ) ratings = (web_models.Rating(**r) for r in rows) assert (web_models.RatingAggregate.from_movie(m1),) == tuple( web_models.aggregate_ratings(ratings, user_ids=[]) ) rows = await db.find_ratings(title="movie", include_unrated=False) ratings = tuple(web_models.Rating(**r) for r in rows) assert ( web_models.Rating.from_movie(m2, rating=r1), web_models.Rating.from_movie(m2, rating=r2), ) == ratings rows = await db.find_ratings(title="movie", include_unrated=True) ratings = tuple(web_models.Rating(**r) for r in rows) assert ( web_models.Rating.from_movie(m1), web_models.Rating.from_movie(m2, rating=r1), web_models.Rating.from_movie(m2, rating=r2), web_models.Rating.from_movie(m3), ) == ratings aggr = web_models.aggregate_ratings(ratings, user_ids=[]) assert tuple( web_models.RatingAggregate.from_movie(m) for m in [m1, m2, m3] ) == tuple(aggr) aggr = web_models.aggregate_ratings(ratings, user_ids=[str(u1.id)]) assert ( web_models.RatingAggregate.from_movie(m1), web_models.RatingAggregate.from_movie(m2, ratings=[r1]), web_models.RatingAggregate.from_movie(m3), ) == tuple(aggr) aggr = web_models.aggregate_ratings(ratings, user_ids=[str(u1.id), str(u2.id)]) assert ( web_models.RatingAggregate.from_movie(m1), web_models.RatingAggregate.from_movie(m2, ratings=[r1, r2]), web_models.RatingAggregate.from_movie(m3), ) == tuple(aggr) rows = await db.find_ratings(title="movie", include_unrated=True) ratings = (web_models.Rating(**r) for r in rows) aggr = web_models.aggregate_ratings(ratings, user_ids=[]) assert tuple( web_models.RatingAggregate.from_movie(m) for m in [m1, m2, m3] ) == tuple(aggr) rows = await db.find_ratings(title="test", include_unrated=True) ratings = tuple(web_models.Rating(**r) for r in rows) assert (web_models.Rating.from_movie(m1),) == ratings