diff --git a/docs/specification/03_database_specification.md b/docs/specification/03_database_specification.md index 6bafa17..7647933 100644 --- a/docs/specification/03_database_specification.md +++ b/docs/specification/03_database_specification.md @@ -15,14 +15,24 @@ ```mermaid erDiagram + documents ||--o{ items : "Contains (1:N)" items ||--|| vec_items : "Reference by ID (1:1)" items ||--|| items_lsa : "Metadata by ID (1:1)" - items { + documents { integer id PK "文書ID (自動採番)" - text content "原文テキスト" - text path "出典・メタデータ" + text path "出典・ファイルパス (Unique)" datetime created_at "作成日" + datetime updated_at "更新日" + } + + items { + integer id PK "チャンクID (自動採番)" + integer document_id FK "documents.id への参照" + integer chunk_index "ドキュメント内での順番" + text content "チャンクのテキスト本体" + datetime created_at "作成日" + datetime updated_at "更新日" } vec_items { @@ -57,9 +67,13 @@ ## 4. テーブル詳細 -### 4.1 `items` (メタデータ管理) +### 4.1 `documents` (文書メタデータ管理) -文書の本体と、それに付随する情報を保持します。`path` カラムは、将来的なローカルパス連携や URL 参照を想定した自由形式の文字列です。 +各ソース(ファイル等)の一意な情報を保持します。`path` カラムにより同一ソースの重複登録を防ぎます。 + +### 4.2 `items` (チャンク管理) + +文書を一定の長さ(例:800文字)で分割した「チャンク」を保持します。`document_id` で親文書と紐付けられ、`chunk_index` で順序が管理されます。 ### 4.2 `vec_items` (ベクトル演算用仮想テーブル) diff --git "a/journals/20260223-0013-\343\203\206\343\203\274\343\203\226\343\203\253\346\247\213\351\200\240\343\201\256\346\255\243\350\246\217\345\214\226\343\201\250\343\203\211\343\202\255\343\203\245\343\203\241\343\203\263\343\203\210\343\203\273\343\203\201\343\203\243\343\203\263\343\202\257\343\201\256\345\210\206\351\233\242.md" "b/journals/20260223-0013-\343\203\206\343\203\274\343\203\226\343\203\253\346\247\213\351\200\240\343\201\256\346\255\243\350\246\217\345\214\226\343\201\250\343\203\211\343\202\255\343\203\245\343\203\241\343\203\263\343\203\210\343\203\273\343\203\201\343\203\243\343\203\263\343\202\257\343\201\256\345\210\206\351\233\242.md" new file mode 100644 index 0000000..0191e38 --- /dev/null +++ "b/journals/20260223-0013-\343\203\206\343\203\274\343\203\226\343\203\253\346\247\213\351\200\240\343\201\256\346\255\243\350\246\217\345\214\226\343\201\250\343\203\211\343\202\255\343\203\245\343\203\241\343\203\263\343\203\210\343\203\273\343\203\201\343\203\243\343\203\263\343\202\257\343\201\256\345\210\206\351\233\242.md" @@ -0,0 +1,44 @@ +# Journal: 20260223-0013-テーブル構造の正規化とドキュメント・チャンクの分離 + +## 1. 作業実施の理由 + +Issue-2に基づき、データベースの正規化を行うため。従来の設計では、1つのドキュメントを複数チャンクに分割した際、`items`テーブルにチャンクごとに同じ`path`情報が保持され、データの冗長性と管理上の不都合が生じていた。 + +## 2. 指示(背景、観点、意図を含む) + +- **背景**: 意味検索の精度向上のためチャンク分割を導入したが、メタデータとデータが混在していた。 +- **観点**: 1ドキュメント対Nチャンクの関係をデータベース上で明示的に管理する。 +- **意図**: 削除時にドキュメント単位で一括操作を可能にし、かつ検索結果で正しいソースを表示できるようにする。 + +## 3. 指示事項とその対応 + +- **ドキュメントテーブルの作成**: `documents`テーブルを新設し、`path`を一意に管理。 +- **アイテムテーブルの変更**: `document_id`と`chunk_index`を追加し、冗長な`path`を削除。 +- **MCPツールの修正**: `add_item_text`や`search_text`を新スキーマに合わせてリファクタリング。 +- **型不整合の修正 (AI自律修正)**: `axum::response::Response`と`Option`の不整合を、`JsonRpcResponse`の活用により解決。 + +## 4. 作業詳細 + +AIエージェントは以下の手順で作業を実施した。 + +1. `src/backend/src/db.rs` のスキーマ定義を更新し、`documents`テーブルの追加と`items`テーブルの正規化を行った。 +2. `src/backend/src/mcp.rs` の各ツールロジックをリファクタリングした。特に `add_item_text` では、ドキュメントの存在確認と既存チャンクのクリーンアップを含めて再実装した。 +3. 開発中に遭遇した型不整合エラー(Axumの戻り型とMCP内部ロジックの不一致)を検知し、適切なレスポンス変換ロジックを組み込むことで自律的に解決した。 +4. 仕様書(`03_database_specification.md`)のER図と解説を更新した。 + +## 5. AI視点での結果 + +リファクタリングにより、データベース構造がクリーンになり、将来的なドキュメント管理(一括削除や管理画面での表示など)が容易になった。 +また、コンパイルエラーの修正過程で、AxumハンドラとMCPツールロジックの境界をより正確に把握し、堅牢なエラーレスポンスの実装が行えた。 + +```mermaid +graph TD + A[Add Item Request] --> B{Find Document?} + B -- No --> C[Create Document Record] + B -- Yes --> D[Wipe Old Chunks] + C --> E[Loop: Split Content] + D --> E + E --> F[Generate LSA Vector] + F --> G[Save Chunk to items] + G --> H[Update HNSW Index] +``` diff --git a/src/backend/src/db.rs b/src/backend/src/db.rs index 716d556..49ac457 100644 --- a/src/backend/src/db.rs +++ b/src/backend/src/db.rs @@ -45,14 +45,29 @@ .await .map_err(|e| e.to_string())?; - // 標準テーブル作成 + // ドキュメント(親メタデータ)テーブル + sqlx::query( + "CREATE TABLE IF NOT EXISTS documents ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + path TEXT UNIQUE, + created_at TEXT DEFAULT (datetime('now', 'localtime')), + updated_at TEXT DEFAULT (datetime('now', 'localtime')) + )", + ) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + // アイテム(チャンク)テーブル sqlx::query( "CREATE TABLE IF NOT EXISTS items ( id INTEGER PRIMARY KEY AUTOINCREMENT, + document_id INTEGER NOT NULL, + chunk_index INTEGER NOT NULL, content TEXT NOT NULL, - path TEXT, created_at TEXT DEFAULT (datetime('now', 'localtime')), - updated_at TEXT DEFAULT (datetime('now', 'localtime')) + updated_at TEXT DEFAULT (datetime('now', 'localtime')), + FOREIGN KEY(document_id) REFERENCES documents(id) ON DELETE CASCADE )", ) .execute(pool) @@ -71,7 +86,20 @@ .await .map_err(|e| e.to_string())?; - // トリガー作成 + // トリガー作成 (documents) + sqlx::query( + "CREATE TRIGGER IF NOT EXISTS update_documents_updated_at + AFTER UPDATE ON documents + FOR EACH ROW + BEGIN + UPDATE documents SET updated_at = datetime('now', 'localtime') WHERE id = OLD.id; + END", + ) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + // トリガー作成 (items) sqlx::query( "CREATE TRIGGER IF NOT EXISTS update_items_updated_at AFTER UPDATE ON items @@ -277,7 +305,12 @@ assert!(row.0.contains("FLOAT[384]")); // データ挿入 - sqlx::query("INSERT INTO items (content) VALUES ('test content')") + let doc_id = sqlx::query("INSERT INTO documents (path) VALUES ('test_path')") + .execute(&pool).await.unwrap().last_insert_rowid(); + sqlx::query("INSERT INTO items (document_id, chunk_index, content) VALUES (?, ?, ?)") + .bind(doc_id) + .bind(0) + .bind("test content") .execute(&pool).await.unwrap(); pool.close().await; diff --git a/src/backend/src/mcp.rs b/src/backend/src/mcp.rs index 8a8d5be..7e66e74 100644 --- a/src/backend/src/mcp.rs +++ b/src/backend/src/mcp.rs @@ -481,14 +481,19 @@ match actual_method { "get_item_by_id" => { let id = args.get("id").and_then(|v| v.as_i64()).unwrap_or(0); - let row = sqlx::query("SELECT id, content, path FROM items WHERE id = ?") + let row = sqlx::query( + "SELECT i.id, i.content, d.path + FROM items i + JOIN documents d ON i.document_id = d.id + WHERE i.id = ?" + ) .bind(id) .fetch_optional(&state.db_pool) .await .unwrap_or(None); if let Some(row) = row { let content: String = row.get("content"); - let path: Option = row.try_get("path").ok(); + let path: String = row.get("path"); Some(serde_json::json!({ "id": id, "content": content, @@ -500,39 +505,85 @@ } "add_item_text" => { let content = args.get("content").and_then(|v| v.as_str()).unwrap_or(""); - let path = args.get("path").and_then(|v| v.as_str()); + let path_str = args.get("path").and_then(|v| v.as_str()).unwrap_or("unknown"); log::info!( - "Executing add_item_text (LSA-only): content length={}, path='{:?}'", + "Executing add_item_text (LSA-only): content length={}, path='{}'", content.chars().count(), - path + path_str ); // 800文字ずつに分割 let chars: Vec = content.chars().collect(); - let chunks: Vec = chars + let chunk_strings: Vec = chars .chunks(800) .map(|chunk| chunk.iter().collect::()) .collect(); let mut results = Vec::new(); - for chunk_content in chunks.iter() { - async fn add_item_inner( + + // 1. ドキュメントレコードの取得または作成 + let doc_id_res = match sqlx::query("SELECT id FROM documents WHERE path = ?") + .bind(path_str) + .fetch_optional(&state.db_pool) + .await + { + Ok(Some(row)) => Ok(row.get::(0)), + Ok(None) => { + match sqlx::query("INSERT INTO documents (path) VALUES (?)") + .bind(path_str) + .execute(&state.db_pool) + .await + { + Ok(res) => Ok(res.last_insert_rowid()), + Err(e) => Err(serde_json::json!({ "error": format!("Failed to create document: {}", e) })) + } + }, + Err(e) => Err(serde_json::json!({ "error": format!("Database error: {}", e) })) + }; + + let doc_id = match doc_id_res { + Ok(id) => id, + Err(err_json) => { + return JsonRpcResponse { + jsonrpc: "2.0", + id: req.id.clone(), + result: Some(err_json), + error: None, + }.into_response(); + } + }; + + // 2. 既存の同一ドキュメントの全チャンクを削除(上書き) + if let Err(e) = sqlx::query("DELETE FROM items WHERE document_id = ?") + .bind(doc_id) + .execute(&state.db_pool) + .await + { + log::error!("Failed to delete old chunks for document {}: {}", doc_id, e); + } + + // 3. 各チャンクを保存 + for (idx, chunk_content) in chunk_strings.iter().enumerate() { + async fn add_item_chunk_inner( state: &AppState, + doc_id: i64, + chunk_index: i32, content: &str, - path: Option<&str>, ) -> Result { let mut tx = state.db_pool.begin().await.map_err(|e| { format!("Failed to begin transaction: {}", e) })?; + let res = - sqlx::query("INSERT INTO items (content, path) VALUES (?, ?)") + sqlx::query("INSERT INTO items (document_id, chunk_index, content) VALUES (?, ?, ?)") + .bind(doc_id) + .bind(chunk_index) .bind(content) - .bind(path) .execute(&mut *tx) .await - .map_err(|e| format!("Failed to insert item: {}", e))?; + .map_err(|e| format!("Failed to insert chunk: {}", e))?; let id = res.last_insert_rowid(); // LSA ベクトルの計算 @@ -553,7 +604,6 @@ if let Ok(projected) = model.project_query(&query_vec) { lsa_vector_f32 = projected.iter().map(|&x| x as f32).collect(); - // 50次元に満たない(モデル初期化時のランク制限等)場合はパディング if lsa_vector_f32.len() < 50 { lsa_vector_f32.resize(50, 0.0); } else if lsa_vector_f32.len() > 50 { @@ -562,7 +612,7 @@ } } - // sqlite-vec の仮想テーブル (vec_items) に LSA ベクトルを保存 + // vec_items に保存 sqlx::query("INSERT INTO vec_items (id, embedding) VALUES (?, ?)") .bind(id) .bind(serde_json::to_string(&lsa_vector_f32).unwrap_or("[]".to_string())) @@ -570,7 +620,7 @@ .await .map_err(|e| format!("Failed to insert LSA vector to vec_items: {}", e))?; - // items_lsa にもバックアップ(または生データ)として保存 + // items_lsa にも保存 if lsa_guard.as_ref().is_some() { let vector_blob = bincode::serialize(&lsa_vector_f32).unwrap_or_default(); sqlx::query("INSERT INTO items_lsa (id, vector) VALUES (?, ?)") @@ -585,30 +635,27 @@ .await .map_err(|e| format!("Failed to commit transaction: {}", e))?; - // HNSW インデックスへの反映 + // HNSW インデックス let hnsw_index_guard = state.hnsw_index.read().await; - let hnsw_opt: &Option> = &hnsw_index_guard; - if let Some(hnsw_ptr) = hnsw_opt.as_ref() { - if lsa_vector_f32.len() == 50 { - let vec_ref: &[f32] = lsa_vector_f32.as_slice(); - hnsw_ptr.insert((vec_ref, id as usize)); - } + if let Some(hnsw_ptr) = hnsw_index_guard.as_ref() { + let vec_ref: &[f32] = lsa_vector_f32.as_slice(); + hnsw_ptr.insert((vec_ref, id as usize)); } Ok(id) } - match add_item_inner(&state, chunk_content, path).await { + match add_item_chunk_inner(&state, doc_id, idx as i32, chunk_content).await { Ok(id) => results.push(id), - Err(e) => log::error!("Failed to add chunk: {}", e), + Err(e) => log::error!("Failed to add chunk {}: {}", idx, e), } } if !results.is_empty() { let _ = state.tx.send("data_changed".to_string()); - log::info!("Successfully added {} chunks via LSA.", results.len()); + log::info!("Successfully added {} chunks to document {}.", results.len(), path_str); Some( - serde_json::json!({ "content": [{ "type": "text", "text": format!("Successfully added {} chunks (LSA).", results.len()) }] }), + serde_json::json!({ "content": [{ "type": "text", "text": format!("Successfully added {} chunks for {}", results.len(), path_str) }] }), ) } else { Some(serde_json::json!({ "error": "Failed to add any chunks." })) @@ -625,17 +672,16 @@ let mut query_counts = HashMap::new(); let tokens = state.tokenizer.tokenize_to_vec(content).unwrap_or_default(); for token in tokens { - if let Some(&id) = model.vocabulary.get(&token) { - *query_counts.entry(id).or_insert(0.0) += 1.0; + if let Some(&tid) = model.vocabulary.get(&token) { + *query_counts.entry(tid).or_insert(0.0) += 1.0; } } let mut query_vec = ndarray::Array1::zeros(model.vocabulary.len()); - for (id, count) in query_counts { - query_vec[id] = count; + for (tid, count) in query_counts { + query_vec[tid] = count; } if let Ok(query_lsa) = model.project_query(&query_vec) { - // クエリが語彙に含まれず零ベクトルになった場合 if query_lsa.iter().all(|&x| x == 0.0) { search_result = Some(serde_json::json!({ "content": [] })); } else { @@ -646,10 +692,8 @@ query_lsa_f32.truncate(50); } - // HNSW インデックスがあればそれを使う、なければ sqlite-vec でフォールバック let hnsw_idx_guard = state.hnsw_index.read().await; - let hnsw_option: &Option> = &hnsw_idx_guard; - if let Some(h_ptr) = hnsw_option.as_ref() { + if let Some(h_ptr) = hnsw_idx_guard.as_ref() { log::info!("Searching using HNSW index..."); let query_ref: &[f32] = query_lsa_f32.as_slice(); let neighbors = h_ptr.search(query_ref, limit as usize, 100); @@ -658,13 +702,18 @@ for neighbor in neighbors { let id = neighbor.d_id as i64; let dist = neighbor.distance; - // HNSW の DistCosine は通常 1 - cos_sim let sim: f32 = 1.0 - dist; - if let Ok(row) = sqlx::query("SELECT content FROM items WHERE id = ?").bind(id).fetch_one(&state.db_pool).await { + if let Ok(row) = sqlx::query( + "SELECT i.content, d.path + FROM items i + JOIN documents d ON i.document_id = d.id + WHERE i.id = ?" + ).bind(id).fetch_one(&state.db_pool).await { results.push(serde_json::json!({ "id": id, "content": row.get::(0), + "path": row.get::(1), "similarity": sim.clamp(0.0, 1.0) })); } @@ -674,11 +723,11 @@ } if search_result.is_none() { - // sqlite-vec の MATCH (BM25等ではなくベクトル近傍検索) を使用 let rows = sqlx::query( - "SELECT items.id, items.content, v.distance - FROM items - JOIN vec_items v ON items.id = v.id + "SELECT i.id, i.content, d.path, v.distance + FROM items i + JOIN documents d ON i.document_id = d.id + JOIN vec_items v ON i.id = v.id WHERE v.embedding MATCH ? AND k = ? ORDER BY distance LIMIT ?", ) @@ -692,15 +741,13 @@ let res: Vec<_> = rows.iter().map(|r| { let id = r.get::(0); let content = r.get::(1); - let distance = r.get::(2); - // sqlite-vec の distance は L2 距離の 2 乗 - // 正規化ベクトル [u, v] において: - // ||u-v||^2 = ||u||^2 + ||v||^2 - 2*u*v = 1 + 1 - 2*cos_sim = 2 - 2*cos_sim - // よって cos_sim = 1.0 - (distance / 2.0) + let path = r.get::(2); + let distance = r.get::(3); let sim = 1.0 - (distance / 2.0); serde_json::json!({ "id": id, "content": content, + "path": path, "similarity": sim.clamp(0.0, 1.0) }) }).collect(); @@ -714,14 +761,25 @@ } if search_result.is_none() { - // LSA モデルがない、または検索結果が得られなかった場合は LIKE 検索でフォールバック - let rows = sqlx::query("SELECT id, content FROM items WHERE content LIKE ? LIMIT ?") + let rows = sqlx::query( + "SELECT i.id, i.content, d.path + FROM items i + JOIN documents d ON i.document_id = d.id + WHERE i.content LIKE ? LIMIT ?" + ) .bind(format!("%{}%", content)) .bind(limit) .fetch_all(&state.db_pool) .await .unwrap_or_default(); - let res: Vec<_> = rows.iter().map(|r| serde_json::json!({ "id": r.get::(0), "content": r.get::(1), "similarity": 0.0 })).collect(); + let res: Vec<_> = rows.iter().map(|r| { + serde_json::json!({ + "id": r.get::(0), + "content": r.get::(1), + "path": r.get::(2), + "similarity": 0.0 + }) + }).collect(); search_result = Some(serde_json::json!({ "content": res })); } search_result @@ -732,24 +790,20 @@ let lsa_guard = state.lsa_model.read().await; if let Some(model) = lsa_guard.as_ref() { - // クエリのベクトル化 (TF) let mut query_counts = HashMap::new(); let tokens = state.tokenizer.tokenize_to_vec(query).unwrap_or_default(); for token in tokens { - if let Some(&id) = model.vocabulary.get(&token) { - *query_counts.entry(id).or_insert(0.0) += 1.0; + if let Some(&tid) = model.vocabulary.get(&token) { + *query_counts.entry(tid).or_insert(0.0) += 1.0; } } let mut query_vec = ndarray::Array1::zeros(model.vocabulary.len()); - for (id, count) in query_counts { - query_vec[id] = count; + for (tid, count) in query_counts { + query_vec[tid] = count; } - // LSA 空間への射影 if let Ok(query_lsa) = model.project_query(&query_vec) { - // DB から全ベクトルを取得して比較 (件数が少ない想定) - // 本来はアイテム数が多い場合は BLOB を全件回すと遅いため、インメモリキャッシュ等を検討 let rows = sqlx::query("SELECT id, vector FROM items_lsa") .fetch_all(&state.db_pool) .await @@ -759,7 +813,8 @@ for row in rows { let id: i64 = row.get(0); let vector_blob: Vec = row.get(1); - if let Ok(vector_f64) = bincode::deserialize::>(&vector_blob) { + if let Ok(vector_f32) = bincode::deserialize::>(&vector_blob) { + let vector_f64: Vec = vector_f32.iter().map(|&x| x as f64).collect(); let doc_vec = ndarray::Array1::from_vec(vector_f64); let sim = crate::utils::lsa::LsaModel::cosine_similarity(&query_lsa, &doc_vec); results.push((id, sim)); @@ -771,11 +826,16 @@ let mut filtered_results = Vec::new(); for (id, sim) in results { - if let Ok(doc_row) = sqlx::query("SELECT content FROM items WHERE id = ?").bind(id).fetch_one(&state.db_pool).await { - let content: String = doc_row.get(0); + if let Ok(row) = sqlx::query( + "SELECT i.content, d.path + FROM items i + JOIN documents d ON i.document_id = d.id + WHERE i.id = ?" + ).bind(id).fetch_one(&state.db_pool).await { filtered_results.push(serde_json::json!({ "id": id, - "content": content, + "content": row.get::(0), + "path": row.get::(1), "similarity": sim })); } @@ -792,21 +852,20 @@ "update_item" => { let id = args.get("id").and_then(|v| v.as_i64()).unwrap_or(0); let content = args.get("content").and_then(|v| v.as_str()).unwrap_or(""); - let path = args.get("path").and_then(|v| v.as_str()); async fn update_item_inner( state: &AppState, id: i64, content: &str, - path: Option<&str>, ) -> Result<(), String> { let mut tx = state.db_pool.begin().await.map_err(|e| { format!("Failed to begin transaction: {}", e) })?; - sqlx::query("UPDATE items SET content = ?, path = ? WHERE id = ?") + + // items テーブルのコンテンツを更新 + sqlx::query("UPDATE items SET content = ? WHERE id = ?") .bind(content) - .bind(path) .bind(id) .execute(&mut *tx) .await @@ -828,13 +887,26 @@ } if let Ok(projected) = model.project_query(&query_vec) { - let vector_blob = bincode::serialize(&projected.to_vec()).unwrap_or_default(); + let mut proj_f32: Vec = projected.iter().map(|&x| x as f32).collect(); + if proj_f32.len() < 50 { proj_f32.resize(50, 0.0); } else { proj_f32.truncate(50); } + + let vector_blob = bincode::serialize(&proj_f32).unwrap_or_default(); + + // items_lsa を更新 sqlx::query("INSERT OR REPLACE INTO items_lsa (id, vector) VALUES (?, ?)") .bind(id) .bind(vector_blob) .execute(&mut *tx) .await .map_err(|e| format!("Failed to update LSA vector: {}", e))?; + + // vec_items を更新 + sqlx::query("INSERT OR REPLACE INTO vec_items (id, embedding) VALUES (?, ?)") + .bind(id) + .bind(serde_json::to_string(&proj_f32).unwrap_or("[]".to_string())) + .execute(&mut *tx) + .await + .map_err(|e| format!("Failed to update vec_items: {}", e))?; } } @@ -844,7 +916,7 @@ Ok(()) } - if let Err(e) = update_item_inner(&state, id, content, path).await + if let Err(e) = update_item_inner(&state, id, content).await { Some(serde_json::json!({ "error": e })) } else {