#!/usr/bin/env python3 """ Batch Image Uploader für Image-Uploader System ============================================= Automatisches Parsen von Verzeichnissen und Upload von Bildern mit strukturierten Metadaten an das Image-Uploader Backend. Features: - Rekursives Verzeichnis-Scanning nach Bildern - Metadaten-Extraktion aus Verzeichnis-/Dateinamen - Batch-Upload an das Backen self.logger.info(f"📊 Upload abgeschlossen: {len(project_groups)} Gruppen erstellt") return { 'total': total_images, 'successful': total_successful, 'failed': total_failed, 'failed_files': failed_files, 'project_groups': project_groups # Für Übersicht am Ende }Progress-Tracking und Error-Handling - EXIF-Daten Unterstützung (optional) Usage: python batch_uploader.py /path/to/images --titel "Meine Sammlung" """ import os import sys import json import requests import argparse from pathlib import Path from typing import List, Dict, Optional, Tuple import mimetypes from PIL import Image, ExifTags from PIL.ExifTags import TAGS import re from datetime import datetime import logging # Konfiguration #DEFAULT_BACKEND_URL = "https://deinprojekt.lan.hobbyhimmel.de/api" DEFAULT_BACKEND_URL = "http://localhost/api" SUPPORTED_FORMATS = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff', '.tif'} MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB class ImageMetadataExtractor: """Extrahiert Metadaten aus Bildern und Verzeichnissen Erwartet Struktur: Photos/Jahr/Name/Projekt/dateiname.endung """ def __init__(self): self.logger = logging.getLogger(__name__) def parse_structured_path(self, file_path: Path) -> Dict[str, str]: """ Parst strukturierten Pfad: Photos/Jahr/Name/Projekt/dateiname.endung Returns: Dict mit 'jahr', 'name', 'projekt', 'dateiname' oder None wenn nicht parsbar """ parts = file_path.parts # Mindestens 4 Verzeichnisse + Datei erforderlich if len(parts) < 5: self.logger.debug(f"Pfad zu kurz für Struktur-Parsing: {file_path}") return {} try: # Rückwärts vom Ende her parsen dateiname = file_path.stem projekt = parts[-2] # Projekt-Verzeichnis name = parts[-3] # Name-Verzeichnis jahr = parts[-4] # Jahr-Verzeichnis # Jahr validieren (4-stellige Zahl) if not re.match(r'^(19|20)\d{2}$', jahr): self.logger.debug(f"Ungültiges Jahr in Pfad: {jahr}") # Versuche Jahr aus anderen Teilen zu extrahieren jahr = self.extract_year_from_path(file_path) return { 'jahr': jahr, 'name': name, 'projekt': projekt, 'dateiname': dateiname } except (IndexError, ValueError) as e: self.logger.debug(f"Struktur-Parsing fehlgeschlagen für {file_path}: {e}") return {} def extract_year_from_path(self, file_path: Path) -> Optional[str]: """Extrahiert Jahr aus Pfad oder Dateinamen (Fallback)""" year_pattern = r'\b(19|20)\d{2}\b' # Zuerst im Verzeichnisnamen for part in file_path.parts: match = re.search(year_pattern, part) if match: return match.group() # Dann im Dateinamen match = re.search(year_pattern, file_path.stem) if match: return match.group() return None def extract_exif_date(self, image_path: Path) -> Optional[str]: """Extrahiert Aufnahmedatum aus EXIF-Daten""" try: with Image.open(image_path) as img: exifdata = img.getexif() if exifdata: for tag_id in exifdata: tag = TAGS.get(tag_id, tag_id) if tag in ['DateTime', 'DateTimeOriginal', 'DateTimeDigitized']: date_str = exifdata.get(tag_id) if date_str: # Format: "2023:12:25 10:30:00" -> "2023" return date_str.split(':')[0] except Exception as e: self.logger.warning(f"EXIF-Extraktion fehlgeschlagen für {image_path}: {e}") return None def read_readme_description(self, directory: Path) -> Optional[str]: """ Liest Beschreibung aus README.md im Verzeichnis. Extrahiert den Text nach '## Beschreibung' bis zur nächsten Überschrift, --- oder 500 Zeichen. """ readme_files = ['README.md', 'readme.md', 'Readme.md', 'README.txt', 'readme.txt'] for readme_name in readme_files: readme_path = directory / readme_name if readme_path.exists() and readme_path.is_file(): try: with open(readme_path, 'r', encoding='utf-8') as f: content = f.read() # Suche nach '## Beschreibung' beschr_start = re.search(r'^\s*##\s*Beschreibung\s*$', content, re.MULTILINE) if beschr_start: start_idx = beschr_start.end() rest = content[start_idx:] # Suche nach nächster Überschrift oder --- nach '## Beschreibung' next_header = re.search(r'^\s*#+\s+', rest, re.MULTILINE) next_sep = re.search(r'^\s*-{3,}\s*$', rest, re.MULTILINE) # Finde das früheste Ende (nächste Überschrift oder ---) end_idx = None if next_header and next_sep: end_idx = min(next_header.start(), next_sep.start()) elif next_header: end_idx = next_header.start() elif next_sep: end_idx = next_sep.start() if end_idx is not None: beschreibung = rest[:end_idx] else: beschreibung = rest # Auf max. 500 Zeichen kürzen und Whitespace säubern beschreibung = beschreibung.strip() beschreibung = beschreibung[:500] if beschreibung: self.logger.debug(f"README Beschreibung gefunden in {directory}: {beschreibung[:50]}...") return beschreibung except Exception as e: self.logger.warning(f"Fehler beim Lesen von {readme_path}: {e}") continue return None def extract_title_from_structured_path(self, path_info: Dict[str, str]) -> str: """Extrahiert Titel aus strukturiertem Pfad""" if 'projekt' in path_info and path_info['projekt']: # Projekt-Name als Titel verwenden title = path_info['projekt'] # Unterstriche/Bindestriche durch Leerzeichen ersetzen title = re.sub(r'[_-]+', ' ', title) return title.title() return "Unbenannt" def extract_description_from_structured_path(self, file_path: Path, path_info: Dict[str, str]) -> str: """ Generiert Beschreibung aus strukturiertem Pfad Priorität: 1. README.md im Projekt-Verzeichnis 2. README.md im Name-Verzeichnis 3. Vollständiger Verzeichnisname als Fallback """ # 1. README.md im Projekt-Verzeichnis suchen projekt_dir = file_path.parent readme_description = self.read_readme_description(projekt_dir) if readme_description: return readme_description # 2. README.md im Name-Verzeichnis suchen if len(file_path.parts) >= 3: name_dir = file_path.parents[1] # Zwei Ebenen nach oben readme_description = self.read_readme_description(name_dir) if readme_description: return readme_description # 3. Fallback: Vollständiger Verzeichnisname if path_info: parts = [] if 'jahr' in path_info and path_info['jahr']: parts.append(f"Jahr: {path_info['jahr']}") if 'name' in path_info and path_info['name']: parts.append(f"Name: {path_info['name']}") if 'projekt' in path_info and path_info['projekt']: parts.append(f"Projekt: {path_info['projekt']}") if parts: return " | ".join(parts) # Final Fallback: Pfad-basiert return f"Aus: {' → '.join(file_path.parts[-3:-1])}" class BatchUploader: """Haupt-Klasse für Batch-Upload""" def __init__(self, backend_url: str = DEFAULT_BACKEND_URL, user: str = None, password: str = None): self.backend_url = backend_url.rstrip('/') self.metadata_extractor = ImageMetadataExtractor() self.logger = logging.getLogger(__name__) # Session für Connection-Reuse self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Batch-Uploader/1.0' }) self.auth = (user, password) if user and password else None def scan_directory(self, directory: Path, recursive: bool = True) -> List[Path]: """Scannt Verzeichnis nach unterstützten Bildern""" images = [] if not directory.exists() or not directory.is_dir(): raise ValueError(f"Verzeichnis nicht gefunden: {directory}") pattern = "**/*" if recursive else "*" for file_path in directory.glob(pattern): if (file_path.is_file() and file_path.suffix.lower() in SUPPORTED_FORMATS and file_path.stat().st_size <= MAX_FILE_SIZE): images.append(file_path) return sorted(images) def prepare_image_metadata(self, image_path: Path, default_titel: Optional[str] = None, default_name: Optional[str] = None) -> Dict: """ Bereitet Metadaten für ein Bild vor Erwartet Struktur: Photos/Jahr/Name/Projekt/dateiname.endung """ # Strukturierten Pfad parsen path_info = self.metadata_extractor.parse_structured_path(image_path) # Jahr bestimmen (Struktur > EXIF > Pfad > Aktuell) jahr = None if path_info and 'jahr' in path_info and path_info['jahr']: jahr = path_info['jahr'] else: # Fallback: EXIF oder Pfad-Pattern jahr = (self.metadata_extractor.extract_exif_date(image_path) or self.metadata_extractor.extract_year_from_path(image_path) or str(datetime.now().year)) # Titel bestimmen (Parameter > Struktur) titel = None if default_titel: titel = default_titel elif path_info: titel = self.metadata_extractor.extract_title_from_structured_path(path_info) else: # Fallback für unstrukturierte Pfade titel = "Unbenannt" # Beschreibung generieren (README > Pfad-Info) beschreibung = self.metadata_extractor.extract_description_from_structured_path( image_path, path_info ) # Name bestimmen (Parameter > Struktur > Leer) name = "" if default_name: name = default_name elif path_info and 'name' in path_info and path_info['name']: name = path_info['name'] return { 'jahr': jahr, 'titel': titel, 'beschreibung': beschreibung, 'name': name } def upload_batch(self, images: List[Path], default_titel: Optional[str] = None, default_name: Optional[str] = None, dry_run: bool = False) -> Dict: """ Uploaded Bilder gruppiert nach PROJEKTEN (Jahr/Name/Projekt) KORREKT: Jedes Projekt wird eine eigene Gruppe! """ total_images = len(images) if not images: return {'total': 0, 'successful': 0, 'failed': 0, 'failed_files': []} # 1. Bilder nach Projekten gruppieren project_groups = {} for image_path in images: path_info = self.metadata_extractor.parse_structured_path(image_path) # Eindeutigen Projekt-Key erstellen if path_info and all(k in path_info for k in ['jahr', 'name', 'projekt']): project_key = f"{path_info['jahr']}/{path_info['name']}/{path_info['projekt']}" else: # Fallback für unstrukturierte Pfade project_key = f"unbekannt/{image_path.parent.name}" if project_key not in project_groups: project_groups[project_key] = [] project_groups[project_key].append(image_path) self.logger.info(f"📁 {len(project_groups)} Projekte gefunden mit {total_images} Bildern") # Dry-Run oder Upload if dry_run: self.logger.info("🔍 Dry-Run Mode - Kein Upload") self.logger.info(f"Würde {len(images)} Bilder uploaden") # README-Dateien erstellen im Dry-Run Mode self.logger.info("📄 Erstelle README-Dateien in Projekt-Verzeichnissen...") readme_count = self.create_readme_files(images) self.logger.info(f"✅ {readme_count} README-Dateien erstellt") # 2. Jedes Projekt als eigene Gruppe uploaden total_successful = 0 total_failed = 0 failed_files = [] for project_key, project_images in project_groups.items(): self.logger.info(f"🚀 Upload Projekt '{project_key}': {len(project_images)} Bilder") # Metadaten für dieses Projekt group_metadata = self.prepare_image_metadata(project_images[0], default_titel, default_name) # Backend erwartet year/title/description/name backend_metadata = { 'year': int(group_metadata.get('jahr', datetime.now().year)), 'title': group_metadata.get('titel', project_key.split('/')[-1]), # Projekt-Name als Titel 'description': group_metadata.get('beschreibung', f"Projekt: {project_key}"), 'name': group_metadata.get('name', '') } # Dieses Projekt uploaden if not dry_run: try: files = [] # Alle Dateien für den Upload vorbereiten for image_path in project_images: files.append(('images', ( image_path.name, open(image_path, 'rb'), mimetypes.guess_type(str(image_path))[0] or 'image/jpeg' ))) # Ein Upload-Request pro Projekt response = self.session.post( f"{self.backend_url}/upload/batch", files=files, data={'metadata': json.dumps(backend_metadata)}, timeout=120, auth=self.auth ) # Files schließen for _, file_tuple in files: if hasattr(file_tuple[1], 'close'): file_tuple[1].close() if response.status_code == 200: self.logger.info(f"✅ Projekt '{project_key}': {len(project_images)} Bilder") total_successful += len(project_images) else: self.logger.error(f"❌ Projekt '{project_key}' Fehler: {response.status_code}") total_failed += len(project_images) failed_files.extend([str(img) for img in project_images]) except Exception as e: self.logger.error(f"💥 Projekt '{project_key}' Fehler: {e}") total_failed += len(project_images) failed_files.extend([str(img) for img in project_images]) self.logger.info(f"� Upload abgeschlossen: {len(project_groups)} Gruppen erstellt") return { 'total': total_images, 'successful': total_successful, 'failed': total_failed, 'failed_files': failed_files, 'project_groups': project_groups } def create_readme_files(self, images: List[Path]) -> int: """ Erstellt README.md Dateien in jedem Projekt-Verzeichnis (nur bei --dry-run) Returns: Anzahl der erstellten README-Dateien """ created_count = 0 project_dirs = set() # Sammle alle Projekt-Verzeichnisse for image_path in images: path_info = self.metadata_extractor.parse_structured_path(image_path) if path_info and all(k in path_info for k in ['jahr', 'name', 'projekt']): # Projekt-Verzeichnis ist das parent des Bildes project_dir = image_path.parent if project_dir not in project_dirs: project_dirs.add(project_dir) # Erstelle README.md in jedem Projekt-Verzeichnis for project_dir in project_dirs: readme_path = project_dir / 'README.md' # Prüfe ob README bereits existiert if readme_path.exists(): self.logger.debug(f"README existiert bereits: {readme_path}") continue # Extrahiere Metadaten aus Pfad path_info = self.metadata_extractor.parse_structured_path(project_dir / 'dummy.jpg') if not path_info: continue # README-Inhalt generieren readme_content = self._generate_readme_content(path_info) try: readme_path.write_text(readme_content, encoding='utf-8') self.logger.info(f"✅ README erstellt: {readme_path}") created_count += 1 except Exception as e: self.logger.error(f"❌ Fehler beim Erstellen von {readme_path}: {e}") return created_count def _generate_readme_content(self, path_info: Dict[str, str]) -> str: """Generiert README-Inhalt basierend auf Pfad-Informationen""" jahr = path_info.get('jahr', 'YYYY') name = path_info.get('name', 'Name') projekt = path_info.get('projekt', 'Projekt') # Projekt-Titel formatieren projekt_titel = projekt.replace('_', ' ').replace('-', ' ').title() content = f"""# {projekt_titel} ## Projekt-Details **Jahr:** {jahr} **Ersteller:** {name} **Projekt:** {projekt_titel} ## Beschreibung [TODO: Beschreibung des Projekts hier einfügen] --- *Diese README wurde automatisch generiert und kann bearbeitet werden.* """ return content def print_upload_summary(self, project_groups: Dict[str, List[Path]]) -> None: """Druckt eine Übersicht aller erstellten Slideshows""" if not project_groups: self.logger.info("📋 Keine Projekte gefunden") return self.logger.info("") self.logger.info("=" * 80) self.logger.info(f"📋 SLIDESHOW ÜBERSICHT - {len(project_groups)} Projekte erstellt") self.logger.info("=" * 80) # Sortiere Projekte nach Jahr, dann Name, dann Projekt sorted_projects = sorted(project_groups.items(), key=lambda x: x[0]) current_year = None current_name = None for project_key, images in sorted_projects: parts = project_key.split('/') if len(parts) == 3: jahr, name, projekt = parts else: jahr, name, projekt = "?", "?", project_key # Gruppierung nach Jahr if current_year != jahr: if current_year is not None: self.logger.info("") self.logger.info(f"📅 {jahr}") self.logger.info("-" * 50) current_year = jahr current_name = None # Gruppierung nach Name (innerhalb des Jahres) if current_name != name: if current_name is not None: self.logger.info("") self.logger.info(f"👤 {name}") current_name = name # Projekt-Details self.logger.info(f" 📺 {projekt.ljust(30)} - {len(images):3d} Bilder") self.logger.info("") self.logger.info("=" * 80) total_images = sum(len(images) for images in project_groups.values()) self.logger.info(f"📊 GESAMT: {len(project_groups)} Slideshows mit {total_images} Bildern") self.logger.info("=" * 80) def test_connection(self) -> bool: """Testet Verbindung zum Backend (mit optionaler Auth)""" try: response = self.session.get(f"{self.backend_url}/groups", timeout=10, auth=self.auth) return response.status_code == 200 except Exception as e: self.logger.error(f"Verbindungstest fehlgeschlagen: {e}") return False def setup_logging(verbose: bool = False): """Konfiguriert Logging""" level = logging.DEBUG if verbose else logging.INFO logging.basicConfig( level=level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) def main(): """Hauptfunktion""" parser = argparse.ArgumentParser( description="Batch Image Uploader für Image-Uploader System", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Beispiele: python batch_uploader.py /home/user/photos --titel "Urlaubsbilder" python batch_uploader.py ./images --name "Max Mustermann" --no-recursive python batch_uploader.py /photos --backend http://myserver:5000 --dry-run --verbose """ ) parser.add_argument('directory', help='Verzeichnis mit Bildern zum Upload') parser.add_argument('--titel', help='Standard-Titel für alle Bilder') parser.add_argument('--name', help='Standard-Name für alle Bilder') parser.add_argument('--backend', default=DEFAULT_BACKEND_URL, help=f'Backend URL (Standard: {DEFAULT_BACKEND_URL})') parser.add_argument('--user', help='HTTP Basic Auth Benutzername (optional)') parser.add_argument('--password', help='HTTP Basic Auth Passwort (optional)') parser.add_argument('--no-recursive', action='store_true', help='Nicht rekursiv in Unterverzeichnisse') parser.add_argument('--dry-run', action='store_true', help='Nur Analyse, kein Upload') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose Output') args = parser.parse_args() # Logging Setup setup_logging(args.verbose) logger = logging.getLogger(__name__) try: # Verzeichnis validieren directory = Path(args.directory).resolve() uploader = BatchUploader(args.backend, args.user, args.password) # Verbindung testen (nur bei echtem Upload) if not args.dry_run: logger.info(f"Teste Verbindung zu {args.backend}...") if not uploader.test_connection(): logger.error("❌ Backend nicht erreichbar!") return 1 logger.info("✅ Backend erreichbar") else: logger.info("🔍 Dry-Run Mode - Überspringe Verbindungstest") # Bilder scannen logger.info(f"Scanne Verzeichnis: {directory}") recursive = not args.no_recursive images = uploader.scan_directory(directory, recursive) if not images: logger.warning("Keine unterstützten Bilder gefunden!") return 0 logger.info(f"📁 {len(images)} Bilder gefunden") # Beispiel-Metadaten zeigen if args.verbose and images: sample_image = images[0] sample_metadata = uploader.prepare_image_metadata( sample_image, args.titel, args.name ) logger.debug(f"Beispiel-Metadaten für {sample_image.name}:") for key, value in sample_metadata.items(): logger.debug(f" {key}: {value}") # Upload starten logger.info(f"🚀 Starte Upload...") result = uploader.upload_batch( images, args.titel, args.name, args.dry_run ) # Ergebnis if not args.dry_run: logger.info("📊 Upload abgeschlossen:") logger.info(f" ✅ Erfolgreich: {result['successful']}") logger.info(f" ❌ Fehlgeschlagen: {result['failed']}") if result['failed_files'] and args.verbose: logger.info("Fehlgeschlagene Dateien:") for failed_file in result['failed_files']: logger.info(f" - {failed_file}") # Upload-Übersicht anzeigen (nach erfolgreichem Upload) if 'project_groups' in result and result['project_groups']: uploader.print_upload_summary(result['project_groups']) return 0 if result['failed'] == 0 else 1 except Exception as e: logger.error(f"💥 Unerwarteter Fehler: {e}") if args.verbose: import traceback traceback.print_exc() return 1 if __name__ == "__main__": sys.exit(main())