Project-Image-Uploader/scripts/batch_uploader.py
matthias.lotz 6332b82c6a Feature Request: admin session security
- replace bearer auth with session+CSRF flow and add admin user directory

- update frontend moderation flow, force password change gate, and new CLI

- refresh changelog/docs/feature plan + ensure swagger dev experience
2025-11-23 21:18:42 +01:00

825 lines
31 KiB
Python
Raw Blame History

#!/usr/bin/env python3
"""
Batch Image Uploader für Image-Uploader System
=============================================
Automatisches Parsen von Verzeichnissen und Upload von Bildern
mit strukturierten Metadaten an das Image-Uploader Backend.
Features:
- Rekursives Verzeichnis-Scanning nach Bildern
- Metadaten-Extraktion aus Verzeichnis-/Dateinamen
- Batch-Upload an das Backend mit Session-Authentifizierung
- Fortschritts-Tracking und Error-Handling
- EXIF-Daten Unterstützung (optional)
Usage:
python batch_uploader.py /path/to/images --titel "Meine Sammlung"
"""
import os
import sys
import json
import requests
import argparse
from pathlib import Path
from typing import Any, List, Dict, Optional, Tuple
import mimetypes
from PIL import Image, ExifTags
from PIL.ExifTags import TAGS
import re
from datetime import datetime
import logging
# Konfiguration
DEFAULT_BACKEND_URL = "http://localhost:5000"
SUPPORTED_FORMATS = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff', '.tif'}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
def load_social_media_consents(input_value: Optional[str]) -> List[Dict[str, Any]]:
"""Lädt Social-Media-Consents aus JSON-String oder Datei"""
if not input_value:
return []
potential_path = Path(input_value)
try:
if potential_path.exists() and potential_path.is_file():
content = potential_path.read_text(encoding='utf-8')
else:
content = input_value
data = json.loads(content)
except (OSError, json.JSONDecodeError) as exc:
raise ValueError(f"Ungültige Social-Media-Consents: {exc}") from exc
if isinstance(data, dict):
# Ein einzelnes Consent-Objekt erlauben
return [data]
if not isinstance(data, list):
raise ValueError("Social-Media-Consents müssen Liste oder Objekt sein")
return data
class ImageMetadataExtractor:
"""Extrahiert Metadaten aus Bildern und Verzeichnissen
Erwartet Struktur: Photos/Jahr/Name/Projekt/dateiname.endung
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
def parse_structured_path(self, file_path: Path) -> Dict[str, str]:
"""
Parst strukturierten Pfad: Photos/Jahr/Name/Projekt/dateiname.endung
Returns:
Dict mit 'jahr', 'name', 'projekt', 'dateiname' oder None wenn nicht parsbar
"""
parts = file_path.parts
# Mindestens 4 Verzeichnisse + Datei erforderlich
if len(parts) < 5:
self.logger.debug(f"Pfad zu kurz für Struktur-Parsing: {file_path}")
return {}
try:
# Rückwärts vom Ende her parsen
dateiname = file_path.stem
projekt = parts[-2] # Projekt-Verzeichnis
name = parts[-3] # Name-Verzeichnis
jahr = parts[-4] # Jahr-Verzeichnis
# Jahr validieren (4-stellige Zahl)
if not re.match(r'^(19|20)\d{2}$', jahr):
self.logger.debug(f"Ungültiges Jahr in Pfad: {jahr}")
# Versuche Jahr aus anderen Teilen zu extrahieren
jahr = self.extract_year_from_path(file_path) or ''
return {
'jahr': jahr,
'name': name,
'projekt': projekt,
'dateiname': dateiname
}
except (IndexError, ValueError) as e:
self.logger.debug(f"Struktur-Parsing fehlgeschlagen für {file_path}: {e}")
return {}
def extract_year_from_path(self, file_path: Path) -> Optional[str]:
"""Extrahiert Jahr aus Pfad oder Dateinamen (Fallback)"""
year_pattern = r'\b(19|20)\d{2}\b'
# Zuerst im Verzeichnisnamen
for part in file_path.parts:
match = re.search(year_pattern, part)
if match:
return match.group()
# Dann im Dateinamen
match = re.search(year_pattern, file_path.stem)
if match:
return match.group()
return None
def extract_exif_date(self, image_path: Path) -> Optional[str]:
"""Extrahiert Aufnahmedatum aus EXIF-Daten"""
try:
with Image.open(image_path) as img:
exifdata = img.getexif()
if exifdata:
for tag_id in exifdata:
tag = TAGS.get(tag_id, tag_id)
if tag in ['DateTime', 'DateTimeOriginal', 'DateTimeDigitized']:
date_str = exifdata.get(tag_id)
if date_str:
# Format: "2023:12:25 10:30:00" -> "2023"
return date_str.split(':')[0]
except Exception as e:
self.logger.warning(f"EXIF-Extraktion fehlgeschlagen für {image_path}: {e}")
return None
def read_readme_description(self, directory: Path) -> Optional[str]:
"""
Liest Beschreibung aus README.md im Verzeichnis.
Extrahiert den Text nach '## Beschreibung' bis zur nächsten Überschrift, --- oder 500 Zeichen.
"""
readme_files = ['README.md', 'readme.md', 'Readme.md', 'README.txt', 'readme.txt']
for readme_name in readme_files:
readme_path = directory / readme_name
if readme_path.exists() and readme_path.is_file():
try:
with open(readme_path, 'r', encoding='utf-8') as f:
content = f.read()
# Suche nach '## Beschreibung'
beschr_start = re.search(r'^\s*##\s*Beschreibung\s*$', content, re.MULTILINE)
if beschr_start:
start_idx = beschr_start.end()
rest = content[start_idx:]
# Suche nach nächster Überschrift oder --- nach '## Beschreibung'
next_header = re.search(r'^\s*#+\s+', rest, re.MULTILINE)
next_sep = re.search(r'^\s*-{3,}\s*$', rest, re.MULTILINE)
# Finde das früheste Ende (nächste Überschrift oder ---)
end_idx = None
if next_header and next_sep:
end_idx = min(next_header.start(), next_sep.start())
elif next_header:
end_idx = next_header.start()
elif next_sep:
end_idx = next_sep.start()
if end_idx is not None:
beschreibung = rest[:end_idx]
else:
beschreibung = rest
# Auf max. 500 Zeichen kürzen und Whitespace säubern
beschreibung = beschreibung.strip()
beschreibung = beschreibung[:500]
if beschreibung:
self.logger.debug(f"README Beschreibung gefunden in {directory}: {beschreibung[:50]}...")
return beschreibung
except Exception as e:
self.logger.warning(f"Fehler beim Lesen von {readme_path}: {e}")
continue
return None
def extract_title_from_structured_path(self, path_info: Dict[str, str]) -> str:
"""Extrahiert Titel aus strukturiertem Pfad"""
if 'projekt' in path_info and path_info['projekt']:
# Projekt-Name als Titel verwenden
title = path_info['projekt']
# Unterstriche/Bindestriche durch Leerzeichen ersetzen
title = re.sub(r'[_-]+', ' ', title)
return title.title()
return "Unbenannt"
def extract_description_from_structured_path(self, file_path: Path, path_info: Dict[str, str]) -> str:
"""
Generiert Beschreibung aus strukturiertem Pfad
Priorität:
1. README.md im Projekt-Verzeichnis
2. README.md im Name-Verzeichnis
3. Vollständiger Verzeichnisname als Fallback
"""
# 1. README.md im Projekt-Verzeichnis suchen
projekt_dir = file_path.parent
readme_description = self.read_readme_description(projekt_dir)
if readme_description:
return readme_description
# 2. README.md im Name-Verzeichnis suchen
if len(file_path.parts) >= 3:
name_dir = file_path.parents[1] # Zwei Ebenen nach oben
readme_description = self.read_readme_description(name_dir)
if readme_description:
return readme_description
# 3. Fallback: Vollständiger Verzeichnisname
if path_info:
parts = []
if 'jahr' in path_info and path_info['jahr']:
parts.append(f"Jahr: {path_info['jahr']}")
if 'name' in path_info and path_info['name']:
parts.append(f"Name: {path_info['name']}")
if 'projekt' in path_info and path_info['projekt']:
parts.append(f"Projekt: {path_info['projekt']}")
if parts:
return " | ".join(parts)
# Final Fallback: Pfad-basiert
return f"Aus: {''.join(file_path.parts[-3:-1])}"
class BatchUploader:
"""Haupt-Klasse für Batch-Upload"""
def __init__(self, backend_url: str = DEFAULT_BACKEND_URL,
username: Optional[str] = None,
password: Optional[str] = None):
self.base_url = backend_url.rstrip('/')
if self.base_url.endswith('/api'):
self.base_url = self.base_url[:-4]
self.api_base_url = f"{self.base_url}/api"
self.metadata_extractor = ImageMetadataExtractor()
self.logger = logging.getLogger(__name__)
# Session für Connection-Reuse
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Batch-Uploader/2.0',
'Accept': 'application/json'
})
self.username = username
self.password = password
self.csrf_token: Optional[str] = None
def _api_url(self, path: str) -> str:
path = path.lstrip('/')
return f"{self.api_base_url}/{path}"
def _auth_required(self) -> bool:
return bool(self.username and self.password)
def has_credentials(self) -> bool:
return self._auth_required()
def ensure_admin_session(self) -> None:
"""Public wrapper für Authentifizierung"""
self._ensure_authenticated()
def _ensure_authenticated(self) -> None:
if self.csrf_token:
return
if not self._auth_required():
raise ValueError("Admin-Benutzername und Passwort erforderlich für Upload")
login_url = f"{self.base_url}/auth/login"
self.logger.info("🔐 Melde Admin-Session an...")
response = self.session.post(
login_url,
json={'username': self.username, 'password': self.password},
timeout=20
)
if response.status_code != 200:
raise RuntimeError(
f"Login fehlgeschlagen ({response.status_code}): {response.text}"
)
try:
data = response.json()
except ValueError as exc:
raise RuntimeError("Login-Antwort konnte nicht gelesen werden") from exc
self.csrf_token = data.get('csrfToken')
if not self.csrf_token:
self._refresh_csrf_token()
else:
self.logger.debug("CSRF-Token aus Login-Response übernommen")
def _refresh_csrf_token(self) -> None:
csrf_url = f"{self.base_url}/auth/csrf-token"
response = self.session.get(csrf_url, timeout=10)
if response.status_code != 200:
raise RuntimeError(
f"CSRF-Token konnte nicht geladen werden ({response.status_code})"
)
try:
data = response.json()
except ValueError as exc:
raise RuntimeError("CSRF-Antwort konnte nicht gelesen werden") from exc
token = data.get('csrfToken')
if not token:
raise RuntimeError("Antwort enthielt kein csrfToken")
self.csrf_token = token
self.logger.debug("CSRF-Token aktualisiert")
def _authorized_headers(self) -> Dict[str, str]:
if not self.csrf_token:
self._ensure_authenticated()
if not self.csrf_token:
raise RuntimeError("Kein CSRF-Token verfügbar")
return {'X-CSRF-Token': self.csrf_token}
def scan_directory(self, directory: Path, recursive: bool = True) -> List[Path]:
"""Scannt Verzeichnis nach unterstützten Bildern"""
images = []
if not directory.exists() or not directory.is_dir():
raise ValueError(f"Verzeichnis nicht gefunden: {directory}")
pattern = "**/*" if recursive else "*"
for file_path in directory.glob(pattern):
if (file_path.is_file() and
file_path.suffix.lower() in SUPPORTED_FORMATS and
file_path.stat().st_size <= MAX_FILE_SIZE):
images.append(file_path)
return sorted(images)
def prepare_image_metadata(self, image_path: Path,
default_titel: Optional[str] = None,
default_name: Optional[str] = None) -> Dict:
"""
Bereitet Metadaten für ein Bild vor
Erwartet Struktur: Photos/Jahr/Name/Projekt/dateiname.endung
"""
# Strukturierten Pfad parsen
path_info = self.metadata_extractor.parse_structured_path(image_path)
# Jahr bestimmen (Struktur > EXIF > Pfad > Aktuell)
jahr = None
if path_info and 'jahr' in path_info and path_info['jahr']:
jahr = path_info['jahr']
else:
# Fallback: EXIF oder Pfad-Pattern
jahr = (self.metadata_extractor.extract_exif_date(image_path) or
self.metadata_extractor.extract_year_from_path(image_path) or
str(datetime.now().year))
# Titel bestimmen (Parameter > Struktur)
titel = None
if default_titel:
titel = default_titel
elif path_info:
titel = self.metadata_extractor.extract_title_from_structured_path(path_info)
else:
# Fallback für unstrukturierte Pfade
titel = "Unbenannt"
# Beschreibung generieren (README > Pfad-Info)
beschreibung = self.metadata_extractor.extract_description_from_structured_path(
image_path, path_info
)
# Name bestimmen (Parameter > Struktur > Leer)
name = ""
if default_name:
name = default_name
elif path_info and 'name' in path_info and path_info['name']:
name = path_info['name']
return {
'jahr': jahr,
'titel': titel,
'beschreibung': beschreibung,
'name': name
}
def upload_batch(self, images: List[Path],
default_titel: Optional[str] = None,
default_name: Optional[str] = None,
consents: Optional[Dict[str, Any]] = None,
dry_run: bool = False) -> Dict:
"""
Uploaded Bilder gruppiert nach PROJEKTEN (Jahr/Name/Projekt)
KORREKT: Jedes Projekt wird eine eigene Gruppe!
"""
total_images = len(images)
if not images:
return {'total': 0, 'successful': 0, 'failed': 0, 'failed_files': []}
consents_payload = consents.copy() if consents else {
'workshopConsent': True,
'socialMediaConsents': []
}
if not consents_payload.get('workshopConsent'):
raise ValueError('workshopConsent ist erforderlich für Batch-Uploads')
# 1. Bilder nach Projekten gruppieren
project_groups = {}
for image_path in images:
path_info = self.metadata_extractor.parse_structured_path(image_path)
# Eindeutigen Projekt-Key erstellen
if path_info and all(k in path_info for k in ['jahr', 'name', 'projekt']):
project_key = f"{path_info['jahr']}/{path_info['name']}/{path_info['projekt']}"
else:
# Fallback für unstrukturierte Pfade
project_key = f"unbekannt/{image_path.parent.name}"
if project_key not in project_groups:
project_groups[project_key] = []
project_groups[project_key].append(image_path)
self.logger.info(f"📁 {len(project_groups)} Projekte gefunden mit {total_images} Bildern")
# Dry-Run oder Upload
if dry_run:
self.logger.info("🔍 Dry-Run Mode - Kein Upload")
self.logger.info(f"Würde {len(images)} Bilder uploaden")
# README-Dateien erstellen im Dry-Run Mode
self.logger.info("📄 Erstelle README-Dateien in Projekt-Verzeichnissen...")
readme_count = self.create_readme_files(images)
self.logger.info(f"{readme_count} README-Dateien erstellt")
# 2. Jedes Projekt als eigene Gruppe uploaden
total_successful = 0
total_failed = 0
failed_files = []
if not dry_run:
self._ensure_authenticated()
for project_key, project_images in project_groups.items():
self.logger.info(f"🚀 Upload Projekt '{project_key}': {len(project_images)} Bilder")
# Metadaten für dieses Projekt
group_metadata = self.prepare_image_metadata(project_images[0], default_titel, default_name)
# Backend erwartet year/title/description/name
backend_metadata = {
'year': int(group_metadata.get('jahr', datetime.now().year)),
'title': group_metadata.get('titel', project_key.split('/')[-1]), # Projekt-Name als Titel
'description': group_metadata.get('beschreibung', f"Projekt: {project_key}"),
'name': group_metadata.get('name', '')
}
# Dieses Projekt uploaden
if not dry_run:
try:
files = []
# Alle Dateien für den Upload vorbereiten
for image_path in project_images:
files.append(('images', (
image_path.name,
open(image_path, 'rb'),
mimetypes.guess_type(str(image_path))[0] or 'image/jpeg'
)))
# Ein Upload-Request pro Projekt
payload = {
'metadata': json.dumps(backend_metadata),
'consents': json.dumps(consents_payload)
}
response = self.session.post(
self._api_url('/upload/batch'),
files=files,
data=payload,
headers=self._authorized_headers(),
timeout=120
)
# Files schließen
for _, file_tuple in files:
if hasattr(file_tuple[1], 'close'):
file_tuple[1].close()
if response.status_code == 200:
self.logger.info(f"✅ Projekt '{project_key}': {len(project_images)} Bilder")
total_successful += len(project_images)
else:
self.logger.error(f"❌ Projekt '{project_key}' Fehler: {response.status_code}")
total_failed += len(project_images)
failed_files.extend([str(img) for img in project_images])
except Exception as e:
self.logger.error(f"💥 Projekt '{project_key}' Fehler: {e}")
total_failed += len(project_images)
failed_files.extend([str(img) for img in project_images])
self.logger.info(f"<EFBFBD> Upload abgeschlossen: {len(project_groups)} Gruppen erstellt")
return {
'total': total_images,
'successful': total_successful,
'failed': total_failed,
'failed_files': failed_files,
'project_groups': project_groups
}
def create_readme_files(self, images: List[Path]) -> int:
"""
Erstellt README.md Dateien in jedem Projekt-Verzeichnis (nur bei --dry-run)
Returns:
Anzahl der erstellten README-Dateien
"""
created_count = 0
project_dirs = set()
# Sammle alle Projekt-Verzeichnisse
for image_path in images:
path_info = self.metadata_extractor.parse_structured_path(image_path)
if path_info and all(k in path_info for k in ['jahr', 'name', 'projekt']):
# Projekt-Verzeichnis ist das parent des Bildes
project_dir = image_path.parent
if project_dir not in project_dirs:
project_dirs.add(project_dir)
# Erstelle README.md in jedem Projekt-Verzeichnis
for project_dir in project_dirs:
readme_path = project_dir / 'README.md'
# Prüfe ob README bereits existiert
if readme_path.exists():
self.logger.debug(f"README existiert bereits: {readme_path}")
continue
# Extrahiere Metadaten aus Pfad
path_info = self.metadata_extractor.parse_structured_path(project_dir / 'dummy.jpg')
if not path_info:
continue
# README-Inhalt generieren
readme_content = self._generate_readme_content(path_info)
try:
readme_path.write_text(readme_content, encoding='utf-8')
self.logger.info(f"✅ README erstellt: {readme_path}")
created_count += 1
except Exception as e:
self.logger.error(f"❌ Fehler beim Erstellen von {readme_path}: {e}")
return created_count
def _generate_readme_content(self, path_info: Dict[str, str]) -> str:
"""Generiert README-Inhalt basierend auf Pfad-Informationen"""
jahr = path_info.get('jahr', 'YYYY')
name = path_info.get('name', 'Name')
projekt = path_info.get('projekt', 'Projekt')
# Projekt-Titel formatieren
projekt_titel = projekt.replace('_', ' ').replace('-', ' ').title()
content = f"""# {projekt_titel}
## Projekt-Details
**Jahr:** {jahr}
**Ersteller:** {name}
**Projekt:** {projekt_titel}
## Beschreibung
[TODO: Beschreibung des Projekts hier einfügen]
---
*Diese README wurde automatisch generiert und kann bearbeitet werden.*
"""
return content
def print_upload_summary(self, project_groups: Dict[str, List[Path]]) -> None:
"""Druckt eine Übersicht aller erstellten Slideshows"""
if not project_groups:
self.logger.info("📋 Keine Projekte gefunden")
return
self.logger.info("")
self.logger.info("=" * 80)
self.logger.info(f"📋 SLIDESHOW ÜBERSICHT - {len(project_groups)} Projekte erstellt")
self.logger.info("=" * 80)
# Sortiere Projekte nach Jahr, dann Name, dann Projekt
sorted_projects = sorted(project_groups.items(), key=lambda x: x[0])
current_year = None
current_name = None
for project_key, images in sorted_projects:
parts = project_key.split('/')
if len(parts) == 3:
jahr, name, projekt = parts
else:
jahr, name, projekt = "?", "?", project_key
# Gruppierung nach Jahr
if current_year != jahr:
if current_year is not None:
self.logger.info("")
self.logger.info(f"📅 {jahr}")
self.logger.info("-" * 50)
current_year = jahr
current_name = None
# Gruppierung nach Name (innerhalb des Jahres)
if current_name != name:
if current_name is not None:
self.logger.info("")
self.logger.info(f"👤 {name}")
current_name = name
# Projekt-Details
self.logger.info(f" 📺 {projekt.ljust(30)} - {len(images):3d} Bilder")
self.logger.info("")
self.logger.info("=" * 80)
total_images = sum(len(images) for images in project_groups.values())
self.logger.info(f"📊 GESAMT: {len(project_groups)} Slideshows mit {total_images} Bildern")
self.logger.info("=" * 80)
def test_connection(self) -> bool:
"""Testet Verbindung zum Backend (mit optionaler Auth)"""
try:
response = self.session.get(self._api_url('/groups'), timeout=10)
return response.status_code == 200
except Exception as e:
self.logger.error(f"Verbindungstest fehlgeschlagen: {e}")
return False
def setup_logging(verbose: bool = False):
"""Konfiguriert Logging"""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
def main():
"""Hauptfunktion"""
parser = argparse.ArgumentParser(
description="Batch Image Uploader für Image-Uploader System",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Beispiele:
python batch_uploader.py /home/user/photos --titel "Urlaubsbilder"
python batch_uploader.py ./images --name "Max Mustermann" --no-recursive
python batch_uploader.py /photos --backend http://myserver:5000 --dry-run --verbose
"""
)
parser.add_argument('directory',
help='Verzeichnis mit Bildern zum Upload')
parser.add_argument('--titel',
help='Standard-Titel für alle Bilder')
parser.add_argument('--name',
help='Standard-Name für alle Bilder')
parser.add_argument('--backend',
default=DEFAULT_BACKEND_URL,
help=f'Backend URL (Standard: {DEFAULT_BACKEND_URL})')
parser.add_argument('--user', '--username', dest='username',
help='Admin-Benutzername für Session-Login (erforderlich für Upload)')
parser.add_argument('--password',
help='Admin-Passwort für Session-Login (erforderlich für Upload)')
parser.add_argument('--no-recursive',
action='store_true',
help='Nicht rekursiv in Unterverzeichnisse')
parser.add_argument('--dry-run',
action='store_true',
help='Nur Analyse, kein Upload')
consent_group = parser.add_mutually_exclusive_group()
consent_group.add_argument('--workshop-consent', dest='workshop_consent',
action='store_true', default=True,
help='Zustimmung zur Anzeige in der Werkstatt (Standard)')
consent_group.add_argument('--no-workshop-consent', dest='workshop_consent',
action='store_false',
help='Keine Zustimmung zur Anzeige in der Werkstatt')
parser.add_argument('--social-media-consents',
help='JSON (String oder Datei) mit Social-Media-Consents')
parser.add_argument('--verbose', '-v',
action='store_true',
help='Verbose Output')
args = parser.parse_args()
# Logging Setup
setup_logging(args.verbose)
logger = logging.getLogger(__name__)
try:
try:
social_media_consents = load_social_media_consents(args.social_media_consents)
except ValueError as exc:
logger.error(str(exc))
return 1
# Verzeichnis validieren
directory = Path(args.directory).resolve()
uploader = BatchUploader(args.backend, args.username, args.password)
consents_config = {
'workshopConsent': args.workshop_consent,
'socialMediaConsents': social_media_consents
}
if not args.dry_run and not uploader.has_credentials():
logger.error("Für Uploads werden Admin-Credentials benötigt (--user / --password)")
return 1
# Verbindung testen (nur bei echtem Upload)
if not args.dry_run:
logger.info(f"Teste Verbindung zu {args.backend}...")
if not uploader.test_connection():
logger.error("❌ Backend nicht erreichbar!")
return 1
logger.info("✅ Backend erreichbar")
uploader.ensure_admin_session()
logger.info("✅ Admin-Session aktiv")
else:
logger.info("🔍 Dry-Run Mode - Überspringe Verbindungstest")
# Bilder scannen
logger.info(f"Scanne Verzeichnis: {directory}")
recursive = not args.no_recursive
images = uploader.scan_directory(directory, recursive)
if not images:
logger.warning("Keine unterstützten Bilder gefunden!")
return 0
logger.info(f"📁 {len(images)} Bilder gefunden")
# Beispiel-Metadaten zeigen
if args.verbose and images:
sample_image = images[0]
sample_metadata = uploader.prepare_image_metadata(
sample_image, args.titel, args.name
)
logger.debug(f"Beispiel-Metadaten für {sample_image.name}:")
for key, value in sample_metadata.items():
logger.debug(f" {key}: {value}")
# Upload starten
logger.info(f"🚀 Starte Upload...")
result = uploader.upload_batch(
images,
args.titel,
args.name,
consents_config,
args.dry_run
)
# Ergebnis
if not args.dry_run:
logger.info("📊 Upload abgeschlossen:")
logger.info(f" ✅ Erfolgreich: {result['successful']}")
logger.info(f" ❌ Fehlgeschlagen: {result['failed']}")
if result['failed_files'] and args.verbose:
logger.info("Fehlgeschlagene Dateien:")
for failed_file in result['failed_files']:
logger.info(f" - {failed_file}")
# Upload-Übersicht anzeigen (nach erfolgreichem Upload)
if 'project_groups' in result and result['project_groups']:
uploader.print_upload_summary(result['project_groups'])
return 0 if result['failed'] == 0 else 1
except Exception as e:
logger.error(f"💥 Unerwarteter Fehler: {e}")
if args.verbose:
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())