unwind/tests/test_web.py
ducklet 4981de4a04 remove databases, use SQLAlechemy 2.0 instead
Among the many changes we switch to using SQLAlchemy's connection pool,
which means we are no longer required to guard against multiple threads
working on the database.
All db funcs now receive a connection to use as their first argument,
this allows the caller to control transaction & rollback behavior.
2023-11-27 23:24:35 +01:00

243 lines
6.6 KiB
Python

from datetime import datetime
import pytest
from starlette.testclient import TestClient
from unwind import config, create_app, db, imdb, models
app = create_app()
@pytest.fixture(scope="module")
def unauthorized_client() -> TestClient:
# https://www.starlette.io/testclient/
return TestClient(app)
@pytest.fixture(scope="module")
def authorized_client() -> TestClient:
client = TestClient(app)
client.auth = "user1", "secret1"
return client
@pytest.fixture(scope="module")
def admin_client() -> TestClient:
client = TestClient(app)
for token in config.api_credentials.values():
break
else:
raise RuntimeError("No bearer tokens configured.")
client.headers = {"Authorization": f"Bearer {token}"}
return client
@pytest.mark.asyncio
async def test_get_ratings_for_group(
conn: db.Connection, unauthorized_client: TestClient
):
user = models.User(
imdb_id="ur12345678",
name="user-1",
secret="secret-1",
groups=[],
)
group = models.Group(
name="group-1",
users=[models.GroupUser(id=str(user.id), name=user.name)],
)
user.groups = [models.UserGroup(id=str(group.id), access="r")]
path = app.url_path_for("get_ratings_for_group", group_id=str(group.id))
resp = unauthorized_client.get(path)
assert resp.status_code == 404, "Group does not exist (yet)"
await db.add(conn, user)
await db.add(conn, group)
resp = unauthorized_client.get(path)
assert resp.status_code == 200
assert resp.json() == []
movie = models.Movie(
title="test movie",
release_year=2013,
media_type="Movie",
imdb_id="tt12345678",
genres={"genre-1"},
)
await db.add(conn, movie)
rating = models.Rating(
movie_id=movie.id, user_id=user.id, score=66, rating_date=datetime.now()
)
await db.add(conn, rating)
rating_aggregate = {
"canonical_title": movie.title,
"imdb_score": movie.imdb_score,
"imdb_votes": movie.imdb_votes,
"link": imdb.movie_url(movie.imdb_id),
"media_type": movie.media_type,
"original_title": movie.original_title,
"user_scores": [rating.score],
"year": movie.release_year,
}
resp = unauthorized_client.get(path)
assert resp.status_code == 200
assert resp.json() == [rating_aggregate]
filters = {
"imdb_id": movie.imdb_id,
"unwind_id": str(movie.id),
"title": movie.title,
"media_type": movie.media_type,
"year": movie.release_year,
}
for k, v in filters.items():
resp = unauthorized_client.get(path, params={k: v})
assert resp.status_code == 200
assert resp.json() == [rating_aggregate]
resp = unauthorized_client.get(path, params={"title": "no such thing"})
assert resp.status_code == 200
assert resp.json() == []
# Test "exact" query param.
resp = unauthorized_client.get(
path, params={"title": "test movie", "exact": "true"}
)
assert resp.status_code == 200
assert resp.json() == [rating_aggregate]
resp = unauthorized_client.get(path, params={"title": "te mo", "exact": "false"})
assert resp.status_code == 200
assert resp.json() == [rating_aggregate]
resp = unauthorized_client.get(path, params={"title": "te mo", "exact": "true"})
assert resp.status_code == 200
assert resp.json() == []
# XXX Test "ignore_tv_episodes" query param.
# XXX Test "include_unrated" query param.
# XXX Test "per_page" query param.
@pytest.mark.asyncio
async def test_list_movies(
conn: db.Connection,
unauthorized_client: TestClient,
authorized_client: TestClient,
):
path = app.url_path_for("list_movies")
response = unauthorized_client.get(path)
assert response.status_code == 403
response = authorized_client.get(path)
assert response.status_code == 200
assert response.json() == []
m = models.Movie(
title="test movie",
release_year=2013,
media_type="Movie",
imdb_id="tt12345678",
genres={"genre-1"},
)
await db.add(conn, m)
response = authorized_client.get(path, params={"include_unrated": 1})
assert response.status_code == 200
assert response.json() == [{**models.asplain(m), "user_scores": []}]
m_plain = {
"canonical_title": m.title,
"imdb_score": m.imdb_score,
"imdb_votes": m.imdb_votes,
"link": imdb.movie_url(m.imdb_id),
"media_type": m.media_type,
"original_title": m.original_title,
"user_scores": [],
"year": m.release_year,
}
response = authorized_client.get(path, params={"imdb_id": m.imdb_id})
assert response.status_code == 200
assert response.json() == [m_plain]
response = authorized_client.get(path, params={"unwind_id": str(m.id)})
assert response.status_code == 200
assert response.json() == [m_plain]
@pytest.mark.asyncio
async def test_list_users(
conn: db.Connection,
unauthorized_client: TestClient,
authorized_client: TestClient,
admin_client: TestClient,
):
path = app.url_path_for("list_users")
response = unauthorized_client.get(path)
assert response.status_code == 403
response = authorized_client.get(path)
assert response.status_code == 403
response = admin_client.get(path)
assert response.status_code == 200
assert response.json() == []
m = models.User(
imdb_id="ur12345678",
name="user-1",
secret="secret-1",
groups=[],
)
await db.add(conn, m)
m_plain = {
"groups": m.groups,
"id": m.id,
"imdb_id": m.imdb_id,
"name": m.name,
"secret": m.secret,
}
response = admin_client.get(path)
assert response.status_code == 200
assert response.json() == [m_plain]
@pytest.mark.asyncio
async def test_list_groups(
conn: db.Connection,
unauthorized_client: TestClient,
authorized_client: TestClient,
admin_client: TestClient,
):
path = app.url_path_for("list_groups")
response = unauthorized_client.get(path)
assert response.status_code == 403
response = authorized_client.get(path)
assert response.status_code == 403
response = admin_client.get(path)
assert response.status_code == 200
assert response.json() == []
m = models.Group(
name="group-1",
users=[models.GroupUser(id="123", name="itsa-me")],
)
await db.add(conn, m)
m_plain = {
"users": m.users,
"id": m.id,
"name": m.name,
}
response = admin_client.get(path)
assert response.status_code == 200
assert response.json() == [m_plain]