code refactoring

This commit is contained in:
sHa
2024-04-08 09:06:26 +03:00
parent 13475a7db4
commit f2a4978cf2
13 changed files with 269 additions and 183 deletions

View File

@@ -1,3 +1,4 @@
MATTERMOST_ACCESS_TOKEN=
MATTERMOST_SERVER_URL=https://my-mattermost.host
MUSIC_APP=autodetect # or apple_music # or spotify
SOURCE=autodetect # or apple_music or spotify or random

View File

@@ -3,19 +3,67 @@ from appscript import app # type: ignore
class AppleMusic:
def __init__(self):
self.music_app = app("Music")
self.__music_app = app("Music")
self.__current_track = None
self.__player_position = None
self.get_current_track_info()
def get_current_track_info(self) -> tuple:
@property
def text(self) -> str:
return f"{self.name} - {self.artist}"
@property
def emoji(self) -> dict:
return {
"name": "headphones",
"name_with_colons": ":headphones:",
"icon": "🎧",
}
@property
def name(self) -> str:
if self.__current_track is not None:
return str(self.__current_track.name.get())
return ""
@property
def artist(self) -> str:
if self.__current_track is not None:
return str(self.__current_track.artist.get())
return ""
@property
def album(self) -> str:
if self.__current_track is not None:
return str(self.__current_track.album.get())
return ""
@property
def duration(self) -> int:
if self.__current_track is not None:
return int(self.__current_track.duration.get())
return 0
@property
def elapsed_time(self) -> float:
if self.__player_position is not None:
return self.__player_position
return 0
@property
def remaining_time(self) -> float:
return self.duration - self.elapsed_time
def get_current_track_info(self):
try:
current_track = self.music_app.current_track.get()
current_position = self.music_app.player_position.get()
track_duration = current_track.duration.get()
return (
current_track.name.get(),
current_track.artist.get(),
track_duration,
current_position,
)
self.__current_track = self.__music_app.current_track.get()
self.__player_position = self.__music_app.player_position.get()
except Exception as e:
print(f"Failed to get current track info: {e}")
return None, None, None, None
def get(self) -> dict:
return {
name: getattr(self, name)
for name in dir(self)
if not name.startswith("_") and not callable(getattr(self, name))
}

19
connectors/mattermost.py Normal file
View File

@@ -0,0 +1,19 @@
import requests
class MattermostConnector:
def __init__(self, connection_config) -> None:
self.url = connection_config.get("url", "https://mattermost.com")
self.token = connection_config.get("token", "")
self.connect(connection_config.get("user_id", "me"))
def connect(self, user_id):
self.headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
}
self.url = f"{self.url}/api/v4/users/{user_id}/status/custom"
def send(self, data):
response = requests.put(self.url, headers=self.headers, json=data)
if response.status_code != 200:
raise Exception(f"Failed to set Mattermost status: {response.content!r}")

View File

@@ -3,25 +3,35 @@ import random
import emoji
class Random:
class RandomConnector:
def __init__(self):
self.faker = Faker()
self.__faker = Faker()
def get_random_activity(self) -> tuple:
activity = self.faker.bs().capitalize() # Generate a random activity
emoji = self.get_random_emoji_name() # Get a random emoji
duration = random.randint(5, 60) # Duration in minutes
return activity, emoji, duration
@property
def text(self) -> str:
return self.__faker.bs().capitalize()
def get_random_emoji_name(self) -> str:
@property
def emoji(self) -> dict:
emoji_names = list(emoji.get_aliases_unicode_dict())
single_char_emoji_names = [
name for name in emoji_names if len(emoji.emojize(name)) == 1
]
random_emoji_name = random.choice(single_char_emoji_names)
return random_emoji_name
emoji_name = random.choice(single_char_emoji_names)
return {
"name": emoji_name.replace(":", ""),
"name_with_colons": emoji_name,
"icon": emoji.emojize(emoji_name),
}
def get_random_emoji(self) -> str:
random_emoji = emoji.emojize(self.get_random_emoji_name())
@property
def duration(self) -> int:
"""Return a random duration between 5 and 60 minutes in seconds"""
return random.randint(5, 60) * 60
return random_emoji
def get(self) -> dict:
return {
name: getattr(self, name)
for name in dir(self)
if not name.startswith("_") and not callable(getattr(self, name))
}

View File

