Add ConversionService for AVI to MKV remux with metadata preservation

- Implemented a new service to convert AVI files to MKV format while preserving metadata.
- Added methods for validating AVI files, detecting subtitle files, and mapping audio languages.
- Built ffmpeg command for fast remuxing without re-encoding.
- Included error handling and logging for conversion processes.
This commit is contained in:
sHa
2026-01-03 14:29:30 +00:00
parent 917d25b360
commit 6fee7d9f63
8 changed files with 632 additions and 26 deletions

BIN
dist/renamer-0.6.6-py3-none-any.whl vendored Normal file

Binary file not shown.

View File

@@ -1,6 +1,6 @@
[project]
name = "renamer"
version = "0.6.5"
version = "0.6.6"
description = "Terminal-based media file renamer and metadata viewer"
readme = "README.md"
requires-python = ">=3.11"

View File

@@ -12,13 +12,14 @@ import logging
import os
from .constants import MEDIA_TYPES
from .screens import OpenScreen, HelpScreen, RenameConfirmScreen, SettingsScreen
from .screens import OpenScreen, HelpScreen, RenameConfirmScreen, SettingsScreen, ConvertConfirmScreen
from .extractors.extractor import MediaExtractor
from .views import MediaPanelView, ProposedFilenameView
from .formatters.text_formatter import TextFormatter
from .formatters.catalog_formatter import CatalogFormatter
from .settings import Settings
from .cache import Cache, CacheManager
from .services.conversion_service import ConversionService
# Set up logging conditionally
@@ -68,6 +69,7 @@ class AppCommandProvider(Provider):
("scan", "Scan Directory", "Scan current directory for media files (s)"),
("refresh", "Refresh File", "Refresh metadata for selected file (f)"),
("rename", "Rename File", "Rename the selected file (r)"),
("convert", "Convert AVI to MKV", "Convert AVI file to MKV container with metadata (c)"),
("toggle_mode", "Toggle Display Mode", "Switch between technical and catalog view (m)"),
("expand", "Toggle Tree Expansion", "Expand or collapse all tree nodes (p)"),
("settings", "Settings", "Open settings screen (Ctrl+S)"),
@@ -102,6 +104,7 @@ class RenamerApp(App):
("s", "scan", "Scan"),
("f", "refresh", "Refresh"),
("r", "rename", "Rename"),
("c", "convert", "Convert AVI→MKV"),
("p", "expand", "Toggle Tree"),
("m", "toggle_mode", "Toggle Mode"),
("h", "help", "Help"),
@@ -352,6 +355,45 @@ By Category:"""
else:
self.notify("Proposed name is the same as current name; no rename needed.", severity="information", timeout=3)
async def action_convert(self):
"""Convert AVI file to MKV with metadata preservation."""
tree = self.query_one("#file_tree", Tree)
node = tree.cursor_node
if not (node and node.data and isinstance(node.data, Path) and node.data.is_file()):
self.notify("Please select a file first", severity="warning", timeout=3)
return
file_path = node.data
conversion_service = ConversionService()
# Check if file can be converted
if not conversion_service.can_convert(file_path):
self.notify("Only AVI files can be converted to MKV", severity="error", timeout=3)
return
# Create extractor for metadata
try:
extractor = MediaExtractor(file_path)
except Exception as e:
self.notify(f"Failed to read file metadata: {e}", severity="error", timeout=5)
return
# Get audio track count and map languages
audio_tracks = extractor.get('audio_tracks', 'MediaInfo') or []
if not audio_tracks:
self.notify("No audio tracks found in file", severity="error", timeout=3)
return
audio_languages = conversion_service.map_audio_languages(extractor, len(audio_tracks))
subtitle_files = conversion_service.find_subtitle_files(file_path)
mkv_path = file_path.with_suffix('.mkv')
# Show confirmation screen (conversion happens in screen's on_button_pressed)
self.push_screen(
ConvertConfirmScreen(file_path, mkv_path, audio_languages, subtitle_files, extractor)
)
async def action_expand(self):
tree = self.query_one("#file_tree", Tree)
if self.tree_expanded:
@@ -413,6 +455,63 @@ By Category:"""
else:
logging.info("Not refreshing details, cursor not on renamed file")
def add_file_to_tree(self, file_path: Path):
"""Add a new file to the tree in the correct position.
Args:
file_path: Path to the new file to add
"""
logging.info(f"add_file_to_tree called with file_path={file_path}")
tree = self.query_one("#file_tree", Tree)
parent_dir = file_path.parent
# Find the parent directory node
def find_node(node):
if node.data == parent_dir:
return node
for child in node.children:
found = find_node(child)
if found:
return found
return None
parent_node = find_node(tree.root)
if parent_node:
logging.info(f"Found parent node for {parent_dir}, adding file {file_path.name}")
# Add the new file node in alphabetically sorted position
new_node = None
inserted = False
for i, child in enumerate(parent_node.children):
if child.data and isinstance(child.data, Path):
# Compare filenames for sorting
if child.data.name > file_path.name:
# Insert before this child
new_node = parent_node.add(escape(file_path.name), data=file_path, before=i)
inserted = True
logging.info(f"Inserted file before {child.data.name}")
break
# If not inserted, add at the end
if not inserted:
new_node = parent_node.add(escape(file_path.name), data=file_path)
logging.info(f"Added file at end of directory")
# Select the new node and show its details
if new_node:
tree.select_node(new_node)
logging.info(f"Selected new node: {new_node.data}")
# Refresh the details panel for the new file
self._start_loading_animation()
threading.Thread(
target=self._extract_and_show_details, args=(file_path,)
).start()
else:
logging.warning(f"No parent node found for {parent_dir}")
def on_key(self, event):
if event.key == "right":
tree = self.query_one("#file_tree", Tree)

