Feature/event notifications (#399)

* additional server events

* sort 'recent recipes' by updated

* remove duplicate code

* fixes #396

* set color

* consolidate tag/category pages

* set colors

* list unorganized recipes

* cleanup old code

* remove flash message, switch to global snackbar

* cancel to close

* cleanup

* notifications first pass

* test notification

* complete notification feature

* use background tasks

* add url param

* update documentation

Co-authored-by: hay-kot <hay-kot@pm.me>
This commit is contained in:
Hayden
2021-05-08 18:29:31 -08:00
committed by GitHub
parent 8923c1ecf8
commit 14b6ab7ec7
49 changed files with 875 additions and 355 deletions

View File

@@ -1,23 +1,60 @@
import apprise
from mealie.db.database import db
from mealie.db.db_setup import create_session
from mealie.schema.events import Event, EventCategory
from sqlalchemy.orm.session import Session
def save_event(title, text, category, session: Session):
def test_notification(notification_url, event=None) -> bool:
if event is None:
event = Event(
title="Test Notification",
text="This is a test message from the Mealie API server",
category=EventCategory.general.value,
)
post_notifications(event, [notification_url], hard_fail=True)
def post_notifications(event: Event, notification_urls=list[str], hard_fail=False, attachment=None):
asset = apprise.AppriseAsset(async_mode=False)
apobj = apprise.Apprise(asset=asset)
for dest in notification_urls:
status = apobj.add(dest)
if not status and hard_fail:
raise Exception("Apprise URL Add Failed")
print(attachment)
apobj.notify(
body=event.text,
title=event.title,
attach=str(attachment),
)
def save_event(title, text, category, session: Session, attachment=None):
event = Event(title=title, text=text, category=category)
session = session or create_session()
db.events.create(session, event.dict())
notification_objects = db.event_notifications.get(session=session, match_value=True, match_key=category, limit=9999)
notification_urls = [x.notification_url for x in notification_objects]
post_notifications(event, notification_urls, attachment=attachment)
def create_general_event(title, text, session=None):
category = EventCategory.general
save_event(title=title, text=text, category=category, session=session)
def create_recipe_event(title, text, session=None):
def create_recipe_event(title, text, session=None, attachment=None):
category = EventCategory.recipe
save_event(title=title, text=text, category=category, session=session)
save_event(title=title, text=text, category=category, session=session, attachment=attachment)
def create_backup_event(title, text, session=None):