mirror of
https://github.com/shadoll/playing_now_2_mm.git
synced 2025-12-20 00:25:51 +00:00
Add random status generator
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -2,4 +2,5 @@
|
||||
.venv
|
||||
__pycache__
|
||||
.mypy_cache
|
||||
.pytest_cache
|
||||
.pytest_cache
|
||||
.DS_Store
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from appscript import app # type: ignore
|
||||
from appscript import app # type: ignore
|
||||
|
||||
|
||||
class AppleMusic:
|
||||
def __init__(self):
|
||||
@@ -7,11 +8,14 @@ class AppleMusic:
|
||||
def get_current_track_info(self) -> tuple:
|
||||
try:
|
||||
current_track = self.music_app.current_track.get()
|
||||
current_position = self.music_app.player_position.get()
|
||||
track_duration = current_track.duration.get()
|
||||
return (
|
||||
current_track.name.get(),
|
||||
current_track.artist.get(),
|
||||
current_track.duration.get(),
|
||||
track_duration,
|
||||
current_position,
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Failed to get current track info: {e}")
|
||||
return None, None, None
|
||||
return None, None, None, None
|
||||
|
||||
27
connectors/random.py
Normal file
27
connectors/random.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from faker import Faker
|
||||
import random
|
||||
import emoji
|
||||
|
||||
|
||||
class Random:
|
||||
def __init__(self):
|
||||
self.faker = Faker()
|
||||
|
||||
def get_random_activity(self) -> tuple:
|
||||
activity = self.faker.bs().capitalize() # Generate a random activity
|
||||
emoji = self.get_random_emoji_name() # Get a random emoji
|
||||
duration = random.randint(5, 60) # Duration in minutes
|
||||
return activity, emoji, duration
|
||||
|
||||
def get_random_emoji_name(self) -> str:
|
||||
emoji_names = list(emoji.get_aliases_unicode_dict())
|
||||
single_char_emoji_names = [
|
||||
name for name in emoji_names if len(emoji.emojize(name)) == 1
|
||||
]
|
||||
random_emoji_name = random.choice(single_char_emoji_names)
|
||||
return random_emoji_name
|
||||
|
||||
def get_random_emoji(self) -> str:
|
||||
random_emoji = emoji.emojize(self.get_random_emoji_name())
|
||||
|
||||
return random_emoji
|
||||
@@ -1,17 +1,30 @@
|
||||
import osascript # type: ignore
|
||||
import osascript # type: ignore
|
||||
|
||||
|
||||
class Spotify:
|
||||
def get_current_track_info(self) -> tuple:
|
||||
try:
|
||||
name_code = 'tell application "Spotify" to name of current track as string'
|
||||
artist_code = 'tell application "Spotify" to artist of current track as string'
|
||||
duration_code = 'tell application "Spotify" to duration of current track as string'
|
||||
artist_code = (
|
||||
'tell application "Spotify" to artist of current track as string'
|
||||
)
|
||||
duration_code = (
|
||||
'tell application "Spotify" to duration of current track as string'
|
||||
)
|
||||
elapsed_time_code = (
|
||||
'tell application "Spotify" to player position as string'
|
||||
)
|
||||
|
||||
name = osascript.osascript(name_code)[1]
|
||||
artist = osascript.osascript(artist_code)[1]
|
||||
duration = int(osascript.osascript(duration_code)[1]) / 1000 # Convert duration from ms to s
|
||||
duration = (
|
||||
int(osascript.osascript(duration_code)[1]) / 1000
|
||||
) # Convert duration from ms to s
|
||||
elapsed_time = (
|
||||
float(osascript.osascript(elapsed_time_code)[1].replace(',','.'))
|
||||
) # Elapsed time in seconds
|
||||
|
||||
return name, artist, duration
|
||||
return name, artist, duration, elapsed_time
|
||||
except Exception as e:
|
||||
print(f"Failed to get current track info: {e}")
|
||||
return None, None, None
|
||||
return None, None, None, None
|
||||
|
||||
@@ -2,6 +2,7 @@ from datetime import datetime, timedelta, timezone
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
import requests
|
||||
import emoji
|
||||
|
||||
load_dotenv()
|
||||
|
||||
@@ -17,23 +18,28 @@ class Mattermost:
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
def set_status(self, status, emoji, expires_at=None):
|
||||
def set_status(self, status, emoji_name, expires_at=None):
|
||||
emoji_icon = emoji.emojize(f":{emoji_name}:", language="alias")
|
||||
print(f"Setting Mattermost status to {emoji_icon} {status} until {expires_at}")
|
||||
data = {
|
||||
"emoji": emoji,
|
||||
"emoji": emoji_name,
|
||||
"text": status,
|
||||
"expires_at": expires_at,
|
||||
"expires_at": expires_at.isoformat() if expires_at else None,
|
||||
}
|
||||
response = requests.put(self.url, headers=self.headers, json=data)
|
||||
if response.status_code != 200:
|
||||
print(f"Failed to set Mattermost status: {response.content}")
|
||||
raise Exception(f"Failed to set Mattermost status: {response.content}")
|
||||
|
||||
def set_now_playing(self, track, artist, duration):
|
||||
expires_at = (datetime.now(timezone.utc) + timedelta(seconds=duration)).astimezone()
|
||||
status = f"{track} - {artist}"
|
||||
print(f"Setting Mattermost status to {status} until {expires_at}")
|
||||
self.set_status(status, "headphones", expires_at.isoformat())
|
||||
def clear_status(self):
|
||||
self.set_status("", "", None)
|
||||
|
||||
def set_now_playing(self, track, artist, duration):
|
||||
expires_at = (
|
||||
datetime.now(timezone.utc) + timedelta(seconds=duration)
|
||||
).astimezone()
|
||||
status = f"{track} - {artist}"
|
||||
self.set_status(status, "headphones", expires_at)
|
||||
|
||||
def get_status(self):
|
||||
response = requests.get(self.url, headers=self.headers)
|
||||
|
||||
74
music_app.py
74
music_app.py
@@ -1,37 +1,65 @@
|
||||
from datetime import datetime
|
||||
import time
|
||||
from music import Music
|
||||
from mattermost import Mattermost
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from dotenv import load_dotenv
|
||||
from mattermost import Mattermost
|
||||
from processors.music import MusicProcessor
|
||||
from processors.text import TextProcessor
|
||||
import argparse
|
||||
import time
|
||||
|
||||
load_dotenv()
|
||||
|
||||
SLEEP_TIME = 3
|
||||
|
||||
def playing_now() -> tuple:
|
||||
music = Music()
|
||||
return music.get_current_track_info()
|
||||
|
||||
def get_status(source: str | None = None) -> dict:
|
||||
if source == "random":
|
||||
activity, emoji, duration = TextProcessor(source=source).get_satus()
|
||||
return {
|
||||
"status": activity,
|
||||
"emoji": emoji.replace(":", ""),
|
||||
"expires_at": datetime.now(timezone.utc) + timedelta(minutes=duration),
|
||||
"ending_time": duration * 60,
|
||||
}
|
||||
if source == "music":
|
||||
track, artist, duration, elapsed_time = (
|
||||
MusicProcessor().get_current_track_info()
|
||||
)
|
||||
if track and artist and duration:
|
||||
now = datetime.now(timezone.utc)
|
||||
print(f"{now} 🎧 {track} - {artist}")
|
||||
expires_at = (
|
||||
now + timedelta(seconds=duration) - timedelta(seconds=elapsed_time)
|
||||
).astimezone()
|
||||
return {
|
||||
"status": f"{track} - {artist}",
|
||||
"emoji": "headphones",
|
||||
"expires_at": expires_at,
|
||||
"ending_time": duration - elapsed_time,
|
||||
}
|
||||
return {
|
||||
"status": None,
|
||||
"emoji": None,
|
||||
"expires_at": None,
|
||||
"ending_time": None,
|
||||
}
|
||||
|
||||
|
||||
def set_now_playing(name, artist, duration):
|
||||
now = datetime.now().strftime("%H:%M:%S")
|
||||
duration = int(duration) if duration else 0
|
||||
print(f"{now} 🎧 {name} - {artist} ⏱️ {duration} seconds")
|
||||
if name and artist and duration:
|
||||
Mattermost().set_now_playing(name, artist, duration)
|
||||
def send_user_status(status, emoji, expires_at=None, **kwargs):
|
||||
Mattermost().set_status(status, emoji, expires_at=expires_at)
|
||||
|
||||
|
||||
def main():
|
||||
name_curr, artist_curr, duration_curr = playing_now()
|
||||
set_now_playing(name_curr, artist_curr, duration_curr)
|
||||
|
||||
def main(source: str | None = "music"):
|
||||
status_curr = {"status": None}
|
||||
while True:
|
||||
name, artist, duration = playing_now()
|
||||
if name != name_curr:
|
||||
set_now_playing(name, artist, duration)
|
||||
name_curr = name
|
||||
time.sleep(SLEEP_TIME)
|
||||
status = get_status(source)
|
||||
if status.get("status") != status_curr.get("status"):
|
||||
send_user_status(**status)
|
||||
status_curr = status
|
||||
time.sleep(status.get("ending_time") or SLEEP_TIME)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--source", help="source to use for connector", default="music")
|
||||
args = parser.parse_args()
|
||||
main(args.source)
|
||||
|
||||
@@ -4,26 +4,29 @@ from connectors.spotify import Spotify
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
class Music:
|
||||
|
||||
class MusicProcessor:
|
||||
def __init__(self):
|
||||
load_dotenv()
|
||||
self.music_app = os.getenv('MUSIC_APP', 'autodetect')
|
||||
if self.music_app == 'autodetect':
|
||||
self.music_app = os.getenv("MUSIC_APP", "autodetect")
|
||||
if self.music_app == "autodetect":
|
||||
self.music_app = self.get_current_music_player()
|
||||
self.connector = self.get_connector()
|
||||
self.connector: Spotify | AppleMusic | None = self.get_connector()
|
||||
|
||||
def get_connector(self):
|
||||
if self.music_app == 'spotify':
|
||||
return Spotify()
|
||||
elif self.music_app == 'apple_music':
|
||||
return AppleMusic()
|
||||
else:
|
||||
raise ValueError(f'Invalid music app: {self.music_app}')
|
||||
def get_connector(self) -> Spotify | AppleMusic | None:
|
||||
match self.music_app:
|
||||
case "spotify":
|
||||
return Spotify()
|
||||
case "apple_music":
|
||||
return AppleMusic()
|
||||
case _:
|
||||
print("Active music player not found")
|
||||
return None
|
||||
|
||||
def get_current_track_info(self) -> tuple:
|
||||
if self.connector:
|
||||
return self.connector.get_current_track_info()
|
||||
return None, None, None
|
||||
return None, None, None, None
|
||||
|
||||
@staticmethod
|
||||
def get_current_music_player():
|
||||
@@ -42,9 +45,9 @@ class Music:
|
||||
.strip()
|
||||
)
|
||||
if spotify_status == "true":
|
||||
player = "Spotify"
|
||||
player = "spotify"
|
||||
elif apple_music_status == "true":
|
||||
player = "Apple Music"
|
||||
player = "apple_music"
|
||||
else:
|
||||
player = None
|
||||
|
||||
18
processors/text.py
Normal file
18
processors/text.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from connectors.random import Random
|
||||
|
||||
|
||||
class TextProcessor:
|
||||
def __init__(self, source: str | None = None):
|
||||
self.source: str | None = source
|
||||
self.connector = self.get_connector()
|
||||
|
||||
def get_connector(self):
|
||||
if self.source == "random":
|
||||
return Random()
|
||||
else:
|
||||
raise ValueError("Invalid source")
|
||||
|
||||
def get_satus(self) -> tuple:
|
||||
if self.connector:
|
||||
return self.connector.get_random_activity()
|
||||
return None, None, None
|
||||
@@ -2,3 +2,5 @@ python-dotenv
|
||||
requests
|
||||
appscript
|
||||
osascript
|
||||
faker
|
||||
emoji
|
||||
Reference in New Issue
Block a user