Add log ingestion tool for loading signature logs into SQLite
- Parse signature messages from log files extracting app info, device details, and feature flags (autofill, touchID, offline login, etc.) - Support both plain .log and gzip compressed .log.gz files - File discovery by date range (YYYY/mm/dd directory structure) - Batch inserts for performance with large files (10GB+ per day) - Index on session_id and version for efficient queries - Extensible parser architecture via MessageParser trait - Parallel file processing for multi-day ingestion Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
166
src/db.rs
Normal file
166
src/db.rs
Normal file
@@ -0,0 +1,166 @@
|
||||
use anyhow::Result;
|
||||
use rusqlite::{params, Connection, Transaction};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::parser::SignatureEntry;
|
||||
|
||||
pub struct Database {
|
||||
conn: Connection,
|
||||
}
|
||||
|
||||
impl Database {
|
||||
pub fn new(path: &str) -> Result<Self> {
|
||||
let conn = Connection::open(path)?;
|
||||
|
||||
// Enable WAL mode for better concurrent read/write performance
|
||||
conn.pragma_update(None, "journal_mode", "WAL")?;
|
||||
|
||||
let db = Self { conn };
|
||||
db.init_schema()?;
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
fn init_schema(&self) -> Result<()> {
|
||||
self.conn.execute_batch(
|
||||
r#"
|
||||
-- Lookup tables for low-cardinality text columns
|
||||
CREATE TABLE IF NOT EXISTS apps (
|
||||
id INTEGER PRIMARY KEY,
|
||||
name TEXT NOT NULL UNIQUE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS versions (
|
||||
id INTEGER PRIMARY KEY,
|
||||
name TEXT NOT NULL UNIQUE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS models (
|
||||
id INTEGER PRIMARY KEY,
|
||||
name TEXT NOT NULL UNIQUE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS devices (
|
||||
id INTEGER PRIMARY KEY,
|
||||
name TEXT NOT NULL UNIQUE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS os_versions (
|
||||
id INTEGER PRIMARY KEY,
|
||||
name TEXT NOT NULL UNIQUE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS app_names (
|
||||
id INTEGER PRIMARY KEY,
|
||||
name TEXT NOT NULL UNIQUE
|
||||
);
|
||||
|
||||
-- Main table with normalized foreign keys and integer timestamp
|
||||
CREATE TABLE IF NOT EXISTS signature_entries (
|
||||
id INTEGER PRIMARY KEY,
|
||||
session_id TEXT NOT NULL,
|
||||
timestamp_ms INTEGER NOT NULL,
|
||||
app_id INTEGER NOT NULL REFERENCES apps(id),
|
||||
version_id INTEGER NOT NULL REFERENCES versions(id),
|
||||
offline_login_usage INTEGER,
|
||||
is_password_autofill_enabled INTEGER,
|
||||
camera_roll_usage INTEGER,
|
||||
os_id INTEGER REFERENCES os_versions(id),
|
||||
app_name_id INTEGER REFERENCES app_names(id),
|
||||
touch_id INTEGER,
|
||||
is_offline_login_enabled INTEGER,
|
||||
model_id INTEGER REFERENCES models(id),
|
||||
device_id INTEGER REFERENCES devices(id),
|
||||
password_autofill_usage INTEGER
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_session_id ON signature_entries(session_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_timestamp ON signature_entries(timestamp_ms);
|
||||
CREATE INDEX IF NOT EXISTS idx_version ON signature_entries(version_id);
|
||||
"#,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn begin_transaction(&mut self) -> Result<Transaction<'_>> {
|
||||
Ok(self.conn.transaction()?)
|
||||
}
|
||||
|
||||
pub fn insert_signature_batch(tx: &Transaction<'_>, entries: &[SignatureEntry]) -> Result<()> {
|
||||
// Build lookup caches for this batch
|
||||
let mut app_cache: HashMap<String, i64> = HashMap::new();
|
||||
let mut version_cache: HashMap<String, i64> = HashMap::new();
|
||||
let mut model_cache: HashMap<String, i64> = HashMap::new();
|
||||
let mut device_cache: HashMap<String, i64> = HashMap::new();
|
||||
let mut os_cache: HashMap<String, i64> = HashMap::new();
|
||||
let mut app_name_cache: HashMap<String, i64> = HashMap::new();
|
||||
|
||||
let mut insert_stmt = tx.prepare_cached(
|
||||
r#"
|
||||
INSERT INTO signature_entries (
|
||||
id, session_id, timestamp_ms, app_id, version_id,
|
||||
offline_login_usage, is_password_autofill_enabled, camera_roll_usage,
|
||||
os_id, app_name_id, touch_id, is_offline_login_enabled,
|
||||
model_id, device_id, password_autofill_usage
|
||||
) VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
"#,
|
||||
)?;
|
||||
|
||||
for entry in entries {
|
||||
let app_id = get_or_insert_lookup(tx, &mut app_cache, "apps", &entry.app)?;
|
||||
let version_id = get_or_insert_lookup(tx, &mut version_cache, "versions", &entry.version)?;
|
||||
let model_id = entry.model.as_ref().map(|v| get_or_insert_lookup(tx, &mut model_cache, "models", v)).transpose()?;
|
||||
let device_id = entry.device.as_ref().map(|v| get_or_insert_lookup(tx, &mut device_cache, "devices", v)).transpose()?;
|
||||
let os_id = entry.os.as_ref().map(|v| get_or_insert_lookup(tx, &mut os_cache, "os_versions", v)).transpose()?;
|
||||
let app_name_id = entry.app_name.as_ref().map(|v| get_or_insert_lookup(tx, &mut app_name_cache, "app_names", v)).transpose()?;
|
||||
|
||||
insert_stmt.execute(params![
|
||||
entry.session_id,
|
||||
entry.timestamp_ms,
|
||||
app_id,
|
||||
version_id,
|
||||
entry.offline_login_usage,
|
||||
entry.is_password_autofill_enabled.map(|b| b as i32),
|
||||
entry.camera_roll_usage,
|
||||
os_id,
|
||||
app_name_id,
|
||||
entry.touch_id.map(|b| b as i32),
|
||||
entry.is_offline_login_enabled.map(|b| b as i32),
|
||||
model_id,
|
||||
device_id,
|
||||
entry.password_autofill_usage,
|
||||
])?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or insert a value into a lookup table, using a cache to minimize DB queries
|
||||
fn get_or_insert_lookup(
|
||||
tx: &Transaction<'_>,
|
||||
cache: &mut HashMap<String, i64>,
|
||||
table: &str,
|
||||
value: &str,
|
||||
) -> Result<i64> {
|
||||
if let Some(&id) = cache.get(value) {
|
||||
return Ok(id);
|
||||
}
|
||||
|
||||
// Try to find existing entry
|
||||
let query = format!("SELECT id FROM {} WHERE name = ?", table);
|
||||
let existing: Option<i64> = tx
|
||||
.query_row(&query, params![value], |row| row.get(0))
|
||||
.ok();
|
||||
|
||||
if let Some(id) = existing {
|
||||
cache.insert(value.to_string(), id);
|
||||
return Ok(id);
|
||||
}
|
||||
|
||||
// Insert new entry
|
||||
let insert = format!("INSERT INTO {} (name) VALUES (?)", table);
|
||||
tx.execute(&insert, params![value])?;
|
||||
let id = tx.last_insert_rowid();
|
||||
cache.insert(value.to_string(), id);
|
||||
Ok(id)
|
||||
}
|
||||
Reference in New Issue
Block a user