feat: Introduce DefaultExtractor for fallback values and refactor extraction logic in MediaExtractor

This commit is contained in:
sHa
2025-12-26 19:10:49 +00:00
parent aec31bae9e
commit 2dce807984
5 changed files with 204 additions and 130 deletions

View File

@@ -3,137 +3,160 @@ from .extractors.filename_extractor import FilenameExtractor
from .extractors.metadata_extractor import MetadataExtractor from .extractors.metadata_extractor import MetadataExtractor
from .extractors.mediainfo_extractor import MediaInfoExtractor from .extractors.mediainfo_extractor import MediaInfoExtractor
from .extractors.fileinfo_extractor import FileInfoExtractor from .extractors.fileinfo_extractor import FileInfoExtractor
from .extractors.default_extractor import DefaultExtractor
class MediaExtractor: class MediaExtractor:
"""Class to extract various metadata from media files using specialized extractors""" """Class to extract various metadata from media files using specialized extractors"""
def __init__(self, file_path: Path): def __init__(self, file_path: Path):
self.file_path = file_path
self.filename_extractor = FilenameExtractor(file_path) self.filename_extractor = FilenameExtractor(file_path)
self.metadata_extractor = MetadataExtractor(file_path) self.metadata_extractor = MetadataExtractor(file_path)
self.mediainfo_extractor = MediaInfoExtractor(file_path) self.mediainfo_extractor = MediaInfoExtractor(file_path)
self.fileinfo_extractor = FileInfoExtractor(file_path) self.fileinfo_extractor = FileInfoExtractor(file_path)
self.default_extractor = DefaultExtractor()
# Define sources for each data type # Extractor mapping
self._sources = { self._extractors = {
'title': [ "Metadata": self.metadata_extractor,
('Metadata', lambda: self.metadata_extractor.extract_title()), "Filename": self.filename_extractor,
('Filename', lambda: self.filename_extractor.extract_title()) "MediaInfo": self.mediainfo_extractor,
], "FileInfo": self.fileinfo_extractor,
'year': [ "Default": self.default_extractor,
('Filename', lambda: self.filename_extractor.extract_year())
],
'source': [
('Filename', lambda: self.filename_extractor.extract_source())
],
'frame_class': [
('MediaInfo', lambda: self.mediainfo_extractor.extract_frame_class()),
('Filename', lambda: self.filename_extractor.extract_frame_class())
],
'resolution': [
('MediaInfo', lambda: self.mediainfo_extractor.extract_resolution())
],
'aspect_ratio': [
('MediaInfo', lambda: self.mediainfo_extractor.extract_aspect_ratio())
],
'hdr': [
('MediaInfo', lambda: self.mediainfo_extractor.extract_hdr()),
('Filename', lambda: self.filename_extractor.extract_hdr())
],
'movie_db': [
('Filename', lambda: self.filename_extractor.extract_movie_db())
],
'audio_langs': [
('MediaInfo', lambda: self.mediainfo_extractor.extract_audio_langs()),
('Filename', lambda: self.filename_extractor.extract_audio_langs())
],
'meta_type': [
('Metadata', lambda: self.metadata_extractor.extract_meta_type())
],
'file_size': [
('FileInfo', lambda: self.fileinfo_extractor.extract_size())
],
'modification_time': [
('FileInfo', lambda: self.fileinfo_extractor.extract_modification_time())
],
'file_name': [
('FileInfo', lambda: self.fileinfo_extractor.extract_file_name())
],
'file_path': [
('FileInfo', lambda: self.fileinfo_extractor.extract_file_path())
],
'extension': [
('FileInfo', lambda: self.fileinfo_extractor.extract_extension())
],
'video_tracks': [
('MediaInfo', lambda: self.mediainfo_extractor.extract_video_tracks())
],
'audio_tracks': [
('MediaInfo', lambda: self.mediainfo_extractor.extract_audio_tracks())
],
'subtitle_tracks': [
('MediaInfo', lambda: self.mediainfo_extractor.extract_subtitle_tracks())
],
} }
# Conditions for when a value is considered valid # Define sources and conditions for each data type
self._conditions = { self._data = {
'title': lambda x: x is not None, "title": {
'year': lambda x: x is not None, "sources": [
'source': lambda x: x is not None, ("Metadata", "extract_title"),
'frame_class': lambda x: x and x != 'Unclassified', ("Filename", "extract_title"),
'resolution': lambda x: x is not None, ("Default", "extract_title"),
'aspect_ratio': lambda x: x is not None, ],
'hdr': lambda x: x is not None, },
'movie_db': lambda x: x is not None, "year": {
'audio_langs': lambda x: x is not None, "sources": [
'tracks': lambda x: x is not None and any(x.get(k, []) for k in ['video_tracks', 'audio_tracks', 'subtitle_tracks']), ("Filename", "extract_year"),
'video_tracks': lambda x: x is not None and len(x) > 0, ("Default", "extract_year"),
'audio_tracks': lambda x: x is not None and len(x) > 0, ],
'subtitle_tracks': lambda x: x is not None and len(x) > 0, },
"source": {
"sources": [
("Filename", "extract_source"),
("Default", "extract_source"),
],
},
"frame_class": {
"sources": [
("MediaInfo", "extract_frame_class"),
("Filename", "extract_frame_class"),
("Default", "extract_frame_class"),
],
},
"resolution": {
"sources": [
("MediaInfo", "extract_resolution"),
("Default", "extract_resolution"),
],
},
"hdr": {
"sources": [
("MediaInfo", "extract_hdr"),
("Filename", "extract_hdr"),
("Default", "extract_hdr"),
],
},
"movie_db": {
"sources": [
("Filename", "extract_movie_db"),
("Default", "extract_movie_db"),
],
},
"audio_langs": {
"sources": [
("MediaInfo", "extract_audio_langs"),
("Filename", "extract_audio_langs"),
("Default", "extract_audio_langs"),
],
},
"meta_type": {
"sources": [
("Metadata", "extract_meta_type"),
("Default", "extract_meta_type"),
],
},
"file_size": {
"sources": [
("FileInfo", "extract_size"),
("Default", "extract_size"),
],
},
"modification_time": {
"sources": [
("FileInfo", "extract_modification_time"),
("Default", "extract_modification_time"),
],
},
"file_name": {
"sources": [
("FileInfo", "extract_file_name"),
("Default", "extract_file_name"),
],
},
"file_path": {
"sources": [
("FileInfo", "extract_file_path"),
("Default", "extract_file_path"),
],
},
"extension": {
"sources": [
("FileInfo", "extract_extension"),
("Default", "extract_extension"),
],
},
"video_tracks": {
"sources": [
("MediaInfo", "extract_video_tracks"),
("Default", "extract_video_tracks"),
],
},
"audio_tracks": {
"sources": [
("MediaInfo", "extract_audio_tracks"),
("Default", "extract_audio_tracks"),
],
},
"subtitle_tracks": {
"sources": [
("MediaInfo", "extract_subtitle_tracks"),
("Default", "extract_subtitle_tracks"),
],
},
} }
def get(self, key: str, source: str | None = None): def get(self, key: str, source: str | None = None):
"""Get extracted data by key, optionally from specific source""" """Get extracted data by key, optionally from specific source"""
if key in self._sources:
condition = self._conditions.get(key, lambda x: x is not None)
if source: if source:
for src, func in self._sources[key]: # Specific source requested - find the extractor and call the method directly
if src.lower() == source.lower(): for extractor_name, extractor in self._extractors.items():
val = func() if extractor_name.lower() == source.lower():
return val if condition(val) else None method = f"extract_{key}"
return None # Source not found for this key, return None if hasattr(extractor, method):
else: return getattr(extractor, method)()
# Use fallback: return first valid value
for src, func in self._sources[key]:
val = func()
if condition(val):
return val
return None return None
else:
# Key not in _sources, try to call extract_<key> on extractors
extract_method = f'extract_{key}'
extractors = [
('MediaInfo', self.mediainfo_extractor),
('Metadata', self.metadata_extractor),
('Filename', self.filename_extractor),
('FileInfo', self.fileinfo_extractor)
]
if source: # Fallback mode - try sources in order
for src_name, extractor in extractors: if key in self._data:
if src_name.lower() == source.lower(): sources = self._data[key]["sources"]
if hasattr(extractor, extract_method):
val = getattr(extractor, extract_method)()
return val
return None
else: else:
# Try all extractors in order # Try extractors in order for unconfigured keys
for src_name, extractor in extractors: sources = [(name, f"extract_{key}") for name in ["MediaInfo", "Metadata", "Filename", "FileInfo"]]
if hasattr(extractor, extract_method):
val = getattr(extractor, extract_method)() # Try each source in order until a non-None value is found
for src, method in sources:
if src in self._extractors and hasattr(self._extractors[src], method):
val = getattr(self._extractors[src], method)()
if val is not None: if val is not None:
return val return val
return None return None

