from datetime import UTC, datetime import pytest from starlette.testclient import TestClient from unwind import config, create_app, db, imdb, models app = create_app() @pytest.fixture(scope="module") def unauthorized_client() -> TestClient: # https://www.starlette.io/testclient/ return TestClient(app) @pytest.fixture(scope="module") def authorized_client() -> TestClient: client = TestClient(app) client.auth = "user1", "secret1" return client @pytest.fixture(scope="module") def admin_client() -> TestClient: client = TestClient(app) for token in config.api_credentials.values(): # noqa: B007 break else: raise RuntimeError("No bearer tokens configured.") client.headers = {"Authorization": f"Bearer {token}"} return client @pytest.mark.asyncio async def test_get_ratings_for_group( conn: db.Connection, unauthorized_client: TestClient ): user = models.User( imdb_id="ur12345678", name="user-1", secret="secret-1", # noqa: S106 groups=[], ) group = models.Group( name="group-1", users=[models.GroupUser(id=str(user.id), name=user.name)], ) user.groups = [models.UserGroup(id=str(group.id), access="r")] path = app.url_path_for("get_ratings_for_group", group_id=str(group.id)) resp = unauthorized_client.get(path) assert resp.status_code == 404, "Group does not exist (yet)" await db.add(conn, user) await db.add(conn, group) resp = unauthorized_client.get(path) assert resp.status_code == 200 assert resp.json() == [] movie = models.Movie( title="test movie", release_year=2013, media_type="Movie", imdb_id="tt12345678", genres={"genre-1"}, ) await db.add(conn, movie) rating = models.Rating( movie_id=movie.id, user_id=user.id, score=66, rating_date=datetime.now(tz=UTC) ) await db.add(conn, rating) rating_aggregate = { "canonical_title": movie.title, "imdb_score": movie.imdb_score, "imdb_votes": movie.imdb_votes, "link": imdb.movie_url(movie.imdb_id), "media_type": movie.media_type, "original_title": movie.original_title, "user_scores": [rating.score], "year": movie.release_year, } resp = unauthorized_client.get(path) assert resp.status_code == 200 assert resp.json() == [rating_aggregate] filters = { "imdb_id": movie.imdb_id, "unwind_id": str(movie.id), "title": movie.title, "media_type": movie.media_type, "year": movie.release_year, } for k, v in filters.items(): resp = unauthorized_client.get(path, params={k: v}) assert resp.status_code == 200 assert resp.json() == [rating_aggregate] resp = unauthorized_client.get(path, params={"title": "no such thing"}) assert resp.status_code == 200 assert resp.json() == [] # Test "exact" query param. resp = unauthorized_client.get( path, params={"title": "test movie", "exact": "true"} ) assert resp.status_code == 200 assert resp.json() == [rating_aggregate] resp = unauthorized_client.get(path, params={"title": "te mo", "exact": "false"}) assert resp.status_code == 200 assert resp.json() == [rating_aggregate] resp = unauthorized_client.get(path, params={"title": "te mo", "exact": "true"}) assert resp.status_code == 200 assert resp.json() == [] # XXX Test "ignore_tv_episodes" query param. # XXX Test "include_unrated" query param. # XXX Test "per_page" query param. @pytest.mark.asyncio async def test_list_movies( conn: db.Connection, unauthorized_client: TestClient, authorized_client: TestClient, ): path = app.url_path_for("list_movies") response = unauthorized_client.get(path) assert response.status_code == 403 response = authorized_client.get(path) assert response.status_code == 200 assert response.json() == [] m = models.Movie( title="test movie", release_year=2013, media_type="Movie", imdb_id="tt12345678", genres={"genre-1"}, ) await db.add(conn, m) response = authorized_client.get(path, params={"include_unrated": 1}) assert response.status_code == 200 assert response.json() == [{**models.asplain(m), "user_scores": []}] m_plain = { "canonical_title": m.title, "imdb_score": m.imdb_score, "imdb_votes": m.imdb_votes, "link": imdb.movie_url(m.imdb_id), "media_type": m.media_type, "original_title": m.original_title, "user_scores": [], "year": m.release_year, } response = authorized_client.get(path, params={"imdb_id": m.imdb_id}) assert response.status_code == 200 assert response.json() == [m_plain] response = authorized_client.get(path, params={"unwind_id": str(m.id)}) assert response.status_code == 200 assert response.json() == [m_plain] @pytest.mark.asyncio async def test_list_users( conn: db.Connection, unauthorized_client: TestClient, authorized_client: TestClient, admin_client: TestClient, ): path = app.url_path_for("list_users") response = unauthorized_client.get(path) assert response.status_code == 403 response = authorized_client.get(path) assert response.status_code == 403 response = admin_client.get(path) assert response.status_code == 200 assert response.json() == [] m = models.User( imdb_id="ur12345678", name="user-1", secret="secret-1", # noqa: S106 groups=[], ) await db.add(conn, m) m_plain = { "groups": m.groups, "id": m.id, "imdb_id": m.imdb_id, "name": m.name, "secret": m.secret, } response = admin_client.get(path) assert response.status_code == 200 assert response.json() == [m_plain] @pytest.mark.asyncio async def test_list_groups( conn: db.Connection, unauthorized_client: TestClient, authorized_client: TestClient, admin_client: TestClient, ): path = app.url_path_for("list_groups") response = unauthorized_client.get(path) assert response.status_code == 403 response = authorized_client.get(path) assert response.status_code == 403 response = admin_client.get(path) assert response.status_code == 200 assert response.json() == [] m = models.Group( name="group-1", users=[models.GroupUser(id="123", name="itsa-me")], ) await db.add(conn, m) m_plain = { "users": m.users, "id": m.id, "name": m.name, } response = admin_client.get(path) assert response.status_code == 200 assert response.json() == [m_plain]