Add singleton logging configuration for the renamer application

This commit introduces a new module `logging_config.py` that implements a singleton pattern for logging configuration. The logger is initialized only once and can be configured based on an environment variable to log to a file or to the console. This centralizes logging setup and ensures consistent logging behavior throughout the application.
This commit is contained in:
sHa
2026-01-05 14:54:03 +00:00
parent ad39632e91
commit 8031c97999
20 changed files with 350 additions and 109 deletions

BIN
dist/renamer-0.8.9-py3-none-any.whl vendored Normal file

Binary file not shown.

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "renamer" name = "renamer"
version = "0.8.8" version = "0.8.9"
description = "Terminal-based media file renamer and metadata viewer" description = "Terminal-based media file renamer and metadata viewer"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"

View File

@@ -9,8 +9,8 @@ from functools import partial
import threading import threading
import time import time
import logging import logging
import os
from .logging_config import LoggerConfig # Initialize logging singleton
from .constants import MEDIA_TYPES from .constants import MEDIA_TYPES
from .screens import OpenScreen, HelpScreen, RenameConfirmScreen, SettingsScreen, ConvertConfirmScreen from .screens import OpenScreen, HelpScreen, RenameConfirmScreen, SettingsScreen, ConvertConfirmScreen
from .extractors.extractor import MediaExtractor from .extractors.extractor import MediaExtractor
@@ -22,14 +22,6 @@ from .cache import Cache, CacheManager
from .services.conversion_service import ConversionService from .services.conversion_service import ConversionService
# Set up logging conditionally
if os.getenv('FORMATTER_LOG', '0') == '1':
logging.basicConfig(filename='formatter.log', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
else:
logging.basicConfig(level=logging.INFO) # Enable logging for debugging
class CacheCommandProvider(Provider): class CacheCommandProvider(Provider):
"""Command provider for cache management operations.""" """Command provider for cache management operations."""
@@ -431,6 +423,11 @@ class RenamerApp(App):
try: try:
if node.data.is_file(): if node.data.is_file():
# Invalidate cache for this file before re-extracting
cache = Cache()
invalidated = cache.invalidate_file(node.data)
logging.info(f"Refresh: invalidated {invalidated} cache entries for {node.data.name}")
self._start_loading_animation() self._start_loading_animation()
threading.Thread( threading.Thread(
target=self._extract_and_show_details, args=(node.data,) target=self._extract_and_show_details, args=(node.data,)

84
renamer/cache/core.py vendored
View File

@@ -12,14 +12,30 @@ logger = logging.getLogger(__name__)
class Cache: class Cache:
"""Thread-safe file-based cache with TTL support.""" """Thread-safe file-based cache with TTL support (Singleton)."""
_instance: Optional['Cache'] = None
_lock_init = threading.Lock()
def __new__(cls, cache_dir: Optional[Path] = None):
"""Create or return singleton instance."""
if cls._instance is None:
with cls._lock_init:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, cache_dir: Optional[Path] = None): def __init__(self, cache_dir: Optional[Path] = None):
"""Initialize cache with optional custom directory. """Initialize cache with optional custom directory (only once).
Args: Args:
cache_dir: Optional cache directory path. Defaults to ~/.cache/renamer/ cache_dir: Optional cache directory path. Defaults to ~/.cache/renamer/
""" """
# Only initialize once
if self._initialized:
return
# Always use the default cache dir to avoid creating cache in scan dir # Always use the default cache dir to avoid creating cache in scan dir
if cache_dir is None: if cache_dir is None:
cache_dir = Path.home() / ".cache" / "renamer" cache_dir = Path.home() / ".cache" / "renamer"
@@ -27,6 +43,7 @@ class Cache:
self.cache_dir.mkdir(parents=True, exist_ok=True) self.cache_dir.mkdir(parents=True, exist_ok=True)
self._memory_cache: Dict[str, Dict[str, Any]] = {} # In-memory cache for faster access self._memory_cache: Dict[str, Dict[str, Any]] = {} # In-memory cache for faster access
self._lock = threading.RLock() # Reentrant lock for thread safety self._lock = threading.RLock() # Reentrant lock for thread safety
self._initialized = True
def _sanitize_key_component(self, component: str) -> str: def _sanitize_key_component(self, component: str) -> str:
"""Sanitize a key component to prevent filesystem escaping. """Sanitize a key component to prevent filesystem escaping.
@@ -85,14 +102,15 @@ class Cache:
# Use .json extension for all cache files (simplifies logic) # Use .json extension for all cache files (simplifies logic)
return cache_subdir / f"{key_hash}.json" return cache_subdir / f"{key_hash}.json"
def get(self, key: str) -> Optional[Any]: def get(self, key: str, default: Any = None) -> Any:
"""Get cached value if not expired (thread-safe). """Get cached value if not expired (thread-safe).
Args: Args:
key: Cache key key: Cache key
default: Value to return if key not found or expired
Returns: Returns:
Cached value or None if not found/expired Cached value or default if not found/expired
""" """
with self._lock: with self._lock:
# Check memory cache first # Check memory cache first
@@ -108,7 +126,7 @@ class Cache:
# Check file cache # Check file cache
cache_file = self._get_cache_file(key) cache_file = self._get_cache_file(key)
if not cache_file.exists(): if not cache_file.exists():
return None return default
try: try:
with open(cache_file, 'r') as f: with open(cache_file, 'r') as f:
@@ -118,7 +136,7 @@ class Cache:
# Expired, remove file # Expired, remove file
cache_file.unlink(missing_ok=True) cache_file.unlink(missing_ok=True)
logger.debug(f"File cache expired for key: {key}, removed {cache_file}") logger.debug(f"File cache expired for key: {key}, removed {cache_file}")
return None return default
# Store in memory cache for faster future access # Store in memory cache for faster future access
self._memory_cache[key] = data self._memory_cache[key] = data
@@ -128,11 +146,11 @@ class Cache:
# Corrupted JSON, remove file # Corrupted JSON, remove file
logger.warning(f"Corrupted cache file {cache_file}: {e}") logger.warning(f"Corrupted cache file {cache_file}: {e}")
cache_file.unlink(missing_ok=True) cache_file.unlink(missing_ok=True)
return None return default
except IOError as e: except IOError as e:
# File read error # File read error
logger.error(f"Failed to read cache file {cache_file}: {e}") logger.error(f"Failed to read cache file {cache_file}: {e}")
return None return default
def set(self, key: str, value: Any, ttl_seconds: int) -> None: def set(self, key: str, value: Any, ttl_seconds: int) -> None:
"""Set cached value with TTL (thread-safe). """Set cached value with TTL (thread-safe).
@@ -177,6 +195,56 @@ class Cache:
cache_file.unlink(missing_ok=True) cache_file.unlink(missing_ok=True)
logger.debug(f"Invalidated cache for key: {key}") logger.debug(f"Invalidated cache for key: {key}")
def invalidate_file(self, file_path: Path) -> int:
"""Invalidate all cache entries for a specific file path.
This invalidates all extractor method caches for the given file by:
1. Clearing matching keys from memory cache
2. Removing matching keys from file cache
Args:
file_path: File path to invalidate cache for
Returns:
Number of cache entries invalidated
"""
with self._lock:
# Generate the path hash used in cache keys
path_hash = hashlib.md5(str(file_path).encode()).hexdigest()[:12]
prefix = f"extractor_{path_hash}_"
invalidated_count = 0
# Remove from memory cache (easy - just check prefix)
keys_to_remove = [k for k in self._memory_cache.keys() if k.startswith(prefix)]
for key in keys_to_remove:
del self._memory_cache[key]
invalidated_count += 1
logger.debug(f"Invalidated memory cache for key: {key}")
# For file cache, we need to invalidate all known extractor methods
# List of all cached extractor methods
extractor_methods = [
'extract_title', 'extract_year', 'extract_source', 'extract_video_codec',
'extract_audio_codec', 'extract_frame_class', 'extract_hdr', 'extract_order',
'extract_special_info', 'extract_movie_db', 'extract_extension',
'extract_video_tracks', 'extract_audio_tracks', 'extract_subtitle_tracks',
'extract_interlaced', 'extract_size', 'extract_duration', 'extract_bitrate',
'extract_created', 'extract_modified'
]
# Invalidate each possible cache key
for method in extractor_methods:
cache_key = f"extractor_{path_hash}_{method}"
cache_file = self._get_cache_file(cache_key)
if cache_file.exists():
cache_file.unlink(missing_ok=True)
invalidated_count += 1
logger.debug(f"Invalidated file cache for key: {cache_key}")
logger.info(f"Invalidated {invalidated_count} cache entries for file: {file_path.name}")
return invalidated_count
def get_image(self, key: str) -> Optional[Path]: def get_image(self, key: str) -> Optional[Path]:
"""Get cached image path if not expired (thread-safe). """Get cached image path if not expired (thread-safe).

View File

@@ -19,6 +19,9 @@ from .strategies import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Sentinel object to distinguish "not in cache" from "cached value is None"
_CACHE_MISS = object()
def cached( def cached(
strategy: Optional[CacheKeyStrategy] = None, strategy: Optional[CacheKeyStrategy] = None,
@@ -78,10 +81,10 @@ def cached(
logger.warning(f"Failed to generate cache key: {e}, executing uncached") logger.warning(f"Failed to generate cache key: {e}, executing uncached")
return func(self, *args, **kwargs) return func(self, *args, **kwargs)
# Check cache # Check cache (use sentinel to distinguish "not in cache" from "cached None")
cached_value = cache.get(cache_key) cached_value = cache.get(cache_key, _CACHE_MISS)
if cached_value is not None: if cached_value is not _CACHE_MISS:
logger.debug(f"Cache hit for {func.__name__}: {cache_key}") logger.debug(f"Cache hit for {func.__name__}: {cache_key} (value={cached_value!r})")
return cached_value return cached_value
# Execute function # Execute function
@@ -91,10 +94,9 @@ def cached(
# Determine TTL # Determine TTL
actual_ttl = _determine_ttl(self, ttl) actual_ttl = _determine_ttl(self, ttl)
# Cache result (only if not None) # Cache result (including None - None is valid data meaning "not found")
if result is not None: cache.set(cache_key, result, actual_ttl)
cache.set(cache_key, result, actual_ttl) logger.debug(f"Cached {func.__name__}: {cache_key} (TTL: {actual_ttl}s, value={result!r})")
logger.debug(f"Cached {func.__name__}: {cache_key} (TTL: {actual_ttl}s)")
return result return result
@@ -129,8 +131,9 @@ def _generate_cache_key(
if not file_path: if not file_path:
raise ValueError(f"{instance.__class__.__name__} missing file_path attribute") raise ValueError(f"{instance.__class__.__name__} missing file_path attribute")
instance_id = str(id(instance)) # Cache by file_path + method_name only (no instance_id)
return strategy.generate_key(file_path, func.__name__, instance_id) # This allows cache hits across different extractor instances for the same file
return strategy.generate_key(file_path, func.__name__)
elif isinstance(strategy, APIRequestStrategy): elif isinstance(strategy, APIRequestStrategy):
# API pattern: expects service name in args or uses function name # API pattern: expects service name in args or uses function name
@@ -246,10 +249,10 @@ def cached_api(service: str, ttl: Optional[int] = None):
strategy = APIRequestStrategy() strategy = APIRequestStrategy()
cache_key = strategy.generate_key(service, func.__name__, {'params': args_repr}) cache_key = strategy.generate_key(service, func.__name__, {'params': args_repr})
# Check cache # Check cache (use sentinel to distinguish "not in cache" from "cached None")
cached_value = cache.get(cache_key) cached_value = cache.get(cache_key, _CACHE_MISS)
if cached_value is not None: if cached_value is not _CACHE_MISS:
logger.debug(f"API cache hit for {service}.{func.__name__}") logger.debug(f"API cache hit for {service}.{func.__name__} (value={cached_value!r})")
return cached_value return cached_value
# Execute function # Execute function
@@ -267,10 +270,9 @@ def cached_api(service: str, ttl: Optional[int] = None):
else: else:
actual_ttl = 21600 # Default 6 hours actual_ttl = 21600 # Default 6 hours
# Cache result (only if not None) # Cache result (including None - None is valid data)
if result is not None: cache.set(cache_key, result, actual_ttl)
cache.set(cache_key, result, actual_ttl) logger.debug(f"API cached {service}.{func.__name__} (TTL: {actual_ttl}s, value={result!r})")
logger.debug(f"API cached {service}.{func.__name__} (TTL: {actual_ttl}s)")
return result return result

View File

@@ -12,7 +12,7 @@ This package contains constants split into logical modules:
""" """
# Import from all constant modules # Import from all constant modules
from .media_constants import MEDIA_TYPES from .media_constants import MEDIA_TYPES, META_TYPE_TO_EXTENSIONS
from .source_constants import SOURCE_DICT from .source_constants import SOURCE_DICT
from .frame_constants import FRAME_CLASSES, NON_STANDARD_QUALITY_INDICATORS from .frame_constants import FRAME_CLASSES, NON_STANDARD_QUALITY_INDICATORS
from .moviedb_constants import MOVIE_DB_DICT from .moviedb_constants import MOVIE_DB_DICT
@@ -24,6 +24,7 @@ from .cyrillic_constants import CYRILLIC_TO_ENGLISH
__all__ = [ __all__ = [
# Media types # Media types
'MEDIA_TYPES', 'MEDIA_TYPES',
'META_TYPE_TO_EXTENSIONS',
# Source types # Source types
'SOURCE_DICT', 'SOURCE_DICT',
# Frame classes # Frame classes

View File

@@ -54,3 +54,13 @@ MEDIA_TYPES = {
"mime": "video/mpeg", "mime": "video/mpeg",
}, },
} }
# Reverse mapping: meta_type -> list of extensions
# Built once at module load instead of rebuilding in every extractor instance
META_TYPE_TO_EXTENSIONS = {}
for ext, info in MEDIA_TYPES.items():
meta_type = info.get('meta_type')
if meta_type:
if meta_type not in META_TYPE_TO_EXTENSIONS:
META_TYPE_TO_EXTENSIONS[meta_type] = []
META_TYPE_TO_EXTENSIONS[meta_type].append(ext)

View File

@@ -45,14 +45,15 @@ class MediaExtractor:
>>> tracks = extractor.get("video_tracks") >>> tracks = extractor.get("video_tracks")
""" """
def __init__(self, file_path: Path): def __init__(self, file_path: Path, use_cache: bool = True):
self.file_path = file_path self.file_path = file_path
self.filename_extractor = FilenameExtractor(file_path) # Initialize all extractors - they use singleton Cache internally
self.metadata_extractor = MetadataExtractor(file_path) self.filename_extractor = FilenameExtractor(file_path, use_cache)
self.mediainfo_extractor = MediaInfoExtractor(file_path) self.metadata_extractor = MetadataExtractor(file_path, use_cache)
self.fileinfo_extractor = FileInfoExtractor(file_path) self.mediainfo_extractor = MediaInfoExtractor(file_path, use_cache)
self.tmdb_extractor = TMDBExtractor(file_path) self.fileinfo_extractor = FileInfoExtractor(file_path, use_cache)
self.tmdb_extractor = TMDBExtractor(file_path, use_cache)
self.default_extractor = DefaultExtractor() self.default_extractor = DefaultExtractor()
# Extractor mapping # Extractor mapping

View File

@@ -6,15 +6,8 @@ file system metadata such as size, timestamps, paths, and extensions.
from pathlib import Path from pathlib import Path
import logging import logging
import os from ..cache import cached_method, Cache
from ..cache import cached_method from ..logging_config import LoggerConfig # Initialize logging singleton
# Set up logging conditionally
if os.getenv('FORMATTER_LOG', '0') == '1':
logging.basicConfig(filename='formatter.log', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
else:
logging.basicConfig(level=logging.CRITICAL) # Disable logging
class FileInfoExtractor: class FileInfoExtractor:
@@ -39,13 +32,17 @@ class FileInfoExtractor:
>>> name = extractor.extract_file_name() # Returns "movie.mkv" >>> name = extractor.extract_file_name() # Returns "movie.mkv"
""" """
def __init__(self, file_path: Path): def __init__(self, file_path: Path, use_cache: bool = True):
"""Initialize the FileInfoExtractor. """Initialize the FileInfoExtractor.
Args: Args:
file_path: Path object pointing to the file to extract info from file_path: Path object pointing to the file to extract info from
use_cache: Whether to use caching (default: True)
""" """
self._file_path = file_path self._file_path = file_path
self.file_path = file_path # Expose for cache key generation
self.cache = Cache() if use_cache else None # Singleton cache for @cached_method decorator
self.settings = None # Will be set by Settings singleton if needed
self._stat = file_path.stat() self._stat = file_path.stat()
self._cache: dict[str, any] = {} # Internal cache for method results self._cache: dict[str, any] = {} # Internal cache for method results

View File

@@ -8,7 +8,7 @@ from ..constants import (
is_valid_year, is_valid_year,
CYRILLIC_TO_ENGLISH CYRILLIC_TO_ENGLISH
) )
from ..cache import cached_method from ..cache import cached_method, Cache
from ..utils.pattern_utils import PatternExtractor from ..utils.pattern_utils import PatternExtractor
import langcodes import langcodes
@@ -18,7 +18,7 @@ logger = logging.getLogger(__name__)
class FilenameExtractor: class FilenameExtractor:
"""Class to extract information from filename""" """Class to extract information from filename"""
def __init__(self, file_path: Path | str): def __init__(self, file_path: Path | str, use_cache: bool = True):
if isinstance(file_path, str): if isinstance(file_path, str):
self.file_path = Path(file_path) self.file_path = Path(file_path)
self.file_name = file_path self.file_name = file_path
@@ -26,6 +26,9 @@ class FilenameExtractor:
self.file_path = file_path self.file_path = file_path
self.file_name = file_path.name self.file_name = file_path.name
self.cache = Cache() if use_cache else None # Singleton cache for @cached_method decorator
self.settings = None # Will be set by Settings singleton if needed
# Initialize utility helper # Initialize utility helper
self._pattern_extractor = PatternExtractor() self._pattern_extractor = PatternExtractor()

View File

@@ -1,8 +1,8 @@
from pathlib import Path from pathlib import Path
from pymediainfo import MediaInfo from pymediainfo import MediaInfo
from collections import Counter from collections import Counter
from ..constants import FRAME_CLASSES, MEDIA_TYPES from ..constants import FRAME_CLASSES, META_TYPE_TO_EXTENSIONS
from ..cache import cached_method from ..cache import cached_method, Cache
import langcodes import langcodes
import logging import logging
@@ -12,30 +12,25 @@ logger = logging.getLogger(__name__)
class MediaInfoExtractor: class MediaInfoExtractor:
"""Class to extract information from MediaInfo""" """Class to extract information from MediaInfo"""
def __init__(self, file_path: Path): def __init__(self, file_path: Path, use_cache: bool = True):
self.file_path = file_path self.file_path = file_path
self.cache = Cache() if use_cache else None # Singleton cache for @cached_method decorator
self.settings = None # Will be set by Settings singleton if needed
self._cache = {} # Internal cache for method results self._cache = {} # Internal cache for method results
try:
self.media_info = MediaInfo.parse(file_path) # Parse media info - set to None on failure
self.media_info = MediaInfo.parse(file_path) if file_path.exists() else None
# Extract tracks
if self.media_info:
self.video_tracks = [t for t in self.media_info.tracks if t.track_type == 'Video'] self.video_tracks = [t for t in self.media_info.tracks if t.track_type == 'Video']
self.audio_tracks = [t for t in self.media_info.tracks if t.track_type == 'Audio'] self.audio_tracks = [t for t in self.media_info.tracks if t.track_type == 'Audio']
self.sub_tracks = [t for t in self.media_info.tracks if t.track_type == 'Text'] self.sub_tracks = [t for t in self.media_info.tracks if t.track_type == 'Text']
except Exception as e: else:
logger.warning(f"Failed to parse media info for {file_path}: {e}")
self.media_info = None
self.video_tracks = [] self.video_tracks = []
self.audio_tracks = [] self.audio_tracks = []
self.sub_tracks = [] self.sub_tracks = []
# Build mapping from meta_type to extensions
self._format_to_extensions = {}
for ext, info in MEDIA_TYPES.items():
meta_type = info.get('meta_type')
if meta_type:
if meta_type not in self._format_to_extensions:
self._format_to_extensions[meta_type] = []
self._format_to_extensions[meta_type].append(ext)
def _get_frame_class_from_height(self, height: int) -> str | None: def _get_frame_class_from_height(self, height: int) -> str | None:
"""Get frame class from video height, finding closest match if exact not found""" """Get frame class from video height, finding closest match if exact not found"""
if not height: if not height:
@@ -83,16 +78,23 @@ class MediaInfoExtractor:
scan_type_attr = getattr(self.video_tracks[0], 'scan_type', None) scan_type_attr = getattr(self.video_tracks[0], 'scan_type', None)
interlaced = getattr(self.video_tracks[0], 'interlaced', None) interlaced = getattr(self.video_tracks[0], 'interlaced', None)
logger.debug(f"[{self.file_path.name}] Frame class detection - Resolution: {width}x{height}")
logger.debug(f"[{self.file_path.name}] scan_type attribute: {scan_type_attr!r} (type: {type(scan_type_attr).__name__})")
logger.debug(f"[{self.file_path.name}] interlaced attribute: {interlaced!r} (type: {type(interlaced).__name__})")
# Determine scan type from available attributes # Determine scan type from available attributes
# Check scan_type first (e.g., "Interlaced", "Progressive", "MBAFF") # Check scan_type first (e.g., "Interlaced", "Progressive", "MBAFF")
if scan_type_attr and isinstance(scan_type_attr, str): if scan_type_attr and isinstance(scan_type_attr, str):
scan_type = 'i' if 'interlaced' in scan_type_attr.lower() else 'p' scan_type = 'i' if 'interlaced' in scan_type_attr.lower() else 'p'
logger.debug(f"[{self.file_path.name}] Using scan_type: {scan_type_attr!r} -> scan_type={scan_type!r}")
# Then check interlaced flag (e.g., "Yes", "No") # Then check interlaced flag (e.g., "Yes", "No")
elif interlaced and isinstance(interlaced, str): elif interlaced and isinstance(interlaced, str):
scan_type = 'i' if interlaced.lower() in ['yes', 'true', '1'] else 'p' scan_type = 'i' if interlaced.lower() in ['yes', 'true', '1'] else 'p'
logger.debug(f"[{self.file_path.name}] Using interlaced: {interlaced!r} -> scan_type={scan_type!r}")
else: else:
# Default to progressive if no information available # Default to progressive if no information available
scan_type = 'p' scan_type = 'p'
logger.debug(f"[{self.file_path.name}] No scan type info, defaulting to progressive")
# Calculate effective height for frame class determination # Calculate effective height for frame class determination
aspect_ratio = 16 / 9 aspect_ratio = 16 / 9
@@ -113,12 +115,15 @@ class MediaInfoExtractor:
if width_matches: if width_matches:
# Choose the frame class with the smallest height difference # Choose the frame class with the smallest height difference
width_matches.sort(key=lambda x: x[1]) width_matches.sort(key=lambda x: x[1])
return width_matches[0][0] result = width_matches[0][0]
logger.debug(f"[{self.file_path.name}] Result (width match): {result!r}")
return result
# If no width match, fall back to height-based matching # If no width match, fall back to height-based matching
# First try exact match with standard frame classes # First try exact match with standard frame classes
frame_class = f"{int(round(effective_height))}{scan_type}" frame_class = f"{int(round(effective_height))}{scan_type}"
if frame_class in FRAME_CLASSES: if frame_class in FRAME_CLASSES:
logger.debug(f"[{self.file_path.name}] Result (exact height match): {frame_class!r}")
return frame_class return frame_class
# Find closest standard height match # Find closest standard height match
@@ -133,9 +138,11 @@ class MediaInfoExtractor:
# Return closest standard match if within reasonable distance (20 pixels) # Return closest standard match if within reasonable distance (20 pixels)
if closest_class and min_diff <= 20: if closest_class and min_diff <= 20:
logger.debug(f"[{self.file_path.name}] Result (closest match, diff={min_diff}): {closest_class!r}")
return closest_class return closest_class
# For non-standard resolutions, create a custom frame class # For non-standard resolutions, create a custom frame class
logger.debug(f"[{self.file_path.name}] Result (custom/non-standard): {frame_class!r}")
return frame_class return frame_class
@cached_method() @cached_method()
@@ -265,8 +272,8 @@ class MediaInfoExtractor:
if not general_track: if not general_track:
return None return None
format_ = getattr(general_track, 'format', None) format_ = getattr(general_track, 'format', None)
if format_ in self._format_to_extensions: if format_ in META_TYPE_TO_EXTENSIONS:
exts = self._format_to_extensions[format_] exts = META_TYPE_TO_EXTENSIONS[format_]
if format_ == 'Matroska': if format_ == 'Matroska':
if self.is_3d() and 'mk3d' in exts: if self.is_3d() and 'mk3d' in exts:
return 'mk3d' return 'mk3d'
@@ -283,3 +290,49 @@ class MediaInfoExtractor:
return None return None
stereoscopic = getattr(self.video_tracks[0], 'stereoscopic', None) stereoscopic = getattr(self.video_tracks[0], 'stereoscopic', None)
return stereoscopic if stereoscopic else None return stereoscopic if stereoscopic else None
@cached_method()
def extract_interlaced(self) -> bool | None:
"""Determine if the video is interlaced.
Returns:
True: Video is interlaced
False: Video is progressive (explicitly set)
None: Information not available in MediaInfo
"""
if not self.video_tracks:
logger.debug(f"[{self.file_path.name}] Interlaced detection: No video tracks")
return None
scan_type_attr = getattr(self.video_tracks[0], 'scan_type', None)
interlaced = getattr(self.video_tracks[0], 'interlaced', None)
logger.debug(f"[{self.file_path.name}] Interlaced detection:")
logger.debug(f"[{self.file_path.name}] scan_type: {scan_type_attr!r} (type: {type(scan_type_attr).__name__})")
logger.debug(f"[{self.file_path.name}] interlaced: {interlaced!r} (type: {type(interlaced).__name__})")
# Check scan_type attribute first (e.g., "Interlaced", "Progressive", "MBAFF")
if scan_type_attr and isinstance(scan_type_attr, str):
scan_lower = scan_type_attr.lower()
if 'interlaced' in scan_lower or 'mbaff' in scan_lower:
logger.debug(f"[{self.file_path.name}] Result: True (from scan_type={scan_type_attr!r})")
return True
elif 'progressive' in scan_lower:
logger.debug(f"[{self.file_path.name}] Result: False (from scan_type={scan_type_attr!r})")
return False
# If scan_type has some other value, fall through to check interlaced
logger.debug(f"[{self.file_path.name}] scan_type unrecognized, checking interlaced attribute")
# Check interlaced attribute (e.g., "Yes", "No")
if interlaced and isinstance(interlaced, str):
interlaced_lower = interlaced.lower()
if interlaced_lower in ['yes', 'true', '1']:
logger.debug(f"[{self.file_path.name}] Result: True (from interlaced={interlaced!r})")
return True
elif interlaced_lower in ['no', 'false', '0']:
logger.debug(f"[{self.file_path.name}] Result: False (from interlaced={interlaced!r})")
return False
# No information available
logger.debug(f"[{self.file_path.name}] Result: None (no information available)")
return None

View File

@@ -8,7 +8,7 @@ import mutagen
import logging import logging
from pathlib import Path from pathlib import Path
from ..constants import MEDIA_TYPES from ..constants import MEDIA_TYPES
from ..cache import cached_method from ..cache import cached_method, Cache
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -32,13 +32,16 @@ class MetadataExtractor:
>>> duration = extractor.extract_duration() >>> duration = extractor.extract_duration()
""" """
def __init__(self, file_path: Path): def __init__(self, file_path: Path, use_cache: bool = True):
"""Initialize the MetadataExtractor. """Initialize the MetadataExtractor.
Args: Args:
file_path: Path object pointing to the media file file_path: Path object pointing to the media file
use_cache: Whether to use caching (default: True)
""" """
self.file_path = file_path self.file_path = file_path
self.cache = Cache() if use_cache else None # Singleton cache for @cached_method decorator
self.settings = None # Will be set by Settings singleton if needed
self._cache: dict[str, any] = {} # Internal cache for method results self._cache: dict[str, any] = {} # Internal cache for method results
try: try:
self.info = mutagen.File(file_path) # type: ignore self.info = mutagen.File(file_path) # type: ignore

View File

@@ -13,10 +13,11 @@ from ..settings import Settings
class TMDBExtractor: class TMDBExtractor:
"""Class to extract TMDB movie information""" """Class to extract TMDB movie information"""
def __init__(self, file_path: Path): def __init__(self, file_path: Path, use_cache: bool = True):
self.file_path = file_path self.file_path = file_path
self.cache = Cache() self.cache = Cache() if use_cache else None # Singleton cache
self.ttl_seconds = Settings().get("cache_ttl_extractors", 21600) self.settings = Settings() # Singleton settings
self.ttl_seconds = self.settings.get("cache_ttl_extractors", 21600)
self._movie_db_info = None self._movie_db_info = None
def _get_cached_data(self, cache_key: str) -> Optional[Dict[str, Any]]: def _get_cached_data(self, cache_key: str) -> Optional[Dict[str, Any]]:

View File

@@ -15,20 +15,14 @@ class SpecialInfoFormatter:
"""Format database info dictionary or tuple/list into a string""" """Format database info dictionary or tuple/list into a string"""
import logging import logging
import os import os
if os.getenv("FORMATTER_LOG"):
logging.info(f"format_database_info called with: {database_info!r} (type: {type(database_info)})")
if isinstance(database_info, dict) and 'name' in database_info and 'id' in database_info: if isinstance(database_info, dict) and 'name' in database_info and 'id' in database_info:
db_name = database_info['name'] db_name = database_info['name']
db_id = database_info['id'] db_id = database_info['id']
result = f"{db_name}id-{db_id}" result = f"{db_name}id-{db_id}"
if os.getenv("FORMATTER_LOG"):
logging.info(f"Formatted dict to: {result!r}")
return result return result
elif isinstance(database_info, (tuple, list)) and len(database_info) == 2: elif isinstance(database_info, (tuple, list)) and len(database_info) == 2:
db_name, db_id = database_info db_name, db_id = database_info
result = f"{db_name}id-{db_id}" result = f"{db_name}id-{db_id}"
if os.getenv("FORMATTER_LOG"):
logging.info(f"Formatted tuple/list to: {result!r}")
return result return result
if os.getenv("FORMATTER_LOG"): if os.getenv("FORMATTER_LOG"):
logging.info("Returning None") logging.info("Returning None")

46
renamer/logging_config.py Normal file
View File

@@ -0,0 +1,46 @@
"""Singleton logging configuration for the renamer application.
This module provides centralized logging configuration that is initialized
once and used throughout the application.
"""
import logging
import os
import threading
class LoggerConfig:
"""Singleton logger configuration."""
_instance = None
_lock = threading.Lock()
_initialized = False
def __new__(cls):
"""Create or return singleton instance."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""Initialize logging configuration (only once)."""
if LoggerConfig._initialized:
return
# Check environment variable for formatter logging
if os.getenv('FORMATTER_LOG', '0') == '1':
logging.basicConfig(
filename='formatter.log',
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
else:
logging.basicConfig(level=logging.INFO)
LoggerConfig._initialized = True
# Initialize logging on import
LoggerConfig()

View File

@@ -11,6 +11,7 @@ This service manages the process of converting AVI/MPG/MPEG/WebM/MP4 files to MK
import logging import logging
import subprocess import subprocess
import platform import platform
import re
from pathlib import Path from pathlib import Path
from typing import Optional, List, Dict, Tuple from typing import Optional, List, Dict, Tuple
@@ -216,6 +217,34 @@ class ConversionService:
logger.debug(f"Found {len(subtitle_files)} subtitle files for {video_path.name}") logger.debug(f"Found {len(subtitle_files)} subtitle files for {video_path.name}")
return subtitle_files return subtitle_files
def _expand_lang_counts(self, lang_str: str) -> List[str]:
"""Expand language string with counts to individual languages.
Handles formats like:
- "2ukr" -> ['ukr', 'ukr']
- "ukr" -> ['ukr']
- "3eng" -> ['eng', 'eng', 'eng']
Args:
lang_str: Language string possibly with numeric prefix
Returns:
List of expanded language codes
Example:
>>> service._expand_lang_counts("2ukr")
['ukr', 'ukr']
"""
# Match pattern: optional number + language code
match = re.match(r'^(\d+)?([a-z]{2,3})$', lang_str.lower())
if match:
count = int(match.group(1)) if match.group(1) else 1
lang = match.group(2)
return [lang] * count
else:
# No numeric prefix, return as-is
return [lang_str.lower()]
def map_audio_languages( def map_audio_languages(
self, self,
extractor: MediaExtractor, extractor: MediaExtractor,
@@ -227,6 +256,8 @@ class ConversionService:
in order. If filename has fewer languages than tracks, remaining in order. If filename has fewer languages than tracks, remaining
tracks get None. tracks get None.
Handles numeric prefixes like "2ukr,eng" -> ['ukr', 'ukr', 'eng']
Args: Args:
extractor: MediaExtractor with filename data extractor: MediaExtractor with filename data
audio_track_count: Number of audio tracks in the file audio_track_count: Number of audio tracks in the file
@@ -235,9 +266,10 @@ class ConversionService:
List of language codes (or None) for each audio track List of language codes (or None) for each audio track
Example: Example:
>>> langs = service.map_audio_languages(extractor, 2) >>> langs = service.map_audio_languages(extractor, 3)
>>> # For filename with [2ukr,eng]
>>> print(langs) >>> print(langs)
['ukr', 'eng'] ['ukr', 'ukr', 'eng']
""" """
# Get audio_langs from filename extractor # Get audio_langs from filename extractor
audio_langs_str = extractor.get('audio_langs', 'Filename') audio_langs_str = extractor.get('audio_langs', 'Filename')
@@ -246,8 +278,13 @@ class ConversionService:
logger.debug("No audio languages found in filename") logger.debug("No audio languages found in filename")
return [None] * audio_track_count return [None] * audio_track_count
# Split by comma and clean # Split by comma and expand numeric prefixes
langs = [lang.strip().lower() for lang in audio_langs_str.split(',')] lang_parts = [lang.strip() for lang in audio_langs_str.split(',')]
langs = []
for part in lang_parts:
langs.extend(self._expand_lang_counts(part))
logger.debug(f"Expanded languages from '{audio_langs_str}' to: {langs}")
# Map to tracks (pad with None if needed) # Map to tracks (pad with None if needed)
result = [] result = []

View File

@@ -1,11 +1,12 @@
import json import json
import os import os
import threading
from pathlib import Path from pathlib import Path
from typing import Dict, Any from typing import Dict, Any, Optional
class Settings: class Settings:
"""Manages application settings stored in a JSON file.""" """Manages application settings stored in a JSON file (Singleton)."""
DEFAULTS = { DEFAULTS = {
"mode": "technical", # "technical" or "catalog" "mode": "technical", # "technical" or "catalog"
@@ -17,12 +18,30 @@ class Settings:
"cache_ttl_posters": 2592000, # 30 days in seconds "cache_ttl_posters": 2592000, # 30 days in seconds
} }
_instance: Optional['Settings'] = None
_lock = threading.Lock()
def __new__(cls, config_dir: Path | None = None):
"""Create or return singleton instance."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, config_dir: Path | None = None): def __init__(self, config_dir: Path | None = None):
"""Initialize settings (only once)."""
# Only initialize once
if self._initialized:
return
if config_dir is None: if config_dir is None:
config_dir = Path.home() / ".config" / "renamer" config_dir = Path.home() / ".config" / "renamer"
self.config_dir = config_dir self.config_dir = config_dir
self.config_file = self.config_dir / "config.json" self.config_file = self.config_dir / "config.json"
self._settings = self.DEFAULTS.copy() self._settings = self.DEFAULTS.copy()
self._initialized = True
self.load() self.load()
def load(self) -> None: def load(self) -> None:

View File

@@ -126,6 +126,7 @@ class MediaPanelView:
self._props.title("Media Info Extraction"), self._props.title("Media Info Extraction"),
self._props.mediainfo_duration, self._props.mediainfo_duration,
self._props.mediainfo_frame_class, self._props.mediainfo_frame_class,
self._props.mediainfo_interlace,
self._props.mediainfo_resolution, self._props.mediainfo_resolution,
self._props.mediainfo_aspect_ratio, self._props.mediainfo_aspect_ratio,
self._props.mediainfo_hdr, self._props.mediainfo_hdr,

View File

@@ -223,6 +223,14 @@ class MediaPanelProperties:
"""Get MediaInfo frame class formatted with label.""" """Get MediaInfo frame class formatted with label."""
return self._extractor.get("frame_class", "MediaInfo") return self._extractor.get("frame_class", "MediaInfo")
@property
@conditional_decorators.wrap("Interlaced: ")
@text_decorators.colour(name="grey")
@conditional_decorators.default("Not extracted")
def mediainfo_interlace(self) -> str:
"""Get MediaInfo interlace formatted with label."""
return self._extractor.get("interlaced", "MediaInfo")
@property @property
@conditional_decorators.wrap("Resolution: ") @conditional_decorators.wrap("Resolution: ")
@text_decorators.colour(name="grey") @text_decorators.colour(name="grey")

2
uv.lock generated
View File

@@ -462,7 +462,7 @@ wheels = [
[[package]] [[package]]
name = "renamer" name = "renamer"
version = "0.8.8" version = "0.8.9"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "langcodes" }, { name = "langcodes" },