Files
AutoMood/app.py
jerem 1cf427a0f2 Ajout suivi prospection : statut, import en masse, message type, trajet+péage
- Statut de prospection (colonne CSV) avec badge coloré et filtre
- Import en masse de liens Facebook (streaming, dédoublonnage)
- Modèle de message de contact configurable + copie en un clic
- Estimation distance/carburant/péage via OpenStreetMap (Nominatim + OSRM)
- Section Paramètres + config.json (non versionné)
2026-06-13 15:28:25 +02:00

279 lines
9.9 KiB
Python

"""Serveur local AutoMood : interface web + API + stockage CSV."""
import csv
import json
import os
import threading
import webbrowser
from datetime import date
from pathlib import Path
from flask import Flask, Response, jsonify, request, send_file, stream_with_context
import extractor
import scraper
import trajet
DOSSIER = Path(__file__).parent
CSV_PATH = DOSSIER / "prospects.csv"
CONFIG_PATH = DOSSIER / "config.json"
COLONNES = [
"Nom du prospect", "Statut", "Département", "Ville", "Code Postal", "Adresse",
"Date d'ajout", "Date de contact", "Nom de contact",
"Téléphone", "Email", "Infos du lieu", "Type", "Lien Facebook",
]
STATUT_DEFAUT = "À contacter"
CONFIG_DEFAUT = {
"adresse_depart": "",
"conso_l_100km": 6.5,
"prix_carburant": 1.90,
"cout_peage_km": 0.10,
"modele_message": (
"Bonjour,\n\n"
"Je me permets de vous contacter au sujet de « {nom} »{ville}. "
"Nous organisons des concerts et serions ravis d'étudier la possibilité "
"d'un événement musical dans votre établissement.\n\n"
"Seriez-vous disponible pour en échanger ?\n\n"
"Bien cordialement,"
),
}
app = Flask(__name__, static_folder="static")
verrou_scrape = threading.Lock()
verrou_csv = threading.Lock()
def lire_prospects():
if not CSV_PATH.exists():
ecrire_prospects([])
return []
with open(CSV_PATH, encoding="utf-8-sig", newline="") as f:
lecteur = csv.DictReader(f, delimiter=";")
return [{col: (ligne.get(col) or "") for col in COLONNES} for ligne in lecteur]
def ecrire_prospects(lignes):
tmp = CSV_PATH.with_suffix(".csv.tmp")
with open(tmp, "w", encoding="utf-8-sig", newline="") as f:
ecrivain = csv.DictWriter(f, fieldnames=COLONNES, delimiter=";", extrasaction="ignore")
ecrivain.writeheader()
ecrivain.writerows(lignes)
os.replace(tmp, CSV_PATH)
def lire_config():
config = dict(CONFIG_DEFAUT)
if CONFIG_PATH.exists():
try:
with open(CONFIG_PATH, encoding="utf-8") as f:
config.update({k: v for k, v in json.load(f).items() if k in CONFIG_DEFAUT})
except Exception:
pass # config illisible : on retombe sur les valeurs par défaut
return config
def ecrire_config(config):
tmp = CONFIG_PATH.with_suffix(".json.tmp")
with open(tmp, "w", encoding="utf-8", newline="") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
os.replace(tmp, CONFIG_PATH)
@app.get("/")
def accueil():
return app.send_static_file("index.html")
@app.post("/api/login")
def api_login():
if not verrou_scrape.acquire(blocking=False):
return jsonify({"error": "occupe", "message": "Une autre opération est en cours, patientez."}), 429
try:
if scraper.connexion():
return jsonify({"ok": True, "message": "Connexion Facebook enregistrée."})
return jsonify({
"error": "login_timeout",
"message": "Connexion non détectée dans le temps imparti. Réessayez.",
}), 408
except Exception as e:
return jsonify({"error": "erreur", "message": f"Échec de la connexion : {e}"}), 500
finally:
verrou_scrape.release()
@app.post("/api/scrape")
def api_scrape():
url = (request.get_json(silent=True) or {}).get("url", "").strip()
if not url:
return jsonify({"error": "url_manquante", "message": "Collez un lien Facebook."}), 400
if not verrou_scrape.acquire(blocking=False):
return jsonify({"error": "occupe", "message": "Une analyse est déjà en cours, patientez."}), 429
try:
resultat = scraper.scrape(url)
champs = extractor.extraire(resultat["titre"], resultat["texte"], resultat["url"])
return jsonify(champs)
except scraper.ErreurScrape as e:
statuts = {"login_required": 409, "page_introuvable": 404, "url_invalide": 400, "redirection": 422}
return jsonify({"error": e.code, "message": str(e)}), statuts.get(e.code, 500)
except Exception as e:
return jsonify({"error": "erreur", "message": f"Échec de l'analyse : {e}"}), 500
finally:
verrou_scrape.release()
@app.get("/api/prospects")
def api_lister():
lignes = lire_prospects()
return jsonify([{"index": i, **ligne} for i, ligne in enumerate(lignes)])
@app.post("/api/prospects")
def api_ajouter():
donnees = request.get_json(silent=True) or {}
ligne = {col: str(donnees.get(col, "")).strip() for col in COLONNES}
if not ligne["Date d'ajout"]:
ligne["Date d'ajout"] = date.today().strftime("%d/%m/%Y")
if not ligne["Statut"]:
ligne["Statut"] = STATUT_DEFAUT
with verrou_csv:
lignes = lire_prospects()
lignes.append(ligne)
ecrire_prospects(lignes)
return jsonify({"ok": True, "index": len(lignes) - 1})
@app.put("/api/prospects/<int:idx>")
def api_modifier(idx):
donnees = request.get_json(silent=True) or {}
with verrou_csv:
lignes = lire_prospects()
if not 0 <= idx < len(lignes):
return jsonify({"error": "introuvable", "message": "Ligne inexistante."}), 404
for col in COLONNES:
if col in donnees:
lignes[idx][col] = str(donnees[col]).strip()
ecrire_prospects(lignes)
return jsonify({"ok": True})
@app.delete("/api/prospects/<int:idx>")
def api_supprimer(idx):
with verrou_csv:
lignes = lire_prospects()
if not 0 <= idx < len(lignes):
return jsonify({"error": "introuvable", "message": "Ligne inexistante."}), 404
lignes.pop(idx)
ecrire_prospects(lignes)
return jsonify({"ok": True})
@app.get("/api/export")
def api_export():
lire_prospects() # crée le fichier si absent
return send_file(CSV_PATH, as_attachment=True, download_name="prospects.csv")
def _ajouter_resultat_lot(resultat):
"""Transforme un résultat de scrape_lot en prospect ajouté au CSV (avec dédoublonnage).
Retourne le compte-rendu (dict) à renvoyer au navigateur pour la ligne traitée.
"""
url = resultat["url"]
if not resultat["ok"]:
return {"url": url, "statut": "erreur", "message": resultat.get("message", "Échec.")}
champs = extractor.extraire(
resultat["resultat"]["titre"], resultat["resultat"]["texte"], resultat["resultat"]["url"])
with verrou_csv:
lignes = lire_prospects()
lien = (champs.get("Lien Facebook") or "").strip()
if lien and any((l.get("Lien Facebook") or "").strip() == lien for l in lignes):
return {"url": url, "statut": "doublon", "nom": champs["Nom du prospect"]}
champs["Statut"] = STATUT_DEFAUT
lignes.append({col: champs.get(col, "") for col in COLONNES})
ecrire_prospects(lignes)
return {"url": url, "statut": "ajoute", "nom": champs["Nom du prospect"]}
@app.post("/api/scrape-lot")
def api_scrape_lot():
donnees = request.get_json(silent=True) or {}
urls = [u.strip() for u in (donnees.get("urls") or []) if u and u.strip()]
if not urls:
return jsonify({"error": "vide", "message": "Collez au moins un lien Facebook."}), 400
if not verrou_scrape.acquire(blocking=False):
return jsonify({"error": "occupe", "message": "Une analyse est déjà en cours, patientez."}), 429
def generer():
# Flux NDJSON : une ligne JSON par lien, envoyée dès qu'il est traité.
try:
for resultat in scraper.scrape_lot(urls):
yield json.dumps(_ajouter_resultat_lot(resultat), ensure_ascii=False) + "\n"
except Exception as e:
yield json.dumps({"statut": "erreur", "message": f"Échec global : {e}"}, ensure_ascii=False) + "\n"
finally:
verrou_scrape.release()
return Response(stream_with_context(generer()), mimetype="application/x-ndjson")
@app.get("/api/config")
def api_config_lire():
return jsonify(lire_config())
@app.put("/api/config")
def api_config_ecrire():
donnees = request.get_json(silent=True) or {}
config = lire_config()
for cle in ("adresse_depart", "modele_message"):
if cle in donnees:
config[cle] = str(donnees[cle])
for cle in ("conso_l_100km", "prix_carburant", "cout_peage_km"):
if cle in donnees:
try:
config[cle] = float(str(donnees[cle]).replace(",", "."))
except (TypeError, ValueError):
pass # valeur non numérique : on garde l'ancienne
ecrire_config(config)
return jsonify({"ok": True, "config": config})
@app.get("/api/distance/<int:idx>")
def api_distance(idx):
lignes = lire_prospects()
if not 0 <= idx < len(lignes):
return jsonify({"error": "introuvable", "message": "Ligne inexistante."}), 404
p = lignes[idx]
config = lire_config()
depart = (config.get("adresse_depart") or "").strip()
if not depart:
return jsonify({
"error": "pas_de_depart",
"message": "Renseignez d'abord votre adresse de départ dans les Paramètres.",
}), 400
arrivee = ", ".join(x.strip() for x in (p.get("Adresse"), p.get("Code Postal"), p.get("Ville")) if x and x.strip())
if not arrivee:
return jsonify({
"error": "pas_d_adresse",
"message": "Ce prospect n'a pas d'adresse ou de ville exploitable.",
}), 400
try:
resultat = trajet.calculer(
depart, arrivee,
float(config.get("conso_l_100km") or 0) or CONFIG_DEFAUT["conso_l_100km"],
float(config.get("prix_carburant") or 0) or CONFIG_DEFAUT["prix_carburant"],
float(config.get("cout_peage_km") or 0), # 0 = pas de péage compté
)
return jsonify(resultat)
except trajet.ErreurTrajet as e:
return jsonify({"error": "trajet", "message": str(e)}), 502
except Exception as e:
return jsonify({"error": "erreur", "message": f"Échec du calcul : {e}"}), 500
if __name__ == "__main__":
lire_prospects()
threading.Timer(1.0, webbrowser.open, args=["http://127.0.0.1:5000"]).start()
app.run(host="127.0.0.1", port=5000)