/** * Moteur de lecture RSS * Récupère les flux actifs, parse les articles, applique les règles d'automatisme, * et insère les nouveaux articles dans veille_items ou aap_items. */ import { XMLParser } from "fast-xml-parser"; import * as crypto from "crypto"; import { getDb } from "./db"; import { rssFeeds, veilleItems, aapItems, type RssFeed, } from "../drizzle/schema"; import { eq } from "drizzle-orm"; // ─── Types internes ─────────────────────────────────────────────────────────── interface RssItem { title: string; description?: string; link?: string; pubDate?: string; guid?: string; } interface AutoRule { keyword: string; typeVeille?: "reglementaire" | "concurrentielle" | "technologique" | "generale"; categorieAap?: "Handicap" | "PA" | "Enfance" | "Précarité" | "Sanitaire" | "Autre"; } interface FetchResult { feedId: number; feedName: string; status: "ok" | "error"; newItems: number; skippedItems: number; error?: string; } // ─── Utilitaires ───────────────────────────────────────────────────────────── function dedupHash(text: string): string { return crypto.createHash("sha256").update(text).digest("hex").substring(0, 64); } function parseDate(dateStr?: string): Date | null { if (!dateStr) return null; const d = new Date(dateStr); return isNaN(d.getTime()) ? null : d; } function stripHtml(html: string): string { return html.replace(/<[^>]*>/g, "").replace(/&/g, "&").replace(/</g, "<").replace(/>/g, ">").replace(/"/g, '"').replace(/'/g, "'").trim(); } /** * Applique les règles d'automatisme sur le titre + description d'un article. * Retourne le premier match trouvé, ou null si aucune règle ne correspond. */ function applyAutoRules( title: string, description: string, rules: AutoRule[] ): AutoRule | null { const text = (title + " " + description).toLowerCase(); for (const rule of rules) { if (text.includes(rule.keyword.toLowerCase())) { return rule; } } return null; } // ─── Parsing RSS/Atom ───────────────────────────────────────────────────────── async function fetchAndParseRss(url: string): Promise { const response = await fetch(url, { headers: { "User-Agent": "Mozilla/5.0 (compatible; VeilleBot/1.0; +https://itinova.fr)", "Accept": "application/rss+xml, application/xml, text/xml, */*", }, signal: AbortSignal.timeout(15000), }); if (!response.ok) { throw new Error(`HTTP ${response.status} ${response.statusText}`); } const xml = await response.text(); const parser = new XMLParser({ ignoreAttributes: false, attributeNamePrefix: "@_", textNodeName: "#text", parseAttributeValue: true, trimValues: true, }); const parsed = parser.parse(xml); // Support RSS 2.0 const channel = parsed?.rss?.channel; if (channel) { const items = Array.isArray(channel.item) ? channel.item : channel.item ? [channel.item] : []; return items.map((item: any) => ({ title: String(item.title?.["#text"] ?? item.title ?? ""), description: String(item.description?.["#text"] ?? item.description ?? ""), link: String(item.link?.["#text"] ?? item.link ?? item.guid?.["#text"] ?? item.guid ?? ""), pubDate: String(item.pubDate ?? item["dc:date"] ?? ""), guid: String(item.guid?.["#text"] ?? item.guid ?? item.link ?? ""), })); } // Support Atom const feed = parsed?.feed; if (feed) { const entries = Array.isArray(feed.entry) ? feed.entry : feed.entry ? [feed.entry] : []; return entries.map((entry: any) => { const links = Array.isArray(entry.link) ? entry.link : entry.link ? [entry.link] : []; const altLink = links.find((l: any) => l["@_rel"] === "alternate") ?? links[0]; return { title: String(entry.title?.["#text"] ?? entry.title ?? ""), description: String(entry.summary?.["#text"] ?? entry.summary ?? entry.content?.["#text"] ?? ""), link: String(altLink?.["@_href"] ?? ""), pubDate: String(entry.published ?? entry.updated ?? ""), guid: String(entry.id ?? altLink?.["@_href"] ?? ""), }; }); } throw new Error("Format RSS/Atom non reconnu"); } // ─── Traitement d'un flux ───────────────────────────────────────────────────── async function processFeed(feed: RssFeed): Promise { const db = await getDb(); if (!db) throw new Error("Database not available"); const result: FetchResult = { feedId: feed.id, feedName: feed.name, status: "ok", newItems: 0, skippedItems: 0, }; try { const items = await fetchAndParseRss(feed.url); const rules: AutoRule[] = Array.isArray(feed.autoRules) ? feed.autoRules as AutoRule[] : []; for (const item of items) { const title = stripHtml(item.title || ""); const description = stripHtml(item.description || ""); const link = item.link || item.guid || ""; const pubDate = parseDate(item.pubDate); if (!title) { result.skippedItems++; continue; } // Clé de déduplication basée sur le titre + lien const dedupKey = dedupHash(title + "|" + link); if (feed.feedType === "veille") { // Déterminer le type de veille const matchedRule = applyAutoRules(title, description, rules); const typeVeille = (matchedRule?.typeVeille ?? feed.defaultTypeVeille ?? "generale") as "reglementaire" | "concurrentielle" | "technologique" | "generale"; try { await db.insert(veilleItems).values({ dedupKey, titre: title, resume: description || null, source: feed.name, lien: link || null, typeVeille, datePublication: pubDate, }); result.newItems++; } catch (e: any) { // Doublon (contrainte UNIQUE sur dedupKey) → on ignore if (e?.code === "ER_DUP_ENTRY" || e?.message?.includes("Duplicate entry")) { result.skippedItems++; } else { throw e; } } } else if (feed.feedType === "aap") { // Déterminer la catégorie AAP const matchedRule = applyAutoRules(title, description, rules); const categorie = (matchedRule?.categorieAap ?? feed.defaultCategorieAap ?? "Autre") as "Handicap" | "PA" | "Enfance" | "Précarité" | "Sanitaire" | "Autre"; try { await db.insert(aapItems).values({ dedupKey, titre: title, categorie, lien: link || null, datePublication: pubDate, }); result.newItems++; } catch (e: any) { if (e?.code === "ER_DUP_ENTRY" || e?.message?.includes("Duplicate entry")) { result.skippedItems++; } else { throw e; } } } } // Mettre à jour lastFetchedAt et lastFetchStatus await db.update(rssFeeds) .set({ lastFetchedAt: new Date(), lastFetchStatus: "ok", lastFetchError: null }) .where(eq(rssFeeds.id, feed.id)); } catch (e: any) { result.status = "error"; result.error = e?.message ?? String(e); // Enregistrer l'erreur dans le flux try { await db.update(rssFeeds) .set({ lastFetchedAt: new Date(), lastFetchStatus: "error", lastFetchError: result.error }) .where(eq(rssFeeds.id, feed.id)); } catch (_) { /* ignore */ } } return result; } // ─── Point d'entrée principal ───────────────────────────────────────────────── export interface RssFetchSummary { totalFeeds: number; successFeeds: number; errorFeeds: number; totalNewItems: number; totalSkippedItems: number; results: FetchResult[]; executedAt: string; } export async function runRssFetch(): Promise { const db = await getDb(); if (!db) throw new Error("Database not available"); // Récupérer tous les flux actifs const feeds = await db.select().from(rssFeeds).where(eq(rssFeeds.isActive, true)); const results: FetchResult[] = []; for (const feed of feeds) { console.log(`[RSS] Lecture du flux: ${feed.name} (${feed.url})`); const result = await processFeed(feed); results.push(result); console.log(`[RSS] ${feed.name}: ${result.newItems} nouveaux, ${result.skippedItems} doublons, statut: ${result.status}`); } const summary: RssFetchSummary = { totalFeeds: feeds.length, successFeeds: results.filter(r => r.status === "ok").length, errorFeeds: results.filter(r => r.status === "error").length, totalNewItems: results.reduce((acc, r) => acc + r.newItems, 0), totalSkippedItems: results.reduce((acc, r) => acc + r.skippedItems, 0), results, executedAt: new Date().toISOString(), }; console.log(`[RSS] Terminé: ${summary.totalNewItems} nouveaux articles, ${summary.errorFeeds} erreurs`); return summary; }