"""Serveur local AutoMood : interface web + API + stockage CSV.""" import csv import json import os import threading import webbrowser from datetime import date, datetime from pathlib import Path from flask import Flask, Response, jsonify, request, send_file, stream_with_context import excel import extractor import scraper import trajet DOSSIER = Path(__file__).parent CSV_PATH = DOSSIER / "prospects.csv" CONFIG_PATH = DOSSIER / "config.json" BACKUP_DIR = DOSSIER / "backups" JOURNAL_PATH = DOSSIER / "scrape.log" MAX_BACKUPS = 30 COLONNES = [ "Nom du prospect", "Statut", "Département", "Ville", "Code Postal", "Adresse", "Date d'ajout", "Date de contact", "Nom de contact", "Téléphone", "Email", "Infos du lieu", "Type", "Lien Facebook", "Notes", ] STATUT_DEFAUT = "À contacter" CONFIG_DEFAUT = { "adresse_depart": "", "conso_l_100km": 6.5, "prix_carburant": 1.90, "cout_peage_km": 0.10, "delai_relance_jours": 7, "modele_message": ( "Bonjour,\n\n" "Je me permets de vous contacter au sujet de « {nom} »{ville}. " "Nous organisons des concerts et serions ravis d'étudier la possibilité " "d'un événement musical dans votre établissement.\n\n" "Seriez-vous disponible pour en échanger ?\n\n" "Bien cordialement," ), } app = Flask(__name__, static_folder="static") verrou_scrape = threading.Lock() verrou_csv = threading.Lock() def lire_prospects(): if not CSV_PATH.exists(): ecrire_prospects([]) return [] with open(CSV_PATH, encoding="utf-8-sig", newline="") as f: lecteur = csv.DictReader(f, delimiter=";") return [{col: (ligne.get(col) or "") for col in COLONNES} for ligne in lecteur] def _sauvegarder_csv(): """Copie le CSV actuel dans backups/ avant qu'il ne soit écrasé. Ne garde que les MAX_BACKUPS plus récents et saute la copie si rien n'a changé depuis la dernière sauvegarde (évite les doublons inutiles). """ if not CSV_PATH.exists(): return try: contenu = CSV_PATH.read_bytes() BACKUP_DIR.mkdir(exist_ok=True) sauvegardes = sorted(BACKUP_DIR.glob("prospects-*.csv")) if sauvegardes and sauvegardes[-1].read_bytes() == contenu: return # identique à la dernière sauvegarde horodatage = datetime.now().strftime("%Y%m%d-%H%M%S-%f") (BACKUP_DIR / f"prospects-{horodatage}.csv").write_bytes(contenu) for vieux in sauvegardes[:-(MAX_BACKUPS - 1)]: vieux.unlink(missing_ok=True) except Exception: pass # une sauvegarde ratée ne doit jamais bloquer l'écriture def ecrire_prospects(lignes): _sauvegarder_csv() tmp = CSV_PATH.with_suffix(".csv.tmp") with open(tmp, "w", encoding="utf-8-sig", newline="") as f: ecrivain = csv.DictWriter(f, fieldnames=COLONNES, delimiter=";", extrasaction="ignore") ecrivain.writeheader() ecrivain.writerows(lignes) os.replace(tmp, CSV_PATH) def journaliser(url, statut, message=""): """Ajoute une ligne au journal de scraping (date, statut, url, message).""" horodatage = datetime.now().strftime("%d/%m/%Y %H:%M:%S") ligne = "\t".join((horodatage, statut, url or "", message or "")).rstrip() + "\n" try: with open(JOURNAL_PATH, "a", encoding="utf-8") as f: f.write(ligne) except Exception: pass def lire_config(): config = dict(CONFIG_DEFAUT) if CONFIG_PATH.exists(): try: with open(CONFIG_PATH, encoding="utf-8") as f: config.update({k: v for k, v in json.load(f).items() if k in CONFIG_DEFAUT}) except Exception: pass # config illisible : on retombe sur les valeurs par défaut return config def ecrire_config(config): tmp = CONFIG_PATH.with_suffix(".json.tmp") with open(tmp, "w", encoding="utf-8", newline="") as f: json.dump(config, f, ensure_ascii=False, indent=2) os.replace(tmp, CONFIG_PATH) @app.get("/") def accueil(): return app.send_static_file("index.html") @app.post("/api/login") def api_login(): if not verrou_scrape.acquire(blocking=False): return jsonify({"error": "occupe", "message": "Une autre opération est en cours, patientez."}), 429 try: if scraper.connexion(): return jsonify({"ok": True, "message": "Connexion Facebook enregistrée."}) return jsonify({ "error": "login_timeout", "message": "Connexion non détectée dans le temps imparti. Réessayez.", }), 408 except Exception as e: return jsonify({"error": "erreur", "message": f"Échec de la connexion : {e}"}), 500 finally: verrou_scrape.release() @app.post("/api/scrape") def api_scrape(): url = (request.get_json(silent=True) or {}).get("url", "").strip() if not url: return jsonify({"error": "url_manquante", "message": "Collez un lien Facebook."}), 400 if not verrou_scrape.acquire(blocking=False): return jsonify({"error": "occupe", "message": "Une analyse est déjà en cours, patientez."}), 429 try: resultat = scraper.scrape(url) champs = extractor.extraire(resultat["titre"], resultat["texte"], resultat["url"]) trouves = [c for c in ("Nom du prospect", "Téléphone", "Email", "Ville", "Adresse") if champs.get(c)] journaliser(url, "ok" if champs.get("Nom du prospect") else "vide", ("trouvé : " + ", ".join(trouves)) if trouves else "aucun champ extrait") return jsonify(champs) except scraper.ErreurScrape as e: journaliser(url, e.code, str(e)) statuts = {"login_required": 409, "page_introuvable": 404, "url_invalide": 400, "redirection": 422} return jsonify({"error": e.code, "message": str(e)}), statuts.get(e.code, 500) except Exception as e: journaliser(url, "erreur", str(e)) return jsonify({"error": "erreur", "message": f"Échec de l'analyse : {e}"}), 500 finally: verrou_scrape.release() @app.get("/api/prospects") def api_lister(): lignes = lire_prospects() return jsonify([{"index": i, **ligne} for i, ligne in enumerate(lignes)]) @app.post("/api/prospects") def api_ajouter(): donnees = request.get_json(silent=True) or {} ligne = {col: str(donnees.get(col, "")).strip() for col in COLONNES} if not ligne["Date d'ajout"]: ligne["Date d'ajout"] = date.today().strftime("%d/%m/%Y") if not ligne["Statut"]: ligne["Statut"] = STATUT_DEFAUT with verrou_csv: lignes = lire_prospects() lignes.append(ligne) ecrire_prospects(lignes) return jsonify({"ok": True, "index": len(lignes) - 1}) @app.put("/api/prospects/") def api_modifier(idx): donnees = request.get_json(silent=True) or {} with verrou_csv: lignes = lire_prospects() if not 0 <= idx < len(lignes): return jsonify({"error": "introuvable", "message": "Ligne inexistante."}), 404 for col in COLONNES: if col in donnees: lignes[idx][col] = str(donnees[col]).strip() ecrire_prospects(lignes) return jsonify({"ok": True}) @app.delete("/api/prospects/") def api_supprimer(idx): with verrou_csv: lignes = lire_prospects() if not 0 <= idx < len(lignes): return jsonify({"error": "introuvable", "message": "Ligne inexistante."}), 404 lignes.pop(idx) ecrire_prospects(lignes) return jsonify({"ok": True}) @app.get("/api/export") def api_export(): lire_prospects() # crée le fichier si absent return send_file(CSV_PATH, as_attachment=True, download_name="prospects.csv") @app.get("/api/export.xlsx") def api_export_xlsx(): contenu = excel.construire_xlsx(COLONNES, lire_prospects()) return Response( contenu, mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": "attachment; filename=prospects.xlsx"}, ) @app.get("/api/logs") def api_logs(): if not JOURNAL_PATH.exists(): return jsonify([]) with open(JOURNAL_PATH, encoding="utf-8") as f: lignes = f.readlines()[-100:] entrees = [] for ligne in reversed(lignes): # le plus récent en premier parts = ligne.rstrip("\n").split("\t") if len(parts) >= 3: entrees.append({ "date": parts[0], "statut": parts[1], "url": parts[2], "message": parts[3] if len(parts) > 3 else "", }) return jsonify(entrees) @app.delete("/api/logs") def api_logs_vider(): try: JOURNAL_PATH.unlink(missing_ok=True) except Exception: pass return jsonify({"ok": True}) def _ajouter_resultat_lot(resultat): """Transforme un résultat de scrape_lot en prospect ajouté au CSV (avec dédoublonnage). Retourne le compte-rendu (dict) à renvoyer au navigateur pour la ligne traitée. """ url = resultat["url"] if not resultat["ok"]: journaliser(url, resultat.get("code", "erreur"), resultat.get("message", "Échec.")) return {"url": url, "statut": "erreur", "message": resultat.get("message", "Échec.")} champs = extractor.extraire( resultat["resultat"]["titre"], resultat["resultat"]["texte"], resultat["resultat"]["url"]) with verrou_csv: lignes = lire_prospects() lien = (champs.get("Lien Facebook") or "").strip() if lien and any((l.get("Lien Facebook") or "").strip() == lien for l in lignes): journaliser(url, "doublon", champs["Nom du prospect"]) return {"url": url, "statut": "doublon", "nom": champs["Nom du prospect"]} champs["Statut"] = STATUT_DEFAUT lignes.append({col: champs.get(col, "") for col in COLONNES}) ecrire_prospects(lignes) journaliser(url, "ajoute", champs["Nom du prospect"]) return {"url": url, "statut": "ajoute", "nom": champs["Nom du prospect"]} @app.post("/api/scrape-lot") def api_scrape_lot(): donnees = request.get_json(silent=True) or {} urls = [u.strip() for u in (donnees.get("urls") or []) if u and u.strip()] if not urls: return jsonify({"error": "vide", "message": "Collez au moins un lien Facebook."}), 400 if not verrou_scrape.acquire(blocking=False): return jsonify({"error": "occupe", "message": "Une analyse est déjà en cours, patientez."}), 429 def generer(): # Flux NDJSON : une ligne JSON par lien, envoyée dès qu'il est traité. try: for resultat in scraper.scrape_lot(urls): yield json.dumps(_ajouter_resultat_lot(resultat), ensure_ascii=False) + "\n" except Exception as e: yield json.dumps({"statut": "erreur", "message": f"Échec global : {e}"}, ensure_ascii=False) + "\n" finally: verrou_scrape.release() return Response(stream_with_context(generer()), mimetype="application/x-ndjson") @app.get("/api/config") def api_config_lire(): return jsonify(lire_config()) @app.put("/api/config") def api_config_ecrire(): donnees = request.get_json(silent=True) or {} config = lire_config() for cle in ("adresse_depart", "modele_message"): if cle in donnees: config[cle] = str(donnees[cle]) for cle in ("conso_l_100km", "prix_carburant", "cout_peage_km"): if cle in donnees: try: config[cle] = float(str(donnees[cle]).replace(",", ".")) except (TypeError, ValueError): pass # valeur non numérique : on garde l'ancienne if "delai_relance_jours" in donnees: try: config["delai_relance_jours"] = max(0, int(float(str(donnees["delai_relance_jours"]).replace(",", ".")))) except (TypeError, ValueError): pass ecrire_config(config) return jsonify({"ok": True, "config": config}) @app.get("/api/distance/") def api_distance(idx): lignes = lire_prospects() if not 0 <= idx < len(lignes): return jsonify({"error": "introuvable", "message": "Ligne inexistante."}), 404 p = lignes[idx] config = lire_config() depart = (config.get("adresse_depart") or "").strip() if not depart: return jsonify({ "error": "pas_de_depart", "message": "Renseignez d'abord votre adresse de départ dans les Paramètres.", }), 400 arrivee = ", ".join(x.strip() for x in (p.get("Adresse"), p.get("Code Postal"), p.get("Ville")) if x and x.strip()) if not arrivee: return jsonify({ "error": "pas_d_adresse", "message": "Ce prospect n'a pas d'adresse ou de ville exploitable.", }), 400 try: resultat = trajet.calculer( depart, arrivee, float(config.get("conso_l_100km") or 0) or CONFIG_DEFAUT["conso_l_100km"], float(config.get("prix_carburant") or 0) or CONFIG_DEFAUT["prix_carburant"], float(config.get("cout_peage_km") or 0), # 0 = pas de péage compté ) return jsonify(resultat) except trajet.ErreurTrajet as e: return jsonify({"error": "trajet", "message": str(e)}), 502 except Exception as e: return jsonify({"error": "erreur", "message": f"Échec du calcul : {e}"}), 500 if __name__ == "__main__": lire_prospects() threading.Timer(1.0, webbrowser.open, args=["http://127.0.0.1:5000"]).start() app.run(host="127.0.0.1", port=5000)