import sys import os import re import argparse import errno import time import subprocess import signal from PIL import Image from multiprocessing import Pool from multiprocessing import Value from PIL import Image, ImageDraw, ImageFont import io import struct class State(object): def __init__(self): self.counter = Value('i', 0) self.start_ticks = Value('d', time.process_time()) def increment(self, n=1): with self.counter.get_lock(): self.counter.value += n @property def value(self): return self.counter.value @property def start(self): return self.start_ticks.value def init(args): global state state = args # Ignore SIGINT in worker processes - let main process handle KeyboardInterrupt signal.signal(signal.SIGINT, signal.SIG_IGN) def main(): args = parse_args() state = State() files = find_files(args.directory) try: with Pool(processes=4, initializer=init, initargs=(state, )) as pool: pool.map(process_file, files) except KeyboardInterrupt: print("\nInterrupted by user. Cleaning up...") return 1 print("{0} files processed in total.".format(state.value)) return 0 def parse_args(): parser = argparse.ArgumentParser( description="Create thumbnails for Synology Photo Station.") parser.add_argument("--directory", required=True, help="Directory to generate thumbnails for. " "Subdirectories will always be processed.") return parser.parse_args() def _ffmpeg_enabled(): try: val = os.environ.get('THUMBGEN_ENABLE_FFMPEG', '1').strip().lower() return val in ('1', 'true', 'yes', 'on') except Exception: return False def find_files(dir): # Only process formats that Synology doesn't handle well # Exclude common images (jpg, png, gif, etc.) since NAS handles them fine valid_exts = ('mp4', 'avi', 'mkv', 'mov', 'wmv', 'flv', 'm4v', 'ts', 'psd', 'blend') valid_exts_re = "|".join( map((lambda ext: ".*\\.{0}$".format(ext)), valid_exts)) for root, dirs, files in os.walk(dir): for name in files: if re.match(valid_exts_re, name, re.IGNORECASE) \ and not name.startswith('SYNOPHOTO_THUMB') \ and not name.startswith('SYNOVIDEO_VIDEO_SCREENSHOT'): yield os.path.join(root, name) def print_progress(): global state state.increment(1) processed = state.value if processed % 10 == 0: elapsed = float(time.process_time() - state.start) rate = float(processed) / elapsed if elapsed > 0 else float(processed) print("{0} files processed so far, averaging {1:.2f} files per second." .format(processed, rate)) def process_file(file_path): print(file_path) (dir, filename) = os.path.split(file_path) thumb_dir = os.path.join(dir, 'eaDir_tmp', filename) ensure_directory_exists(thumb_dir) # Determine file type and process accordingly file_ext = os.path.splitext(filename)[1].lower() if file_ext in ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.m4v', '.ts']: create_video_thumbnails(file_path, thumb_dir) elif file_ext in ['.psd']: create_psd_thumbnails(file_path, thumb_dir) elif file_ext in ['.blend']: create_blend_thumbnails(file_path, thumb_dir) else: # Standard image processing create_thumbnails(file_path, thumb_dir) print_progress() def ensure_directory_exists(path): try: os.makedirs(path) except OSError as exception: if exception.errno != errno.EEXIST: raise def create_video_thumbnails(source_path, dest_dir): """Generate video thumbnails: Windows Shell (Icaros-backed) first, FFmpeg fallback.""" # Try Windows thumbnail extraction first (leverages Icaros provider when present) windows_thumb = extract_windows_thumbnail(source_path) if windows_thumb: print(f" -> SUCCESS: Using Windows/Icaros provider") generate_synology_thumbnails(windows_thumb, dest_dir, include_video_screenshot=True) return # Optionally fall back to FFmpeg (disabled by default) if _ffmpeg_enabled(): print(f"Windows/Icaros extraction failed for {source_path}, using FFmpeg...") create_video_thumbnails_ffmpeg(source_path, dest_dir) else: print(f"Windows/Icaros extraction failed for {source_path}, FFmpeg disabled (THUMBGEN_ENABLE_FFMPEG=0). Skipping.") def create_video_thumbnails_ffmpeg(source_path, dest_dir): """Generate video thumbnails using FFmpeg (fallback method)""" to_generate = (('SYNOVIDEO_VIDEO_SCREENSHOT.jpg', 1280), ('SYNOPHOTO_THUMB_XL.jpg', 1280), ('SYNOPHOTO_THUMB_B.jpg', 640), ('SYNOPHOTO_THUMB_M.jpg', 320), ('SYNOPHOTO_THUMB_PREVIEW.jpg', 160), ('SYNOPHOTO_THUMB_S.jpg', 120)) for thumb in to_generate: thumb_path = os.path.join(dest_dir, thumb[0]) if os.path.exists(thumb_path): continue try: # Use FFmpeg to extract a frame from the video at 10% duration cmd = [ 'ffmpeg', '-nostdin', '-i', source_path, '-ss', '00:00:05', # Seek to 5 seconds '-vframes', '1', '-vf', f'scale={thumb[1]}:{thumb[1]}:force_original_aspect_ratio=decrease', '-y', # Overwrite output file thumb_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Warning: FFmpeg failed for {source_path}: {result.stderr}") continue except FileNotFoundError: print("Warning: FFmpeg not found. Video thumbnails will be skipped.") print("Install FFmpeg: https://ffmpeg.org/download.html") break except Exception as e: print(f"Error processing video {source_path}: {e}") def create_psd_thumbnails(source_path, dest_dir): """Generate PSD thumbnails using Windows/Icaros provider first, then psd-tools fallback""" # Try Windows thumbnail extraction first (uses Icaros or built-in PSD support) windows_thumb = extract_windows_thumbnail(source_path) if windows_thumb: generate_synology_thumbnails(windows_thumb, dest_dir) return # Fallback to psd-tools if Windows extraction fails print(f"Windows thumbnail extraction failed for {source_path}, using psd-tools...") create_psd_thumbnails_direct(source_path, dest_dir) def create_psd_thumbnails_direct(source_path, dest_dir): """Generate PSD thumbnails using PIL with PSD support (fallback method)""" try: # Try to open PSD file - requires psd-tools from psd_tools import PSDImage # type: ignore[reportMissingImports] psd = PSDImage.open(source_path) pil_image = psd.composite() to_generate = (('SYNOPHOTO_THUMB_XL.jpg', 1280), ('SYNOPHOTO_THUMB_B.jpg', 640), ('SYNOPHOTO_THUMB_M.jpg', 320), ('SYNOPHOTO_THUMB_PREVIEW.jpg', 160), ('SYNOPHOTO_THUMB_S.jpg', 120)) for thumb in to_generate: thumb_path = os.path.join(dest_dir, thumb[0]) if os.path.exists(thumb_path): continue pil_image.thumbnail((thumb[1], thumb[1]), Image.LANCZOS) # Convert to RGB if needed for JPEG if pil_image.mode == 'RGBA': pil_image = pil_image.convert('RGB') pil_image.save(thumb_path, 'JPEG') except ImportError: print("Warning: psd-tools not installed. Install with: pip install psd-tools") except Exception as e: print(f"Error processing PSD {source_path}: {e}") def generate_synology_thumbnails(image, dest_dir, include_video_screenshot=False): """Generate all required Synology thumbnail sizes from a PIL image""" to_generate = [ ('SYNOPHOTO_THUMB_XL.jpg', 1280), ('SYNOPHOTO_THUMB_B.jpg', 640), ('SYNOPHOTO_THUMB_M.jpg', 320), ('SYNOPHOTO_THUMB_PREVIEW.jpg', 160), ('SYNOPHOTO_THUMB_S.jpg', 120) ] # Add video screenshot for video files if include_video_screenshot: to_generate.insert(0, ('SYNOVIDEO_VIDEO_SCREENSHOT.jpg', 1280)) for thumb_name, thumb_size in to_generate: thumb_path = os.path.join(dest_dir, thumb_name) if os.path.exists(thumb_path): continue # Create a copy for thumbnailing img_copy = image.copy() img_copy.thumbnail((thumb_size, thumb_size), Image.LANCZOS) # Convert to RGB if needed for JPEG if img_copy.mode == 'RGBA': img_copy = img_copy.convert('RGB') img_copy.save(thumb_path, 'JPEG', quality=85) def extract_blend_thumbnail(blend_path): """Extract thumbnail from .blend file using Windows thumbnail provider or direct extraction.""" # Option 1: Try using Windows thumbnail extraction first (fastest) try: temp_thumb = extract_windows_thumbnail(blend_path) if temp_thumb: return temp_thumb except Exception: pass # Option 2: Direct .blend file parsing (more reliable) try: return extract_blend_thumbnail_direct(blend_path) except Exception as e: print(f"Failed to extract .blend thumbnail: {e}") return None def extract_windows_thumbnail(file_path): """Extract thumbnail using Windows providers (centralized cache first, then Thumbs.db, then Icaros cache)""" filename = os.path.basename(file_path) print(f" -> Trying Windows thumbnail extraction for {filename}") # Tier 1: Try Windows centralized thumbnail cache first try: thumb = extract_from_thumbcache(file_path) if thumb: print(f" -> Found thumbnail in Windows thumbcache for {filename}") return thumb else: print(f" -> No thumbnail in Windows thumbcache for {filename}") except Exception as e: print(f" -> Windows thumbcache extraction failed: {e}") # Tier 2: Try Thumbs.db extraction try: directory = os.path.dirname(file_path) thumbs_db_path = os.path.join(directory, 'Thumbs.db') if os.path.exists(thumbs_db_path): print(f" -> Found Thumbs.db, checking for {filename}") thumb = extract_from_thumbs_db(thumbs_db_path, filename) if thumb: print(f" -> Found thumbnail in Thumbs.db for {filename}") return thumb else: print(f" -> No thumbnail in Thumbs.db for {filename}") else: print(f" -> No Thumbs.db found in directory") except Exception as e: print(f" -> Thumbs.db extraction failed: {e}") # Tier 3: Try Icaros cache extraction try: print(f" -> Trying Icaros cache extraction...") icaros_thumb = extract_icaros_thumbnail(file_path) if icaros_thumb: print(f" -> Found thumbnail in Icaros cache for {filename}") return icaros_thumb else: print(f" -> No thumbnail in Icaros cache for {filename}") except Exception as e: print(f" -> Icaros cache extraction failed: {e}") print(f" -> Windows thumbnail extraction failed for {filename}") return None def extract_from_thumbcache(file_path): """Extract thumbnail using Windows Shell API (proper method)""" try: print(f" -> Requesting thumbnail from Windows Shell API...") print(f" -> Target file: {file_path}") # Method 1: Try using IShellItemImageFactory (proper thumbnail API) try: import subprocess import tempfile import os print(f" -> Setting up temporary directory...") temp_dir = tempfile.mkdtemp() temp_thumbnail = os.path.join(temp_dir, "thumbnail.png") print(f" -> Temp thumbnail path: {temp_thumbnail}") # PowerShell script using IShellItemImageFactory for proper thumbnails powershell_script = f''' Add-Type -AssemblyName System.Drawing try {{ # Load Shell32 COM object for thumbnail extraction $filePath = "{file_path.replace(chr(92), chr(92) + chr(92))}" Write-Output "DEBUG: Starting thumbnail extraction for: $filePath" if (-not (Test-Path $filePath)) {{ Write-Output "FAILED: File not found" exit }} Write-Output "DEBUG: File exists, attempting to load thumbnail API..." # Use .NET Framework's thumbnail extraction Add-Type -TypeDefinition @" using System; using System.Drawing; using System.Drawing.Imaging; using System.Runtime.InteropServices; using System.Runtime.InteropServices.ComTypes; [ComImport, Guid("bcc18b79-ba16-442f-80c4-8a59c30c463b")] [InterfaceType(ComInterfaceType.InterfaceIsIUnknown)] public interface IShellItemImageFactory {{ [PreserveSig] int GetImage([In, MarshalAs(UnmanagedType.Struct)] SIZE size, [In] SIIGBF flags, [Out] out IntPtr phbmp); }} [StructLayout(LayoutKind.Sequential)] public struct SIZE {{ public int cx; public int cy; public SIZE(int cx, int cy) {{ this.cx = cx; this.cy = cy; }} }} public enum SIIGBF {{ SIIGBF_RESIZETOFIT = 0x00000000, SIIGBF_BIGGERSIZEOK = 0x00000001, SIIGBF_MEMORYONLY = 0x00000002, SIIGBF_ICONONLY = 0x00000004, SIIGBF_THUMBNAILONLY = 0x00000008, SIIGBF_INCACHEONLY = 0x00000010, }} public static class ThumbnailExtractor {{ [DllImport("shell32.dll", CharSet = CharSet.Unicode, PreserveSig = false)] public static extern void SHCreateItemFromParsingName( [In][MarshalAs(UnmanagedType.LPWStr)] string pszPath, [In] IntPtr pbc, [In][MarshalAs(UnmanagedType.LPStruct)] Guid riid, [Out][MarshalAs(UnmanagedType.Interface)] out IShellItemImageFactory ppv); [DllImport("gdi32.dll")] public static extern bool DeleteObject(IntPtr hObject); public static Bitmap GetThumbnail(string path, int size) {{ try {{ Console.WriteLine("DEBUG: Creating IShellItemImageFactory..."); Guid iid = new Guid("bcc18b79-ba16-442f-80c4-8a59c30c463b"); IShellItemImageFactory factory; SHCreateItemFromParsingName(path, IntPtr.Zero, iid, out factory); Console.WriteLine("DEBUG: Calling GetImage with THUMBNAILONLY..."); IntPtr hbitmap; SIZE sz = new SIZE(size, size); // Try thumbnail first, then allow icon fallback int hr = factory.GetImage(sz, SIIGBF.SIIGBF_THUMBNAILONLY | SIIGBF.SIIGBF_BIGGERSIZEOK, out hbitmap); Console.WriteLine("DEBUG: THUMBNAILONLY result: " + hr.ToString("X")); if (hr != 0) {{ Console.WriteLine("DEBUG: Trying with RESIZETOFIT fallback..."); // If thumbnail fails, try with icon fallback hr = factory.GetImage(sz, SIIGBF.SIIGBF_RESIZETOFIT | SIIGBF.SIIGBF_BIGGERSIZEOK, out hbitmap); Console.WriteLine("DEBUG: RESIZETOFIT result: " + hr.ToString("X")); }} if (hr == 0 && hbitmap != IntPtr.Zero) {{ Console.WriteLine("DEBUG: Successfully got bitmap, converting..."); Bitmap bitmap = Image.FromHbitmap(hbitmap); DeleteObject(hbitmap); return bitmap; }} Console.WriteLine("DEBUG: No bitmap obtained"); return null; }} catch (Exception ex) {{ Console.WriteLine("DEBUG: Exception in GetThumbnail: " + ex.Message); throw new Exception("Thumbnail extraction failed: " + ex.Message); }} }} }} "@ -ReferencedAssemblies System.Drawing Write-Output "DEBUG: Type definitions loaded, calling GetThumbnail..." # Try to extract thumbnail $thumbnail = [ThumbnailExtractor]::GetThumbnail($filePath, 256) if ($thumbnail -ne $null) {{ Write-Output "DEBUG: Thumbnail extracted, saving to file..." # Check if this is actually a thumbnail or just an icon # Thumbnails are usually more varied in color than generic icons $thumbnail.Save("{temp_thumbnail.replace(chr(92), chr(92) + chr(92))}", [System.Drawing.Imaging.ImageFormat]::Png) $thumbnail.Dispose() Write-Output "SUCCESS: Thumbnail extracted" }} else {{ Write-Output "FAILED: No thumbnail available" }} }} catch {{ Write-Output "FAILED: $($_.Exception.Message)" Write-Output "DEBUG: Exception details: $($_.Exception)" }} ''' print(f" -> Executing PowerShell script...") # Execute PowerShell script # Force Windows PowerShell 5.1 (powershell.exe) instead of pwsh ps_exe = "powershell.exe" if os.name == 'nt' else "powershell" result = subprocess.run([ps_exe, "-NoProfile", "-Command", powershell_script], capture_output=True, text=True, timeout=30) print(f" -> PowerShell return code: {result.returncode}") print(f" -> PowerShell stdout: {result.stdout}") if result.stderr: print(f" -> PowerShell stderr: {result.stderr}") # Check if thumbnail was created if os.path.exists(temp_thumbnail): print(f" -> Thumbnail file exists, size: {os.path.getsize(temp_thumbnail)} bytes") if "SUCCESS" in result.stdout: img = Image.open(temp_thumbnail) print(f" -> Successfully extracted thumbnail via IShellItemImageFactory ({img.width}x{img.height})") # Clean up try: os.remove(temp_thumbnail) os.rmdir(temp_dir) except: pass return img else: print(f" -> Thumbnail file exists but script reported failure") else: print(f" -> No thumbnail file was created") print(f" -> IShellItemImageFactory extraction failed: {result.stdout.strip()}") if result.stderr: print(f" -> Error details: {result.stderr.strip()}") # Clean up try: if os.path.exists(temp_thumbnail): os.remove(temp_thumbnail) os.rmdir(temp_dir) except: pass except Exception as e: print(f" -> IShellItemImageFactory method failed: {e}") import traceback print(f" -> Traceback: {traceback.format_exc()}") # Method 2: Try direct thumbcache lookup as fallback try: print(f" -> Trying direct thumbcache database lookup...") # Windows centralized cache location thumbcache_dir = os.path.expanduser(r"~\AppData\Local\Microsoft\Windows\Explorer") if os.path.exists(thumbcache_dir): # Look for thumbcache database files (prioritize higher resolutions) cache_files = [] for cache_file in ['thumbcache_2560.db', 'thumbcache_1024.db', 'thumbcache_256.db', 'thumbcache_96.db']: cache_path = os.path.join(thumbcache_dir, cache_file) if os.path.exists(cache_path): cache_files.append(cache_path) if cache_files: print(f" -> Found {len(cache_files)} thumbcache databases, using fallback brute-force method") print(f" -> WARNING: This may return incorrect thumbnails due to hash-based indexing") # Try to extract from the largest cache file for cache_path in cache_files[:1]: # Only try the first (largest) one try: thumb = extract_from_thumbcache_db(cache_path, file_path) if thumb: cache_name = os.path.basename(cache_path) print(f" -> Found thumbnail in {cache_name} (may be incorrect file)") return thumb except Exception as e: print(f" -> Failed to read {os.path.basename(cache_path)}: {e}") continue except Exception as e: print(f" -> Thumbcache fallback failed: {e}") return None except Exception as e: print(f" -> Shell API extraction error: {e}") import traceback print(f" -> Traceback: {traceback.format_exc()}") return None def extract_from_thumbs_db(thumbs_db_path, target_filename): """Extract specific file thumbnail from Thumbs.db""" try: import olefile # type: ignore[reportMissingModuleSource] if not olefile.isOleFile(thumbs_db_path): return None ole = olefile.OleFileIO(thumbs_db_path) # Get list of streams all_streams = ole.listdir() # Thumbs.db stores thumbnails with their filename as the stream name # Try different variations of the filename stream_names = [] base_name = os.path.splitext(target_filename)[0] # Common patterns for thumbnail stream names in Thumbs.db stream_names.extend([ target_filename, target_filename.lower(), target_filename.upper(), base_name, base_name.lower(), base_name.upper(), ]) # First try filename-based matching (older Thumbs.db format) for stream_name in ole.listdir(): stream_path = '/'.join(stream_name) if isinstance(stream_name, list) else stream_name # Check if this stream corresponds to our target file for candidate in stream_names: if candidate in stream_path or stream_path.endswith(candidate): img = extract_thumbnail_from_stream(ole, stream_name) if img: ole.close() return img # If filename matching failed, try hash-based extraction (newer format) for stream_name in ole.listdir(): stream_path = '/'.join(stream_name) if isinstance(stream_name, list) else stream_name # Look for thumbnail streams (usually start with size like "256_") if stream_path.startswith(('256_', '96_', '32_', 'Thumbnail')): img = extract_thumbnail_from_stream(ole, stream_name) if img: ole.close() return img ole.close() return None except ImportError: print("Warning: olefile not installed. Install with: pip install olefile") return None except Exception as e: print(f"Debug: Thumbs.db extraction error: {e}") return None def extract_thumbnail_from_stream(ole, stream_name): """Helper function to extract thumbnail from a Thumbs.db stream""" try: # Use the original list format as returned by ole.listdir() with ole.openstream(stream_name) as stream: # Read the full stream data to analyze the header structure stream.seek(0) full_data = stream.read() # Try different header offsets for Thumbs.db format # Common offsets: 12, 16, 20, 24, 32 for offset in [12, 16, 20, 24, 32]: try: thumbnail_data = full_data[offset:] if len(thumbnail_data) > 0: img = Image.open(io.BytesIO(thumbnail_data)) return img except Exception: continue # If standard offsets fail, try to find JPEG/PNG signatures # Look for JPEG signature (FF D8 FF) jpeg_start = full_data.find(b'\xff\xd8\xff') if jpeg_start != -1: try: img = Image.open(io.BytesIO(full_data[jpeg_start:])) return img except Exception: pass # Look for PNG signature (89 50 4E 47) png_start = full_data.find(b'\x89PNG') if png_start != -1: try: img = Image.open(io.BytesIO(full_data[png_start:])) return img except Exception: pass return None except Exception: return None def extract_icaros_thumbnail(file_path): """Extract thumbnail from Icaros cache (.icdb database files)""" try: import sqlite3 import hashlib # Locate Icaros cache directory icaros_cache_dir = _get_icaros_cache_dir() if not icaros_cache_dir: print(f" -> No Icaros cache directory found") return None # Debug: Show cache directory contents (all files) try: cache_files = os.listdir(icaros_cache_dir) print(f" -> Icaros cache has {len(cache_files)} files") if len(cache_files) > 0: print(f" -> All cache files: {cache_files}") except Exception: pass # Discover .icdb databases by size preference icdb_paths = _list_icdb_databases(icaros_cache_dir) if not icdb_paths: print(f" -> No .icdb database files found") return None # Lookup a precise index for this file from Icaros_idx.icdb (SQLite or binary) mapped_index, preferred_db = _lookup_icaros_index(icaros_cache_dir, file_path) if mapped_index is None: print(f" -> No exact Icaros index entry for this file; skipping Icaros") return None # Try preferred DB first if provided ordered_dbs = icdb_paths if preferred_db is not None and preferred_db in icdb_paths: ordered_dbs = [preferred_db] + [p for p in icdb_paths if p != preferred_db] for icdb_path in ordered_dbs: img = extract_from_icdb_database(icdb_path, file_path, forced_index=mapped_index) if img: print(f" -> Found thumbnail in {os.path.basename(icdb_path)} via mapped index") return img print(f" -> Mapped index did not resolve a thumbnail in any DB") return None except ImportError: print(f" -> Required modules not available for .icdb extraction") return None except Exception as e: print(f" -> Icaros cache extraction error: {e}") return None # --- Icaros helpers: cache dir, index mapping, and database discovery --- def _get_icaros_cache_dir(): """Return the Icaros cache directory if found, else None.""" candidates = [ r"C:\\Program Files\\Icaros\\IcarosCache", r"C:\\Program Files (x86)\\Icaros\\IcarosCache", os.path.expanduser(r"~\\AppData\\Local\\Icaros\\IcarosCache"), os.path.expanduser(r"~\\AppData\\Roaming\\Icaros\\IcarosCache"), ] for path in candidates: if os.path.exists(path): return path return None _ICAROS_INDEX_CACHE = None def _normalize_windows_path(path): try: return os.path.normcase(os.path.abspath(path)) except Exception: return path.lower() def _build_icaros_index_map(cache_dir): """Attempt to build a file->position map from Icaros_idx.icdb (if SQLite). Returns dict[path_lower] -> { 'index': int, 'db': optional full path to icdb } """ global _ICAROS_INDEX_CACHE if _ICAROS_INDEX_CACHE is not None: return _ICAROS_INDEX_CACHE idx_path = os.path.join(cache_dir, 'Icaros_idx.icdb') if not os.path.exists(idx_path): _ICAROS_INDEX_CACHE = {} return _ICAROS_INDEX_CACHE try: import sqlite3 conn = sqlite3.connect(idx_path) cur = conn.cursor() # Discover tables cur.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [r[0] for r in cur.fetchall()] mapping = {} def pick_cols(table): cur.execute(f"PRAGMA table_info('{table}')") cols = cur.fetchall() # cols: cid, name, type, notnull, dflt_value, pk path_cols = [c[1] for c in cols if isinstance(c[1], str) and c[1].lower() in ( 'path', 'filepath', 'file_path', 'fullpath', 'filename', 'name')] index_cols = [c[1] for c in cols if isinstance(c[1], str) and any(k in c[1].lower() for k in ( 'index', 'position', 'pos', 'idx', 'imageindex', 'thumbindex'))] db_cols = [c[1] for c in cols if isinstance(c[1], str) and any(k in c[1].lower() for k in ( 'db', 'database', 'cache', 'size'))] return path_cols, index_cols, db_cols for t in tables: try: path_cols, index_cols, db_cols = pick_cols(t) if not path_cols or not index_cols: continue # Use the first reasonable candidates pcol = path_cols[0] icol = index_cols[0] dcol = db_cols[0] if db_cols else None q = f"SELECT {pcol}, {icol}" + (f", {dcol}" if dcol else "") + f" FROM '{t}'" for row in cur.execute(q): try: p = _normalize_windows_path(row[0]) idx_val = int(row[1]) if row[1] is not None else None db_hint = None if dcol and len(row) >= 3 and row[2]: # Some schemas might store db size or name; try to resolve to a file db_hint = _resolve_icdb_hint(cache_dir, str(row[2])) if idx_val is not None: mapping[p] = {'index': idx_val, 'db': db_hint} except Exception: continue except Exception: continue conn.close() _ICAROS_INDEX_CACHE = mapping if mapping: print(f" -> Loaded Icaros index map for {len(mapping)} files") else: print(f" -> Icaros index database present but no usable mapping found") return _ICAROS_INDEX_CACHE except Exception as e: print(f" -> Failed to read Icaros index: {e}") _ICAROS_INDEX_CACHE = {} return _ICAROS_INDEX_CACHE def _lookup_icaros_index(cache_dir, file_path): """Return (index, preferred_db_path) for file_path by inspecting Icaros_idx.icdb. Tries multiple strategies: 1) Direct SQLite mapping (exact normalized full path match) 2) If SQLite not available, try binary scan for embedded UTF-16/UTF-8 path followed by an int 3) If still not found, try UNC/drive-letter normalization variants (R:\ -> \\Server\Share) Returns (None, None) if not found. """ # Try SQLite-based mapping first index_map = _build_icaros_index_map(cache_dir) key = _normalize_windows_path(file_path) if key in index_map: entry = index_map[key] return entry.get('index'), entry.get('db') # Try alternate path forms (UNC vs drive letter) alt_keys = _generate_alternate_windows_paths(key) for ak in alt_keys: if ak in index_map: entry = index_map[ak] return entry.get('index'), entry.get('db') # Binary fallback: attempt to find path occurrence in Icaros_idx.icdb and read nearby int idx_path = os.path.join(cache_dir, 'Icaros_idx.icdb') try: with open(idx_path, 'rb') as f: data = f.read() # Try UTF-16LE needle_utf16 = key.encode('utf-16le') pos = data.find(needle_utf16) if pos == -1: # Try UTF-8 needle_utf8 = key.encode('utf-8') pos = data.find(needle_utf8) if pos != -1: # Scan forward a small window for a plausible 32-bit little-endian index win = data[pos:pos + 256] for off in range(0, min(256 - 4, len(win) - 4), 4): try: idx_candidate = struct.unpack(' UNC) to match Icaros records.""" variants = set() p = norm_path variants.add(p) try: # If path is drive form like R:\folder, try to map to UNC if the drive is a mapped network drive if len(p) >= 3 and p[1:3] == ':\\': drive = p[0:2] # Environment-based hint (not perfect): if a share root env exists unc_root = os.environ.get('ICAROS_UNC_ROOT') # e.g. \\Hydra\Hydra if unc_root: tail = p[2:].lstrip('\\') variants.add(os.path.normcase(os.path.join(unc_root, tail))) except Exception: pass return list(variants) def _compute_index_from_roots(roots, file_path): """Approximate Icaros index by enumerating all supported files under provided roots and sorting case-insensitively, then taking the ordinal of file_path within that list. Returns None if file not found. """ try: supported_exts = {'.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.m4v', '.ts', '.webm'} all_files = [] norm_roots = [] for root in roots: if not root: continue nr = os.path.normcase(os.path.abspath(root)) norm_roots.append(nr) if os.path.exists(nr): for r, dnames, fnames in os.walk(nr): for fn in fnames: ext = os.path.splitext(fn)[1].lower() if ext in supported_exts: all_files.append(os.path.join(r, fn)) if not all_files: return None all_files.sort(key=lambda p: os.path.normcase(p)) key = os.path.normcase(os.path.abspath(file_path)) try: return all_files.index(key) except ValueError: return None except Exception: return None def _resolve_icdb_hint(cache_dir, hint): """Try to resolve a DB hint (e.g., size or name) to a concrete .icdb file path.""" try: paths = _list_icdb_databases(cache_dir) # If hint is a number (like 2560), pick that size DB try: size = int(str(hint).strip()) for p in paths: base = os.path.basename(p) if f"_{size}.icdb" in base: return p except Exception: pass # If hint is a substring of a file name for p in paths: if str(hint).lower() in os.path.basename(p).lower(): return p return None except Exception: return None def _list_icdb_databases(cache_dir): """Return .icdb database paths ordered by preferred resolution (largest first).""" files = [f for f in os.listdir(cache_dir) if f.endswith('.icdb') and f.lower() != 'icaros_idx.icdb'] def get_size_from_filename(filename): try: if '_' in filename and filename.endswith('.icdb'): return int(filename.split('_')[1].split('.')[0]) return 0 except (ValueError, IndexError): return 0 files.sort(key=get_size_from_filename, reverse=True) return [os.path.join(cache_dir, f) for f in files] def extract_from_icdb_database(icdb_path, file_path, forced_index=None): """Extract thumbnail from Icaros .icdb binary cache file. If forced_index is provided, use that exact JPEG ordinal if present; otherwise fall back to heuristic alphabetical mapping (best-effort). """ try: import struct import glob import re with open(icdb_path, 'rb') as f: # Read the entire file data = f.read() # Verify ICDB signature if not data.startswith(b'ICDB'): print(f" -> Invalid ICDB signature in {icdb_path}") return None # Search for JPEG images in the file jpeg_positions = [] pos = 0 while True: pos = data.find(b'\xff\xd8\xff', pos) if pos == -1: break jpeg_positions.append(pos) pos += 1 if not jpeg_positions: print(f" -> No JPEG images found in {icdb_path}") return None print(f" -> Found {len(jpeg_positions)} JPEG images in {icdb_path}") # If we have a mapped index, try it directly first if isinstance(forced_index, int) and len(jpeg_positions) > 0: pos_index = forced_index if pos_index < 0: pos_index = 0 if pos_index >= len(jpeg_positions): pos_index = pos_index % len(jpeg_positions) jpeg_start = jpeg_positions[pos_index] jpeg_end = data.find(b'\xff\xd9', jpeg_start) if jpeg_end != -1: jpeg_end += 2 jpeg_data = data[jpeg_start:jpeg_end] try: img = Image.open(io.BytesIO(jpeg_data)) print(f" -> Used mapped index {pos_index} from {os.path.basename(icdb_path)}") return img.copy() except Exception: # If mapped position invalid, continue to heuristic print(f" -> Mapped index {pos_index} invalid; falling back to heuristic") # Heuristic alphabetical mapping (fallback) def get_alphabetical_position(target_file_path): """Get cache position based on alphabetical sorting of monitored files""" # Get Icaros supported extensions icaros_extensions = {'.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.m4v', '.ts', '.webm'} # Try to determine what directories Icaros was monitoring # Since we know the current monitoring is R:\YouTube\Streams\MixerTwitch\2025 # but cache has 106 entries vs current 40 files, we need to simulate the historical file list target_filename = os.path.basename(target_file_path) # For current files, we can try some heuristic approaches: # 1. Check if it's in our known current file list # 2. Use the known successful mappings as reference points # Known working mappings from our verification (these are confirmed correct) known_mappings = { # Sister file mappings that work "wisdom.ts.mp4": 33, # Actually maps to wisdom_vtubrec.webm "progress.ts.mp4": 31, # Actually maps to memorial_vtubrec.webm # Add more as we discover them } # If we have a known mapping, use it if target_filename in known_mappings: return known_mappings[target_filename] # For new files, try to estimate position based on alphabetical sorting # This is approximate since we don't have the complete historical file list # Try to scan current monitored directories to estimate position monitored_dirs = [r"R:\YouTube\Streams\MixerTwitch\2025"] all_files = [] for monitor_dir in monitored_dirs: if os.path.exists(monitor_dir): for root, dirs, files in os.walk(monitor_dir): for file in files: file_ext = os.path.splitext(file)[1].lower() if file_ext in icaros_extensions: rel_path = os.path.relpath(os.path.join(root, file), monitor_dir) all_files.append(rel_path) # Sort alphabetically (case-insensitive) all_files.sort(key=lambda x: x.lower()) # Find target file in sorted list target_rel_path = os.path.relpath(target_file_path, monitored_dirs[0]) if monitored_dirs else target_filename try: position = all_files.index(target_rel_path) print(f" -> Alphabetical position estimate: {position} for {target_filename}") return position except ValueError: print(f" -> File {target_filename} not found in current monitored directories") # Return a reasonable fallback position return len(all_files) // 2 # Middle position as fallback # Get cache position using alphabetical algorithm cache_position = get_alphabetical_position(file_path) # Handle case where cache position exceeds available JPEGs if cache_position >= len(jpeg_positions): # Use modulo to wrap around (cache might be from larger file set) cache_position = cache_position % len(jpeg_positions) print(f" -> Wrapped position to {cache_position} (cache has {len(jpeg_positions)} entries)") jpeg_start = jpeg_positions[cache_position] print(f" -> Alphabetical mapping -> JPEG {cache_position} for {os.path.basename(file_path)}") # Find the end of the JPEG (FF D9 marker) jpeg_end = data.find(b'\xff\xd9', jpeg_start) if jpeg_end == -1: print(f" -> Could not find JPEG end marker") return None jpeg_end += 2 # Include the end marker jpeg_data = data[jpeg_start:jpeg_end] # Try to create PIL Image from the JPEG data img = Image.open(io.BytesIO(jpeg_data)) print(f" -> Successfully extracted {img.size[0]}x{img.size[1]} JPEG from {icdb_path}") return img.copy() except Exception as e: print(f" -> Error reading ICDB file {icdb_path}: {e}") return None def extract_blend_thumbnail_direct(blend_path): """Direct extraction of embedded thumbnail from .blend file.""" import struct with open(blend_path, 'rb') as f: # Read header header = f.read(12) if not header.startswith(b'BLENDER'): return None # Determine architecture and endianness pointer_size = 8 if header[7:8] == b'-' else 4 endian = '<' if header[8:9] == b'v' else '>' # Find the largest embedded preview image best_preview = None best_size = 0 while True: # Read block header block_header = f.read(16 + pointer_size) if len(block_header) < 16 + pointer_size: break code = block_header[:4] size = struct.unpack(endian + 'I', block_header[4:8])[0] if code == b'PREV': # Read preview block data preview_data = f.read(size) if len(preview_data) < size: break # Look for PNG signature png_start = preview_data.find(b'\x89PNG\r\n\x1a\n') if png_start != -1: # Find the end of this PNG png_end = preview_data.find(b'IEND', png_start) if png_end != -1: png_end += 8 # Include IEND + CRC png_data = preview_data[png_start:png_end] try: img = Image.open(io.BytesIO(png_data)) img_size = img.size[0] * img.size[1] # Keep the largest preview if img_size > best_size: best_preview = img.copy() best_size = img_size except Exception: continue else: # Skip this block f.seek(size, 1) return best_preview def create_blend_thumbnails(source_path, dest_dir): """Extract embedded Blender thumbnail and generate Synology thumbnails.""" img = extract_blend_thumbnail(source_path) if img is None: print(f"No embedded thumbnail in {source_path}") return generate_synology_thumbnails(img, dest_dir) def create_thumbnails(source_path, dest_dir): """Image thumbnail creation using Windows provider first, then PIL fallback""" # Try Windows thumbnail extraction first windows_thumb = extract_windows_thumbnail(source_path) if windows_thumb: generate_synology_thumbnails(windows_thumb, dest_dir) return # Fallback to PIL for basic image processing create_thumbnails_pil(source_path, dest_dir) def create_thumbnails_pil(source_path, dest_dir): """Original PIL-based image thumbnail creation (fallback method)""" try: im = Image.open(source_path) generate_synology_thumbnails(im, dest_dir) except Exception as e: print(f"Error processing image {source_path}: {e}") def extract_from_thumbcache_db(cache_path, file_path): """DEPRECATED: Extract thumbnail from a specific thumbcache database file This brute-force approach is flawed because thumbcache uses hash-based indexing. It's kept for fallback purposes but should not be the primary method. """ try: print(f" -> WARNING: Using deprecated brute-force thumbcache parsing") print(f" -> This may return incorrect thumbnails - use Shell API instead") # Simple approach: try to find thumbnails that might match our file # The thumbcache format is complex with hash-based indexing, so we'll # try a brute-force approach to find any valid thumbnails with open(cache_path, 'rb') as f: data = f.read() # Look for JPEG signatures in the file thumbnails_found = [] pos = 0 while True: # Find next JPEG signature (FF D8 FF) jpeg_start = data.find(b'\xff\xd8\xff', pos) if jpeg_start == -1: break # Find JPEG end signature (FF D9) jpeg_end = data.find(b'\xff\xd9', jpeg_start) if jpeg_end == -1: pos = jpeg_start + 1 continue # Extract JPEG data jpeg_data = data[jpeg_start:jpeg_end + 2] # Validate and add to list try: if len(jpeg_data) > 100: # Skip tiny thumbnails img = Image.open(io.BytesIO(jpeg_data)) # Only consider reasonably sized thumbnails if img.width >= 32 and img.height >= 32: thumbnails_found.append((img, img.width * img.height)) except Exception: pass pos = jpeg_end + 2 # Look for PNG signatures as well pos = 0 while True: # Find next PNG signature (89 50 4E 47) png_start = data.find(b'\x89PNG', pos) if png_start == -1: break # Find PNG end (IEND + CRC) png_end = data.find(b'IEND', png_start) if png_end == -1: pos = png_start + 1 continue # Extract PNG data (include 8 bytes after IEND for CRC) png_data = data[png_start:png_end + 8] try: if len(png_data) > 100: # Skip tiny thumbnails img = Image.open(io.BytesIO(png_data)) if img.width >= 32 and img.height >= 32: thumbnails_found.append((img, img.width * img.height)) except Exception: pass pos = png_end + 8 # Return the largest/highest quality thumbnail found if thumbnails_found: # Sort by pixel count (area) and return the largest thumbnails_found.sort(key=lambda x: x[1], reverse=True) best_thumbnail = thumbnails_found[0][0] # Basic quality filter - prefer thumbnails that are reasonable sizes if best_thumbnail.width >= 96 or best_thumbnail.height >= 96: return best_thumbnail return None except Exception as e: print(f" -> Error reading thumbcache database: {e}") return None if __name__ == "__main__": sys.exit(main())