add auth using bearer tokens

These tokens are used to gain global admin rights, or to gain more
resource specific capabilities, like adding users to a group.
This commit is contained in:
ducklet 2021-07-08 14:38:08 +02:00
parent cd42b95e49
commit edb2fa5069

View file

@ -8,8 +8,8 @@ from starlette.authentication import (
AuthCredentials,
AuthenticationBackend,
AuthenticationError,
BaseUser,
SimpleUser,
UnauthenticatedUser,
requires,
)
from starlette.exceptions import HTTPException
@ -28,23 +28,40 @@ from .utils import b64encode, phc_compare, phc_scrypt
log = logging.getLogger(__name__)
class BasicAuthBackend(AuthenticationBackend):
class BearingUser(BaseUser):
def __init__(self, token):
self.token = token
class BearerAuthBackend(AuthenticationBackend):
def __init__(self, credentials: dict[str, str]):
self.admin_tokens = {v: k for k, v in credentials.items()}
async def authenticate(self, request):
if "Authorization" not in request.headers:
return
auth = request.headers["Authorization"]
try:
scheme, credentials = auth.split()
if scheme.lower() != "basic":
return
decoded = base64.b64decode(credentials).decode("ascii")
except (ValueError, UnicodeDecodeError, binascii.Error) as exc:
raise AuthenticationError("Invalid basic auth credentials")
scheme, token = auth.split()
except ValueError:
raise AuthenticationError("Invalid auth credentials")
username, _, password = decoded.partition(":")
# TODO: You'd want to verify the username and password here.
return AuthCredentials(["authenticated"]), SimpleUser(username)
if scheme.lower() != "bearer":
return
roles = []
is_admin = token in self.admin_tokens
if is_admin:
user = SimpleUser(self.admin_tokens[token])
roles.append("admin")
else:
user = BearingUser(token)
return AuthCredentials(["authenticated", *roles]), user
def imdb_url(imdb_id: str):
@ -210,7 +227,7 @@ async def add_group(request):
)
@requires(["authenticated", "admin"])
@requires(["authenticated"])
async def add_user_to_group(request):
group_id = as_ulid(request.path_params["group_id"])
group = await db.get(Group, id=str(group_id))
@ -251,6 +268,14 @@ async def add_user_to_group(request):
return JSONResponse(asplain(group))
async def http_exception(request, exc):
return JSONResponse({"error": exc.detail}, status_code=exc.status_code)
def auth_error(request, err):
return unauthorized(str(err))
def create_app():
if config.loglevel == "DEBUG":
logging.basicConfig(
@ -285,6 +310,11 @@ def create_app():
],
middleware=[
Middleware(ResponseTimeMiddleware, header_name="Unwind-Elapsed"),
Middleware(AuthenticationMiddleware, backend=BasicAuthBackend()),
Middleware(
AuthenticationMiddleware,
backend=BearerAuthBackend(config.api_credentials),
on_error=auth_error,
),
],
exception_handlers={HTTPException: http_exception},
)