Files
mealie/mealie/repos/repository_users.py
Hayden b3c41a4bd0 security: implement user lockout (#1552)
* add data-types required for login security

* implement user lockout checking at login

* cleanup legacy patterns

* expose passwords in test_user

* test user lockout after bad attempts

* test user service

* bump alembic version

* save increment to database

* add locked_at to datetime transformer on import

* do proper test cleanup

* implement scheduled task

* spelling

* document env variables

* implement context manager for session

* use context manager

* implement reset script

* cleanup generator

* run generator

* implement API endpoint for resetting locked users

* add button to reset all locked users

* add info when account is locked

* use ignore instead of expect-error
2022-08-13 13:18:12 -08:00

47 lines
1.6 KiB
Python

import random
import shutil
from pydantic import UUID4
from mealie.assets import users as users_assets
from mealie.schema.user.user import PrivateUser, User
from .repository_generic import RepositoryGeneric
class RepositoryUsers(RepositoryGeneric[PrivateUser, User]):
def update_password(self, id, password: str):
entry = self._query_one(match_value=id)
entry.update_password(password)
self.session.commit()
return self.schema.from_orm(entry)
def create(self, user: PrivateUser | dict): # type: ignore
new_user = super().create(user)
# Select Random Image
all_images = [
users_assets.img_random_1,
users_assets.img_random_2,
users_assets.img_random_3,
]
random_image = random.choice(all_images)
shutil.copy(random_image, new_user.directory() / "profile.webp")
return new_user
def delete(self, value: str | UUID4, match_key: str | None = None) -> User:
entry = super().delete(value, match_key)
# Delete the user's directory
shutil.rmtree(PrivateUser.get_directory(value))
return entry # type: ignore
def get_by_username(self, username: str) -> PrivateUser | None:
dbuser = self.session.query(User).filter(User.username == username).one_or_none()
return None if dbuser is None else self.schema.from_orm(dbuser)
def get_locked_users(self) -> list[PrivateUser]:
results = self.session.query(User).filter(User.locked_at != None).all() # noqa E711
return [self.schema.from_orm(x) for x in results]