@@ -2,29 +2,57 @@ import osascript # type: ignore
class Spotify:
def get_current_track_info(self) -> tuple:
def __init__(self):
self.__request_prefix = 'tell application "Spotify" to'
self.__request_current_track = "of current track as string"
self.name: str = ""
self.artist: str = ""
self.album: str = ""
self.duration: int = 0
self.elapsed_time: float = 0
self.get_current_track_info()
@property
def text(self) -> str:
return f"{self.name} - {self.artist}"
@property
def emoji(self) -> dict:
return {
"name": "headphones",
"name_with_colons": ":headphones:",
"icon": "🎧",
}
@property
def remaining_time(self) -> float:
return self.duration - self.elapsed_time
def get_current_track_info(self):
try:
name_code = 'tell application "Spotify" to name of current track as string'
name_code = f"{self.__request_prefix} name {self.__request_current_track}"
artist_code = (
'tell application "Spotify" to artist of current track as string'
f"{self.__request_prefix} artist {self.__request_current_track}"
)
album_code = f"{self.__request_prefix} album {self.__request_current_track}"
duration_code = (
'tell application "Spotify" to duration of current track as string'
f"{self.__request_prefix} duration {self.__request_current_track}"
)
elapsed_time_code = (
'tell application "Spotify" to player position as string'
elapsed_time_code = f"{self.__request_prefix} player position as string"
self.name = osascript.osascript(name_code)[1]
self.artist = osascript.osascript(artist_code)[1]
self.album = osascript.osascript(album_code)[1]
self.duration = round(int(osascript.osascript(duration_code)[1]) / 1000)
self.elapsed_time = float(
osascript.osascript(elapsed_time_code)[1].replace(",", ".")
)
name = osascript.osascript(name_code)[1]
artist = osascript.osascript(artist_code)[1]
duration = (
int(osascript.osascript(duration_code)[1]) / 1000
) # Convert duration from ms to s
elapsed_time = (
float(osascript.osascript(elapsed_time_code)[1].replace(',','.'))
) # Elapsed time in seconds
return name, artist, duration, elapsed_time
except Exception as e:
print(f"Failed to get current track info: {e}")
return None, None, None, None
def get(self) -> dict:
return {
name: getattr(self, name)
for name in dir(self)
if not name.startswith("_") and not callable(getattr(self, name))
}

57
main.py Normal file
View File

@@ -0,0 +1,57 @@
from dotenv import load_dotenv
from processors.status_send import StatusSend
from processors.music import MusicProcessor
from processors.text import TextProcessor
import argparse
import time
import os
load_dotenv()
SLEEP_TIME = 3
def get_status(source: str | None = None) -> dict:
if source == "random":
return TextProcessor(source=source).get_status()
if source in ["spotify", "apple_music", "autodetect"]:
track = MusicProcessor(source=source).get_status()
return track
return {}
def send_user_status(**kwargs) -> bool:
try:
now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(
f"{now} Setting Mattermost status to {kwargs.get('emoji',{}).get('icon','')} {kwargs.get('text')} ⏱️ for {kwargs.get('duration')} seconds"
)
StatusSend().set_status(**kwargs)
return True
except Exception as e:
print(e)
return False
def main(source: str = "autodetect"):
status_curr = {"status": None}
while True:
status = get_status(source)
if status.get("text", "") != status_curr.get("text", ""):
status_result = send_user_status(**status)
status_curr = status
if not status_result:
continue
time.sleep(status.get("duration") or SLEEP_TIME)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
# Source can be "autodetect", "spotify", "apple_music", "random"
parser.add_argument(
"--source", help="source to use for connector", default="autodetect"
)
args = parser.parse_args()
if args.source == "env":
args.source = os.getenv("SOURCE", "autodetect")
main(args.source)

View File

@@ -1,49 +0,0 @@
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv
import os
import requests
import emoji
load_dotenv()
class Mattermost:
def __init__(self, user_id="me"):
host = os.getenv("MATTERMOST_SERVER_URL", "http://localhost")
access_token = os.getenv("MATTERMOST_ACCESS_TOKEN", "")
self.user_id = user_id
self.url = f"{host}/api/v4/users/{self.user_id}/status/custom"
self.headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
def set_status(self, status, emoji_name, expires_at=None):
emoji_icon = emoji.emojize(f":{emoji_name}:", language="alias")
print(f"Setting Mattermost status to {emoji_icon} {status} until {expires_at}")
data = {
"emoji": emoji_name,
"text": status,
"expires_at": expires_at.isoformat() if expires_at else None,
}
response = requests.put(self.url, headers=self.headers, json=data)
if response.status_code != 200:
print(f"Failed to set Mattermost status: {response.content}")
raise Exception(f"Failed to set Mattermost status: {response.content}")
def clear_status(self):
self.set_status("", "", None)
def set_now_playing(self, track, artist, duration):
expires_at = (
datetime.now(timezone.utc) + timedelta(seconds=duration)
).astimezone()
status = f"{track} - {artist}"
self.set_status(status, "headphones", expires_at)
def get_status(self):
response = requests.get(self.url, headers=self.headers)
if response.status_code != 200:
print(f"Failed to get Mattermost status: {response.content}")
raise Exception(f"Failed to get Mattermost status: {response.content}")
return response.json()

View File

@@ -1,65 +0,0 @@
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv
from mattermost import Mattermost
from processors.music import MusicProcessor
from processors.text import TextProcessor
import argparse
import time
load_dotenv()
SLEEP_TIME = 3
def get_status(source: str | None = None) -> dict:
if source == "random":
activity, emoji, duration = TextProcessor(source=source).get_satus()
return {
"status": activity,
"emoji": emoji.replace(":", ""),
"expires_at": datetime.now(timezone.utc) + timedelta(minutes=duration),
"ending_time": duration * 60,
}
if source == "music":
track, artist, duration, elapsed_time = (
MusicProcessor().get_current_track_info()
)
if track and artist and duration:
now = datetime.now(timezone.utc)
print(f"{now} 🎧 {track} - {artist}")
expires_at = (
now + timedelta(seconds=duration) - timedelta(seconds=elapsed_time)
).astimezone()
return {
"status": f"{track} - {artist}",
"emoji": "headphones",
"expires_at": expires_at,
"ending_time": duration - elapsed_time,
}
return {
"status": None,
"emoji": None,
"expires_at": None,
"ending_time": None,
}
def send_user_status(status, emoji, expires_at=None, **kwargs):
Mattermost().set_status(status, emoji, expires_at=expires_at)
def main(source: str | None = "music"):
status_curr = {"status": None}
while True:
status = get_status(source)
if status.get("status") != status_curr.get("status"):
send_user_status(**status)
status_curr = status
time.sleep(status.get("ending_time") or SLEEP_TIME)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--source", help="source to use for connector", default="music")
args = parser.parse_args()
main(args.source)

View File

@@ -1,20 +1,18 @@
import subprocess
from connectors.apple_music import AppleMusic
from connectors.spotify import Spotify
import os
from dotenv import load_dotenv
class MusicProcessor:
def __init__(self):
load_dotenv()
self.music_app = os.getenv("MUSIC_APP", "autodetect")
if self.music_app == "autodetect":
self.music_app = self.get_current_music_player()
def __init__(self, source: str | None = None):
self.source: str | None = source
self.connector: Spotify | AppleMusic | None = self.get_connector()
def get_connector(self) -> Spotify | AppleMusic | None:
match self.music_app:
if self.source == "autodetect":
self.source = self.get_current_music_player()
match self.source:
case "spotify":
return Spotify()
case "apple_music":
@@ -23,10 +21,10 @@ class MusicProcessor:
print("Active music player not found")
return None
def get_current_track_info(self) -> tuple:
def get_status(self) -> dict:
if self.connector:
return self.connector.get_current_track_info()
return None, None, None, None
return self.connector.get()
return {}
@staticmethod
def get_current_music_player():
@@ -50,6 +48,4 @@ class MusicProcessor:
player = "apple_music"
else:
player = None
# print(f"Detected 📀 player: {player}")
return player

38
processors/status_send.py Normal file
View File

@@ -0,0 +1,38 @@
import os
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv
from connectors.mattermost import MattermostConnector
load_dotenv()
class StatusSend:
def __init__(self):
conf = {
"url": os.getenv("MATTERMOST_SERVER_URL"),
"token": os.getenv("MATTERMOST_ACCESS_TOKEN"),
}
self.connector = MattermostConnector(conf)
def set_status(self, text, emoji, duration=None, **kwargs):
if duration is not None:
expires_at = datetime.now(timezone.utc) + timedelta(seconds=duration)
if isinstance(emoji, dict):
emoji_name = emoji.get("name")
else:
emoji_name = emoji
data = {
"emoji": emoji_name,
"text": text,
"expires_at": expires_at.isoformat() if expires_at else None,
}
try:
self.connector.send(data=data)
except Exception as e:
print(e)
def clear_status(self):
try:
self.connector.send(data={"emoji": "", "text": "", "expires_at": ""})
except Exception as e:
print(e)

View File

@@ -1,18 +1,19 @@
from connectors.random import Random
from connectors.random import RandomConnector
class TextProcessor:
def __init__(self, source: str | None = None):
self.source: str | None = source
self.connector = self.get_connector()
self.connector: RandomConnector | None = self.get_connector()
def get_connector(self):
def get_connector(self) -> RandomConnector | None:
if self.source == "random":
return Random()
return RandomConnector()
else:
raise ValueError("Invalid source")
print("Invalid source")
return None
def get_satus(self) -> tuple:
def get_status(self) -> dict:
if self.connector:
return self.connector.get_random_activity()
return None, None, None
return self.connector.get()
return {}

View File

@@ -14,16 +14,18 @@ This is a Python application that fetches the currently playing track from eithe
- Mattermost
## Environment Variables
`MUSIC_APP` - This variable determines which music service the application will fetch the currently playing track from. It can be set to `autodetect` to automatically detect the running music application.
`SOURCE` - This variable determines which music service the application will fetch the currently playing track from. It can be set to `autodetect` to automatically detect the running music application.
`MATTERMOST_SERVER_URL` variable represents the URL of the Mattermost server.
`MATTERMOST_ACCESS_TOKEN` the access token for the Mattermost API, which is obtained by generating a personal access token from the Mattermost user settings and is used to authenticate and authorize API requests to the Mattermost server.
## How to Run
1. Clone the repository
2. Install the dependencies with `pip install -r requirements.txt`
3. Set Mattermost token and host in `.env`
4. Set the `MUSIC_APP` environment variable to your preferred music service
5. Run the application with `python music_app.py`
4. Set the `SOURCE` environment variable to your preferred music service
5. Run the application with `python main.py`
## Note
The application runs in an infinite loop, constantly checking for changes in the currently playing track and updating your Mattermost status accordingly.

View File

@@ -3,4 +3,4 @@ requests
appscript
osascript
faker
emoji
emoji