import sys import os import re import argparse import errno import time import subprocess from PIL import Image from multiprocessing import Pool from multiprocessing import Value from PIL import Image, ImageDraw, ImageFont import io import struct class State(object): def __init__(self): self.counter = Value('i', 0) self.start_ticks = Value('d', time.process_time()) def increment(self, n=1): with self.counter.get_lock(): self.counter.value += n @property def value(self): return self.counter.value @property def start(self): return self.start_ticks.value def init(args): global state state = args def main(): args = parse_args() state = State() files = find_files(args.directory) with Pool(processes=4, initializer=init, initargs=(state, )) as pool: pool.map(process_file, files) print("{0} files processed in total.".format(state.value)) def parse_args(): parser = argparse.ArgumentParser( description="Create thumbnails for Synology Photo Station.") parser.add_argument("--directory", required=True, help="Directory to generate thumbnails for. " "Subdirectories will always be processed.") return parser.parse_args() def find_files(dir): # Only process formats that Synology doesn't handle well # Exclude common images (jpg, png, gif, etc.) since NAS handles them fine valid_exts = ('mp4', 'webm', 'avi', 'mkv', 'mov', 'wmv', 'flv', 'm4v', 'ts', 'psd', 'blend') valid_exts_re = "|".join( map((lambda ext: ".*\\.{0}$".format(ext)), valid_exts)) for root, dirs, files in os.walk(dir): for name in files: if re.match(valid_exts_re, name, re.IGNORECASE) \ and not name.startswith('SYNOPHOTO_THUMB') \ and not name.startswith('SYNOVIDEO_VIDEO_SCREENSHOT'): yield os.path.join(root, name) def print_progress(): global state state.increment(1) processed = state.value if processed % 10 == 0: elapsed = float(time.process_time() - state.start) rate = float(processed) / elapsed if elapsed > 0 else float(processed) print("{0} files processed so far, averaging {1:.2f} files per second." .format(processed, rate)) def process_file(file_path): print(file_path) (dir, filename) = os.path.split(file_path) thumb_dir = os.path.join(dir, 'eaDir_tmp', filename) ensure_directory_exists(thumb_dir) # Determine file type and process accordingly file_ext = os.path.splitext(filename)[1].lower() if file_ext in ['.mp4', '.webm', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.m4v', '.ts']: create_video_thumbnails(file_path, thumb_dir) elif file_ext in ['.psd']: create_psd_thumbnails(file_path, thumb_dir) elif file_ext in ['.blend']: create_blend_thumbnails(file_path, thumb_dir) else: # Standard image processing create_thumbnails(file_path, thumb_dir) print_progress() def ensure_directory_exists(path): try: os.makedirs(path) except OSError as exception: if exception.errno != errno.EEXIST: raise def create_video_thumbnails(source_path, dest_dir): """Generate video thumbnails using Windows provider first, then FFmpeg fallback""" # Try Windows thumbnail extraction first (much faster!) windows_thumb = extract_windows_thumbnail(source_path) if windows_thumb: generate_synology_thumbnails(windows_thumb, dest_dir, include_video_screenshot=True) return # Fallback to FFmpeg if Windows extraction fails print(f"Windows thumbnail extraction failed for {source_path}, using FFmpeg...") create_video_thumbnails_ffmpeg(source_path, dest_dir) def create_video_thumbnails_ffmpeg(source_path, dest_dir): """Generate video thumbnails using FFmpeg (fallback method)""" to_generate = (('SYNOVIDEO_VIDEO_SCREENSHOT.jpg', 1280), ('SYNOPHOTO_THUMB_XL.jpg', 1280), ('SYNOPHOTO_THUMB_B.jpg', 640), ('SYNOPHOTO_THUMB_M.jpg', 320), ('SYNOPHOTO_THUMB_PREVIEW.jpg', 160), ('SYNOPHOTO_THUMB_S.jpg', 120)) for thumb in to_generate: thumb_path = os.path.join(dest_dir, thumb[0]) if os.path.exists(thumb_path): continue try: # Use FFmpeg to extract a frame from the video at 10% duration cmd = [ 'ffmpeg', '-i', source_path, '-ss', '00:00:05', # Seek to 5 seconds '-vframes', '1', '-vf', f'scale={thumb[1]}:{thumb[1]}:force_original_aspect_ratio=decrease', '-y', # Overwrite output file thumb_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Warning: FFmpeg failed for {source_path}: {result.stderr}") continue except FileNotFoundError: print("Warning: FFmpeg not found. Video thumbnails will be skipped.") print("Install FFmpeg: https://ffmpeg.org/download.html") break except Exception as e: print(f"Error processing video {source_path}: {e}") def create_psd_thumbnails(source_path, dest_dir): """Generate PSD thumbnails using Windows/Icaros provider first, then psd-tools fallback""" # Try Windows thumbnail extraction first (uses Icaros or built-in PSD support) windows_thumb = extract_windows_thumbnail(source_path) if windows_thumb: generate_synology_thumbnails(windows_thumb, dest_dir) return # Fallback to psd-tools if Windows extraction fails print(f"Windows thumbnail extraction failed for {source_path}, using psd-tools...") create_psd_thumbnails_direct(source_path, dest_dir) def create_psd_thumbnails_direct(source_path, dest_dir): """Generate PSD thumbnails using PIL with PSD support (fallback method)""" try: # Try to open PSD file - requires pillow-psd plugin from psd_tools import PSDImage psd = PSDImage.open(source_path) pil_image = psd.composite() to_generate = (('SYNOPHOTO_THUMB_XL.jpg', 1280), ('SYNOPHOTO_THUMB_B.jpg', 640), ('SYNOPHOTO_THUMB_M.jpg', 320), ('SYNOPHOTO_THUMB_PREVIEW.jpg', 160), ('SYNOPHOTO_THUMB_S.jpg', 120)) for thumb in to_generate: thumb_path = os.path.join(dest_dir, thumb[0]) if os.path.exists(thumb_path): continue pil_image.thumbnail((thumb[1], thumb[1]), Image.LANCZOS) # Convert to RGB if needed for JPEG if pil_image.mode == 'RGBA': pil_image = pil_image.convert('RGB') pil_image.save(thumb_path, 'JPEG') except ImportError: print("Warning: psd-tools not installed. Install with: pip install psd-tools") except Exception as e: print(f"Error processing PSD {source_path}: {e}") def generate_synology_thumbnails(image, dest_dir, include_video_screenshot=False): """Generate all required Synology thumbnail sizes from a PIL image""" to_generate = [ ('SYNOPHOTO_THUMB_XL.jpg', 1280), ('SYNOPHOTO_THUMB_B.jpg', 640), ('SYNOPHOTO_THUMB_M.jpg', 320), ('SYNOPHOTO_THUMB_PREVIEW.jpg', 160), ('SYNOPHOTO_THUMB_S.jpg', 120) ] # Add video screenshot for video files if include_video_screenshot: to_generate.insert(0, ('SYNOVIDEO_VIDEO_SCREENSHOT.jpg', 1280)) for thumb_name, thumb_size in to_generate: thumb_path = os.path.join(dest_dir, thumb_name) if os.path.exists(thumb_path): continue # Create a copy for thumbnailing img_copy = image.copy() img_copy.thumbnail((thumb_size, thumb_size), Image.LANCZOS) # Convert to RGB if needed for JPEG if img_copy.mode == 'RGBA': img_copy = img_copy.convert('RGB') img_copy.save(thumb_path, 'JPEG', quality=85) def extract_blend_thumbnail(blend_path): """Extract thumbnail from .blend file using Windows thumbnail provider or direct extraction.""" # Option 1: Try using Windows thumbnail extraction first (fastest) try: temp_thumb = extract_windows_thumbnail(blend_path) if temp_thumb: return temp_thumb except Exception: pass # Option 2: Direct .blend file parsing (more reliable) try: return extract_blend_thumbnail_direct(blend_path) except Exception as e: print(f"Failed to extract .blend thumbnail: {e}") return None def extract_windows_thumbnail(file_path): """Extract thumbnail from adjacent Thumbs.db file.""" try: # Look for Thumbs.db in the same directory directory = os.path.dirname(file_path) filename = os.path.basename(file_path) thumbs_db_path = os.path.join(directory, 'Thumbs.db') if not os.path.exists(thumbs_db_path): return None return extract_from_thumbs_db(thumbs_db_path, filename) except Exception: return None def extract_from_thumbs_db(thumbs_db_path, target_filename): """Extract specific file thumbnail from Thumbs.db""" try: import olefile if not olefile.isOleFile(thumbs_db_path): return None ole = olefile.OleFileIO(thumbs_db_path) # Get list of streams all_streams = ole.listdir() # Thumbs.db stores thumbnails with their filename as the stream name # Try different variations of the filename stream_names = [] base_name = os.path.splitext(target_filename)[0] # Common patterns for thumbnail stream names in Thumbs.db stream_names.extend([ target_filename, target_filename.lower(), target_filename.upper(), base_name, base_name.lower(), base_name.upper(), ]) # First try filename-based matching (older Thumbs.db format) for stream_name in ole.listdir(): stream_path = '/'.join(stream_name) if isinstance(stream_name, list) else stream_name # Check if this stream corresponds to our target file for candidate in stream_names: if candidate in stream_path or stream_path.endswith(candidate): img = extract_thumbnail_from_stream(ole, stream_name) if img: ole.close() return img # If filename matching failed, try hash-based extraction (newer format) for stream_name in ole.listdir(): stream_path = '/'.join(stream_name) if isinstance(stream_name, list) else stream_name # Look for thumbnail streams (usually start with size like "256_") if stream_path.startswith(('256_', '96_', '32_', 'Thumbnail')): img = extract_thumbnail_from_stream(ole, stream_name) if img: ole.close() return img ole.close() return None except ImportError: print("Warning: olefile not installed. Install with: pip install olefile") return None except Exception as e: print(f"Debug: Thumbs.db extraction error: {e}") return None def extract_thumbnail_from_stream(ole, stream_name): """Helper function to extract thumbnail from a Thumbs.db stream""" try: # Use the original list format as returned by ole.listdir() with ole.openstream(stream_name) as stream: # Read the full stream data to analyze the header structure stream.seek(0) full_data = stream.read() # Try different header offsets for Thumbs.db format # Common offsets: 12, 16, 20, 24, 32 for offset in [12, 16, 20, 24, 32]: try: thumbnail_data = full_data[offset:] if len(thumbnail_data) > 0: img = Image.open(io.BytesIO(thumbnail_data)) return img except Exception: continue # If standard offsets fail, try to find JPEG/PNG signatures # Look for JPEG signature (FF D8 FF) jpeg_start = full_data.find(b'\xff\xd8\xff') if jpeg_start != -1: try: img = Image.open(io.BytesIO(full_data[jpeg_start:])) return img except Exception: pass # Look for PNG signature (89 50 4E 47) png_start = full_data.find(b'\x89PNG') if png_start != -1: try: img = Image.open(io.BytesIO(full_data[png_start:])) return img except Exception: pass return None except Exception: return None def extract_blend_thumbnail_direct(blend_path): """Direct extraction of embedded thumbnail from .blend file.""" import struct with open(blend_path, 'rb') as f: # Read header header = f.read(12) if not header.startswith(b'BLENDER'): return None # Determine architecture and endianness pointer_size = 8 if header[7:8] == b'-' else 4 endian = '<' if header[8:9] == b'v' else '>' # Find the largest embedded preview image best_preview = None best_size = 0 while True: # Read block header block_header = f.read(16 + pointer_size) if len(block_header) < 16 + pointer_size: break code = block_header[:4] size = struct.unpack(endian + 'I', block_header[4:8])[0] if code == b'PREV': # Read preview block data preview_data = f.read(size) if len(preview_data) < size: break # Look for PNG signature png_start = preview_data.find(b'\x89PNG\r\n\x1a\n') if png_start != -1: # Find the end of this PNG png_end = preview_data.find(b'IEND', png_start) if png_end != -1: png_end += 8 # Include IEND + CRC png_data = preview_data[png_start:png_end] try: img = Image.open(io.BytesIO(png_data)) img_size = img.size[0] * img.size[1] # Keep the largest preview if img_size > best_size: best_preview = img.copy() best_size = img_size except Exception: continue else: # Skip this block f.seek(size, 1) return best_preview def create_blend_thumbnails(source_path, dest_dir): """Extract embedded Blender thumbnail and generate Synology thumbnails.""" img = extract_blend_thumbnail(source_path) if img is None: print(f"No embedded thumbnail in {source_path}") return generate_synology_thumbnails(img, dest_dir) def create_thumbnails(source_path, dest_dir): """Image thumbnail creation using Windows provider first, then PIL fallback""" # Try Windows thumbnail extraction first windows_thumb = extract_windows_thumbnail(source_path) if windows_thumb: generate_synology_thumbnails(windows_thumb, dest_dir) return # Fallback to PIL for basic image processing create_thumbnails_pil(source_path, dest_dir) def create_thumbnails_pil(source_path, dest_dir): """Original PIL-based image thumbnail creation (fallback method)""" try: im = Image.open(source_path) generate_synology_thumbnails(im, dest_dir) except Exception as e: print(f"Error processing image {source_path}: {e}") if __name__ == "__main__": sys.exit(main())