View File

@@ -0,0 +1,50 @@
class DefaultExtractor:
"""Extractor that provides default fallback values"""
def extract_title(self):
return "Unknown Title"
def extract_year(self):
return None
def extract_source(self):
return None
def extract_resolution(self):
return None
def extract_hdr(self):
return None
def extract_movie_db(self):
return None
def extract_audio_langs(self):
return None
def extract_meta_type(self):
return None
def extract_size(self):
return None
def extract_modification_time(self):
return None
def extract_file_name(self):
return None
def extract_file_path(self):
return None
def extract_frame_class(self):
return None
def extract_video_tracks(self):
return []
def extract_audio_tracks(self):
return []
def extract_subtitle_tracks(self):
return []

View File

@@ -12,12 +12,12 @@ class FilenameExtractor:
self.file_path = file_path self.file_path = file_path
self.file_name = file_path.name self.file_name = file_path.name
def _get_frame_class_from_height(self, height: int) -> str: def _get_frame_class_from_height(self, height: int) -> str | None:
"""Get frame class from video height using FRAME_CLASSES constant""" """Get frame class from video height using FRAME_CLASSES constant"""
for frame_class, info in FRAME_CLASSES.items(): for frame_class, info in FRAME_CLASSES.items():
if height == info['nominal_height']: if height == info['nominal_height']:
return frame_class return frame_class
return 'Unclassified' return None
def extract_title(self) -> str | None: def extract_title(self) -> str | None:
"""Extract movie title from filename""" """Extract movie title from filename"""
@@ -126,9 +126,9 @@ class FilenameExtractor:
unclassified_indicators = ['SD', 'LQ', 'HD', 'QHD'] unclassified_indicators = ['SD', 'LQ', 'HD', 'QHD']
for indicator in unclassified_indicators: for indicator in unclassified_indicators:
if re.search(r'\b' + re.escape(indicator) + r'\b', self.file_name, re.IGNORECASE): if re.search(r'\b' + re.escape(indicator) + r'\b', self.file_name, re.IGNORECASE):
return 'Unclassified' return None
return 'Unclassified' return None
def extract_hdr(self) -> str | None: def extract_hdr(self) -> str | None:
"""Extract HDR information from filename""" """Extract HDR information from filename"""