View File

@@ -342,4 +342,175 @@ Configure application settings.
self.app.notify("Settings saved!", severity="information", timeout=2) # type: ignore
except ValueError:
self.app.notify("Invalid TTL values. Please enter numbers only.", severity="error", timeout=3) # type: ignore
self.app.notify("Invalid TTL values. Please enter numbers only.", severity="error", timeout=3) # type: ignore
class ConvertConfirmScreen(Screen):
"""Confirmation screen for AVI to MKV conversion."""
CSS = """
#convert_content {
text-align: center;
}
Button:focus {
background: $primary;
}
#buttons {
align: center middle;
}
#conversion_details {
text-align: left;
margin: 1 2;
padding: 1 2;
border: solid $primary;
}
#warning_content {
text-align: center;
margin-bottom: 1;
margin-top: 1;
}
"""
def __init__(
self,
avi_path: Path,
mkv_path: Path,
audio_languages: list,
subtitle_files: list,
extractor
):
super().__init__()
self.avi_path = avi_path
self.mkv_path = mkv_path
self.audio_languages = audio_languages
self.subtitle_files = subtitle_files
self.extractor = extractor
def compose(self):
from .formatters.text_formatter import TextFormatter
title_text = f"{TextFormatter.bold(TextFormatter.yellow('AVI → MKV CONVERSION'))}"
# Build details
details_lines = [
f"{TextFormatter.bold('Source:')} {TextFormatter.cyan(escape(self.avi_path.name))}",
f"{TextFormatter.bold('Output:')} {TextFormatter.green(escape(self.mkv_path.name))}",
"",
f"{TextFormatter.bold('Audio Languages:')}",
]
# Add audio language mapping
for i, lang in enumerate(self.audio_languages):
if lang:
details_lines.append(f" Track {i+1}: {TextFormatter.green(lang)}")
else:
details_lines.append(f" Track {i+1}: {TextFormatter.grey('(no language)')}")
# Add subtitle info
if self.subtitle_files:
details_lines.append("")
details_lines.append(f"{TextFormatter.bold('Subtitles to include:')}")
for sub_file in self.subtitle_files:
details_lines.append(f"{TextFormatter.blue(escape(sub_file.name))}")
else:
details_lines.append("")
details_lines.append(f"{TextFormatter.grey('No subtitle files found')}")
details_text = "\n".join(details_lines)
warning_text = f"""
{TextFormatter.bold(TextFormatter.red("Fast remux - streams will be copied without re-encoding"))}
{TextFormatter.yellow("This operation may take a few seconds to minutes depending on file size")}
Do you want to proceed with conversion?
""".strip()
with Center():
with Vertical():
yield Static(title_text, id="convert_content", markup=True)
yield Static(details_text, id="conversion_details", markup=True)
yield Static(warning_text, id="warning_content", markup=True)
with Horizontal(id="buttons"):
yield Button("Convert (y)", id="convert", variant="success")
yield Button("Cancel (n)", id="cancel", variant="error")
def on_mount(self):
self.set_focus(self.query_one("#convert"))
def _handle_conversion_success(self, mkv_path, message):
"""Handle successful conversion - called on main thread."""
import logging
try:
logging.info(f"_handle_conversion_success called: {mkv_path}")
self.app.notify(f"{message}", severity="information", timeout=5) # type: ignore
logging.info(f"Adding file to tree: {mkv_path}")
self.app.add_file_to_tree(mkv_path) # type: ignore
logging.info("Conversion success handler completed")
except Exception as e:
logging.error(f"Error in _handle_conversion_success: {e}", exc_info=True)
def _handle_conversion_error(self, message):
"""Handle conversion error - called on main thread."""
import logging
try:
logging.info(f"_handle_conversion_error called: {message}")
self.app.notify(f"{message}", severity="error", timeout=10) # type: ignore
logging.info("Conversion error handler completed")
except Exception as e:
logging.error(f"Error in _handle_conversion_error: {e}", exc_info=True)
def on_button_pressed(self, event):
if event.button.id == "convert":
# Start conversion
self.app.notify("Starting conversion...", severity="information", timeout=2) # type: ignore
def do_conversion():
from .services.conversion_service import ConversionService
import threading
import logging
conversion_service = ConversionService()
logging.info(f"Starting conversion of {self.avi_path}")
success, message = conversion_service.convert_avi_to_mkv(
self.avi_path,
extractor=self.extractor
)
logging.info(f"Conversion result: success={success}, message={message}")
# Schedule UI updates on the main thread using set_timer
mkv_path = self.avi_path.with_suffix('.mkv')
if success:
logging.info(f"Conversion successful, scheduling UI update for {mkv_path}")
# Use app.set_timer to schedule callback on main thread
self.app.set_timer(
0.1, # Small delay to ensure main thread processes it
lambda: self._handle_conversion_success(mkv_path, message)
) # type: ignore
else:
logging.error(f"Conversion failed: {message}")
self.app.set_timer(
0.1,
lambda: self._handle_conversion_error(message)
) # type: ignore
# Run conversion in background thread
import threading
threading.Thread(target=do_conversion, daemon=True).start()
# Close the screen
self.app.pop_screen() # type: ignore
else:
# Cancel
self.app.pop_screen() # type: ignore
def on_key(self, event):
if event.key == "y":
# Simulate convert button press
convert_button = self.query_one("#convert")
self.on_button_pressed(type('Event', (), {'button': convert_button})())
elif event.key == "n" or event.key == "escape":
self.app.pop_screen() # type: ignore

