add basic auth for non-admin users
This commit is contained in:
parent
f7913c30c8
commit
1413ba896f
1 changed files with 26 additions and 13 deletions
|
|
@ -31,10 +31,15 @@ from .utils import b64decode, b64encode, phc_compare, phc_scrypt
|
|||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# XXX we probably don't need a group secret anymore, if group access is managed
|
||||
# on a user level; a group secret would be a separate user with full group
|
||||
# access
|
||||
|
||||
class BearingUser(BaseUser):
|
||||
def __init__(self, token):
|
||||
self.token = token
|
||||
|
||||
class AuthedUser(BaseUser):
|
||||
def __init__(self, username: str, secret: str):
|
||||
self.username = username
|
||||
self.secret = secret
|
||||
|
||||
|
||||
class BearerAuthBackend(AuthenticationBackend):
|
||||
|
|
@ -45,25 +50,33 @@ class BearerAuthBackend(AuthenticationBackend):
|
|||
if "Authorization" not in request.headers:
|
||||
return
|
||||
|
||||
# XXX should we remove the auth header after reading, for security reasons?
|
||||
|
||||
auth = request.headers["Authorization"]
|
||||
try:
|
||||
scheme, token = auth.split()
|
||||
scheme, credentials = auth.split()
|
||||
except ValueError:
|
||||
raise AuthenticationError("Invalid auth credentials")
|
||||
|
||||
if scheme.lower() != "bearer":
|
||||
return
|
||||
|
||||
roles = []
|
||||
|
||||
is_admin = token in self.admin_tokens
|
||||
|
||||
if is_admin:
|
||||
user = SimpleUser(self.admin_tokens[token])
|
||||
if scheme.lower() == "bearer":
|
||||
is_admin = credentials in self.admin_tokens
|
||||
if not is_admin:
|
||||
return
|
||||
name = self.admin_tokens[credentials]
|
||||
user = SimpleUser(name)
|
||||
roles.append("admin")
|
||||
|
||||
elif scheme.lower() == "basic":
|
||||
try:
|
||||
name, secret = b64decode(credentials).decode().split(":")
|
||||
except:
|
||||
raise AuthenticationError("Invalid auth credentials")
|
||||
user = AuthedUser(name, secret)
|
||||
|
||||
else:
|
||||
user = BearingUser(token)
|
||||
return
|
||||
|
||||
return AuthCredentials(["authenticated", *roles]), user
|
||||
|
||||
|
|
@ -143,7 +156,7 @@ async def json_from_body(request, keys: list[str] = None):
|
|||
|
||||
def auth(request, secret: str = None):
|
||||
is_admin = "admin" in request.auth.scopes
|
||||
is_owner = secret and phc_compare(secret=request.user.token, phc_string=secret)
|
||||
is_owner = secret and phc_compare(secret=request.user.secret, phc_string=secret)
|
||||
return is_admin, bool(is_owner)
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue