Newer
Older
TelosDB / src / backend / src / mcp / watch.rs
//! フォルダ監視(計画 folder_monitor)。notify + debouncer で Create/Modify/Remove を検知し DB に反映する。
//! debouncer-mini は Any/AnyContinuous のみ区別するため、パスがファイルとして存在すれば取り込み、存在しなければインデックスから削除する。
//! モニター追加時は既存ファイルを走査し、未インデックスのものだけ取り込む(初期スキャン)。

use crate::db;
use crate::mcp::tools::items::{delete_document_by_path, ingest_file_path};
use crate::mcp::types::{AppState, WatcherConfig};
use notify_debouncer_mini::{new_debouncer, DebounceEventResult};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Handle;

/// 取込対象拡張子のデフォルト(設定未指定時)
pub const DEFAULT_WATCH_EXTENSIONS: &[&str] = &["txt", "md", "json", "html", "css", "js", "mjs", "ts", "rs"];

fn is_watched_file(path: &Path, extensions: &[String]) -> bool {
    if extensions.is_empty() {
        return false;
    }
    path.extension()
        .and_then(|e| e.to_str())
        .map(|ext| extensions.iter().any(|e| e.as_str().eq_ignore_ascii_case(ext)))
        .unwrap_or(false)
}

fn process_event(state: &AppState, path: &Path, handle: &Handle, extensions: &[String]) {
    if !is_watched_file(path, extensions) {
        return;
    }
    let path_str = path.to_string_lossy().to_string();
    let display_name = path
        .file_name()
        .map(|n| n.to_string_lossy().into_owned())
        .unwrap_or_else(|| path_str.clone());
    if path.is_file() {
        if let Ok(mut guard) = state.watch_ingestion_status.write() {
            *guard = format!("取込中: {}", display_name);
        }
        if let Err(e) = handle.block_on(ingest_file_path(state, path)) {
            log::warn!("[watch] ingest {:?}: {}", path, e);
        } else {
            log::info!("[watch] ingested: {}", path_str);
        }
        if let Ok(mut guard) = state.watch_ingestion_status.write() {
            guard.clear();
        }
    } else {
        if let Ok(mut guard) = state.watch_ingestion_status.write() {
            *guard = format!("削除中: {}", display_name);
        }
        if let Err(e) = handle.block_on(delete_document_by_path(state, &path_str)) {
            log::warn!("[watch] delete {:?}: {}", path, e);
        } else {
            log::info!("[watch] deleted from index: {}", path_str);
        }
        if let Ok(mut guard) = state.watch_ingestion_status.write() {
            guard.clear();
        }
    }
}

/// 指定ディレクトリを再帰走査し、対象拡張子のファイルのうち未インデックスのものだけ取り込む(モニター追加時の初期スキャン)。
fn initial_scan_directory(state: &AppState, dir: &Path, extensions: &[String], handle: &Handle) {
    let read_dir = match std::fs::read_dir(dir) {
        Ok(r) => r,
        Err(e) => {
            log::warn!("[watch] initial_scan read_dir {:?}: {}", dir, e);
            return;
        }
    };
    for entry in read_dir.flatten() {
        let path: PathBuf = entry.path();
        if path.is_dir() {
            initial_scan_directory(state, &path, extensions, handle);
        } else if path.is_file() && is_watched_file(&path, extensions) {
            let path_str = path.to_string_lossy().to_string();
            let display_name = path
                .file_name()
                .map(|n| n.to_string_lossy().into_owned())
                .unwrap_or_else(|| path_str.clone());
            let already = handle.block_on(db::get_document_id_by_path(&state.db_pool, &path_str));
            match already {
                Ok(Some(_)) => { /* 既にインデックス済み */ }
                Ok(None) => {
                    if let Ok(mut guard) = state.watch_ingestion_status.write() {
                        *guard = format!("取込中: {}", display_name);
                    }
                    if let Err(e) = handle.block_on(ingest_file_path(state, &path)) {
                        log::warn!("[watch] initial_scan ingest {:?}: {}", path, e);
                    } else {
                        log::info!("[watch] initial_scan ingested: {}", path_str);
                    }
                    if let Ok(mut guard) = state.watch_ingestion_status.write() {
                        guard.clear();
                    }
                }
                Err(e) => log::warn!("[watch] initial_scan get_doc_id {:?}: {}", path, e),
            }
        }
    }
}

