neue-maschine-mail/verteiler.py

420 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import click
import time
import logging
import xmlrpc.client
import yaml
import os
import smtplib
import ssl
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from datetime import datetime
from dateutil.relativedelta import relativedelta
import csv
from collections import defaultdict
# Lege das Output-Verzeichnis an
os.makedirs('logs', exist_ok=True)
# Erstelle Logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # Ermöglicht alle Levels
# Format definieren
log_format = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
# Handler für Konsole
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(log_format)
# Handler für Datei
log_filename = datetime.now().strftime('logs/%Y-%m-%d-%H-%M-%S_verteiler.log')
file_handler = logging.FileHandler(log_filename, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(log_format)
# Handler zum Logger hinzufügen
logger.addHandler(console_handler)
logger.addHandler(file_handler)
BATCH_SIZE = 20
BATCH_WAIT_SECONDS = 30 * 60 # 30 Minuten
PRODUCT_NAMES = ["Formatkreissäge (1 Minute)", "Hobel (1 Minute)"]
TRAINING_PRODUCTS = ["Einweisung FKS", "Einweisung Hobel"]
EMAIL_SUBJECT = "Neue Maschinen in der Werkstatt - Einweisungsvideos erforderlich"
EMAIL_BODY_TEMPLATE = """
<html>
<body style="font-family: sans-serif; font-size: 14px; line-height: 1.6;">
<p>Hallo {name},</p>
<p>
wir freuen uns, dass du in den letzten zwei Jahren unsere Formatkreissäge oder unseren Abricht-/Dickenhobel genutzt hast. Wir haben tolle Nachrichten für dich!
</p>
<p>
Vielleicht hast du es schon mitbekommen: Vor knapp zwei Monaten haben wir die FormatkreissägeAbricht-/Dickenhobel-Kombi durch zwei neue Maschinen ersetzt.
<strong>Voraussichtlich ab dem 15.06.</strong> stehen die neue Formatkreissäge und der neue Abricht-/Dickenhobel wieder zur Nutzung bereit.
Da du bereits für mindestens eine der beiden Maschinen eine Einweisung erhalten hast, ist <strong>für diese keine erneute Einweisung erforderlich</strong>.
</p>
<p>
Wir bitten dich jedoch, dir vor der nächsten Nutzung die jeweiligen Einweisungsvideos auf YouTube anzuschauen.
Diese ersetzen keine vollständige Einweisung, sind aber für uns eine wichtige Sicherheitsmaßnahme bei der Umstellung auf die neuen Maschinen:
</p>
<ul>
<li><a href="https://www.youtube.com/playlist?list=PL3oeDnj7aMvkyCnmMH6uDlFCV7unT0r3P" target="_blank">Videoserie zur Formatkreissäge</a></li>
<li><a href="https://www.youtube.com/playlist?list=PL3oeDnj7aMvlG6T1xC38x1lTsXap154zU" target="_blank">Videoserie zum Abricht-/Dickenhobel</a></li>
</ul>
<p>
Vor der Nutzung werden wir dich fragen, ob du die Videos angesehen hast. Auf dieser Vertrauensbasis wird deine Freigabe im System aktiviert.
Hattest du bereits eine Einweisung für die neuen Maschinen, entfällt diese Abfrage für dich und du musst dir die Videos auch nicht anschauen.
Es schadet aber auch nicht :).
</p>
<p>
Über unsere Webseite informieren wir dich durch einen roten Hinweisbanner, ab wann die Maschinen wieder nutzbar sind.
</p>
<p>
Bitte beachte, dass die Maschinen nur von Personen genutzt werden dürfen, die eine Einweisung erhalten haben.
Solltest du für eine der beiden Maschinen noch keine Einweisung erhalten haben, melde dich gerne bei uns, um einen Termin zu vereinbaren.
</p>
<hr>
<p>
Diese E-Mail ist eine einmalige Information aus gegebenem Anlass.
Wir versenden in der Regel keinen Newsletter oder Werbung. Vielen Dank für dein Verständnis.
</p>
<p>
Mit werkstattfreundlichen Grüßen<br>
Dein Hobbyhimmel-Team
</p>
<hr>
<p style="font-size: 12px; color: gray;">
Hinweis gemäß Art. 6 Abs. 1 lit. f DSGVO:<br>
Diese Information erhältst du, weil du in den letzten zwei Jahren entsprechende Maschinen in unserer Werkstatt genutzt hast.
Die Kontaktaufnahme erfolgt im Rahmen unseres berechtigten Interesses, dich über sicherheitsrelevante Änderungen zu informieren.
</p>
</body>
</html>
"""
def test_odoo_api_connection(odoo_api: dict):
"""
Testet die Verbindung zur Odoo API und authentifiziert den Benutzer.
Args:
odoo_api (dict): Ein Dictionary mit den Odoo API-Zugangsdaten.
Returns:
tuple: Ein Tuple mit URL, Datenbankname, Benutzer-ID und dem Models-Proxy.
"""
try:
url = odoo_api['odoo-api']['url']
db = odoo_api['odoo-api']['db']
username = odoo_api['odoo-api']['username']
password = odoo_api['odoo-api']['password']
common = xmlrpc.client.ServerProxy(f'{url}/xmlrpc/2/common')
uid = common.authenticate(db, username, password, {})
if not uid:
logger.error("❌ Fehler: Authentifizierung fehlgeschlagen. Bitte überprüfe die Zugangsdaten in secrets.yml.")
return None, None, None, None
models = xmlrpc.client.ServerProxy(f'{url}/xmlrpc/2/object')
logger.info("✅ Verbindung zur Odoo-API erfolgreich hergestellt.")
return url, db, uid, models
except Exception as e:
logger.error(f"❌ Fehler bei der Verbindung zur Odoo-API: {e}")
return None, None, None, None
def load_odoo_api_credentials() -> dict:
"""
Lädt die Odoo API-Zugangsdaten aus der Datei 'secrets.yml'.
Returns:
dict: Ein Dictionary mit den Odoo API-Zugangsdaten.
"""
script_dir = os.path.dirname(os.path.realpath(__file__))
filepath = os.path.join(script_dir, 'secrets.yml')
try:
with open(filepath, 'r') as file:
return yaml.safe_load(file)
except FileNotFoundError:
logger.error(f"Datei {filepath} nicht gefunden.")
return None
def send_email(secrets, email, subject, body):
smtp_server = secrets['mail']['smtp_server']
smtp_port = secrets['mail']['smtp_port']
email_sender = secrets['mail']['email_sender']
email_password = secrets['mail']['email_password']
email_receiver = email
msg = MIMEMultipart()
msg['From'] = email_sender
msg['To'] = email_receiver
msg['Subject'] = subject
msg.attach(MIMEText(body, 'html'))
signature = """
<p><strong><span style="font-size: 14px;"><span style="color: #99cc00;">HOBBY</span>HIMMEL</span> - </strong>Die Offene Werkstatt<br />Siemensstrasse 140<br />70469 Stuttgart - Feuerbach<br /><br />0172 / 77 88&nbsp;0 44<br /><a href="mailto:info@hobbyhimmel.de" target="_blank" rel="nofollow noopener">info@hobbyhimmel.de</a><br /><br /><a href="http://www.hobbyhimmel.de/" target="_blank" rel="nofollow noopener">hobbyhimmel.de</a>&nbsp;|&nbsp;<a href="https://facebook.com/hobbyhimmel.stuttgart" target="_blank" rel="nofollow noopener">facebook.com/hobbyhimmel.stuttgart</a>&nbsp;|&nbsp;<a href="https://www.instagram.com/explore/locations/1024077988/hobbyhimmel-die-offene-werkstatt/" target="_blank" rel="nofollow noopener">instagram.de/hobbyhimmel</a><br />VVOW e. V. - Verein zur Verbreitung Offener Werkst&auml;tten |&nbsp;<a href="http://www.vvow.de/" target="_blank" rel="nofollow noopener">vvow.de</a></p>
"""
msg.attach(MIMEText(signature, 'html'))
try:
context = ssl.create_default_context()
with smtplib.SMTP_SSL(smtp_server, smtp_port, context=context) as server:
server.login(email_sender, email_password)
server.sendmail(email_sender, email_receiver, msg.as_string())
logger.info(f"📧 E-Mail gesendet an: {email_receiver}")
except Exception as e:
logger.error(f"❌ Fehler beim Senden der E-Mail an {email_receiver}: {e}")
def get_training_product_sales_count(models, db, uid, password, years):
since = datetime.now() - relativedelta(years=years)
since_str = since.strftime('%Y-%m-%d %H:%M:%S')
domain = [('name', 'in', TRAINING_PRODUCTS)]
templates = models.execute_kw(db, uid, password, 'product.template', 'search_read', [domain], {'fields': ['id', 'name']})
template_map = {t['id']: t['name'] for t in templates}
template_ids = list(template_map.keys())
product_ids = models.execute_kw(db, uid, password, 'product.product', 'search', [[('product_tmpl_id', 'in', template_ids)]])
if not product_ids:
logger.warning("⚠️ Keine Einweisungsprodukte gefunden.")
return
domain = [
('product_id', 'in', product_ids),
('create_date', '>=', since_str),
]
lines = models.execute_kw(db, uid, password, 'pos.order.line', 'search_read', [domain], {'fields': ['product_id']})
counter = defaultdict(int)
product_names = models.execute_kw(db, uid, password, 'product.product', 'read', [product_ids], {'fields': ['id', 'name']})
name_map = {p['id']: p['name'] for p in product_names}
for line in lines:
product_id = line['product_id'][0]
name = name_map.get(product_id, 'Unbekannt')
counter[name] += 1
os.makedirs('output', exist_ok=True)
with open('output/einweisungen_verkauft.csv', 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Produkt', 'Anzahl'])
for name, count in counter.items():
writer.writerow([name, count])
logger.info(f"🛠️ Produkt: {name}, Anzahl: {count}")
logger.info(f"📈 Verkaufszahlen gespeichert unter: output/einweisungen_verkauft.csv")
def get_relevant_product_ids(models, db, uid, password):
domain = [('name', 'in', PRODUCT_NAMES)]
product_templates = models.execute_kw(db, uid, password, 'product.template', 'search_read', [domain], {'fields': ['id']})
template_ids = [pt['id'] for pt in product_templates]
if not template_ids:
logger.warning("⚠️ Keine Produkte mit den gewünschten Namen gefunden.")
return []
product_ids = models.execute_kw(db, uid, password, 'product.product', 'search', [[('product_tmpl_id', 'in', template_ids)]])
logger.info(f"🔍 {len(product_ids)} Produktvarianten zu {PRODUCT_NAMES} gefunden.")
return product_ids
def get_partners_from_pos_orders(models, db, uid, password, product_ids, years=2):
since = datetime.now() - relativedelta(years=years)
since_str = since.strftime('%Y-%m-%d %H:%M:%S')
order_line_domain = [
('product_id', 'in', product_ids),
('create_date', '>=', since_str),
]
order_lines = models.execute_kw(db, uid, password, 'pos.order.line', 'search_read', [order_line_domain], {'fields': ['order_id'], 'limit': 100000})
order_ids = list({line['order_id'][0] for line in order_lines if line['order_id']})
logger.info(f"🧾 {len(order_ids)} POS-Bestellungen mit relevanten Produkten seit {since_str} gefunden.")
if not order_ids:
return []
orders = models.execute_kw(db, uid, password, 'pos.order', 'read', [order_ids], {'fields': ['partner_id']})
partner_ids = list({order['partner_id'][0] for order in orders if order['partner_id']})
logger.info(f"👥 {len(partner_ids)} eindeutige Partner identifiziert.")
return partner_ids
def get_partner_emails(models, db, uid, password, partner_ids):
partners = models.execute_kw(db, uid, password, 'res.partner', 'read', [partner_ids], {'fields': ['name', 'email']})
return [p for p in partners if p.get('email')]
def log_detailed_usage(models, db, uid, password, product_ids, years=2):
since = datetime.now() - relativedelta(years=years)
since_str = since.strftime('%Y-%m-%d %H:%M:%S')
domain = [
('product_id', 'in', product_ids),
('create_date', '>=', since_str),
]
order_lines = models.execute_kw(db, uid, password, 'pos.order.line', 'search_read', [domain], {'fields': ['order_id', 'product_id', 'create_date']})
order_ids = list({line['order_id'][0] for line in order_lines if line['order_id']})
orders = models.execute_kw(db, uid, password, 'pos.order', 'read', [order_ids], {'fields': ['id', 'partner_id']})
order_partner_map = {o['id']: o['partner_id'][0] if o['partner_id'] else None for o in orders}
product_map = models.execute_kw(db, uid, password, 'product.product', 'read', [product_ids], {'fields': ['id', 'name']})
product_names = {p['id']: p['name'] for p in product_map}
partner_ids = list(set(order_partner_map.values()) - {None})
partners = models.execute_kw(db, uid, password, 'res.partner', 'read', [partner_ids], {'fields': ['id', 'name']})
partner_names = {p['id']: p['name'] for p in partners}
usage_summary = defaultdict(list)
for line in order_lines:
order_field = line.get('order_id')
if not order_field or not isinstance(order_field, (list, tuple)) or not order_field[0]:
continue # ⛔ Überspringen, wenn kein valides order_id-Feld vorhanden
order_id = order_field[0]
partner_id = order_partner_map.get(order_id)
if not partner_id:
continue
partner_name = partner_names.get(partner_id, 'Unbekannt')
product_name = product_names.get(line['product_id'][0], 'Unbekannt')
date = line['create_date']
usage_summary[partner_name].append((product_name, date))
os.makedirs('output', exist_ok=True)
with open('output/nutzer_maschinennutzung.csv', 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Partner', 'Produkt', 'Datum'])
for partner, entries in usage_summary.items():
for product, date in entries:
writer.writerow([partner, product, date])
logger.info(f"📝 Nutzungs-Log gespeichert unter: output/nutzer_maschinennutzung.csv")
return usage_summary
def get_all_employees(models, db, uid, password):
employees = models.execute_kw(db, uid, password, 'hr.employee', 'search_read', [[('user_id', '!=', False)]], {'fields': ['id', 'name', 'user_id']})
employee_partner_ids = set()
os.makedirs('output', exist_ok=True)
with open('output/alle_mitarbeiter.csv', 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Mitarbeiter', 'Partner-ID'])
for emp in employees:
user_id = emp['user_id'][0] if emp['user_id'] else None
if not user_id:
continue
user_data = models.execute_kw(db, uid, password, 'res.users', 'read', [user_id], {'fields': ['partner_id']})
if user_data and 'partner_id' in user_data[0]:
partner_id = user_data[0]['partner_id'][0]
employee_partner_ids.add(partner_id)
writer.writerow([emp['name'], partner_id])
logger.info(f"🧑‍🤝‍🧑 Mitarbeiterliste gespeichert unter: output/alle_mitarbeiter.csv")
return employee_partner_ids
@click.command()
@click.option('--years', default=2, help='Anzahl der Jahre für die Nutzungsauswertung (Standard: 2 Jahre).')
@click.option('--send', is_flag=True, help='Wenn gesetzt, wird sofort eine E-Mail versendet.')
@click.option('--group', type=click.Choice(['testmail', 'thekenheld', 'nutzer']), default='testmail', show_default=True, help='Gruppe für E-Mail-Versand: testmail, thekenheld oder nutzer.')
def main(years, send, group):
secrets = load_odoo_api_credentials()
if not secrets:
return
url, db, uid, models = test_odoo_api_connection(secrets)
if not uid:
return
product_ids = get_relevant_product_ids(models, db, uid, secrets['odoo-api']['password'])
if not product_ids:
return
_ = log_detailed_usage(models, db, uid, secrets['odoo-api']['password'], product_ids, years)
_ = get_training_product_sales_count(models, db, uid, secrets['odoo-api']['password'], years)
partner_ids = get_partners_from_pos_orders(models, db, uid, secrets['odoo-api']['password'], product_ids, years)
if not partner_ids:
return
partners = get_partner_emails(models, db, uid, secrets['odoo-api']['password'], partner_ids)
employee_partner_ids = get_all_employees(models, db, uid, secrets['odoo-api']['password'])
count = 0
count_hero = 0
for i, partner in enumerate(partners):
is_employee = partner['id'] in employee_partner_ids
if group == 'thekenheld':
name = partner['name']
email = partner['email']
message = EMAIL_BODY_TEMPLATE.format(name=name)
if is_employee:
count_hero += 1
if send:
send_email(secrets, email, subject=EMAIL_SUBJECT, body=message)
logger.info(f"📧 (Thekenheld) E-Mail an {email} gesendet.")
else:
logger.info(f"📧 (Thekenheld) E-Mail an {email} würde gesendet werden.")
elif group == 'nutzer':
name = partner['name']
email = partner['email']
message = EMAIL_BODY_TEMPLATE.format(name=name)
if not is_employee:
count += 1
if send:
send_email(secrets, email, subject=EMAIL_SUBJECT, body=message)
logger.info(f"📧 (Nutzer) E-Mail an {email} gesendet.")
else:
logger.info(f"📧 (Nutzer) E-Mail an {email} würde gesendet werden.")
elif group == 'testmail':
name = 'Matthias Lotz'
email = secrets['mail']['email_receiver']
message = EMAIL_BODY_TEMPLATE.format(name=name)
send_email(secrets, email, subject=EMAIL_SUBJECT, body=message)
logger.info(f"📧 (Testmail) Test-E-Mail an {email} gesendet.")
break
else: # debug
logger.debug(f"Hier solltest du nicht landen. Gruppe: {group}")
logger.info(f"📋 Email an Thekenhelden: {count_hero}")
logger.info(f"📊 Email an Nutzer: {count} Partner")
logger.info("🏁 Fertig.")
if __name__ == '__main__':
main()