View File

@@ -0,0 +1,317 @@
"""Conversion service for AVI to MKV remux with metadata preservation.
This service manages the process of converting AVI files to MKV container:
- Fast stream copy (no re-encoding)
- Audio language detection and mapping from filename
- Subtitle file detection and inclusion
- Metadata preservation from multiple sources
- Track order matching
"""
import logging
import subprocess
from pathlib import Path
from typing import Optional, List, Dict, Tuple
from renamer.extractors.extractor import MediaExtractor
logger = logging.getLogger(__name__)
class ConversionService:
"""Service for converting AVI files to MKV with metadata preservation.
This service handles:
- Validating AVI files for conversion
- Detecting nearby subtitle files
- Mapping audio languages from filename to tracks
- Building ffmpeg command for fast remux
- Executing conversion with progress
Example:
service = ConversionService()
# Check if file can be converted
if service.can_convert(Path("/media/movie.avi")):
success, message = service.convert_avi_to_mkv(
Path("/media/movie.avi"),
extractor=media_extractor
)
"""
# Supported subtitle extensions
SUBTITLE_EXTENSIONS = {'.srt', '.ass', '.ssa', '.sub', '.idx'}
def __init__(self):
"""Initialize the conversion service."""
logger.debug("ConversionService initialized")
def can_convert(self, file_path: Path) -> bool:
"""Check if a file can be converted (is AVI).
Args:
file_path: Path to the file to check
Returns:
True if file is AVI and can be converted
"""
if not file_path.exists() or not file_path.is_file():
return False
return file_path.suffix.lower() == '.avi'
def find_subtitle_files(self, avi_path: Path) -> List[Path]:
"""Find subtitle files near the AVI file.
Looks for subtitle files with the same basename in the same directory.
Args:
avi_path: Path to the AVI file
Returns:
List of Path objects for found subtitle files
Example:
>>> service.find_subtitle_files(Path("/media/movie.avi"))
[Path("/media/movie.srt"), Path("/media/movie.eng.srt")]
"""
subtitle_files = []
base_name = avi_path.stem # filename without extension
directory = avi_path.parent
# Look for files with same base name and subtitle extensions
for sub_ext in self.SUBTITLE_EXTENSIONS:
# Exact match: movie.srt
exact_match = directory / f"{base_name}{sub_ext}"
if exact_match.exists():
subtitle_files.append(exact_match)
# Pattern match: movie.eng.srt, movie.ukr.srt, etc.
pattern_files = list(directory.glob(f"{base_name}.*{sub_ext}"))
for sub_file in pattern_files:
if sub_file not in subtitle_files:
subtitle_files.append(sub_file)
logger.debug(f"Found {len(subtitle_files)} subtitle files for {avi_path.name}")
return subtitle_files
def map_audio_languages(
self,
extractor: MediaExtractor,
audio_track_count: int
) -> List[Optional[str]]:
"""Map audio languages from filename to track indices.
Extracts audio language list from filename and maps them to tracks
in order. If filename has fewer languages than tracks, remaining
tracks get None.
Args:
extractor: MediaExtractor with filename data
audio_track_count: Number of audio tracks in the file
Returns:
List of language codes (or None) for each audio track
Example:
>>> langs = service.map_audio_languages(extractor, 2)
>>> print(langs)
['ukr', 'eng']
"""
# Get audio_langs from filename extractor
audio_langs_str = extractor.get('audio_langs', 'Filename')
if not audio_langs_str:
logger.debug("No audio languages found in filename")
return [None] * audio_track_count
# Split by comma and clean
langs = [lang.strip().lower() for lang in audio_langs_str.split(',')]
# Map to tracks (pad with None if needed)
result = []
for i in range(audio_track_count):
if i < len(langs):
result.append(langs[i])
else:
result.append(None)
logger.debug(f"Mapped audio languages: {result}")
return result
def build_ffmpeg_command(
self,
avi_path: Path,
mkv_path: Path,
audio_languages: List[Optional[str]],
subtitle_files: List[Path]
) -> List[str]:
"""Build ffmpeg command for AVI to MKV conversion.
Creates a command that:
- Copies video and audio streams (no re-encoding)
- Sets audio language metadata
- Includes external subtitle files
- Sets MKV title from filename
Args:
avi_path: Source AVI file
mkv_path: Destination MKV file
audio_languages: Language codes for each audio track
subtitle_files: List of subtitle files to include
Returns:
List of command arguments for subprocess
"""
cmd = ['ffmpeg']
# Add flags to fix timestamp issues in AVI files
cmd.extend(['-fflags', '+genpts'])
# Input file
cmd.extend(['-i', str(avi_path)])
# Add subtitle files as inputs
for sub_file in subtitle_files:
cmd.extend(['-i', str(sub_file)])
# Map video stream
cmd.extend(['-map', '0:v:0'])
# Map all audio streams
cmd.extend(['-map', '0:a'])
# Map subtitle streams
for i in range(len(subtitle_files)):
cmd.extend(['-map', f'{i+1}:s:0'])
# Copy codecs (no re-encoding)
cmd.extend(['-c', 'copy'])
# Set audio language metadata
for i, lang in enumerate(audio_languages):
if lang:
cmd.extend([f'-metadata:s:a:{i}', f'language={lang}'])
# Set title metadata from filename
title = avi_path.stem
cmd.extend(['-metadata', f'title={title}'])
# Output file
cmd.append(str(mkv_path))
logger.debug(f"Built ffmpeg command: {' '.join(cmd)}")
return cmd
def convert_avi_to_mkv(
self,
avi_path: Path,
extractor: Optional[MediaExtractor] = None,
output_path: Optional[Path] = None,
dry_run: bool = False
) -> Tuple[bool, str]:
"""Convert AVI file to MKV with metadata preservation.
Args:
avi_path: Source AVI file path
extractor: Optional MediaExtractor (creates new if None)
output_path: Optional output path (defaults to same name with .mkv)
dry_run: If True, build command but don't execute
Returns:
Tuple of (success, message)
Example:
>>> success, msg = service.convert_avi_to_mkv(
... Path("/media/movie.avi")
... )
>>> print(msg)
"""
# Validate input
if not self.can_convert(avi_path):
error_msg = f"File is not AVI or doesn't exist: {avi_path}"
logger.error(error_msg)
return False, error_msg
# Create extractor if needed
if extractor is None:
try:
extractor = MediaExtractor(avi_path)
except Exception as e:
error_msg = f"Failed to create extractor: {e}"
logger.error(error_msg)
return False, error_msg
# Determine output path
if output_path is None:
output_path = avi_path.with_suffix('.mkv')
# Check if output already exists
if output_path.exists():
error_msg = f"Output file already exists: {output_path.name}"
logger.warning(error_msg)
return False, error_msg
# Get audio track count from MediaInfo
audio_tracks = extractor.get('audio_tracks', 'MediaInfo') or []
audio_track_count = len(audio_tracks)
if audio_track_count == 0:
error_msg = "No audio tracks found in file"
logger.error(error_msg)
return False, error_msg
# Map audio languages
audio_languages = self.map_audio_languages(extractor, audio_track_count)
# Find subtitle files
subtitle_files = self.find_subtitle_files(avi_path)
# Build ffmpeg command
cmd = self.build_ffmpeg_command(
avi_path,
output_path,
audio_languages,
subtitle_files
)
# Dry run mode
if dry_run:
cmd_str = ' '.join(cmd)
info_msg = f"Would convert: {avi_path.name}{output_path.name}\n"
info_msg += f"Audio languages: {audio_languages}\n"
info_msg += f"Subtitles: {[s.name for s in subtitle_files]}\n"
info_msg += f"Command: {cmd_str}"
logger.info(info_msg)
return True, info_msg
# Execute conversion
try:
logger.info(f"Starting conversion: {avi_path.name}{output_path.name}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
success_msg = f"Converted successfully: {avi_path.name}{output_path.name}"
logger.info(success_msg)
return True, success_msg
except subprocess.CalledProcessError as e:
error_msg = f"ffmpeg error: {e.stderr}"
logger.error(error_msg)
return False, error_msg
except FileNotFoundError:
error_msg = "ffmpeg not found. Please install ffmpeg."
logger.error(error_msg)
return False, error_msg
except Exception as e:
error_msg = f"Conversion failed: {e}"
logger.error(error_msg)
return False, error_msg

View File

@@ -17,8 +17,8 @@ class MediaPanelView:
"""Return formatted file info panel string"""
return "\n".join(
[
self.fileinfo_section(),
self.selected_section(),
self.fileinfo_section(),
self.tmdb_section(),
self.tracksinfo_section(),
self.filename_section(),
@@ -27,6 +27,25 @@ class MediaPanelView:
]
)
@conditional_decorators.wrap("", "\n")
def selected_section(self) -> str:
"""Return formatted selected data"""
return "\n".join(
[
self._props.title("Media Info Summary"),
self._props.media_title,
self._props.media_year,
self._props.media_duration,
self._props.media_file_size,
self._props.selected_frame_class,
self._props.selected_source,
self._props.selected_special_info,
self._props.selected_audio_langs,
self._props.selected_database_info,
self._props.selected_order,
]
)
@conditional_decorators.wrap("", "\n")
def fileinfo_section(self) -> str:
"""Return formatted file info"""
@@ -41,24 +60,6 @@ class MediaPanelView:
]
)
@conditional_decorators.wrap("", "\n")
def selected_section(self) -> str:
"""Return formatted selected data"""
return "\n".join(
[
self._props.title("Selected Data"),
self._props.selected_order,
self._props.selected_title,
self._props.selected_year,
self._props.selected_special_info,
self._props.selected_source,
self._props.selected_frame_class,
self._props.selected_hdr,
self._props.selected_audio_langs,
self._props.selected_database_info,
]
)
@conditional_decorators.wrap("", "\n")
def tmdb_section(self) -> str:
"""Return formatted TMDB data"""

