create dataclasses for output data

This commit is contained in:
ducklet 2021-12-19 19:13:49 +01:00
parent 146e6ff6b8
commit e49ea603ee
3 changed files with 96 additions and 36 deletions

View file

@ -4,6 +4,7 @@ import pytest
from unwind import create_app
from unwind import db, models, imdb
# https://pypi.org/project/pytest-asyncio/
pytestmark = pytest.mark.asyncio
app = create_app()
@ -15,6 +16,7 @@ async def test_app():
async with conn.transaction(force_rollback=True):
# https://www.starlette.io/testclient/
client = TestClient(app)
response = client.get("/api/v1/movies")
assert response.status_code == 403
@ -36,10 +38,9 @@ async def test_app():
response = client.get("/api/v1/movies", params={"include_unrated": 1})
assert response.status_code == 200
assert response.json() == [{**db.asplain(m), "user_scores": []}]
assert response.json() == [{**models.asplain(m), "user_scores": []}]
m_plain = {
"unwind_id": m.id,
"canonical_title": m.title,
"imdb_score": m.imdb_score,
"imdb_votes": m.imdb_votes,

View file

@ -22,7 +22,7 @@ from starlette.middleware.gzip import GZipMiddleware
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
from . import config, db, imdb, imdb_import
from . import config, db, imdb, imdb_import, web_models
from .db import close_connection_pool, find_movies, find_ratings, open_connection_pool
from .middleware.responsetime import ResponseTimeMiddleware
from .models import Group, Movie, User, asplain
@ -210,26 +210,11 @@ async def get_ratings_for_group(request):
limit_rows=as_int(params.get("per_page"), max=10, default=5),
user_ids=user_ids,
)
ratings = (web_models.Rating(**r) for r in rows)
aggr = {}
for r in rows:
mov = aggr.setdefault(
r["movie_imdb_id"],
{
"canonical_title": r["canonical_title"],
"original_title": r["original_title"],
"year": r["release_year"],
"link": imdb.movie_url(r["movie_imdb_id"]),
"user_scores": [],
"imdb_score": r["imdb_score"],
"imdb_votes": r["imdb_votes"],
"media_type": r["media_type"],
},
)
if r["user_score"] is not None and r["user_id"] in user_ids:
mov["user_scores"].append(r["user_score"])
aggr = web_models.aggregate_ratings(ratings, user_ids)
resp = tuple(aggr.values())
resp = tuple(asplain(r) for r in aggr)
return JSONResponse(resp)
@ -288,22 +273,11 @@ async def list_movies(request):
if imdb_id or unwind_id:
# XXX missing support for user_ids and user_scores
movies = [await db.get(Movie, id=unwind_id, imdb_id=imdb_id)]
movies = (
[m] if (m := await db.get(Movie, id=unwind_id, imdb_id=imdb_id)) else []
)
resp = [
{
"unwind_id": m["id"],
"canonical_title": m["title"],
"imdb_score": m["imdb_score"],
"imdb_votes": m["imdb_votes"],
"link": imdb.movie_url(m["imdb_id"]),
"media_type": m["media_type"],
"original_title": m["original_title"],
"user_scores": [],
"year": m["release_year"],
}
for m in map(asplain, movies)
]
resp = [asplain(web_models.RatingAggregate.from_movie(m)) for m in movies]
else:
per_page = as_int(params.get("per_page"), max=1000, default=5)

85
unwind/web_models.py Normal file
View file

@ -0,0 +1,85 @@
from dataclasses import dataclass
from typing import Container, Iterable, Optional
from . import imdb, models
URL = str
Score100 = int # [0, 100]
@dataclass
class Rating:
canonical_title: str
imdb_score: Optional[Score100]
imdb_votes: Optional[int]
media_type: str
movie_imdb_id: str
original_title: Optional[str]
release_year: int
user_id: Optional[str]
user_score: Optional[Score100]
@classmethod
def from_movie(cls, movie: models.Movie, *, rating: models.Rating = None):
return cls(
canonical_title=movie.title,
imdb_score=movie.imdb_score,
imdb_votes=movie.imdb_votes,
media_type=movie.media_type,
movie_imdb_id=movie.imdb_id,
original_title=movie.original_title,
release_year=movie.release_year,
user_id=str(rating.user_id) if rating else None,
user_score=rating.score if rating else None,
)
@dataclass
class RatingAggregate:
canonical_title: str
imdb_score: Optional[Score100]
imdb_votes: Optional[int]
link: URL
media_type: str
original_title: Optional[str]
user_scores: list[Score100]
year: int
@classmethod
def from_movie(cls, movie: models.Movie, *, ratings: Iterable[models.Rating] = []):
return cls(
canonical_title=movie.title,
imdb_score=movie.imdb_score,
imdb_votes=movie.imdb_votes,
link=imdb.movie_url(movie.imdb_id),
media_type=movie.media_type,
original_title=movie.original_title,
user_scores=[r.score for r in ratings],
year=movie.release_year,
)
def aggregate_ratings(
ratings: Iterable[Rating], user_ids: Container[str]
) -> Iterable[RatingAggregate]:
aggr: dict[str, RatingAggregate] = {}
for r in ratings:
mov = aggr.setdefault(
r.movie_imdb_id,
RatingAggregate(
canonical_title=r.canonical_title,
imdb_score=r.imdb_score,
imdb_votes=r.imdb_votes,
link=imdb.movie_url(r.movie_imdb_id),
media_type=r.media_type,
original_title=r.original_title,
user_scores=[],
year=r.release_year,
),
)
# XXX do we need this? why don't we just get the ratings we're supposed to aggregate?
if r.user_score is not None and r.user_id in user_ids:
mov.user_scores.append(r.user_score)
return aggr.values()