Files
playing_now_2_mm/processors/status_send.py

60 lines
2.2 KiB
Python

import os
import time
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv
from connectors.mattermost import MattermostConnector
from connectors.gitlab import GitlabConnector
load_dotenv()
class StatusSend:
def __init__(self, destination: str | None = None) -> None:
self.destination: str | None = destination
self.connector: MattermostConnector | GitlabConnector | None = self.get_connector()
def get_connector(self) -> MattermostConnector | GitlabConnector | None:
if self.destination == "mattermost":
conf = {
"url": os.getenv("MATTERMOST_SERVER_URL"),
"token": os.getenv("MATTERMOST_ACCESS_TOKEN"),
}
return MattermostConnector(conf)
elif self.destination == "gitlab":
conf = {
"url": os.getenv("GITLAB_SERVER_URL"),
"token": os.getenv("GITLAB_ACCESS_TOKEN"),
}
return GitlabConnector(conf)
else:
print("Invalid source")
return None
def set_status(self, text, emoji, duration=None, **kwargs):
if self.connector:
if duration is not None:
expires_at = datetime.now(timezone.utc) + timedelta(seconds=duration)
if isinstance(emoji, dict):
emoji_name = emoji.get("name")
else:
emoji_name = emoji
data = {
"emoji": emoji_name,
"text": text,
"expires_at": expires_at.isoformat() if expires_at else None,
}
now = time.strftime("%H:%M:%S", time.localtime())
remaining_time = round(kwargs.get("remaining_time", 0))
print(f"{now} Setting status to {self.connector.NAME} of {emoji.get("icon")} {text} ⏱️ for {remaining_time} seconds")
try:
self.connector.send(data=data)
except Exception as e:
print(e)
def clear_status(self):
if self.connector:
try:
self.connector.send(data={"emoji": "", "text": "", "expires_at": ""})
except Exception as e:
print(e)