/// 指定ディレクトリを再帰監視し、イベントを DB に反映する。別スレッドでデバウンスを動かす。
/// `handle`: Tokio ランタイムの Handle(std スレッド内では Handle::current() が使えないため呼び出し元から渡す)。
/// `rx` から WatcherConfig(パス一覧・取込対象拡張子)を受け取り監視を開始。設定保存で新しい config が送られると再起動する(Phase 3)。
pub fn spawn_folder_watcher(handle: Handle, state: Arc<AppState>, rx: std::sync::mpsc::Receiver<WatcherConfig>) {
    use std::sync::mpsc::RecvTimeoutError;
    std::thread::spawn(move || {
        let state = state.clone();
        let mut config = match rx.recv() {
            Ok(c) => c,
            Err(_) => return,
        };
        loop {
            if config.paths.is_empty() {
                log::info!("[watch] no paths, waiting for next config");
                match rx.recv() {
                    Ok(c) => config = c,
                    Err(_) => break,
                }
                continue;
            }
            let extensions = if config.extensions.is_empty() {
                DEFAULT_WATCH_EXTENSIONS.iter().map(|s| s.to_string()).collect::<Vec<_>>()
            } else {
                config.extensions.clone()
            };
            let state_for_debouncer = state.clone();
            let handle_for_cb = handle.clone();
            let extensions_for_cb = extensions.clone();
            let mut debouncer = match new_debouncer(Duration::from_secs(2), move |res: DebounceEventResult| {
                match res {
                    Ok(events) => {
                        for e in events {
                            process_event(state_for_debouncer.as_ref(), &e.path, &handle_for_cb, &extensions_for_cb);
                        }
                    }
                    Err(e) => {
                        log::warn!("[watch] debouncer error: {:?}", e);
                    }
                }
            }) {
                Ok(d) => d,
                Err(e) => {
                    log::error!("[watch] failed to create debouncer: {}", e);
                    match rx.recv() {
                        Ok(c) => config = c,
                        Err(_) => break,
                    }
                    continue;
                }
            };

            for path in &config.paths {
                if path.is_dir() {
                    if let Err(e) = debouncer.watcher().watch(path, notify::RecursiveMode::Recursive) {
                        log::warn!("[watch] watch {:?}: {}", path, e);
                    } else {
                        log::info!("[watch] watching: {}", path.display());
                    }
                } else {
                    log::warn!("[watch] skip (not a directory): {}", path.display());
                }
            }

            // モニター追加時: 既存ファイルのうち未インデックスのものを取り込む
            for path in &config.paths {
                if path.is_dir() {
                    log::info!("[watch] initial_scan: {}", path.display());
                    initial_scan_directory(state.as_ref(), path, &extensions, &handle);
                }
            }

            // 設定保存で新しい config が送られるまで待つ(recv_timeout で 1 秒ごとにチェック)
            loop {
                match rx.recv_timeout(Duration::from_secs(1)) {
                    Ok(new_config) => {
                        config = new_config;
                        log::info!("[watch] restarting with new config");
                        break;
                    }
                    Err(RecvTimeoutError::Timeout) => {}
                    Err(RecvTimeoutError::Disconnected) => return,
                }
            }
        }
    });
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::path::Path;

    #[test]
    fn is_watched_file_matches_configured_extensions() {
        let exts = vec!["txt".to_string(), "md".to_string()];
        assert!(is_watched_file(Path::new("/a/b/c.txt"), &exts));
        assert!(is_watched_file(Path::new("file.md"), &exts));
        assert!(!is_watched_file(Path::new("/a/b/c.rs"), &exts));
        assert!(!is_watched_file(Path::new("/a/b/noext"), &exts));
    }

    #[test]
    fn is_watched_file_extension_case_insensitive() {
        let exts = vec!["TXT".to_string()];
        assert!(is_watched_file(Path::new("a.TXT"), &exts));
        assert!(is_watched_file(Path::new("a.txt"), &exts));
    }

    #[test]
    fn is_watched_file_empty_extensions_returns_false() {
        let exts: Vec<String> = vec![];
        assert!(!is_watched_file(Path::new("/a/b/c.txt"), &exts));
    }
}