diff --git a/README.md b/README.md index 0173220..4981823 100644 --- a/README.md +++ b/README.md @@ -17,9 +17,12 @@ - **完全な知識管理 (CRUD)**: `add_item_text`, `update_item`, `delete_item` を通じて、AIエージェントが自律的に知識を蓄積・修正・削除可能。 - **セマンティック検索 (Vector Search)**: `sqlite-vec` + `llama-server` (Gemma-3) による高度な意味検索。 +- **動的次元検知 & 自動同期**: モデル変更時にベクトルの次元数(Gemma-3なら640等)を自動検知し、不足しているベクトルをバックグラウンドで自動生成(ヒーリング)します。 - **サイドカー自動管理**: `llama-server` の起動・ヘルスチェック・ログ転送をRust側で制御。 - **MCP SSE サーバー**: LM Studio, Claude Desktop 等の外部ツールからプラグインとして即座に利用可能。 - **Premium UI**: ガラスモーフィズムを採用した洗練されたデスクトップUX。 +- **常駐モード (Resident Mode)**: システムトレイに常駐し、閉じてもMCPサーバーが待機し続けます。 +- **リアルタイム・アクティビティログ**: MCP経由のツール呼び出しをUI上でリアルタイムに監視可能。 - **堅牢なロギング**: ローテーション機能付きログ出力。`llama-server` の詳細な内部ログもキャプチャします。 --- @@ -47,6 +50,11 @@ bun run tauri dev ``` +### 4. 操作方法 + +- **ウィンドウの最小化 / 閉じ**: 「×」ボタンで閉じるとシステムトレイ(画面右下)に格納されます。 +- **右クリックメニュー**: システムトレイのアイコンを右クリックして「終了」を選ぶと、アプリを完全に終了できます。 + --- ## システム構造 diff --git a/src-tauri/build.rs b/src-tauri/build.rs index 98be475..05526d0 100644 --- a/src-tauri/build.rs +++ b/src-tauri/build.rs @@ -1,6 +1,12 @@ fn main() { tauri_build::build(); + // Allow skipping DLL copy in userland/dev environments where copying may require elevated permissions. + if std::env::var("SKIP_DLL_COPY").is_ok() { + println!("cargo:warning=[DLL-COPY] SKIP_DLL_COPY set; skipping DLL copy steps."); + return; + } + // MinGW-free DLL handling: Copy vec0.dll from node_modules let out_dir = std::env::var("OUT_DIR").expect("OUT_DIR not set"); let out_path = std::path::Path::new(&out_dir); @@ -9,7 +15,7 @@ // Traverse up until we find "debug" or "release" directory that is a direct child of "target" let mut debug_dir = out_path.to_path_buf(); let mut found = false; - for _ in 0..10 { + for _ in 0..10 { if let Some(file_name) = debug_dir.file_name().and_then(|n| n.to_str()) { if file_name == "debug" || file_name == "release" { if let Some(parent) = debug_dir.parent() { @@ -24,20 +30,23 @@ break; } } - + if !found { println!("cargo:warning=[DLL-COPY] Could not find target/debug root from {:?}. Using default ancestor logic.", out_path); // Fallback: 3 levels up from OUT_DIR is usually target/debug/build/app/out -> target/debug if let Some(ancestor) = out_path.ancestors().nth(3) { - debug_dir = ancestor.to_path_buf(); + debug_dir = ancestor.to_path_buf(); } } else { - println!("cargo:warning=[DLL-COPY] Found output root: {:?}", debug_dir); + println!( + "cargo:warning=[DLL-COPY] Found output root: {:?}", + debug_dir + ); } - + let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").expect("MANIFEST_DIR not set"); let manifest_path = std::path::Path::new(&manifest_dir); - + let copy_to_output = |src: &std::path::Path, name: &str| { // Copy to target/debug (for execution) and target/debug/deps (for tests/build) let dests = vec![debug_dir.join(name), debug_dir.join("deps").join(name)]; @@ -47,7 +56,10 @@ } match std::fs::copy(src, &dest) { Ok(_) => println!("cargo:warning=[DLL-COPY] Copied {:?} to {:?}", src, dest), - Err(e) => println!("cargo:warning=[DLL-COPY] Failed to copy {:?} to {:?}: {}", src, dest, e), + Err(e) => println!( + "cargo:warning=[DLL-COPY] Failed to copy {:?} to {:?}: {}", + src, dest, e + ), } } }; @@ -57,7 +69,10 @@ if vec0_src.exists() { copy_to_output(&vec0_src, "vec0.dll"); } else { - println!("cargo:warning=[DLL-COPY] vec0.dll not found in node_modules at {:?}", vec0_src); + println!( + "cargo:warning=[DLL-COPY] vec0.dll not found in node_modules at {:?}", + vec0_src + ); } // 2. Copy all DLLs from bin/ to support sidecar execution during development @@ -74,6 +89,9 @@ } } } else { - println!("cargo:warning=[DLL-COPY] bin directory not found at {:?}", bin_dir); + println!( + "cargo:warning=[DLL-COPY] bin directory not found at {:?}", + bin_dir + ); } } diff --git a/src-tauri/src/db.rs b/src-tauri/src/db.rs index e24b113..dba379d 100644 --- a/src-tauri/src/db.rs +++ b/src-tauri/src/db.rs @@ -1,64 +1,286 @@ -use rusqlite::{Connection, Result}; +use sqlx::Row; +use sqlx::SqlitePool; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; +use std::borrow::Cow; use std::path::Path; -use std::fs; +use std::str::FromStr; -pub fn init_db(db_path: &Path, extension_path: &Path) -> Result { - // Ensure parent directory exists +/// データベースを初期化し、コネクションプールを返す。 +/// 埋め込み次元が変更されている場合は vec_items テーブルを再作成する。 +pub async fn initialize_database( + db_path: &Path, + extension_path: &Path, + dimension: usize, +) -> Result { + // ディレクトリの作成 if let Some(parent) = db_path.parent() { if !parent.exists() { - fs::create_dir_all(parent).map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?; + std::fs::create_dir_all(parent).map_err(|e| e.to_string())?; } } - let conn = Connection::open(db_path)?; + let db_path_str = db_path.to_str().ok_or("Invalid DB path")?; + let ext_path_str = extension_path.to_str().ok_or("Invalid extension path")?; - // Load sqlite-vec extension - // IMPORTANT: This relies on vec0.dll being in a place where standard LoadLibrary can find it - // OR we pass the absolute path. - // Since build.rs copies it to the same dir as the exe, we can try loading by name "vec0" or full path. - // If we use full path, we need to know where the exe is. - // But passing extension_path allows flexibility from main.rs. - unsafe { - conn.load_extension_enable()?; - conn.load_extension(extension_path, None)?; - conn.load_extension_disable()?; - } + let opts = SqliteConnectOptions::from_str(&format!("sqlite://{}?mode=rwc", db_path_str)) + .map_err(|e| e.to_string())? + .extension(ext_path_str.to_owned()); - // Initialize Schema - conn.execute_batch( - "PRAGMA journal_mode = WAL; - CREATE TABLE IF NOT EXISTS items ( + let pool = SqlitePoolOptions::new() + .max_connections(5) + .connect_with(opts) + .await + .map_err(|e| e.to_string())?; + + // スキーマ初期化 + init_schema(&pool, dimension).await?; + + Ok(pool) +} + +async fn init_schema(pool: &SqlitePool, dimension: usize) -> Result<(), String> { + // PRAGMA設定 + sqlx::query("PRAGMA journal_mode = WAL") + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + // 標準テーブル作成 + sqlx::query( + "CREATE TABLE IF NOT EXISTS items ( id INTEGER PRIMARY KEY AUTOINCREMENT, content TEXT NOT NULL, path TEXT, created_at TEXT DEFAULT (datetime('now', 'localtime')), updated_at TEXT DEFAULT (datetime('now', 'localtime')) - ); - CREATE TRIGGER IF NOT EXISTS update_items_updated_at + )", + ) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + // トリガー作成 + sqlx::query( + "CREATE TRIGGER IF NOT EXISTS update_items_updated_at AFTER UPDATE ON items FOR EACH ROW BEGIN UPDATE items SET updated_at = datetime('now', 'localtime') WHERE id = OLD.id; - END; - CREATE VIRTUAL TABLE IF NOT EXISTS vec_items USING vec0( - id INTEGER PRIMARY KEY, - embedding FLOAT[640] - );" - )?; + END", + ) + .execute(pool) + .await + .map_err(|e| e.to_string())?; - Ok(conn) + // vec_items の次元数チェックと初期化 + check_and_init_vector_table(pool, dimension).await?; + + Ok(()) } -use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; -use std::str::FromStr; -use std::borrow::Cow; +async fn check_and_init_vector_table(pool: &SqlitePool, dimension: usize) -> Result<(), String> { + // 現在のテーブル定義を確認 + let row = sqlx::query("SELECT sql FROM sqlite_master WHERE type='table' AND name='vec_items'") + .fetch_optional(pool) + .await + .map_err(|e| e.to_string())?; -pub async fn init_pool(db_path: &str, extension_path: impl Into>) -> Result { + let should_recreate = if let Some(row) = row { + let sql: String = row.get(0); + // "FLOAT[640]" のような文字列が含まれているかチェック + let expected = format!("FLOAT[{}]", dimension); + if !sql.contains(&expected) { + log::info!("Dimension mismatch detected (expected {}). Rebuilding vec_items...", dimension); + true + } else { + false + } + } else { + true + }; + + if should_recreate { + let mut tx = pool.begin().await.map_err(|e| e.to_string())?; + + sqlx::query("DROP TABLE IF EXISTS vec_items") + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + + let create_sql = format!( + "CREATE VIRTUAL TABLE vec_items USING vec0(id INTEGER PRIMARY KEY, embedding FLOAT[{}])", + dimension + ); + sqlx::query(&create_sql) + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + + // 既存の items があればベクトルを再生成する必要があるが、 + // ここでは空のテーブル作成に留める。 + // (/v1/embeddings が必要になるため、再生成は別のユーティリティで行うのが安全) + + tx.commit().await.map_err(|e| e.to_string())?; + } + + Ok(()) +} + +/// 全データのベクトルを再生成する +pub async fn rebuild_vector_data( + pool: &SqlitePool, + dimension: usize, + embed_fn: F, +) -> Result<(), String> +where + F: Fn(String) -> Fut, + Fut: std::future::Future, String>>, +{ + // 強制的に再作成 + { + let mut tx = pool.begin().await.map_err(|e| e.to_string())?; + sqlx::query("DROP TABLE IF EXISTS vec_items") + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + let create_sql = format!( + "CREATE VIRTUAL TABLE vec_items USING vec0(id INTEGER PRIMARY KEY, embedding FLOAT[{}])", + dimension + ); + sqlx::query(&create_sql) + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + tx.commit().await.map_err(|e| e.to_string())?; + } + + let rows = sqlx::query("SELECT id, content FROM items") + .fetch_all(pool) + .await + .map_err(|e| e.to_string())?; + + for row in rows { + let id: i64 = row.get("id"); + let content: String = row.get("content"); + let emb = embed_fn(content).await?; + + sqlx::query("INSERT INTO vec_items (id, embedding) VALUES (?, ?)") + .bind(id) + .bind(serde_json::to_string(&emb).unwrap_or_default()) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + } + + Ok(()) +} + +/// itemsテーブルにあってvec_itemsテーブルにないデータを同期する(ヒーリング) +pub async fn sync_vectors( + pool: &SqlitePool, + embed_fn: F, +) -> Result +where + F: Fn(String) -> Fut, + Fut: std::future::Future, String>>, +{ + // vec_itemsに存在しないIDを抽出 + let rows = sqlx::query( + "SELECT i.id, i.content + FROM items i + LEFT JOIN vec_items v ON i.id = v.id + WHERE v.id IS NULL" + ) + .fetch_all(pool) + .await + .map_err(|e| e.to_string())?; + + let count = rows.len(); + if count == 0 { + return Ok(0); + } + + log::info!("Healing {} missing vectors...", count); + + for row in rows { + let id: i64 = row.get("id"); + let content: String = row.get("content"); + + match embed_fn(content).await { + Ok(emb) => { + sqlx::query("INSERT INTO vec_items (id, embedding) VALUES (?, ?)") + .bind(id) + .bind(serde_json::to_string(&emb).unwrap_or_default()) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + } + Err(e) => { + log::error!("Failed to generate embedding for item {}: {}", id, e); + } + } + } + + Ok(count) +} + +// 下位互換性(既存の依存箇所が多いため、一時的に定義を残すか、順次書き換える) +pub async fn init_pool( + db_path: &str, + extension_path: impl Into>, +) -> Result { let opts = SqliteConnectOptions::from_str(&format!("sqlite://{}?mode=rwc", db_path))? .extension(extension_path); - let pool = SqlitePoolOptions::new() - .connect_with(opts) - .await?; - Ok(pool) + SqlitePoolOptions::new().connect_with(opts).await +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + use tempfile::tempdir; + + #[tokio::test] + async fn test_dimension_change_and_rebuild() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("test_telos.db"); + + // DLLのパス取得(ビルドディレクトリにあることが前提) + // テスト環境では $OUT_DIR や $CARGO_MANIFEST_DIR 基準で探す + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + let ext_path = Path::new(manifest_dir).join("../node_modules/sqlite-vec-windows-x64/vec0.dll"); + + if !ext_path.exists() { + println!("Skipping test: vec0.dll not found at {:?}", ext_path); + return; + } + + // 1. 最初は次元数 384 で初期化 + let pool = initialize_database(&db_path, &ext_path, 384).await.unwrap(); + + // テーブル定義の確認 + let row: (String,) = sqlx::query_as("SELECT sql FROM sqlite_master WHERE name='vec_items'") + .fetch_one(&pool).await.unwrap(); + assert!(row.0.contains("FLOAT[384]")); + + // データ挿入 + sqlx::query("INSERT INTO items (content) VALUES ('test content')") + .execute(&pool).await.unwrap(); + + pool.close().await; + + // 2. 次は次元数 768 で再初期化(不整合検知 -> 再作成) + let pool2 = initialize_database(&db_path, &ext_path, 768).await.unwrap(); + let row2: (String,) = sqlx::query_as("SELECT sql FROM sqlite_master WHERE name='vec_items'") + .fetch_one(&pool2).await.unwrap(); + assert!(row2.0.contains("FLOAT[768]")); + + // items テーブルは維持されていることを確認 + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM items") + .fetch_one(&pool2).await.unwrap(); + assert_eq!(count.0, 1); + + pool2.close().await; + } } diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 5f85a4b..f0e790b 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -1,11 +1,46 @@ -mod db; +// 使用中モデル名をグローバルで保持 +#[allow(dead_code)] +static MODEL_NAME: &str = "gemma-3-270m-it-Q4_K_M.gguf"; +// モデル名を返すAPI +#[tauri::command] +#[allow(dead_code)] +fn get_model_name() -> String { + MODEL_NAME.to_string() +} +// Read last N lines from the log file for UI consumption +#[tauri::command] +fn read_logs(limit: Option) -> Result { + let log_dir = if cfg!(debug_assertions) { + std::env::current_dir() + .map_err(|e| e.to_string())? + .join("logs") + } else { + dirs::data_dir() + .unwrap_or_else(|| std::env::current_dir().unwrap()) + .join("com.telosdb.app") + .join("logs") + }; + + let log_file = log_dir.join("telos.log"); + if !log_file.exists() { + return Ok(String::new()); + } + + let s = std::fs::read_to_string(&log_file).map_err(|e| e.to_string())?; + let lines: Vec<&str> = s.lines().collect(); + let n = limit.unwrap_or(200); + let start = if lines.len() > n { lines.len() - n } else { 0 }; + Ok(lines[start..].join("\n")) +} +pub mod db; mod mcp; -use tauri::Manager; -use tauri_plugin_shell::ShellExt; -use tauri_plugin_shell::process::{CommandEvent, CommandChild}; use std::sync::{Arc, Mutex}; - +use tauri::Manager; +use tauri::menu::{Menu, MenuItem}; +use tauri::tray::{TrayIconBuilder, TrayIconEvent}; +use tauri_plugin_shell::process::{CommandChild, CommandEvent}; +use tauri_plugin_shell::ShellExt; #[allow(dead_code)] struct AppState { @@ -21,12 +56,50 @@ let log_dir = if cfg!(debug_assertions) { std::env::current_dir().unwrap().join("logs") } else { - dirs::data_dir().unwrap_or_else(|| std::env::current_dir().unwrap()).join("com.telosdb.app").join("logs") + dirs::data_dir() + .unwrap_or_else(|| std::env::current_dir().unwrap()) + .join("com.telosdb.app") + .join("logs") }; std::fs::create_dir_all(&log_dir).ok(); - let log_file_path = log_dir.join("telos.log"); + + // Prune old telos logs, keep latest 7 files + if let Ok(entries) = std::fs::read_dir(&log_dir) { + let mut logs: Vec<_> = entries + .filter_map(|e| e.ok()) + .filter_map(|de| { + let p = de.path(); + if let Some(name) = p.file_name().and_then(|n| n.to_str()) { + if name.starts_with("telos") && name.ends_with(".log") { + return Some(p); + } + } + None + }) + .collect(); + + logs.sort_by_key(|p| std::fs::metadata(p).and_then(|m| m.modified()).ok()); + while logs.len() > 7 { + let old = logs.remove(0); + let _ = std::fs::remove_file(old); + } + } tauri_plugin_log::Builder::default() + .level(log::LevelFilter::Info) + .filter(|metadata| { + if metadata.target().starts_with("reqwest") + || metadata.target().starts_with("hyper") + || metadata.target().starts_with("h2") + || metadata.target().starts_with("tracing") + || metadata.target().starts_with("html5ever") + || metadata.target().starts_with("selectors") + { + metadata.level() <= log::Level::Warn + } else { + true + } + }) .targets([ tauri_plugin_log::Target::new(tauri_plugin_log::TargetKind::Stdout), tauri_plugin_log::Target::new(tauri_plugin_log::TargetKind::LogDir { @@ -38,106 +111,224 @@ .max_file_size(10 * 1024 * 1024) .build() }) - + .invoke_handler(tauri::generate_handler![get_model_name, read_logs]) .setup({ let llama_child = llama_child.clone(); move |app| { // Resolve paths - let app_data_dir = app.path().app_data_dir().expect("failed to get app data dir"); + let app_data_dir = app + .path() + .app_data_dir() + .expect("failed to get app data dir"); let db_path = app_data_dir.join("telos.db"); // llama-serverの起動をTauriのsidecar APIで行う let resource_dir = app.path().resource_dir().unwrap_or_default(); let bin_dir = resource_dir.join("bin"); let model_path = bin_dir.join("gemma-3-270m-it-Q4_K_M.gguf"); - + // vec0.dll はビルド時に実行ルート(resource_dir)にコピーされる let mut vec0_path = resource_dir.join("vec0.dll"); - + // 開発時 (target/debug) かつ bin にある場合のフォールバック if !vec0_path.exists() && bin_dir.join("vec0.dll").exists() { vec0_path = bin_dir.join("vec0.dll"); } + // Tray Menu + let quit_i = MenuItem::with_id(app, "quit", "終了", true, None::<&str>)?; + let show_i = MenuItem::with_id(app, "show", "表示", true, None::<&str>)?; + let menu = Menu::with_items(app, &[&show_i, &quit_i])?; + + let llama_child_tray = llama_child.clone(); + let _tray = TrayIconBuilder::new() + .icon(app.default_window_icon().unwrap().clone()) + .menu(&menu) + .show_menu_on_left_click(false) + .on_menu_event(move |app, event| { + match event.id.as_ref() { + "quit" => { + if let Some(child) = llama_child_tray.lock().unwrap().take() { + let _ = child.kill(); + } + app.exit(0); + } + "show" => { + if let Some(window) = app.get_webview_window("main") { + let _ = window.show().unwrap(); + let _ = window.set_focus().unwrap(); + } + } + _ => {} + } + }) + .on_tray_icon_event(|tray, event| { + if let TrayIconEvent::Click { + button: tauri::tray::MouseButton::Left, + .. + } = event + { + let app = tray.app_handle(); + if let Some(window) = app.get_webview_window("main") { + if window.is_visible().unwrap_or(false) { + let _ = window.hide().unwrap(); + } else { + let _ = window.show().unwrap(); + let _ = window.set_focus().unwrap(); + } + } + } + }) + .build(app)?; + log::info!("Initializing TelosDB at {:?}", db_path); log::info!("Bin directory: {:?}", bin_dir); - log::info!("Model path: {:?}", model_path); + log::info!("Model path (Gemma-3): {:?}", model_path); log::info!("vec0.dll path: {:?}", vec0_path); if !vec0_path.exists() { - log::error!("vec0.dll NOT FOUND at {:?}. Vector search and DB init will fail.", vec0_path); + log::error!( + "vec0.dll NOT FOUND at {:?}. Vector search and DB init will fail.", + vec0_path + ); } // llama-server自動起動(Tauri sidecar API使用) if model_path.exists() { - let (mut rx, child) = app.shell() + let (mut rx, child) = app + .shell() .sidecar("llama-server") .expect("failed to create sidecar") - .args(["--model", model_path.to_str().unwrap(), "--port", "8080", "--embedding", "--parallel", "1"]) + .args([ + "--model", + model_path.to_str().unwrap(), + "--port", + "8080", + "--embedding", + "--pooling", + "mean", + ]) .spawn() .expect("failed to spawn sidecar"); - log::info!("llama-server sidecar started"); + log::info!("llama-server sidecar started (Qwen3-4B)"); *llama_child.lock().unwrap() = Some(child); std::thread::spawn(move || { while let Some(event) = rx.blocking_recv() { match event { - CommandEvent::Stdout(line) => log::info!("llama-server: {}", String::from_utf8_lossy(&line)), - CommandEvent::Stderr(line) => log::error!("llama-server: {}", String::from_utf8_lossy(&line)), + CommandEvent::Stdout(line) => { + log::info!("llama-server: {}", String::from_utf8_lossy(&line)) + } + CommandEvent::Stderr(line) => { + let s = String::from_utf8_lossy(&line); + if !s.contains("post_embedding") { // 埋め込みリクエスト時のノイズを軽減 + log::error!("llama-server: {}", s) + } + } _ => {} } } }); - } else { - log::error!("gemma-3-270m-it-Q4_K_M.gguf not found at {:?}", model_path); - } - // DB初期化 - match db::init_db(&db_path, &vec0_path) { - Ok(_) => log::info!("Database schema initialized."), - Err(e) => log::error!("Database schema init failed: {:?}", e), - } + // llama-serverの起動待ちと次元の動的取得 + let pool = tauri::async_runtime::block_on(async { + let client = reqwest::Client::new(); + let mut dimension = 768; // デフォルト値 - // Async Pool - let pool: sqlx::SqlitePool = tauri::async_runtime::block_on(async { - match db::init_pool(db_path.to_str().unwrap(), vec0_path.to_str().unwrap().to_owned()).await { - Ok(pool) => { - log::info!("App State managed with SQLx pool."); - pool - }, - Err(e) => { - log::error!("Failed to create SQLx pool: {:?}", e); - panic!("Failed to create SQLx pool"); + log::info!("Waiting for llama-server to be ready..."); + for _ in 0..30 { + if let Ok(resp) = client.get("http://127.0.0.1:8080/health").send().await { + if resp.status().is_success() { + log::info!("llama-server is healthy."); + + // ダミーの埋め込みリクエストで次元を取得 + let payload = serde_json::json!({ + "input": ["dim_check"], + "model": "default" + }); + if let Ok(resp) = client.post("http://127.0.0.1:8080/v1/embeddings").json(&payload).send().await { + if let Ok(json) = resp.json::().await { + if let Some(emb) = json["data"][0]["embedding"].as_array() { + dimension = emb.len(); + log::info!("Detected embedding dimension: {}", dimension); + } + } + } + break; + } + } + tokio::time::sleep(std::time::Duration::from_millis(500)).await; } - } - }); - app.manage(AppState { db_pool: pool.clone() }); - // MCP Server - let db_path_str = db_path.to_str().unwrap().to_owned(); - let vec0_path_str = vec0_path.to_str().unwrap().to_owned(); - - use tokio::sync::RwLock; - let llama_status = Arc::new(RwLock::new("unknown".to_string())); - tauri::async_runtime::spawn({ - let llama_status = llama_status.clone(); - async move { - mcp::run_server(3001, &db_path_str, &vec0_path_str, llama_status).await; - } - }); + // DB初期化(一本化されたロジック) + match db::initialize_database(&db_path, &vec0_path, dimension).await { + Ok(pool) => { + log::info!("Database initialized (dim={}).", dimension); + + // ベクトルの不整合をチェックして修復(ヒーリング) + let pool_clone = pool.clone(); + let client_clone = client.clone(); + tauri::async_runtime::spawn(async move { + let res = db::sync_vectors(&pool_clone, |content| { + let c = client_clone.clone(); + async move { + let payload = serde_json::json!({ + "input": [content], + "model": "default" + }); + let resp = c.post("http://127.0.0.1:8080/v1/embeddings") + .json(&payload) + .send() + .await + .map_err(|e| e.to_string())?; + + let json = resp.json::().await.map_err(|e| e.to_string())?; + let emb = json["data"][0]["embedding"].as_array().ok_or("No embedding in response")?; + Ok(emb.iter().map(|v| v.as_f64().unwrap_or(0.0) as f32).collect()) + } + }).await; + match res { + Ok(0) => log::info!("Vector synchronization complete: All items already have embeddings."), + Ok(count) => log::info!("Vector synchronization complete: {} missing embeddings healed.", count), + Err(e) => log::error!("Vector synchronization failed: {}", e), + } + }); + + pool + } + Err(e) => { + log::error!("Database initialization failed: {}", e); + panic!("DB Init Error: {}", e); + } + } + }); + + app.manage(AppState { + db_pool: pool.clone(), + }); + + // MCP Server (Poolを直接渡すように修正) + use tokio::sync::RwLock; + let llama_status = Arc::new(RwLock::new("unknown".to_string())); + tauri::async_runtime::spawn({ + let llama_status = llama_status.clone(); + let pool = pool.clone(); + async move { + mcp::run_server(3001, pool, llama_status, MODEL_NAME.to_string()).await; + } + }); + } else { + log::error!("{} not found at {:?}", MODEL_NAME, model_path); + } Ok(()) } }) - .on_window_event({ - let llama_child = llama_child.clone(); - move |_app_handle, event| { - if let tauri::WindowEvent::CloseRequested { .. } = event { - // llama-serverプロセスをkill - if let Some(child) = llama_child.lock().unwrap().take() { - let _ = child.kill(); - } - } + .on_window_event(|window, event| { + if let tauri::WindowEvent::CloseRequested { api, .. } = event { + // Prevent window from closing, just hide it + api.prevent_close(); + let _ = window.hide().unwrap(); } }) .run(tauri::generate_context!()) diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index ad5fe83..69c3a72 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -2,5 +2,5 @@ #![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] fn main() { - app_lib::run(); + app_lib::run(); } diff --git a/src-tauri/src/mcp.rs b/src-tauri/src/mcp.rs index 398aea0..a520efa 100644 --- a/src-tauri/src/mcp.rs +++ b/src-tauri/src/mcp.rs @@ -1,35 +1,43 @@ +// use crate::db; use axum::{ - extract::{State, Query}, - response::{sse::{Event, Sse}, IntoResponse}, + extract::{Query, State}, + response::{ + sse::{Event, Sse}, + IntoResponse, + }, routing::{get, post}, - Router, Json, + Json, Router, }; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::{RwLock, mpsc}; -use serde::{Deserialize, Serialize}; -use std::convert::Infallible; -use tokio::sync::broadcast; use futures::stream::Stream; -use tokio_stream::StreamExt; -use tower_http::cors::{Any, CorsLayer}; -use crate::db; +use serde::{Deserialize, Serialize}; +use chrono::Utc; use sqlx::Row; +use std::collections::HashMap; +use std::convert::Infallible; +use std::sync::Arc; +use tokio::sync::broadcast; +use tokio::sync::{mpsc, RwLock}; +use tower_http::cors::{Any, CorsLayer}; #[derive(Clone)] pub struct AppState { pub db_pool: sqlx::SqlitePool, pub tx: broadcast::Sender, pub llama_status: Arc>, + pub model_name: String, // MCP sessions map pub sessions: Arc>>>, } -pub async fn run_server(port: u16, db_path: &str, vec0_path: &str, llama_status: Arc>) { - let db_pool = db::init_pool(db_path, vec0_path.to_owned()).await.expect("DB pool init failed"); +pub async fn run_server( + port: u16, + db_pool: sqlx::SqlitePool, + llama_status: Arc>, + model_name: String, +) { let (tx, _rx) = broadcast::channel(100); let sessions = Arc::new(RwLock::new(HashMap::new())); - + // llama-server status monitor let llama_status_clone = llama_status.clone(); tokio::spawn(async move { @@ -51,7 +59,13 @@ } }); - let app_state = AppState { db_pool, tx, llama_status: llama_status.clone(), sessions }; + let app_state = AppState { + db_pool, + tx, + llama_status: llama_status.clone(), + model_name, + sessions, + }; let cors = CorsLayer::new() .allow_origin(Any) @@ -63,6 +77,7 @@ .route("/messages", post(mcp_messages_handler)) .route("/llama_status", get(llama_status_handler)) .route("/doc_count", get(doc_count_handler)) + .route("/model_name", get(model_name_handler)) .layer(cors) .with_state(app_state); @@ -87,6 +102,11 @@ Json(serde_json::json!({ "count": count })) } +async fn model_name_handler(State(state): State) -> impl IntoResponse { + Json(serde_json::json!({ "model_name": state.model_name })) +} + +#[allow(dead_code)] #[derive(Deserialize)] struct SseQuery { session_id: Option, @@ -99,9 +119,9 @@ // Generate a simple session ID let session_id = uuid::Uuid::new_v4().to_string(); let (tx, rx) = mpsc::unbounded_channel::(); - + log::info!("New MCP SSE Session: {}", session_id); - + // Register session state.sessions.write().await.insert(session_id.clone(), tx); @@ -114,12 +134,18 @@ let global_rx = state.tx.subscribe(); let stream = futures::stream::unfold( - (rx, Some(endpoint_event), session_id_for_close, sessions_for_close, global_rx), + ( + rx, + Some(endpoint_event), + session_id_for_close, + sessions_for_close, + global_rx, + ), |(mut rx, mut initial, sid, smap, mut grx)| async move { if let Some(event) = initial.take() { return Some((Ok(event), (rx, None, sid, smap, grx))); } - + tokio::select! { Some(msg) = rx.recv() => { Some((Ok(Event::default().event("message").data(msg)), (rx, None, sid, smap, grx))) @@ -140,7 +166,7 @@ Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default()) } -#[derive(Deserialize)] +#[derive(Serialize, Deserialize)] struct JsonRpcRequest { jsonrpc: String, method: String, @@ -163,21 +189,41 @@ session_id: Option, } +impl IntoResponse for JsonRpcResponse { + fn into_response(self) -> axum::response::Response { + Json(self).into_response() + } +} + async fn get_embedding(content: &str) -> Result, String> { + let payload = serde_json::json!({ + "input": [content], + "model": "default" + }); + log::info!("Sending embedding request: {}", payload); + let client = reqwest::Client::new(); - let resp = client.post("http://127.0.0.1:8080/embedding") - .json(&serde_json::json!({ "content": content })) + let resp = client + .post("http://127.0.0.1:8080/v1/embeddings") + .json(&payload) .send() .await .map_err(|e| e.to_string())?; - let json: serde_json::Value = resp.json().await.map_err(|e| e.to_string())?; - let embedding = json["embedding"].as_array() - .ok_or("No embedding field in llama-server response")? + let body_text = resp.text().await.map_err(|e| e.to_string())?; + log::info!("Received embedding response: {}", body_text); + + let json: serde_json::Value = serde_json::from_str(&body_text).map_err(|e| e.to_string())?; + + // Parse OpenAI-compatible response: {"data": [{"embedding": [...]}]} + let emb_value = json["data"][0]["embedding"].as_array(); + + let embedding = emb_value + .ok_or_else(|| format!("No embedding found in llama-server response: {}", json))? .iter() .map(|v| v.as_f64().unwrap_or(0.0) as f32) .collect(); - + Ok(embedding) } @@ -189,12 +235,37 @@ let method = req.method.as_str(); log::info!("MCP Request: {} (Session: {:?})", method, query.session_id); - let result = match method { - "initialize" => Some(serde_json::json!({ - "protocolVersion": "2024-11-05", - "capabilities": { "tools": {} }, - "serverInfo": { "name": "TelosDB", "version": "0.1.0" } - })), + // 受信データを構造化JSONで出力(timestamp と source を含む) + let structured = serde_json::json!({ + "timestamp": Utc::now().to_rfc3339(), + "source": "mcp", + "session": query.session_id, + "method": method, + "id": req.id, + "params": req.params, + }); + log::info!("{}", serde_json::to_string(&structured).unwrap_or_else(|_| "{\"error\":\"serialize_failed\"}".to_string())); + + let result: Option = match method { + "initialize" => { + let client_version = req.params.as_ref() + .and_then(|p| p.get("protocolVersion")) + .and_then(|v| v.as_str()) + .unwrap_or("2024-11-05"); + + log::info!("MCP Handshake: Client requested protocol version {}", client_version); + + Some(serde_json::json!({ + "protocolVersion": client_version, + "capabilities": { + "tools": { "listChanged": false }, + "resources": { "listChanged": false, "subscribe": false }, + "prompts": { "listChanged": false }, + "logging": {} + }, + "serverInfo": { "name": "TelosDB", "version": "0.1.0" } + })) + }, "notifications/initialized" => None, "tools/list" => Some(serde_json::json!({ "tools": [ @@ -245,47 +316,118 @@ }, "required": ["id"] } + }, + { + "name": "get_item_by_id", + "description": "Get text content by item ID.", + "inputSchema": { + "type": "object", + "properties": { + "id": { "type": "integer" } + }, + "required": ["id"] + } } ] })), - "search_text" | "tools/call" => { + "search_text" | "tools/call" | "add_item_text" | "update_item" | "delete_item" | "get_item_by_id" => { let p = req.params.clone().unwrap_or_default(); let (actual_method, args) = if method == "tools/call" { - (p.get("name").and_then(|v| v.as_str()).unwrap_or(""), p.get("arguments").cloned().unwrap_or_default()) + ( + p.get("name").and_then(|v| v.as_str()).unwrap_or(""), + p.get("arguments").cloned().unwrap_or_default(), + ) } else { (method, p) }; + // UIへの通知(ツール呼び出し開始) + let _ = state.tx.send(format!("mcp:call:{}", actual_method)); + match actual_method { + "get_item_by_id" => { + let id = args.get("id").and_then(|v| v.as_i64()).unwrap_or(0); + let row = sqlx::query("SELECT id, content, path FROM items WHERE id = ?") + .bind(id) + .fetch_optional(&state.db_pool) + .await + .unwrap_or(None); + if let Some(row) = row { + let content: String = row.get("content"); + let path: Option = row.try_get("path").ok(); + Some(serde_json::json!({ + "id": id, + "content": content, + "path": path + })) + } else { + Some(serde_json::json!({ "error": format!("Item not found: {}", id) })) + } + } "add_item_text" => { let content = args.get("content").and_then(|v| v.as_str()).unwrap_or(""); let path = args.get("path").and_then(|v| v.as_str()); - + + log::info!( + "Executing add_item_text: content='{}', path='{:?}'", + content, + path + ); + match get_embedding(content).await { Ok(emb) => { - let mut tx = state.db_pool.begin().await.unwrap(); - let res = sqlx::query("INSERT INTO items (content, path) VALUES (?, ?)") - .bind(content) - .bind(path) - .execute(&mut *tx) - .await - .unwrap(); - let id = res.last_insert_rowid(); - - sqlx::query("INSERT INTO vec_items (id, embedding) VALUES (?, ?)") - .bind(id) - .bind(serde_json::to_string(&emb).unwrap()) - .execute(&mut *tx) - .await - .unwrap(); - - tx.commit().await.unwrap(); - let _ = state.tx.send("data_changed".to_string()); - Some(serde_json::json!({ "content": [{ "type": "text", "text": format!("Successfully added item with ID: {}", id) }] })) + async fn add_item_inner( + state: &AppState, + content: &str, + path: Option<&str>, + emb: Vec, + ) -> Result { + let mut tx = + state.db_pool.begin().await.map_err(|e| { + format!("Failed to begin transaction: {}", e) + })?; + let res = + sqlx::query("INSERT INTO items (content, path) VALUES (?, ?)") + .bind(content) + .bind(path) + .execute(&mut *tx) + .await + .map_err(|e| format!("Failed to insert item: {}", e))?; + let id = res.last_insert_rowid(); + + sqlx::query("INSERT INTO vec_items (id, embedding) VALUES (?, ?)") + .bind(id) + .bind(serde_json::to_string(&emb).unwrap_or("[]".to_string())) + .execute(&mut *tx) + .await + .map_err(|e| format!("Failed to insert vector: {}", e))?; + + tx.commit() + .await + .map_err(|e| format!("Failed to commit transaction: {}", e))?; + Ok(id) + } + + match add_item_inner(&state, content, path, emb).await { + Ok(id) => { + let _ = state.tx.send("data_changed".to_string()); + log::info!("Successfully added item ID: {}", id); + Some( + serde_json::json!({ "content": [{ "type": "text", "text": format!("Successfully added item with ID: {}", id) }] }), + ) + } + Err(e) => { + log::error!("Failed to add item: {}", e); + Some(serde_json::json!({ "error": e })) + } + } } - Err(e) => Some(serde_json::json!({ "error": format!("Embedding failed: {}", e) })) + Err(e) => { + log::error!("Embedding failed in add_item_text: {}", e); + Some(serde_json::json!({ "error": format!("Embedding failed: {}", e) })) + } } - }, + } "search_text" => { let content = args.get("content").and_then(|v| v.as_str()).unwrap_or(""); let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(10) as u32; @@ -297,45 +439,93 @@ FROM items JOIN vec_items v ON items.id = v.id WHERE v.embedding MATCH ? AND k = ? - ORDER BY distance LIMIT ?" + ORDER BY distance LIMIT ?", ) - .bind(serde_json::to_string(&emb).unwrap()) + .bind(serde_json::to_string(&emb).unwrap_or("[]".to_string())) .bind(limit) .bind(limit) .fetch_all(&state.db_pool) .await .unwrap_or_default(); + log::info!("Search query: '{}'", content); + log::info!("Embedding (first 5): {:?}", &emb[..5.min(emb.len())]); + + // Log results for debugging regardless of output format + for r in &rows { + let id = r.get::(0); + let d = r.get::(2); + log::info!("Result ID: {}, Distance: {}", id, d); + } + let is_mcp_output = method == "tools/call"; if is_mcp_output { - let txt = if rows.is_empty() { "No results.".to_string() } else { - rows.iter().map(|r| format!("[ID: {}, Distance: {:.4}]\n{}", r.get::(0), r.get::(2), r.get::(1))).collect::>().join("\n\n---\n\n") + let txt = if rows.is_empty() { + "No results.".to_string() + } else { + rows.iter() + .map(|r| { + format!( + "[ID: {}, Distance: {:.4}]\n{}", + r.get::(0), + r.get::(2), + r.get::(1) + ) + }) + .collect::>() + .join("\n\n---\n\n") }; - Some(serde_json::json!({ "content": [{ "type": "text", "text": txt }] })) + Some( + serde_json::json!({ "content": [{ "type": "text", "text": txt }] }), + ) } else { - let res: Vec<_> = rows.iter().map(|r| serde_json::json!({ - "id": r.get::(0), - "content": r.get::(1), - "distance": r.get::(2) - })).collect(); + let res: Vec<_> = rows + .iter() + .map(|r| { + serde_json::json!({ + "id": r.get::(0), + "content": r.get::(1), + "distance": r.get::(2) + }) + }) + .collect(); Some(serde_json::json!({ "content": res })) } } Err(e) => { + log::warn!( + "Embedding failed in search_text, falling back to LIKE: {}", + e + ); // Fallback to LIKE if llama-server is not running - let rows = sqlx::query("SELECT id, content FROM items WHERE content LIKE ? LIMIT ?") - .bind(format!("%{}%", content)) - .bind(limit) - .fetch_all(&state.db_pool) - .await - .unwrap_or_default(); - - let txt = format!("(Fallback SEARCH due to embedding error: {})\n\n", e); - let results = rows.iter().map(|r| format!("ID: {}, Content: {}", r.get::(0), r.get::(1))).collect::>().join("\n\n"); - Some(serde_json::json!({ "content": [{ "type": "text", "text": txt + &results }] })) + let rows = sqlx::query( + "SELECT id, content FROM items WHERE content LIKE ? LIMIT ?", + ) + .bind(format!("%{}%", content)) + .bind(limit) + .fetch_all(&state.db_pool) + .await + .unwrap_or_default(); + + let txt = + format!("(Fallback SEARCH due to embedding error: {})\n\n", e); + let results = rows + .iter() + .map(|r| { + format!( + "ID: {}, Content: {}", + r.get::(0), + r.get::(1) + ) + }) + .collect::>() + .join("\n\n"); + Some( + serde_json::json!({ "content": [{ "type": "text", "text": txt + &results }] }), + ) } } - }, + } "update_item" => { let id = args.get("id").and_then(|v| v.as_i64()).unwrap_or(0); let content = args.get("content").and_then(|v| v.as_str()).unwrap_or(""); @@ -343,50 +533,111 @@ match get_embedding(content).await { Ok(emb) => { - let mut tx = state.db_pool.begin().await.unwrap(); - sqlx::query("UPDATE items SET content = ?, path = ? WHERE id = ?") - .bind(content) - .bind(path) - .bind(id) - .execute(&mut *tx) - .await - .unwrap(); - - sqlx::query("UPDATE vec_items SET embedding = ? WHERE id = ?") - .bind(serde_json::to_string(&emb).unwrap()) - .bind(id) - .execute(&mut *tx) - .await - .unwrap(); - - tx.commit().await.unwrap(); - let _ = state.tx.send("data_changed".to_string()); - Some(serde_json::json!({ "content": [{ "type": "text", "text": format!("Successfully updated item {}", id) }] })) + async fn update_item_inner( + state: &AppState, + id: i64, + content: &str, + path: Option<&str>, + emb: Vec, + ) -> Result<(), String> { + let mut tx = + state.db_pool.begin().await.map_err(|e| { + format!("Failed to begin transaction: {}", e) + })?; + sqlx::query("UPDATE items SET content = ?, path = ? WHERE id = ?") + .bind(content) + .bind(path) + .bind(id) + .execute(&mut *tx) + .await + .map_err(|e| format!("Failed to update item: {}", e))?; + + sqlx::query("UPDATE vec_items SET embedding = ? WHERE id = ?") + .bind(serde_json::to_string(&emb).unwrap_or("[]".to_string())) + .bind(id) + .execute(&mut *tx) + .await + .map_err(|e| format!("Failed to update vector: {}", e))?; + + tx.commit() + .await + .map_err(|e| format!("Failed to commit transaction: {}", e))?; + Ok(()) + } + + if let Err(e) = update_item_inner(&state, id, content, path, emb).await + { + Some(serde_json::json!({ "error": e })) + } else { + let _ = state.tx.send("data_changed".to_string()); + Some( + serde_json::json!({ "content": [{ "type": "text", "text": format!("Successfully updated item {}", id) }] }), + ) + } } - Err(e) => Some(serde_json::json!({ "error": format!("Embedding failed: {}", e) })) + Err(e) => { + Some(serde_json::json!({ "error": format!("Embedding failed: {}", e) })) + } } - }, + } "delete_item" => { let id = args.get("id").and_then(|v| v.as_i64()).unwrap_or(0); - let mut tx = state.db_pool.begin().await.unwrap(); - sqlx::query("DELETE FROM items WHERE id = ?").bind(id).execute(&mut *tx).await.unwrap(); - sqlx::query("DELETE FROM vec_items WHERE id = ?").bind(id).execute(&mut *tx).await.unwrap(); - tx.commit().await.unwrap(); - let _ = state.tx.send("data_changed".to_string()); - Some(serde_json::json!({ "content": [{ "type": "text", "text": format!("Successfully deleted item {}", id) }] })) - }, + + async fn delete_item_inner(state: &AppState, id: i64) -> Result<(), String> { + let mut tx = state + .db_pool + .begin() + .await + .map_err(|e| format!("Failed to begin transaction: {}", e))?; + sqlx::query("DELETE FROM items WHERE id = ?") + .bind(id) + .execute(&mut *tx) + .await + .map_err(|e| format!("Failed to delete item: {}", e))?; + sqlx::query("DELETE FROM vec_items WHERE id = ?") + .bind(id) + .execute(&mut *tx) + .await + .map_err(|e| format!("Failed to delete vector: {}", e))?; + tx.commit() + .await + .map_err(|e| format!("Failed to commit transaction: {}", e))?; + Ok(()) + } + + if let Err(e) = delete_item_inner(&state, id).await { + Some(serde_json::json!({ "error": e })) + } else { + let _ = state.tx.send("data_changed".to_string()); + Some( + serde_json::json!({ "content": [{ "type": "text", "text": format!("Successfully deleted item {}", id) }] }), + ) + } + } _ => Some(serde_json::json!({ "error": "Unknown tool" })), } - }, + } _ => Some(serde_json::json!({ "error": "Not implemented" })), }; + // Notifications (id == null) MUST NOT receive a response + if req.id.is_none() || req.id.as_ref().map_or(false, |v| v.is_null()) { + log::info!("MCP Notification received: {} (No response sent)", method); + return axum::http::StatusCode::NO_CONTENT.into_response(); + } + if let Some(id_val) = req.id { - let resp = JsonRpcResponse { jsonrpc: "2.0", result, error: None, id: Some(id_val) }; + let resp = JsonRpcResponse { + jsonrpc: "2.0", + result, + error: None, + id: Some(id_val), + }; if let Some(sid) = query.session_id { // MCP Client (SSE Mode) let resp_str = serde_json::to_string(&resp).unwrap(); + log::info!("Sending MCP Response (Session: {}, ID: {:?}): {}", sid, resp.id, resp_str); let sessions = state.sessions.read().await; if let Some(tx) = sessions.get(&sid) { let _ = tx.send(resp_str); @@ -394,7 +645,7 @@ axum::http::StatusCode::ACCEPTED.into_response() } else { // App UI (Direct Mode) - Json(resp).into_response() + resp.into_response() } } else { axum::http::StatusCode::NO_CONTENT.into_response() diff --git a/src/index.html b/src/index.html index 246eec7..54cfc4a 100644 --- a/src/index.html +++ b/src/index.html @@ -8,259 +8,7 @@ - + @@ -270,14 +18,15 @@

TelosDB

Local Sementic Search Engine

+
判定中...
- + 判定中...
- + -- docs
@@ -295,22 +44,36 @@ +
+
+ MCP Activity Log + Listening... +
+
+ +
Waiting for MCP calls...
+
+
+
- +
-

mcp.json

+

     
+ +
+