Among the many changes we switch to using SQLAlchemy's connection pool, which means we are no longer required to guard against multiple threads working on the database. All db funcs now receive a connection to use as their first argument, this allows the caller to control transaction & rollback behavior.
243 lines
6.6 KiB
Python
243 lines
6.6 KiB
Python
from datetime import datetime
|
|
|
|
import pytest
|
|
from starlette.testclient import TestClient
|
|
|
|
from unwind import config, create_app, db, imdb, models
|
|
|
|
app = create_app()
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def unauthorized_client() -> TestClient:
|
|
# https://www.starlette.io/testclient/
|
|
return TestClient(app)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def authorized_client() -> TestClient:
|
|
client = TestClient(app)
|
|
client.auth = "user1", "secret1"
|
|
return client
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def admin_client() -> TestClient:
|
|
client = TestClient(app)
|
|
for token in config.api_credentials.values():
|
|
break
|
|
else:
|
|
raise RuntimeError("No bearer tokens configured.")
|
|
client.headers = {"Authorization": f"Bearer {token}"}
|
|
return client
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_ratings_for_group(
|
|
conn: db.Connection, unauthorized_client: TestClient
|
|
):
|
|
user = models.User(
|
|
imdb_id="ur12345678",
|
|
name="user-1",
|
|
secret="secret-1",
|
|
groups=[],
|
|
)
|
|
group = models.Group(
|
|
name="group-1",
|
|
users=[models.GroupUser(id=str(user.id), name=user.name)],
|
|
)
|
|
user.groups = [models.UserGroup(id=str(group.id), access="r")]
|
|
path = app.url_path_for("get_ratings_for_group", group_id=str(group.id))
|
|
|
|
resp = unauthorized_client.get(path)
|
|
assert resp.status_code == 404, "Group does not exist (yet)"
|
|
|
|
await db.add(conn, user)
|
|
await db.add(conn, group)
|
|
|
|
resp = unauthorized_client.get(path)
|
|
assert resp.status_code == 200
|
|
assert resp.json() == []
|
|
|
|
movie = models.Movie(
|
|
title="test movie",
|
|
release_year=2013,
|
|
media_type="Movie",
|
|
imdb_id="tt12345678",
|
|
genres={"genre-1"},
|
|
)
|
|
await db.add(conn, movie)
|
|
|
|
rating = models.Rating(
|
|
movie_id=movie.id, user_id=user.id, score=66, rating_date=datetime.now()
|
|
)
|
|
await db.add(conn, rating)
|
|
|
|
rating_aggregate = {
|
|
"canonical_title": movie.title,
|
|
"imdb_score": movie.imdb_score,
|
|
"imdb_votes": movie.imdb_votes,
|
|
"link": imdb.movie_url(movie.imdb_id),
|
|
"media_type": movie.media_type,
|
|
"original_title": movie.original_title,
|
|
"user_scores": [rating.score],
|
|
"year": movie.release_year,
|
|
}
|
|
|
|
resp = unauthorized_client.get(path)
|
|
assert resp.status_code == 200
|
|
assert resp.json() == [rating_aggregate]
|
|
|
|
filters = {
|
|
"imdb_id": movie.imdb_id,
|
|
"unwind_id": str(movie.id),
|
|
"title": movie.title,
|
|
"media_type": movie.media_type,
|
|
"year": movie.release_year,
|
|
}
|
|
for k, v in filters.items():
|
|
resp = unauthorized_client.get(path, params={k: v})
|
|
assert resp.status_code == 200
|
|
assert resp.json() == [rating_aggregate]
|
|
|
|
resp = unauthorized_client.get(path, params={"title": "no such thing"})
|
|
assert resp.status_code == 200
|
|
assert resp.json() == []
|
|
|
|
# Test "exact" query param.
|
|
resp = unauthorized_client.get(
|
|
path, params={"title": "test movie", "exact": "true"}
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.json() == [rating_aggregate]
|
|
resp = unauthorized_client.get(path, params={"title": "te mo", "exact": "false"})
|
|
assert resp.status_code == 200
|
|
assert resp.json() == [rating_aggregate]
|
|
resp = unauthorized_client.get(path, params={"title": "te mo", "exact": "true"})
|
|
assert resp.status_code == 200
|
|
assert resp.json() == []
|
|
|
|
# XXX Test "ignore_tv_episodes" query param.
|
|
# XXX Test "include_unrated" query param.
|
|
# XXX Test "per_page" query param.
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_movies(
|
|
conn: db.Connection,
|
|
unauthorized_client: TestClient,
|
|
authorized_client: TestClient,
|
|
):
|
|
path = app.url_path_for("list_movies")
|
|
response = unauthorized_client.get(path)
|
|
assert response.status_code == 403
|
|
|
|
response = authorized_client.get(path)
|
|
assert response.status_code == 200
|
|
assert response.json() == []
|
|
|
|
m = models.Movie(
|
|
title="test movie",
|
|
release_year=2013,
|
|
media_type="Movie",
|
|
imdb_id="tt12345678",
|
|
genres={"genre-1"},
|
|
)
|
|
await db.add(conn, m)
|
|
|
|
response = authorized_client.get(path, params={"include_unrated": 1})
|
|
assert response.status_code == 200
|
|
assert response.json() == [{**models.asplain(m), "user_scores": []}]
|
|
|
|
m_plain = {
|
|
"canonical_title": m.title,
|
|
"imdb_score": m.imdb_score,
|
|
"imdb_votes": m.imdb_votes,
|
|
"link": imdb.movie_url(m.imdb_id),
|
|
"media_type": m.media_type,
|
|
"original_title": m.original_title,
|
|
"user_scores": [],
|
|
"year": m.release_year,
|
|
}
|
|
|
|
response = authorized_client.get(path, params={"imdb_id": m.imdb_id})
|
|
assert response.status_code == 200
|
|
assert response.json() == [m_plain]
|
|
|
|
response = authorized_client.get(path, params={"unwind_id": str(m.id)})
|
|
assert response.status_code == 200
|
|
assert response.json() == [m_plain]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_users(
|
|
conn: db.Connection,
|
|
unauthorized_client: TestClient,
|
|
authorized_client: TestClient,
|
|
admin_client: TestClient,
|
|
):
|
|
path = app.url_path_for("list_users")
|
|
response = unauthorized_client.get(path)
|
|
assert response.status_code == 403
|
|
|
|
response = authorized_client.get(path)
|
|
assert response.status_code == 403
|
|
|
|
response = admin_client.get(path)
|
|
assert response.status_code == 200
|
|
assert response.json() == []
|
|
|
|
m = models.User(
|
|
imdb_id="ur12345678",
|
|
name="user-1",
|
|
secret="secret-1",
|
|
groups=[],
|
|
)
|
|
await db.add(conn, m)
|
|
|
|
m_plain = {
|
|
"groups": m.groups,
|
|
"id": m.id,
|
|
"imdb_id": m.imdb_id,
|
|
"name": m.name,
|
|
"secret": m.secret,
|
|
}
|
|
|
|
response = admin_client.get(path)
|
|
assert response.status_code == 200
|
|
assert response.json() == [m_plain]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_groups(
|
|
conn: db.Connection,
|
|
unauthorized_client: TestClient,
|
|
authorized_client: TestClient,
|
|
admin_client: TestClient,
|
|
):
|
|
path = app.url_path_for("list_groups")
|
|
response = unauthorized_client.get(path)
|
|
assert response.status_code == 403
|
|
|
|
response = authorized_client.get(path)
|
|
assert response.status_code == 403
|
|
|
|
response = admin_client.get(path)
|
|
assert response.status_code == 200
|
|
assert response.json() == []
|
|
|
|
m = models.Group(
|
|
name="group-1",
|
|
users=[models.GroupUser(id="123", name="itsa-me")],
|
|
)
|
|
await db.add(conn, m)
|
|
|
|
m_plain = {
|
|
"users": m.users,
|
|
"id": m.id,
|
|
"name": m.name,
|
|
}
|
|
|
|
response = admin_client.get(path)
|
|
assert response.status_code == 200
|
|
assert response.json() == [m_plain]
|