Files
AutoMood/app.py
jerem 018add739a Infos du groupe dans les réglages, injectées au prompt IA
Nouveaux champs (nom, style, description, lien) en réglages, transmis au
prompt système de génération de messages dans les deux modes (générer et
peaufiner). La consigne de format reste en dernière position, non éditable.
Bloc omis si aucun champ rempli : prompt identique à l'ancien.
2026-06-13 23:33:30 +02:00

490 lines
18 KiB
Python

"""Serveur local AutoMood : interface web + API + stockage CSV."""
import csv
import json
import os
import threading
import webbrowser
from datetime import date, datetime
from pathlib import Path
from flask import Flask, Response, jsonify, request, send_file, stream_with_context
import excel
import extractor
import ia
import scraper
import trajet
DOSSIER = Path(__file__).parent
CSV_PATH = DOSSIER / "prospects.csv"
CONFIG_PATH = DOSSIER / "config.json"
BACKUP_DIR = DOSSIER / "backups"
JOURNAL_PATH = DOSSIER / "scrape.log"
MAX_BACKUPS = 30
COLONNES = [
"Nom du prospect", "Statut", "Département", "Ville", "Code Postal", "Adresse",
"Date d'ajout", "Date de contact", "Nom de contact",
"Téléphone", "Email", "Infos du lieu", "Type", "Lien Facebook", "Notes",
"Trajet distance km", "Trajet durée min", "Trajet km péage",
"Trajet adresse départ", "Trajet adresse arrivée",
]
STATUT_DEFAUT = "À contacter"
CONFIG_DEFAUT = {
"adresse_depart": "",
"conso_l_100km": 6.5,
"prix_carburant": 1.90,
"cout_peage_km": 0.10,
"delai_relance_jours": 7,
"modele_message": (
"Bonjour,\n\n"
"Je me permets de vous contacter au sujet de « {nom} »{ville}. "
"Nous organisons des concerts et serions ravis d'étudier la possibilité "
"d'un événement musical dans votre établissement.\n\n"
"Seriez-vous disponible pour en échanger ?\n\n"
"Bien cordialement,"
),
"ia_modele": ia.MODELE_DEFAUT,
"groupe_nom": "",
"groupe_style": "",
"groupe_description": "",
"groupe_lien": "",
}
app = Flask(__name__, static_folder="static")
verrou_scrape = threading.Lock()
verrou_csv = threading.Lock()
# Sérialise les calculs de trajet (un appel Nominatim/OSRM à la fois, ~1 req/s).
verrou_trajet = threading.Lock()
def lire_prospects():
if not CSV_PATH.exists():
ecrire_prospects([])
return []
with open(CSV_PATH, encoding="utf-8-sig", newline="") as f:
lecteur = csv.DictReader(f, delimiter=";")
return [{col: (ligne.get(col) or "") for col in COLONNES} for ligne in lecteur]
def _sauvegarder_csv():
"""Copie le CSV actuel dans backups/ avant qu'il ne soit écrasé.
Ne garde que les MAX_BACKUPS plus récents et saute la copie si rien n'a
changé depuis la dernière sauvegarde (évite les doublons inutiles).
"""
if not CSV_PATH.exists():
return
try:
contenu = CSV_PATH.read_bytes()
BACKUP_DIR.mkdir(exist_ok=True)
sauvegardes = sorted(BACKUP_DIR.glob("prospects-*.csv"))
if sauvegardes and sauvegardes[-1].read_bytes() == contenu:
return # identique à la dernière sauvegarde
horodatage = datetime.now().strftime("%Y%m%d-%H%M%S-%f")
(BACKUP_DIR / f"prospects-{horodatage}.csv").write_bytes(contenu)
for vieux in sauvegardes[:-(MAX_BACKUPS - 1)]:
vieux.unlink(missing_ok=True)
except Exception:
pass # une sauvegarde ratée ne doit jamais bloquer l'écriture
def ecrire_prospects(lignes):
_sauvegarder_csv()
tmp = CSV_PATH.with_suffix(".csv.tmp")
with open(tmp, "w", encoding="utf-8-sig", newline="") as f:
ecrivain = csv.DictWriter(f, fieldnames=COLONNES, delimiter=";", extrasaction="ignore")
ecrivain.writeheader()
ecrivain.writerows(lignes)
os.replace(tmp, CSV_PATH)
def journaliser(url, statut, message=""):
"""Ajoute une ligne au journal de scraping (date, statut, url, message)."""
horodatage = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
ligne = "\t".join((horodatage, statut, url or "", message or "")).rstrip() + "\n"
try:
with open(JOURNAL_PATH, "a", encoding="utf-8") as f:
f.write(ligne)
except Exception:
pass
def lire_config():
config = dict(CONFIG_DEFAUT)
if CONFIG_PATH.exists():
try:
with open(CONFIG_PATH, encoding="utf-8") as f:
config.update({k: v for k, v in json.load(f).items() if k in CONFIG_DEFAUT})
except Exception:
pass # config illisible : on retombe sur les valeurs par défaut
return config
def ecrire_config(config):
tmp = CONFIG_PATH.with_suffix(".json.tmp")
with open(tmp, "w", encoding="utf-8", newline="") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
os.replace(tmp, CONFIG_PATH)
def _adresse_arrivee(p):
"""Reconstruit l'adresse d'arrivée d'un prospect (Adresse, Code Postal, Ville)."""
return ", ".join(
x.strip() for x in (p.get("Adresse"), p.get("Code Postal"), p.get("Ville")) if x and x.strip())
def _tarifs(config):
"""(conso L/100km, prix €/L, péage €/km) depuis la config, avec replis."""
return (
float(config.get("conso_l_100km") or 0) or CONFIG_DEFAUT["conso_l_100km"],
float(config.get("prix_carburant") or 0) or CONFIG_DEFAUT["prix_carburant"],
float(config.get("cout_peage_km") or 0), # 0 = pas de péage compté
)
def _ecrire_colonnes_trajet(ligne, depart, arrivee, res):
"""Mémorise dans une ligne les données géo + adresses utilisées pour le calcul."""
ligne["Trajet distance km"] = str(res["distance_km"])
ligne["Trajet durée min"] = str(res["duree_min"])
ligne["Trajet km péage"] = str(res["km_peage"])
ligne["Trajet adresse départ"] = depart
ligne["Trajet adresse arrivée"] = arrivee
def _auto_trajet(ligne, config):
"""Calcule et mémorise le trajet d'un prospect fraîchement ajouté (best-effort).
Conçu pour tourner dans un thread daemon : un échec réseau ne doit jamais
affecter le scrape. `verrou_trajet` sérialise les appels Nominatim/OSRM.
"""
try:
depart = (config.get("adresse_depart") or "").strip()
arrivee = _adresse_arrivee(ligne)
lien = (ligne.get("Lien Facebook") or "").strip()
if not depart or not arrivee:
return
with verrou_trajet:
res = trajet.calculer(depart, arrivee, *_tarifs(config))
with verrou_csv:
lignes = lire_prospects()
cible = next(
(l for l in lignes if (l.get("Lien Facebook") or "").strip() == lien), None
) if lien else None
if cible is not None:
_ecrire_colonnes_trajet(cible, depart, arrivee, res)
ecrire_prospects(lignes)
except Exception:
pass # best-effort : le trajet sera calculé au clic si besoin
@app.get("/")
def accueil():
return app.send_static_file("index.html")
@app.post("/api/login")
def api_login():
if not verrou_scrape.acquire(blocking=False):
return jsonify({"error": "occupe", "message": "Une autre opération est en cours, patientez."}), 429
try:
if scraper.connexion():
return jsonify({"ok": True, "message": "Connexion Facebook enregistrée."})
return jsonify({
"error": "login_timeout",
"message": "Connexion non détectée dans le temps imparti. Réessayez.",
}), 408
except Exception as e:
return jsonify({"error": "erreur", "message": f"Échec de la connexion : {e}"}), 500
finally:
verrou_scrape.release()
@app.post("/api/scrape")
def api_scrape():
url = (request.get_json(silent=True) or {}).get("url", "").strip()
if not url:
return jsonify({"error": "url_manquante", "message": "Collez un lien Facebook."}), 400
if not verrou_scrape.acquire(blocking=False):
return jsonify({"error": "occupe", "message": "Une analyse est déjà en cours, patientez."}), 429
try:
resultat = scraper.scrape(url)
champs = extractor.extraire(resultat["titre"], resultat["texte"], resultat["url"])
trouves = [c for c in ("Nom du prospect", "Téléphone", "Email", "Ville", "Adresse") if champs.get(c)]
journaliser(url, "ok" if champs.get("Nom du prospect") else "vide",
("trouvé : " + ", ".join(trouves)) if trouves else "aucun champ extrait")
return jsonify(champs)
except scraper.ErreurScrape as e:
journaliser(url, e.code, str(e))
statuts = {"login_required": 409, "page_introuvable": 404, "url_invalide": 400, "redirection": 422}
return jsonify({"error": e.code, "message": str(e)}), statuts.get(e.code, 500)
except Exception as e:
journaliser(url, "erreur", str(e))
return jsonify({"error": "erreur", "message": f"Échec de l'analyse : {e}"}), 500
finally:
verrou_scrape.release()
@app.get("/api/prospects")
def api_lister():
lignes = lire_prospects()
return jsonify([{"index": i, **ligne} for i, ligne in enumerate(lignes)])
@app.post("/api/prospects")
def api_ajouter():
donnees = request.get_json(silent=True) or {}
ligne = {col: str(donnees.get(col, "")).strip() for col in COLONNES}
if not ligne["Date d'ajout"]:
ligne["Date d'ajout"] = date.today().strftime("%d/%m/%Y")
if not ligne["Statut"]:
ligne["Statut"] = STATUT_DEFAUT
with verrou_csv:
lignes = lire_prospects()
lignes.append(ligne)
ecrire_prospects(lignes)
threading.Thread(target=_auto_trajet, args=(ligne, lire_config()), daemon=True).start()
return jsonify({"ok": True, "index": len(lignes) - 1})
@app.put("/api/prospects/<int:idx>")
def api_modifier(idx):
donnees = request.get_json(silent=True) or {}
with verrou_csv:
lignes = lire_prospects()
if not 0 <= idx < len(lignes):
return jsonify({"error": "introuvable", "message": "Ligne inexistante."}), 404
for col in COLONNES:
if col in donnees:
lignes[idx][col] = str(donnees[col]).strip()
ecrire_prospects(lignes)
return jsonify({"ok": True})
@app.delete("/api/prospects/<int:idx>")
def api_supprimer(idx):
with verrou_csv:
lignes = lire_prospects()
if not 0 <= idx < len(lignes):
return jsonify({"error": "introuvable", "message": "Ligne inexistante."}), 404
lignes.pop(idx)
ecrire_prospects(lignes)
return jsonify({"ok": True})
@app.get("/api/export")
def api_export():
lire_prospects() # crée le fichier si absent
return send_file(CSV_PATH, as_attachment=True, download_name="prospects.csv")
@app.get("/api/export.xlsx")
def api_export_xlsx():
contenu = excel.construire_xlsx(COLONNES, lire_prospects())
return Response(
contenu,
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": "attachment; filename=prospects.xlsx"},
)
@app.get("/api/logs")
def api_logs():
if not JOURNAL_PATH.exists():
return jsonify([])
with open(JOURNAL_PATH, encoding="utf-8") as f:
lignes = f.readlines()[-100:]
entrees = []
for ligne in reversed(lignes): # le plus récent en premier
parts = ligne.rstrip("\n").split("\t")
if len(parts) >= 3:
entrees.append({
"date": parts[0], "statut": parts[1], "url": parts[2],
"message": parts[3] if len(parts) > 3 else "",
})
return jsonify(entrees)
@app.delete("/api/logs")
def api_logs_vider():
try:
JOURNAL_PATH.unlink(missing_ok=True)
except Exception:
pass
return jsonify({"ok": True})
def _ajouter_resultat_lot(resultat):
"""Transforme un résultat de scrape_lot en prospect ajouté au CSV (avec dédoublonnage).
Retourne le compte-rendu (dict) à renvoyer au navigateur pour la ligne traitée.
"""
url = resultat["url"]
if not resultat["ok"]:
journaliser(url, resultat.get("code", "erreur"), resultat.get("message", "Échec."))
return {"url": url, "statut": "erreur", "message": resultat.get("message", "Échec.")}
champs = extractor.extraire(
resultat["resultat"]["titre"], resultat["resultat"]["texte"], resultat["resultat"]["url"])
with verrou_csv:
lignes = lire_prospects()
lien = (champs.get("Lien Facebook") or "").strip()
if lien and any((l.get("Lien Facebook") or "").strip() == lien for l in lignes):
journaliser(url, "doublon", champs["Nom du prospect"])
return {"url": url, "statut": "doublon", "nom": champs["Nom du prospect"]}
champs["Statut"] = STATUT_DEFAUT
ligne = {col: champs.get(col, "") for col in COLONNES}
lignes.append(ligne)
ecrire_prospects(lignes)
journaliser(url, "ajoute", champs["Nom du prospect"])
threading.Thread(target=_auto_trajet, args=(ligne, lire_config()), daemon=True).start()
return {"url": url, "statut": "ajoute", "nom": champs["Nom du prospect"]}
@app.post("/api/scrape-lot")
def api_scrape_lot():
donnees = request.get_json(silent=True) or {}
urls = [u.strip() for u in (donnees.get("urls") or []) if u and u.strip()]
if not urls:
return jsonify({"error": "vide", "message": "Collez au moins un lien Facebook."}), 400
if not verrou_scrape.acquire(blocking=False):
return jsonify({"error": "occupe", "message": "Une analyse est déjà en cours, patientez."}), 429
def generer():
# Flux NDJSON : une ligne JSON par lien, envoyée dès qu'il est traité.
try:
for resultat in scraper.scrape_lot(urls):
yield json.dumps(_ajouter_resultat_lot(resultat), ensure_ascii=False) + "\n"
except Exception as e:
yield json.dumps({"statut": "erreur", "message": f"Échec global : {e}"}, ensure_ascii=False) + "\n"
finally:
verrou_scrape.release()
return Response(stream_with_context(generer()), mimetype="application/x-ndjson")
@app.get("/api/config")
def api_config_lire():
return jsonify(lire_config())
@app.put("/api/config")
def api_config_ecrire():
donnees = request.get_json(silent=True) or {}
config = lire_config()
for cle in ("adresse_depart", "modele_message", "ia_modele",
"groupe_nom", "groupe_style", "groupe_description", "groupe_lien"):
if cle in donnees:
config[cle] = str(donnees[cle])
for cle in ("conso_l_100km", "prix_carburant", "cout_peage_km"):
if cle in donnees:
try:
config[cle] = float(str(donnees[cle]).replace(",", "."))
except (TypeError, ValueError):
pass # valeur non numérique : on garde l'ancienne
if "delai_relance_jours" in donnees:
try:
config["delai_relance_jours"] = max(0, int(float(str(donnees["delai_relance_jours"]).replace(",", "."))))
except (TypeError, ValueError):
pass
ecrire_config(config)
return jsonify({"ok": True, "config": config})
@app.get("/api/ia/status")
def api_ia_status():
return jsonify(ia.statut_login())
@app.post("/api/ia/login")
def api_ia_login():
try:
return jsonify(ia.lancer_login())
except Exception as e:
return jsonify({"error": "login", "message": f"Échec du démarrage de la connexion : {e}"}), 502
@app.post("/api/message")
def api_message():
donnees = request.get_json(silent=True) or {}
prospect = donnees.get("prospect") or {}
mode = donnees.get("mode") or "generer"
if not prospect.get("Nom du prospect"):
return jsonify({"error": "prospect_vide", "message": "Prospect sans nom."}), 400
config = lire_config()
try:
message = ia.generer_message(
prospect, config.get("modele_message", ""), mode,
nom_modele=config.get("ia_modele") or ia.MODELE_DEFAUT,
groupe={
"nom": config.get("groupe_nom", ""),
"style": config.get("groupe_style", ""),
"description": config.get("groupe_description", ""),
"lien": config.get("groupe_lien", ""),
},
)
return jsonify({"message": message})
except ia.IANonConnecte as e:
return jsonify({"error": "non_connecte", "message": str(e)}), 401
except ia.IAErreur as e:
return jsonify({"error": "ia", "message": str(e)}), 502
except Exception as e:
return jsonify({"error": "erreur", "message": f"Échec de la génération : {e}"}), 500
@app.get("/api/distance/<int:idx>")
def api_distance(idx):
lignes = lire_prospects()
if not 0 <= idx < len(lignes):
return jsonify({"error": "introuvable", "message": "Ligne inexistante."}), 404
p = lignes[idx]
config = lire_config()
depart = (config.get("adresse_depart") or "").strip()
if not depart:
return jsonify({
"error": "pas_de_depart",
"message": "Renseignez d'abord votre adresse de départ dans les Paramètres.",
}), 400
arrivee = _adresse_arrivee(p)
if not arrivee:
return jsonify({
"error": "pas_d_adresse",
"message": "Ce prospect n'a pas d'adresse ou de ville exploitable.",
}), 400
conso, prix, peage = _tarifs(config)
# Recalcul local : les données géo en cache restent valides tant que les deux
# adresses n'ont pas changé. On réapplique simplement les prix courants.
if (p.get("Trajet distance km")
and p.get("Trajet adresse départ") == depart
and p.get("Trajet adresse arrivée") == arrivee):
try:
return jsonify({**trajet.couts(
float(p["Trajet distance km"]),
float(p.get("Trajet durée min") or 0),
float(p.get("Trajet km péage") or 0),
conso, prix, peage,
), "source": "local"})
except (TypeError, ValueError):
pass # données en cache illisibles : on bascule sur un recalcul complet
# Recalcul complet : appels Nominatim/OSRM, puis mémorisation des données géo.
try:
resultat = trajet.calculer(depart, arrivee, conso, prix, peage)
except trajet.ErreurTrajet as e:
return jsonify({"error": "trajet", "message": str(e)}), 502
except Exception as e:
return jsonify({"error": "erreur", "message": f"Échec du calcul : {e}"}), 500
with verrou_csv:
lignes = lire_prospects()
if 0 <= idx < len(lignes):
_ecrire_colonnes_trajet(lignes[idx], depart, arrivee, resultat)
ecrire_prospects(lignes)
return jsonify({**resultat, "source": "api"})
if __name__ == "__main__":
lire_prospects()
threading.Timer(1.0, webbrowser.open, args=["http://127.0.0.1:5000"]).start()
app.run(host="127.0.0.1", port=5000)