add secret to User & add user REST routes

This commit is contained in:
ducklet 2021-08-04 17:31:59 +02:00
parent 9bb433eb0c
commit f7913c30c8
4 changed files with 118 additions and 3 deletions

View file

@ -256,6 +256,12 @@ async def update(item):
await shared_connection().execute(query=query, values=values) await shared_connection().execute(query=query, values=values)
async def remove(item):
values = asplain(item, fields_={"id"})
query = f"DELETE FROM {item._table} WHERE id=:id"
await shared_connection().execute(query=query, values=values)
async def add_or_update_user(user: User): async def add_or_update_user(user: User):
db_user = await get(User, imdb_id=user.imdb_id) db_user = await get(User, imdb_id=user.imdb_id)
if not db_user: if not db_user:

View file

@ -69,12 +69,15 @@ def optional_fields(o):
yield f yield f
def asplain(o) -> dict[str, Any]: def asplain(o, *, fields_: set = None) -> dict[str, Any]:
validate(o) validate(o)
d = {} d = {}
for f in fields(o): for f in fields(o):
if fields_ is not None and f.name not in fields_:
continue
target = f.type target = f.type
# XXX this doesn't properly support any kind of nested types # XXX this doesn't properly support any kind of nested types
if (otype := optional_type(f.type)) is not None: if (otype := optional_type(f.type)) is not None:
@ -278,6 +281,7 @@ class User:
id: ULID = field(default_factory=ULID) id: ULID = field(default_factory=ULID)
imdb_id: str = None imdb_id: str = None
name: str = None # canonical user name name: str = None # canonical user name
secret: str = None
@dataclass @dataclass

View file

@ -0,0 +1,22 @@
-- add secret to users
CREATE TABLE _migrate_users (
id TEXT PRIMARY KEY NOT NULL,
imdb_id TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
secret TEXT NOT NULL
);;
INSERT INTO _migrate_users
SELECT
id,
imdb_id,
name,
'' AS secret
FROM users
WHERE true;;
DROP TABLE users;;
ALTER TABLE _migrate_users
RENAME TO users;;

View file

@ -27,7 +27,7 @@ from .db import close_connection_pool, find_ratings, open_connection_pool
from .middleware.responsetime import ResponseTimeMiddleware from .middleware.responsetime import ResponseTimeMiddleware
from .models import Group, Movie, User, asplain from .models import Group, Movie, User, asplain
from .types import ULID from .types import ULID
from .utils import b64encode, phc_compare, phc_scrypt from .utils import b64decode, b64encode, phc_compare, phc_scrypt
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -308,7 +308,90 @@ async def list_users(request):
@requires(["authenticated", "admin"]) @requires(["authenticated", "admin"])
async def add_user(request): async def add_user(request):
not_implemented() name, imdb_id = await json_from_body(request, ["name", "imdb_id"])
# XXX restrict name
# XXX check if imdb_id is well-formed
secret = secrets.token_bytes()
user = User(name=name, imdb_id=imdb_id, secret=phc_scrypt(secret))
await db.add(user)
return JSONResponse(
{
"secret": b64encode(secret),
"user": asplain(user),
}
)
@route("/users/{user_id}")
@requires(["authenticated", "admin"])
async def show_user(request):
user_id = as_ulid(request.path_params["user_id"])
user = await db.get(User, id=str(user_id))
if not user:
return not_found()
return JSONResponse(asplain(user))
@route("/users/{user_id}", methods=["DELETE"])
@requires(["authenticated", "admin"])
async def remove_user(request):
user_id = as_ulid(request.path_params["user_id"])
user = await db.get(User, id=str(user_id))
if not user:
return not_found()
async with db.shared_connection().transaction():
# XXX remove user refs from groups and ratings
await db.remove(user)
return JSONResponse(asplain(user))
@route("/users/{user_id}", methods=["PATCH"])
@requires(["authenticated"])
async def modify_user(request):
user_id = as_ulid(request.path_params["user_id"])
user = await db.get(User, id=str(user_id))
if not user:
return not_found()
is_admin, is_owner = auth(request, user.secret)
if not (is_admin or is_owner):
return forbidden()
data = await json_from_body(request)
if is_admin and "name" in data:
# XXX restrict name
user.name = data["name"]
if is_admin and "imdb_id" in data:
# XXX check if imdb_id is well-formed
user.imdb_id = data["imdb_id"]
if "secret" in data:
try:
secret = b64decode(data["secret"])
except:
raise HTTPException(422, f"Invalid secret.")
user.secret = phc_scrypt(secret)
await db.update(user)
return JSONResponse(asplain(user))
@route("/users/{user_id}/ratings") @route("/users/{user_id}/ratings")