diff --git a/src/backend/src/db/migration.rs b/src/backend/src/db/migration.rs new file mode 100644 index 0000000..6f85d2e --- /dev/null +++ b/src/backend/src/db/migration.rs @@ -0,0 +1,127 @@ +use sqlx::SqlitePool; +use sqlx::Row; + +/// 全てのマイグレーションを実行する +pub async fn run_migrations(pool: &SqlitePool) -> Result<(), String> { + // v0.2.5 -> v0.3.0 + migrate_025_to_030(pool).await?; + + // 今後、バージョンが増えるごとにここに追加 + // migrate_030_to_040(pool).await?; + + Ok(()) +} + +async fn migrate_025_to_030(pool: &SqlitePool) -> Result<(), String> { + // 1. internal_metadata テーブルが存在するか確認 + let row = sqlx::query("SELECT 1 FROM sqlite_master WHERE type='table' AND name='internal_metadata'") + .fetch_optional(pool) + .await + .map_err(|e| e.to_string())?; + + if row.is_some() { + // すでにメタデータテーブルがある場合は、0.3.0以降とみなす + return Ok(()); + } + + // 2. items テーブルの構造を確認(v0.2.5以前の判定) + let rows = sqlx::query("PRAGMA table_info(items)") + .fetch_all(pool) + .await + .map_err(|e| e.to_string())?; + + if rows.is_empty() { + // テーブル自体がない(新規導入)の場合はマイグレーション不要 + return Ok(()); + } + + let has_path = rows.iter().any(|row| { + let name: String = row.get(1); + name == "path" + }); + + if !has_path { + return Ok(()); + } + + log::info!("Migrating database from v0.2.5 to v0.3.0 (Dedicated Migrator)..."); + + // 外部キー制約を一時的に無効化 + sqlx::query("PRAGMA foreign_keys = OFF") + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + let mut tx = pool.begin().await.map_err(|e| e.to_string())?; + + // documents テーブルを作成 + sqlx::query( + "CREATE TABLE IF NOT EXISTS documents ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + path TEXT UNIQUE, + mime TEXT, + created_at TEXT DEFAULT (datetime('now', 'localtime')), + updated_at TEXT DEFAULT (datetime('now', 'localtime')) + )", + ) + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + + sqlx::query("INSERT OR IGNORE INTO documents (path) SELECT DISTINCT path FROM items") + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + + sqlx::query( + "CREATE TABLE items_new ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + document_id INTEGER NOT NULL, + chunk_index INTEGER NOT NULL, + content TEXT NOT NULL, + created_at TEXT, + updated_at TEXT, + FOREIGN KEY(document_id) REFERENCES documents(id) ON DELETE CASCADE + )", + ) + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + + sqlx::query( + "INSERT INTO items_new (id, document_id, chunk_index, content, created_at, updated_at) + SELECT + i.id, + d.id, + (SELECT COUNT(*) FROM items i2 WHERE i2.path = i.path AND i2.id < i.id), + i.content, + i.created_at, + i.updated_at + FROM items i + JOIN documents d ON i.path = d.path", + ) + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + + sqlx::query("DROP TABLE items") + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + + sqlx::query("ALTER TABLE items_new RENAME TO items") + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + + tx.commit().await.map_err(|e| e.to_string())?; + + sqlx::query("PRAGMA foreign_keys = ON") + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + log::info!("Migration v0.2.5 to v0.3.0 completed successfully."); + + Ok(()) +} diff --git a/src/backend/src/db/mod.rs b/src/backend/src/db/mod.rs new file mode 100644 index 0000000..e46af45 --- /dev/null +++ b/src/backend/src/db/mod.rs @@ -0,0 +1,355 @@ +use sqlx::Row; +use sqlx::SqlitePool; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; +use std::borrow::Cow; +use std::path::Path; +use std::str::FromStr; + +pub mod migration; + +/// データベースを初期化し、コネクションプールを返す。 +/// 埋め込み次元が変更されている場合は vec_items テーブルを再作成する。 +pub async fn initialize_database( + db_path: &Path, + extension_path: &Path, + dimension: usize, +) -> Result { + // ディレクトリの作成 + if let Some(parent) = db_path.parent() { + if !parent.exists() { + std::fs::create_dir_all(parent).map_err(|e| e.to_string())?; + } + } + + let db_path_str = db_path.to_str().ok_or("Invalid DB path")?; + let ext_path_str = extension_path.to_str().ok_or("Invalid extension path")?; + + let opts = SqliteConnectOptions::from_str(&format!("sqlite://{}?mode=rwc", db_path_str)) + .map_err(|e| e.to_string())? + .extension(ext_path_str.to_owned()); + + let pool = SqlitePoolOptions::new() + .max_connections(5) + .connect_with(opts) + .await + .map_err(|e| e.to_string())?; + + // スキーマ初期化 + init_schema(&pool, dimension).await?; + + Ok(pool) +} + +async fn init_schema(pool: &SqlitePool, dimension: usize) -> Result<(), String> { + // PRAGMA設定 + sqlx::query("PRAGMA journal_mode = WAL") + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + // マイグレーションの実行 + migration::run_migrations(pool).await?; + + // 内部管理(バージョン等)テーブル + sqlx::query( + "CREATE TABLE IF NOT EXISTS internal_metadata ( + key TEXT PRIMARY KEY, + value TEXT + )", + ) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + // バージョン情報の書き込み (0.3.0) + sqlx::query("INSERT OR REPLACE INTO internal_metadata (key, value) VALUES ('version', '0.3.0')") + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + // ドキュメント(親メタデータ)テーブル + sqlx::query( + "CREATE TABLE IF NOT EXISTS documents ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + path TEXT UNIQUE, + mime TEXT, + created_at TEXT DEFAULT (datetime('now', 'localtime')), + updated_at TEXT DEFAULT (datetime('now', 'localtime')) + )", + ) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + // アイテム(チャンク)テーブル + sqlx::query( + "CREATE TABLE IF NOT EXISTS items ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + document_id INTEGER NOT NULL, + chunk_index INTEGER NOT NULL, + content TEXT NOT NULL, + created_at TEXT DEFAULT (datetime('now', 'localtime')), + updated_at TEXT DEFAULT (datetime('now', 'localtime')), + FOREIGN KEY(document_id) REFERENCES documents(id) ON DELETE CASCADE + )", + ) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + // LSA ベクトル保存テーブル (意味検索用) + sqlx::query( + "CREATE TABLE IF NOT EXISTS items_lsa ( + id INTEGER PRIMARY KEY, + vector BLOB NOT NULL, + FOREIGN KEY(id) REFERENCES items(id) ON DELETE CASCADE + )", + ) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + // トリガー作成 (documents) + sqlx::query( + "CREATE TRIGGER IF NOT EXISTS update_documents_updated_at + AFTER UPDATE ON documents + FOR EACH ROW + BEGIN + UPDATE documents SET updated_at = datetime('now', 'localtime') WHERE id = OLD.id; + END", + ) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + // トリガー作成 (items) + sqlx::query( + "CREATE TRIGGER IF NOT EXISTS update_items_updated_at + AFTER UPDATE ON items + FOR EACH ROW + BEGIN + UPDATE items SET updated_at = datetime('now', 'localtime') WHERE id = OLD.id; + END", + ) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + // vec_items の次元数チェックと初期化 + check_and_init_vector_table(pool, dimension).await?; + + Ok(()) +} + + +async fn check_and_init_vector_table(pool: &SqlitePool, dimension: usize) -> Result<(), String> { + // 現在のテーブル定義を確認 + let row = sqlx::query("SELECT sql FROM sqlite_master WHERE type='table' AND name='vec_items'") + .fetch_optional(pool) + .await + .map_err(|e| e.to_string())?; + + let should_recreate = if let Some(row) = row { + let sql: String = row.get(0); + // "FLOAT[640]" のような文字列が含まれているかチェック + let expected = format!("FLOAT[{}]", dimension); + if !sql.contains(&expected) { + log::info!("Dimension mismatch detected (expected {}). Rebuilding vec_items...", dimension); + true + } else { + false + } + } else { + true + }; + + if should_recreate { + let mut tx = pool.begin().await.map_err(|e| e.to_string())?; + + sqlx::query("DROP TABLE IF EXISTS vec_items") + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + + let create_sql = format!( + "CREATE VIRTUAL TABLE vec_items USING vec0(id INTEGER PRIMARY KEY, embedding FLOAT[{}])", + dimension + ); + sqlx::query(&create_sql) + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + + // 既存の items があればベクトルを再生成する必要があるが、 + // ここでは空のテーブル作成に留める。 + // (/v1/embeddings が必要になるため、再生成は別のユーティリティで行うのが安全) + + tx.commit().await.map_err(|e| e.to_string())?; + } + + Ok(()) +} + +/// 全データのベクトルを再生成する +pub async fn rebuild_vector_data( + pool: &SqlitePool, + dimension: usize, + embed_fn: F, +) -> Result<(), String> +where + F: Fn(String) -> Fut, + Fut: std::future::Future, String>>, +{ + // 強制的に再作成 + { + let mut tx = pool.begin().await.map_err(|e| e.to_string())?; + sqlx::query("DROP TABLE IF EXISTS vec_items") + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + let create_sql = format!( + "CREATE VIRTUAL TABLE vec_items USING vec0(id INTEGER PRIMARY KEY, embedding FLOAT[{}])", + dimension + ); + sqlx::query(&create_sql) + .execute(&mut *tx) + .await + .map_err(|e| e.to_string())?; + tx.commit().await.map_err(|e| e.to_string())?; + } + + let rows = sqlx::query("SELECT id, content FROM items") + .fetch_all(pool) + .await + .map_err(|e| e.to_string())?; + + for row in rows { + let id: i64 = row.get("id"); + let content: String = row.get("content"); + let emb = embed_fn(content).await?; + + sqlx::query("INSERT INTO vec_items (id, embedding) VALUES (?, ?)") + .bind(id) + .bind(serde_json::to_string(&emb).unwrap_or_default()) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + } + + Ok(()) +} + +/// itemsテーブルにあってvec_itemsテーブルにないデータを同期する(ヒーリング) +pub async fn sync_vectors( + pool: &SqlitePool, + embed_fn: F, +) -> Result +where + F: Fn(String) -> Fut, + Fut: std::future::Future, String>>, +{ + // vec_itemsに存在しないIDを抽出 + let rows = sqlx::query( + "SELECT i.id, i.content + FROM items i + LEFT JOIN vec_items v ON i.id = v.id + WHERE v.id IS NULL" + ) + .fetch_all(pool) + .await + .map_err(|e| e.to_string())?; + + let count = rows.len(); + if count == 0 { + return Ok(0); + } + + log::info!("Healing {} missing vectors...", count); + + for row in rows { + let id: i64 = row.get("id"); + let content: String = row.get("content"); + + match embed_fn(content).await { + Ok(emb) => { + sqlx::query("INSERT INTO vec_items (id, embedding) VALUES (?, ?)") + .bind(id) + .bind(serde_json::to_string(&emb).unwrap_or_default()) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + } + Err(e) => { + log::error!("Failed to generate embedding for item {}: {}", id, e); + } + } + } + + Ok(count) +} + +// 下位互換性(既存の依存箇所が多いため、一時的に定義を残すか、順次書き換える) +pub async fn init_pool( + db_path: &str, + extension_path: impl Into>, +) -> Result { + let opts = SqliteConnectOptions::from_str(&format!("sqlite://{}?mode=rwc", db_path))? + .extension(extension_path); + + SqlitePoolOptions::new().connect_with(opts).await +} + +#[cfg(test)] +mod tests { + use super::*; + // use std::fs; + use tempfile::tempdir; + + #[tokio::test] + async fn test_dimension_change_and_rebuild() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("test_telos.db"); + + // DLLのパス取得(ビルドディレクトリにあることが前提) + // テスト環境では $OUT_DIR や $CARGO_MANIFEST_DIR 基準で探す + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + let ext_path = Path::new(manifest_dir).join("../node_modules/sqlite-vec-windows-x64/vec0.dll"); + + if !ext_path.exists() { + println!("Skipping test: vec0.dll not found at {:?}", ext_path); + return; + } + + // 1. 最初は次元数 384 で初期化 + let pool = initialize_database(&db_path, &ext_path, 384).await.unwrap(); + + // テーブル定義の確認 + let row: (String,) = sqlx::query_as("SELECT sql FROM sqlite_master WHERE name='vec_items'") + .fetch_one(&pool).await.unwrap(); + assert!(row.0.contains("FLOAT[384]")); + + // データ挿入 + let doc_id = sqlx::query("INSERT INTO documents (path) VALUES ('test_path')") + .execute(&pool).await.unwrap().last_insert_rowid(); + sqlx::query("INSERT INTO items (document_id, chunk_index, content) VALUES (?, ?, ?)") + .bind(doc_id) + .bind(0) + .bind("test content") + .execute(&pool).await.unwrap(); + + pool.close().await; + + // 2. 次は次元数 768 で再初期化(不整合検知 -> 再作成) + let pool2 = initialize_database(&db_path, &ext_path, 768).await.unwrap(); + let row2: (String,) = sqlx::query_as("SELECT sql FROM sqlite_master WHERE name='vec_items'") + .fetch_one(&pool2).await.unwrap(); + assert!(row2.0.contains("FLOAT[768]")); + + // items テーブルは維持されていることを確認 + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM items") + .fetch_one(&pool2).await.unwrap(); + assert_eq!(count.0, 1); + + pool2.close().await; + } +}