Project-Image-Uploader/scripts/batch_uploader.py

682 lines
26 KiB
Python
Raw Permalink Blame History

#!/usr/bin/env python3
"""
Batch Image Uploader für Image-Uploader System
=============================================
Automatisches Parsen von Verzeichnissen und Upload von Bildern
mit strukturierten Metadaten an das Image-Uploader Backend.
Features:
- Rekursives Verzeichnis-Scanning nach Bildern
- Metadaten-Extraktion aus Verzeichnis-/Dateinamen
- Batch-Upload an das Backen self.logger.info(f"📊 Upload abgeschlossen: {len(project_groups)} Gruppen erstellt")
return {
'total': total_images,
'successful': total_successful,
'failed': total_failed,
'failed_files': failed_files,
'project_groups': project_groups # Für Übersicht am Ende
}Progress-Tracking und Error-Handling
- EXIF-Daten Unterstützung (optional)
Usage:
python batch_uploader.py /path/to/images --titel "Meine Sammlung"
"""
import os
import sys
import json
import requests
import argparse
from pathlib import Path
from typing import List, Dict, Optional, Tuple
import mimetypes
from PIL import Image, ExifTags
from PIL.ExifTags import TAGS
import re
from datetime import datetime
import logging
# Konfiguration
#DEFAULT_BACKEND_URL = "https://deinprojekt.lan.hobbyhimmel.de/api"
DEFAULT_BACKEND_URL = "http://localhost/api"
SUPPORTED_FORMATS = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff', '.tif'}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
class ImageMetadataExtractor:
"""Extrahiert Metadaten aus Bildern und Verzeichnissen
Erwartet Struktur: Photos/Jahr/Name/Projekt/dateiname.endung
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
def parse_structured_path(self, file_path: Path) -> Dict[str, str]:
"""
Parst strukturierten Pfad: Photos/Jahr/Name/Projekt/dateiname.endung
Returns:
Dict mit 'jahr', 'name', 'projekt', 'dateiname' oder None wenn nicht parsbar
"""
parts = file_path.parts
# Mindestens 4 Verzeichnisse + Datei erforderlich
if len(parts) < 5:
self.logger.debug(f"Pfad zu kurz für Struktur-Parsing: {file_path}")
return {}
try:
# Rückwärts vom Ende her parsen
dateiname = file_path.stem
projekt = parts[-2] # Projekt-Verzeichnis
name = parts[-3] # Name-Verzeichnis
jahr = parts[-4] # Jahr-Verzeichnis
# Jahr validieren (4-stellige Zahl)
if not re.match(r'^(19|20)\d{2}$', jahr):
self.logger.debug(f"Ungültiges Jahr in Pfad: {jahr}")
# Versuche Jahr aus anderen Teilen zu extrahieren
jahr = self.extract_year_from_path(file_path)
return {
'jahr': jahr,
'name': name,
'projekt': projekt,
'dateiname': dateiname
}
except (IndexError, ValueError) as e:
self.logger.debug(f"Struktur-Parsing fehlgeschlagen für {file_path}: {e}")
return {}
def extract_year_from_path(self, file_path: Path) -> Optional[str]:
"""Extrahiert Jahr aus Pfad oder Dateinamen (Fallback)"""
year_pattern = r'\b(19|20)\d{2}\b'
# Zuerst im Verzeichnisnamen
for part in file_path.parts:
match = re.search(year_pattern, part)
if match:
return match.group()
# Dann im Dateinamen
match = re.search(year_pattern, file_path.stem)
if match:
return match.group()
return None
def extract_exif_date(self, image_path: Path) -> Optional[str]:
"""Extrahiert Aufnahmedatum aus EXIF-Daten"""
try:
with Image.open(image_path) as img:
exifdata = img.getexif()
if exifdata:
for tag_id in exifdata:
tag = TAGS.get(tag_id, tag_id)
if tag in ['DateTime', 'DateTimeOriginal', 'DateTimeDigitized']:
date_str = exifdata.get(tag_id)
if date_str:
# Format: "2023:12:25 10:30:00" -> "2023"
return date_str.split(':')[0]
except Exception as e:
self.logger.warning(f"EXIF-Extraktion fehlgeschlagen für {image_path}: {e}")
return None
def read_readme_description(self, directory: Path) -> Optional[str]:
"""
Liest Beschreibung aus README.md im Verzeichnis.
Extrahiert den Text nach '## Beschreibung' bis zur nächsten Überschrift, --- oder 500 Zeichen.
"""
readme_files = ['README.md', 'readme.md', 'Readme.md', 'README.txt', 'readme.txt']
for readme_name in readme_files:
readme_path = directory / readme_name
if readme_path.exists() and readme_path.is_file():
try:
with open(readme_path, 'r', encoding='utf-8') as f:
content = f.read()
# Suche nach '## Beschreibung'
beschr_start = re.search(r'^\s*##\s*Beschreibung\s*$', content, re.MULTILINE)
if beschr_start:
start_idx = beschr_start.end()
rest = content[start_idx:]
# Suche nach nächster Überschrift oder --- nach '## Beschreibung'
next_header = re.search(r'^\s*#+\s+', rest, re.MULTILINE)
next_sep = re.search(r'^\s*-{3,}\s*$', rest, re.MULTILINE)
# Finde das früheste Ende (nächste Überschrift oder ---)
end_idx = None
if next_header and next_sep:
end_idx = min(next_header.start(), next_sep.start())
elif next_header:
end_idx = next_header.start()
elif next_sep:
end_idx = next_sep.start()
if end_idx is not None:
beschreibung = rest[:end_idx]
else:
beschreibung = rest
# Auf max. 500 Zeichen kürzen und Whitespace säubern
beschreibung = beschreibung.strip()
beschreibung = beschreibung[:500]
if beschreibung:
self.logger.debug(f"README Beschreibung gefunden in {directory}: {beschreibung[:50]}...")
return beschreibung
except Exception as e:
self.logger.warning(f"Fehler beim Lesen von {readme_path}: {e}")
continue
return None
def extract_title_from_structured_path(self, path_info: Dict[str, str]) -> str:
"""Extrahiert Titel aus strukturiertem Pfad"""
if 'projekt' in path_info and path_info['projekt']:
# Projekt-Name als Titel verwenden
title = path_info['projekt']
# Unterstriche/Bindestriche durch Leerzeichen ersetzen
title = re.sub(r'[_-]+', ' ', title)
return title.title()
return "Unbenannt"
def extract_description_from_structured_path(self, file_path: Path, path_info: Dict[str, str]) -> str:
"""
Generiert Beschreibung aus strukturiertem Pfad
Priorität:
1. README.md im Projekt-Verzeichnis
2. README.md im Name-Verzeichnis
3. Vollständiger Verzeichnisname als Fallback
"""
# 1. README.md im Projekt-Verzeichnis suchen
projekt_dir = file_path.parent
readme_description = self.read_readme_description(projekt_dir)
if readme_description:
return readme_description
# 2. README.md im Name-Verzeichnis suchen
if len(file_path.parts) >= 3:
name_dir = file_path.parents[1] # Zwei Ebenen nach oben
readme_description = self.read_readme_description(name_dir)
if readme_description:
return readme_description
# 3. Fallback: Vollständiger Verzeichnisname
if path_info:
parts = []
if 'jahr' in path_info and path_info['jahr']:
parts.append(f"Jahr: {path_info['jahr']}")
if 'name' in path_info and path_info['name']:
parts.append(f"Name: {path_info['name']}")
if 'projekt' in path_info and path_info['projekt']:
parts.append(f"Projekt: {path_info['projekt']}")
if parts:
return " | ".join(parts)
# Final Fallback: Pfad-basiert
return f"Aus: {''.join(file_path.parts[-3:-1])}"
class BatchUploader:
"""Haupt-Klasse für Batch-Upload"""
def __init__(self, backend_url: str = DEFAULT_BACKEND_URL, user: str = None, password: str = None):
self.backend_url = backend_url.rstrip('/')
self.metadata_extractor = ImageMetadataExtractor()
self.logger = logging.getLogger(__name__)
# Session für Connection-Reuse
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Batch-Uploader/1.0'
})
self.auth = (user, password) if user and password else None
def scan_directory(self, directory: Path, recursive: bool = True) -> List[Path]:
"""Scannt Verzeichnis nach unterstützten Bildern"""
images = []
if not directory.exists() or not directory.is_dir():
raise ValueError(f"Verzeichnis nicht gefunden: {directory}")
pattern = "**/*" if recursive else "*"
for file_path in directory.glob(pattern):
if (file_path.is_file() and
file_path.suffix.lower() in SUPPORTED_FORMATS and
file_path.stat().st_size <= MAX_FILE_SIZE):
images.append(file_path)
return sorted(images)
def prepare_image_metadata(self, image_path: Path,
default_titel: Optional[str] = None,
default_name: Optional[str] = None) -> Dict:
"""
Bereitet Metadaten für ein Bild vor
Erwartet Struktur: Photos/Jahr/Name/Projekt/dateiname.endung
"""
# Strukturierten Pfad parsen
path_info = self.metadata_extractor.parse_structured_path(image_path)
# Jahr bestimmen (Struktur > EXIF > Pfad > Aktuell)
jahr = None
if path_info and 'jahr' in path_info and path_info['jahr']:
jahr = path_info['jahr']
else:
# Fallback: EXIF oder Pfad-Pattern
jahr = (self.metadata_extractor.extract_exif_date(image_path) or
self.metadata_extractor.extract_year_from_path(image_path) or
str(datetime.now().year))
# Titel bestimmen (Parameter > Struktur)
titel = None
if default_titel:
titel = default_titel
elif path_info:
titel = self.metadata_extractor.extract_title_from_structured_path(path_info)
else:
# Fallback für unstrukturierte Pfade
titel = "Unbenannt"
# Beschreibung generieren (README > Pfad-Info)
beschreibung = self.metadata_extractor.extract_description_from_structured_path(
image_path, path_info
)
# Name bestimmen (Parameter > Struktur > Leer)
name = ""
if default_name:
name = default_name
elif path_info and 'name' in path_info and path_info['name']:
name = path_info['name']
return {
'jahr': jahr,
'titel': titel,
'beschreibung': beschreibung,
'name': name
}
def upload_batch(self, images: List[Path],
default_titel: Optional[str] = None,
default_name: Optional[str] = None,
dry_run: bool = False) -> Dict:
"""
Uploaded Bilder gruppiert nach PROJEKTEN (Jahr/Name/Projekt)
KORREKT: Jedes Projekt wird eine eigene Gruppe!
"""
total_images = len(images)
if not images:
return {'total': 0, 'successful': 0, 'failed': 0, 'failed_files': []}
# 1. Bilder nach Projekten gruppieren
project_groups = {}
for image_path in images:
path_info = self.metadata_extractor.parse_structured_path(image_path)
# Eindeutigen Projekt-Key erstellen
if path_info and all(k in path_info for k in ['jahr', 'name', 'projekt']):
project_key = f"{path_info['jahr']}/{path_info['name']}/{path_info['projekt']}"
else:
# Fallback für unstrukturierte Pfade
project_key = f"unbekannt/{image_path.parent.name}"
if project_key not in project_groups:
project_groups[project_key] = []
project_groups[project_key].append(image_path)
self.logger.info(f"📁 {len(project_groups)} Projekte gefunden mit {total_images} Bildern")
# Dry-Run oder Upload
if dry_run:
self.logger.info("🔍 Dry-Run Mode - Kein Upload")
self.logger.info(f"Würde {len(images)} Bilder uploaden")
# README-Dateien erstellen im Dry-Run Mode
self.logger.info("📄 Erstelle README-Dateien in Projekt-Verzeichnissen...")
readme_count = self.create_readme_files(images)
self.logger.info(f"{readme_count} README-Dateien erstellt")
# 2. Jedes Projekt als eigene Gruppe uploaden
total_successful = 0
total_failed = 0
failed_files = []
for project_key, project_images in project_groups.items():
self.logger.info(f"🚀 Upload Projekt '{project_key}': {len(project_images)} Bilder")
# Metadaten für dieses Projekt
group_metadata = self.prepare_image_metadata(project_images[0], default_titel, default_name)
# Backend erwartet year/title/description/name
backend_metadata = {
'year': int(group_metadata.get('jahr', datetime.now().year)),
'title': group_metadata.get('titel', project_key.split('/')[-1]), # Projekt-Name als Titel
'description': group_metadata.get('beschreibung', f"Projekt: {project_key}"),
'name': group_metadata.get('name', '')
}
# Dieses Projekt uploaden
if not dry_run:
try:
files = []
# Alle Dateien für den Upload vorbereiten
for image_path in project_images:
files.append(('images', (
image_path.name,
open(image_path, 'rb'),
mimetypes.guess_type(str(image_path))[0] or 'image/jpeg'
)))
# Ein Upload-Request pro Projekt
response = self.session.post(
f"{self.backend_url}/upload/batch",
files=files,
data={'metadata': json.dumps(backend_metadata)},
timeout=120,
auth=self.auth
)
# Files schließen
for _, file_tuple in files:
if hasattr(file_tuple[1], 'close'):
file_tuple[1].close()
if response.status_code == 200:
self.logger.info(f"✅ Projekt '{project_key}': {len(project_images)} Bilder")
total_successful += len(project_images)
else:
self.logger.error(f"❌ Projekt '{project_key}' Fehler: {response.status_code}")
total_failed += len(project_images)
failed_files.extend([str(img) for img in project_images])
except Exception as e:
self.logger.error(f"💥 Projekt '{project_key}' Fehler: {e}")
total_failed += len(project_images)
failed_files.extend([str(img) for img in project_images])
self.logger.info(f"<EFBFBD> Upload abgeschlossen: {len(project_groups)} Gruppen erstellt")
return {
'total': total_images,
'successful': total_successful,
'failed': total_failed,
'failed_files': failed_files,
'project_groups': project_groups
}
def create_readme_files(self, images: List[Path]) -> int:
"""
Erstellt README.md Dateien in jedem Projekt-Verzeichnis (nur bei --dry-run)
Returns:
Anzahl der erstellten README-Dateien
"""
created_count = 0
project_dirs = set()
# Sammle alle Projekt-Verzeichnisse
for image_path in images:
path_info = self.metadata_extractor.parse_structured_path(image_path)
if path_info and all(k in path_info for k in ['jahr', 'name', 'projekt']):
# Projekt-Verzeichnis ist das parent des Bildes
project_dir = image_path.parent
if project_dir not in project_dirs:
project_dirs.add(project_dir)
# Erstelle README.md in jedem Projekt-Verzeichnis
for project_dir in project_dirs:
readme_path = project_dir / 'README.md'
# Prüfe ob README bereits existiert
if readme_path.exists():
self.logger.debug(f"README existiert bereits: {readme_path}")
continue
# Extrahiere Metadaten aus Pfad
path_info = self.metadata_extractor.parse_structured_path(project_dir / 'dummy.jpg')
if not path_info:
continue
# README-Inhalt generieren
readme_content = self._generate_readme_content(path_info)
try:
readme_path.write_text(readme_content, encoding='utf-8')
self.logger.info(f"✅ README erstellt: {readme_path}")
created_count += 1
except Exception as e:
self.logger.error(f"❌ Fehler beim Erstellen von {readme_path}: {e}")
return created_count
def _generate_readme_content(self, path_info: Dict[str, str]) -> str:
"""Generiert README-Inhalt basierend auf Pfad-Informationen"""
jahr = path_info.get('jahr', 'YYYY')
name = path_info.get('name', 'Name')
projekt = path_info.get('projekt', 'Projekt')
# Projekt-Titel formatieren
projekt_titel = projekt.replace('_', ' ').replace('-', ' ').title()
content = f"""# {projekt_titel}
## Projekt-Details
**Jahr:** {jahr}
**Ersteller:** {name}
**Projekt:** {projekt_titel}
## Beschreibung
[TODO: Beschreibung des Projekts hier einfügen]
---
*Diese README wurde automatisch generiert und kann bearbeitet werden.*
"""
return content
def print_upload_summary(self, project_groups: Dict[str, List[Path]]) -> None:
"""Druckt eine Übersicht aller erstellten Slideshows"""
if not project_groups:
self.logger.info("📋 Keine Projekte gefunden")
return
self.logger.info("")
self.logger.info("=" * 80)
self.logger.info(f"📋 SLIDESHOW ÜBERSICHT - {len(project_groups)} Projekte erstellt")
self.logger.info("=" * 80)
# Sortiere Projekte nach Jahr, dann Name, dann Projekt
sorted_projects = sorted(project_groups.items(), key=lambda x: x[0])
current_year = None
current_name = None
for project_key, images in sorted_projects:
parts = project_key.split('/')
if len(parts) == 3:
jahr, name, projekt = parts
else:
jahr, name, projekt = "?", "?", project_key
# Gruppierung nach Jahr
if current_year != jahr:
if current_year is not None:
self.logger.info("")
self.logger.info(f"📅 {jahr}")
self.logger.info("-" * 50)
current_year = jahr
current_name = None
# Gruppierung nach Name (innerhalb des Jahres)
if current_name != name:
if current_name is not None:
self.logger.info("")
self.logger.info(f"👤 {name}")
current_name = name
# Projekt-Details
self.logger.info(f" 📺 {projekt.ljust(30)} - {len(images):3d} Bilder")
self.logger.info("")
self.logger.info("=" * 80)
total_images = sum(len(images) for images in project_groups.values())
self.logger.info(f"📊 GESAMT: {len(project_groups)} Slideshows mit {total_images} Bildern")
self.logger.info("=" * 80)
def test_connection(self) -> bool:
"""Testet Verbindung zum Backend (mit optionaler Auth)"""
try:
response = self.session.get(f"{self.backend_url}/groups", timeout=10, auth=self.auth)
return response.status_code == 200
except Exception as e:
self.logger.error(f"Verbindungstest fehlgeschlagen: {e}")
return False
def setup_logging(verbose: bool = False):
"""Konfiguriert Logging"""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
def main():
"""Hauptfunktion"""
parser = argparse.ArgumentParser(
description="Batch Image Uploader für Image-Uploader System",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Beispiele:
python batch_uploader.py /home/user/photos --titel "Urlaubsbilder"
python batch_uploader.py ./images --name "Max Mustermann" --no-recursive
python batch_uploader.py /photos --backend http://myserver:5000 --dry-run --verbose
"""
)
parser.add_argument('directory',
help='Verzeichnis mit Bildern zum Upload')
parser.add_argument('--titel',
help='Standard-Titel für alle Bilder')
parser.add_argument('--name',
help='Standard-Name für alle Bilder')
parser.add_argument('--backend',
default=DEFAULT_BACKEND_URL,
help=f'Backend URL (Standard: {DEFAULT_BACKEND_URL})')
parser.add_argument('--user',
help='HTTP Basic Auth Benutzername (optional)')
parser.add_argument('--password',
help='HTTP Basic Auth Passwort (optional)')
parser.add_argument('--no-recursive',
action='store_true',
help='Nicht rekursiv in Unterverzeichnisse')
parser.add_argument('--dry-run',
action='store_true',
help='Nur Analyse, kein Upload')
parser.add_argument('--verbose', '-v',
action='store_true',
help='Verbose Output')
args = parser.parse_args()
# Logging Setup
setup_logging(args.verbose)
logger = logging.getLogger(__name__)
try:
# Verzeichnis validieren
directory = Path(args.directory).resolve()
uploader = BatchUploader(args.backend, args.user, args.password)
# Verbindung testen (nur bei echtem Upload)
if not args.dry_run:
logger.info(f"Teste Verbindung zu {args.backend}...")
if not uploader.test_connection():
logger.error("❌ Backend nicht erreichbar!")
return 1
logger.info("✅ Backend erreichbar")
else:
logger.info("🔍 Dry-Run Mode - Überspringe Verbindungstest")
# Bilder scannen
logger.info(f"Scanne Verzeichnis: {directory}")
recursive = not args.no_recursive
images = uploader.scan_directory(directory, recursive)
if not images:
logger.warning("Keine unterstützten Bilder gefunden!")
return 0
logger.info(f"📁 {len(images)} Bilder gefunden")
# Beispiel-Metadaten zeigen
if args.verbose and images:
sample_image = images[0]
sample_metadata = uploader.prepare_image_metadata(
sample_image, args.titel, args.name
)
logger.debug(f"Beispiel-Metadaten für {sample_image.name}:")
for key, value in sample_metadata.items():
logger.debug(f" {key}: {value}")
# Upload starten
logger.info(f"🚀 Starte Upload...")
result = uploader.upload_batch(
images,
args.titel,
args.name,
args.dry_run
)
# Ergebnis
if not args.dry_run:
logger.info("📊 Upload abgeschlossen:")
logger.info(f" ✅ Erfolgreich: {result['successful']}")
logger.info(f" ❌ Fehlgeschlagen: {result['failed']}")
if result['failed_files'] and args.verbose:
logger.info("Fehlgeschlagene Dateien:")
for failed_file in result['failed_files']:
logger.info(f" - {failed_file}")
# Upload-Übersicht anzeigen (nach erfolgreichem Upload)
if 'project_groups' in result and result['project_groups']:
uploader.print_upload_summary(result['project_groups'])
return 0 if result['failed'] == 0 else 1
except Exception as e:
logger.error(f"💥 Unerwarteter Fehler: {e}")
if args.verbose:
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())