View File

@@ -328,9 +328,18 @@ class MediaPanelProperties:
return self._extractor.get("movie_db", "Filename")
# ============================================================
# Selected Data Properties
# Joined Data Properties
# ============================================================
@property
@text_decorators.blue()
@conditional_decorators.wrap("Duration: ")
@text_decorators.yellow()
@duration_decorators.duration_full()
def media_duration(self) -> str:
"""Get media duration from best available source."""
return self._extractor.get("duration")
@property
@text_decorators.blue()
@conditional_decorators.wrap("Order: ")
@@ -345,7 +354,7 @@ class MediaPanelProperties:
@conditional_decorators.wrap("Title: ")
@text_decorators.yellow()
@conditional_decorators.default("<None>")
def selected_title(self) -> str:
def media_title(self) -> str:
"""Get selected title formatted with label."""
return self._extractor.get("title")
@@ -354,10 +363,19 @@ class MediaPanelProperties:
@conditional_decorators.wrap("Year: ")
@text_decorators.yellow()
@conditional_decorators.default("<None>")
def selected_year(self) -> str:
def media_year(self) -> str:
"""Get selected year formatted with label."""
return self._extractor.get("year")
@property
@text_decorators.blue()
@conditional_decorators.wrap("File size: ")
@text_decorators.green()
@size_decorators.size_full()
def media_file_size(self) -> str:
"""Get media file size formatted with label."""
return self._extractor.get("file_size")
@property
@text_decorators.blue()
@conditional_decorators.wrap("Special info: ")

2
uv.lock generated
View File

@@ -462,7 +462,7 @@ wheels = [
[[package]]
name = "renamer"
version = "0.6.5"
version = "0.6.6"
source = { editable = "." }
dependencies = [
{ name = "langcodes" },