View File

@@ -26,7 +26,7 @@ class MediaInfoExtractor:
for frame_class, info in FRAME_CLASSES.items(): for frame_class, info in FRAME_CLASSES.items():
if height == info['nominal_height']: if height == info['nominal_height']:
return frame_class return frame_class
return 'Unclassified' return None
def extract_duration(self) -> float | None: def extract_duration(self) -> float | None:
"""Extract duration from media info in seconds""" """Extract duration from media info in seconds"""
@@ -39,11 +39,11 @@ class MediaInfoExtractor:
def extract_frame_class(self) -> str | None: def extract_frame_class(self) -> str | None:
"""Extract frame class from media info (480p, 720p, 1080p, etc.)""" """Extract frame class from media info (480p, 720p, 1080p, etc.)"""
if not self.video_tracks: if not self.video_tracks:
return 'Unclassified' return None
height = getattr(self.video_tracks[0], 'height', None) height = getattr(self.video_tracks[0], 'height', None)
if height: if height:
return self._get_frame_class_from_height(height) return self._get_frame_class_from_height(height)
return 'Unclassified' return None
def extract_resolution(self) -> tuple[int, int] | None: def extract_resolution(self) -> tuple[int, int] | None:
"""Extract actual video resolution as (width, height) tuple from media info""" """Extract actual video resolution as (width, height) tuple from media info"""

View File

@@ -62,10 +62,11 @@ def test_extract_frame_class(filename):
# Print filename and extracted frame class clearly # Print filename and extracted frame class clearly
print(f"\nFilename: \033[1;36m{filename}\033[0m") print(f"\nFilename: \033[1;36m{filename}\033[0m")
print(f"Extracted frame_class: \033[1;32m{frame_class}\033[0m") print(f"Extracted frame_class: \033[1;32m{frame_class}\033[0m")
# Frame class should be a string # Frame class should be a string or None
assert isinstance(frame_class, str) assert frame_class is None or isinstance(frame_class, str)
# Should be one of the valid frame classes or 'Unclassified' # Should be one of the valid frame classes or None
valid_classes = set(FRAME_CLASSES.keys()) | {'Unclassified'} if frame_class is not None:
valid_classes = set(FRAME_CLASSES.keys())
assert frame_class in valid_classes assert frame_class in valid_classes