//! フォルダ監視(計画 folder_monitor)。notify + debouncer で Create/Modify/Remove を検知し DB に反映する。
//! debouncer-mini は Any/AnyContinuous のみ区別するため、パスがファイルとして存在すれば取り込み、存在しなければインデックスから削除する。
//! モニター追加時は既存ファイルを走査し、未インデックスのものだけ取り込む(初期スキャン)。
use crate::db;
use crate::mcp::tools::items::{delete_document_by_path, ingest_file_path};
use crate::mcp::types::{AppState, WatcherConfig};
use notify_debouncer_mini::{new_debouncer, DebounceEventResult};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Handle;
/// 取込対象拡張子のデフォルト(設定未指定時)
pub const DEFAULT_WATCH_EXTENSIONS: &[&str] = &["txt", "md", "json", "html", "css", "js", "mjs", "ts", "rs"];
fn is_watched_file(path: &Path, extensions: &[String]) -> bool {
if extensions.is_empty() {
return false;
}
path.extension()
.and_then(|e| e.to_str())
.map(|ext| extensions.iter().any(|e| e.as_str().eq_ignore_ascii_case(ext)))
.unwrap_or(false)
}
fn process_event(state: &AppState, path: &Path, handle: &Handle, extensions: &[String]) {
if !is_watched_file(path, extensions) {
return;
}
let path_str = path.to_string_lossy().to_string();
let display_name = path
.file_name()
.map(|n| n.to_string_lossy().into_owned())
.unwrap_or_else(|| path_str.clone());
if path.is_file() {
if let Ok(mut guard) = state.watch_ingestion_status.write() {
*guard = format!("取込中: {}", display_name);
}
if let Err(e) = handle.block_on(ingest_file_path(state, path)) {
log::warn!("[watch] ingest {:?}: {}", path, e);
} else {
log::info!("[watch] ingested: {}", path_str);
}
if let Ok(mut guard) = state.watch_ingestion_status.write() {
guard.clear();
}
} else {
if let Ok(mut guard) = state.watch_ingestion_status.write() {
*guard = format!("削除中: {}", display_name);
}
if let Err(e) = handle.block_on(delete_document_by_path(state, &path_str)) {
log::warn!("[watch] delete {:?}: {}", path, e);
} else {
log::info!("[watch] deleted from index: {}", path_str);
}
if let Ok(mut guard) = state.watch_ingestion_status.write() {
guard.clear();
}
}
}
/// 指定ディレクトリを再帰走査し、対象拡張子のファイルのうち未インデックスのものだけ取り込む(モニター追加時の初期スキャン)。
fn initial_scan_directory(state: &AppState, dir: &Path, extensions: &[String], handle: &Handle) {
let read_dir = match std::fs::read_dir(dir) {
Ok(r) => r,
Err(e) => {
log::warn!("[watch] initial_scan read_dir {:?}: {}", dir, e);
return;
}
};
for entry in read_dir.flatten() {
let path: PathBuf = entry.path();
if path.is_dir() {
initial_scan_directory(state, &path, extensions, handle);
} else if path.is_file() && is_watched_file(&path, extensions) {
let path_str = path.to_string_lossy().to_string();
let display_name = path
.file_name()
.map(|n| n.to_string_lossy().into_owned())
.unwrap_or_else(|| path_str.clone());
let already = handle.block_on(db::get_document_id_by_path(&state.db_pool, &path_str));
match already {
Ok(Some(_)) => { /* 既にインデックス済み */ }
Ok(None) => {
if let Ok(mut guard) = state.watch_ingestion_status.write() {
*guard = format!("取込中: {}", display_name);
}
if let Err(e) = handle.block_on(ingest_file_path(state, &path)) {
log::warn!("[watch] initial_scan ingest {:?}: {}", path, e);
} else {
log::info!("[watch] initial_scan ingested: {}", path_str);
}
if let Ok(mut guard) = state.watch_ingestion_status.write() {
guard.clear();
}
}
Err(e) => log::warn!("[watch] initial_scan get_doc_id {:?}: {}", path, e),
}
}
}
}
/// 指定ディレクトリを再帰監視し、イベントを DB に反映する。別スレッドでデバウンスを動かす。
/// `handle`: Tokio ランタイムの Handle(std スレッド内では Handle::current() が使えないため呼び出し元から渡す)。
/// `rx` から WatcherConfig(パス一覧・取込対象拡張子)を受け取り監視を開始。設定保存で新しい config が送られると再起動する(Phase 3)。
pub fn spawn_folder_watcher(handle: Handle, state: Arc<AppState>, rx: std::sync::mpsc::Receiver<WatcherConfig>) {
use std::sync::mpsc::RecvTimeoutError;
std::thread::spawn(move || {
let state = state.clone();
let mut config = match rx.recv() {
Ok(c) => c,
Err(_) => return,
};
loop {
if config.paths.is_empty() {
log::info!("[watch] no paths, waiting for next config");
match rx.recv() {
Ok(c) => config = c,
Err(_) => break,
}
continue;
}
let extensions = if config.extensions.is_empty() {
DEFAULT_WATCH_EXTENSIONS.iter().map(|s| s.to_string()).collect::<Vec<_>>()
} else {
config.extensions.clone()
};
let state_for_debouncer = state.clone();
let handle_for_cb = handle.clone();
let extensions_for_cb = extensions.clone();
let mut debouncer = match new_debouncer(Duration::from_secs(2), move |res: DebounceEventResult| {
match res {
Ok(events) => {
for e in events {
process_event(state_for_debouncer.as_ref(), &e.path, &handle_for_cb, &extensions_for_cb);
}
}
Err(e) => {
log::warn!("[watch] debouncer error: {:?}", e);
}
}
}) {
Ok(d) => d,
Err(e) => {
log::error!("[watch] failed to create debouncer: {}", e);
match rx.recv() {
Ok(c) => config = c,
Err(_) => break,
}
continue;
}
};
for path in &config.paths {
if path.is_dir() {
if let Err(e) = debouncer.watcher().watch(path, notify::RecursiveMode::Recursive) {
log::warn!("[watch] watch {:?}: {}", path, e);
} else {
log::info!("[watch] watching: {}", path.display());
}
} else {
log::warn!("[watch] skip (not a directory): {}", path.display());
}
}
// モニター追加時: 既存ファイルのうち未インデックスのものを取り込む
for path in &config.paths {
if path.is_dir() {
log::info!("[watch] initial_scan: {}", path.display());
initial_scan_directory(state.as_ref(), path, &extensions, &handle);
}
}
// 設定保存で新しい config が送られるまで待つ(recv_timeout で 1 秒ごとにチェック)
loop {
match rx.recv_timeout(Duration::from_secs(1)) {
Ok(new_config) => {
config = new_config;
log::info!("[watch] restarting with new config");
break;
}
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => return,
}
}
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
#[test]
fn is_watched_file_matches_configured_extensions() {
let exts = vec!["txt".to_string(), "md".to_string()];
assert!(is_watched_file(Path::new("/a/b/c.txt"), &exts));
assert!(is_watched_file(Path::new("file.md"), &exts));
assert!(!is_watched_file(Path::new("/a/b/c.rs"), &exts));
assert!(!is_watched_file(Path::new("/a/b/noext"), &exts));
}
#[test]
fn is_watched_file_extension_case_insensitive() {
let exts = vec!["TXT".to_string()];
assert!(is_watched_file(Path::new("a.TXT"), &exts));
assert!(is_watched_file(Path::new("a.txt"), &exts));
}
#[test]
fn is_watched_file_empty_extensions_returns_false() {
let exts: Vec<String> = vec![];
assert!(!is_watched_file(Path::new("/a/b/c.txt"), &exts));
}
}