Improved memory and disk usage, best practices. Review done by GPT 5.2, code written by Claude Opus 4.5
This commit is contained in:
109
src/db.rs
109
src/db.rs
@@ -1,5 +1,5 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use rusqlite::{params, Connection, Transaction};
|
use rusqlite::{Connection, Transaction, params};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use crate::parser::SignatureEntry;
|
use crate::parser::SignatureEntry;
|
||||||
@@ -12,8 +12,17 @@ impl Database {
|
|||||||
pub fn new(path: &str) -> Result<Self> {
|
pub fn new(path: &str) -> Result<Self> {
|
||||||
let conn = Connection::open(path)?;
|
let conn = Connection::open(path)?;
|
||||||
|
|
||||||
// Enable WAL mode for better concurrent read/write performance
|
// Production-optimized pragmas for bulk ingestion
|
||||||
|
// WAL mode for better concurrent read/write performance
|
||||||
conn.pragma_update(None, "journal_mode", "WAL")?;
|
conn.pragma_update(None, "journal_mode", "WAL")?;
|
||||||
|
// NORMAL synchronous is acceptable for ingestion pipelines (fsync on checkpoint only)
|
||||||
|
conn.pragma_update(None, "synchronous", "NORMAL")?;
|
||||||
|
// Bound WAL growth: checkpoint every 1000 pages (~4MB with default page size)
|
||||||
|
conn.pragma_update(None, "wal_autocheckpoint", 1000)?;
|
||||||
|
// Small bounded cache to limit memory usage (negative = KB)
|
||||||
|
conn.pragma_update(None, "cache_size", -8000)?; // 8MB cache
|
||||||
|
// Store temp tables in memory to reduce disk I/O
|
||||||
|
conn.pragma_update(None, "temp_store", "MEMORY")?;
|
||||||
|
|
||||||
let db = Self { conn };
|
let db = Self { conn };
|
||||||
db.init_schema()?;
|
db.init_schema()?;
|
||||||
@@ -21,6 +30,7 @@ impl Database {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn init_schema(&self) -> Result<()> {
|
fn init_schema(&self) -> Result<()> {
|
||||||
|
// Create tables without indexes - indexes will be created after ingestion
|
||||||
self.conn.execute_batch(
|
self.conn.execute_batch(
|
||||||
r#"
|
r#"
|
||||||
-- Lookup tables for low-cardinality text columns
|
-- Lookup tables for low-cardinality text columns
|
||||||
@@ -55,6 +65,7 @@ impl Database {
|
|||||||
);
|
);
|
||||||
|
|
||||||
-- Main table with normalized foreign keys and integer timestamp
|
-- Main table with normalized foreign keys and integer timestamp
|
||||||
|
-- Indexes are created AFTER ingestion for better bulk insert performance
|
||||||
CREATE TABLE IF NOT EXISTS signature_entries (
|
CREATE TABLE IF NOT EXISTS signature_entries (
|
||||||
id INTEGER PRIMARY KEY,
|
id INTEGER PRIMARY KEY,
|
||||||
session_id TEXT NOT NULL,
|
session_id TEXT NOT NULL,
|
||||||
@@ -72,12 +83,32 @@ impl Database {
|
|||||||
device_id INTEGER REFERENCES devices(id),
|
device_id INTEGER REFERENCES devices(id),
|
||||||
password_autofill_usage INTEGER
|
password_autofill_usage INTEGER
|
||||||
);
|
);
|
||||||
|
"#,
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create indexes and optimize database after ingestion is complete
|
||||||
|
pub fn finalize(&self) -> Result<()> {
|
||||||
|
eprintln!("Creating indexes...");
|
||||||
|
self.conn.execute_batch(
|
||||||
|
r#"
|
||||||
CREATE INDEX IF NOT EXISTS idx_session_id ON signature_entries(session_id);
|
CREATE INDEX IF NOT EXISTS idx_session_id ON signature_entries(session_id);
|
||||||
CREATE INDEX IF NOT EXISTS idx_timestamp ON signature_entries(timestamp_ms);
|
CREATE INDEX IF NOT EXISTS idx_timestamp ON signature_entries(timestamp_ms);
|
||||||
CREATE INDEX IF NOT EXISTS idx_version ON signature_entries(version_id);
|
CREATE INDEX IF NOT EXISTS idx_version ON signature_entries(version_id);
|
||||||
"#,
|
"#,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
// Run optimizer to analyze tables and update statistics
|
||||||
|
// Checkpoint and truncate WAL to reduce disk usage
|
||||||
|
eprintln!("Optimizing database and Checkpointing WAL...");
|
||||||
|
self.conn.execute_batch(
|
||||||
|
r#"
|
||||||
|
PRAGMA optimize;
|
||||||
|
PRAGMA wal_checkpoint(TRUNCATE);
|
||||||
|
"#,
|
||||||
|
)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -94,6 +125,26 @@ impl Database {
|
|||||||
let mut os_cache: HashMap<String, i64> = HashMap::new();
|
let mut os_cache: HashMap<String, i64> = HashMap::new();
|
||||||
let mut app_name_cache: HashMap<String, i64> = HashMap::new();
|
let mut app_name_cache: HashMap<String, i64> = HashMap::new();
|
||||||
|
|
||||||
|
// Prepare all lookup statements once (using INSERT ... ON CONFLICT ... RETURNING)
|
||||||
|
let mut apps_stmt = tx.prepare_cached(
|
||||||
|
"INSERT INTO apps (name) VALUES (?) ON CONFLICT(name) DO UPDATE SET name=excluded.name RETURNING id",
|
||||||
|
)?;
|
||||||
|
let mut versions_stmt = tx.prepare_cached(
|
||||||
|
"INSERT INTO versions (name) VALUES (?) ON CONFLICT(name) DO UPDATE SET name=excluded.name RETURNING id",
|
||||||
|
)?;
|
||||||
|
let mut models_stmt = tx.prepare_cached(
|
||||||
|
"INSERT INTO models (name) VALUES (?) ON CONFLICT(name) DO UPDATE SET name=excluded.name RETURNING id",
|
||||||
|
)?;
|
||||||
|
let mut devices_stmt = tx.prepare_cached(
|
||||||
|
"INSERT INTO devices (name) VALUES (?) ON CONFLICT(name) DO UPDATE SET name=excluded.name RETURNING id",
|
||||||
|
)?;
|
||||||
|
let mut os_stmt = tx.prepare_cached(
|
||||||
|
"INSERT INTO os_versions (name) VALUES (?) ON CONFLICT(name) DO UPDATE SET name=excluded.name RETURNING id",
|
||||||
|
)?;
|
||||||
|
let mut app_names_stmt = tx.prepare_cached(
|
||||||
|
"INSERT INTO app_names (name) VALUES (?) ON CONFLICT(name) DO UPDATE SET name=excluded.name RETURNING id",
|
||||||
|
)?;
|
||||||
|
|
||||||
let mut insert_stmt = tx.prepare_cached(
|
let mut insert_stmt = tx.prepare_cached(
|
||||||
r#"
|
r#"
|
||||||
INSERT INTO signature_entries (
|
INSERT INTO signature_entries (
|
||||||
@@ -106,12 +157,29 @@ impl Database {
|
|||||||
)?;
|
)?;
|
||||||
|
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
let app_id = get_or_insert_lookup(tx, &mut app_cache, "apps", &entry.app)?;
|
let app_id = get_or_insert_cached(&mut apps_stmt, &mut app_cache, &entry.app)?;
|
||||||
let version_id = get_or_insert_lookup(tx, &mut version_cache, "versions", &entry.version)?;
|
let version_id =
|
||||||
let model_id = entry.model.as_ref().map(|v| get_or_insert_lookup(tx, &mut model_cache, "models", v)).transpose()?;
|
get_or_insert_cached(&mut versions_stmt, &mut version_cache, &entry.version)?;
|
||||||
let device_id = entry.device.as_ref().map(|v| get_or_insert_lookup(tx, &mut device_cache, "devices", v)).transpose()?;
|
let model_id = entry
|
||||||
let os_id = entry.os.as_ref().map(|v| get_or_insert_lookup(tx, &mut os_cache, "os_versions", v)).transpose()?;
|
.model
|
||||||
let app_name_id = entry.app_name.as_ref().map(|v| get_or_insert_lookup(tx, &mut app_name_cache, "app_names", v)).transpose()?;
|
.as_ref()
|
||||||
|
.map(|v| get_or_insert_cached(&mut models_stmt, &mut model_cache, v))
|
||||||
|
.transpose()?;
|
||||||
|
let device_id = entry
|
||||||
|
.device
|
||||||
|
.as_ref()
|
||||||
|
.map(|v| get_or_insert_cached(&mut devices_stmt, &mut device_cache, v))
|
||||||
|
.transpose()?;
|
||||||
|
let os_id = entry
|
||||||
|
.os
|
||||||
|
.as_ref()
|
||||||
|
.map(|v| get_or_insert_cached(&mut os_stmt, &mut os_cache, v))
|
||||||
|
.transpose()?;
|
||||||
|
let app_name_id = entry
|
||||||
|
.app_name
|
||||||
|
.as_ref()
|
||||||
|
.map(|v| get_or_insert_cached(&mut app_names_stmt, &mut app_name_cache, v))
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
insert_stmt.execute(params![
|
insert_stmt.execute(params![
|
||||||
entry.session_id,
|
entry.session_id,
|
||||||
@@ -135,32 +203,19 @@ impl Database {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get or insert a value into a lookup table, using a cache to minimize DB queries
|
/// Get or insert a value using a prepared statement with RETURNING, with in-memory cache
|
||||||
fn get_or_insert_lookup(
|
fn get_or_insert_cached(
|
||||||
tx: &Transaction<'_>,
|
stmt: &mut rusqlite::CachedStatement<'_>,
|
||||||
cache: &mut HashMap<String, i64>,
|
cache: &mut HashMap<String, i64>,
|
||||||
table: &str,
|
|
||||||
value: &str,
|
value: &str,
|
||||||
) -> Result<i64> {
|
) -> Result<i64> {
|
||||||
|
// Check cache first
|
||||||
if let Some(&id) = cache.get(value) {
|
if let Some(&id) = cache.get(value) {
|
||||||
return Ok(id);
|
return Ok(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to find existing entry
|
// Use INSERT ... ON CONFLICT ... RETURNING to get id in one round-trip
|
||||||
let query = format!("SELECT id FROM {} WHERE name = ?", table);
|
let id: i64 = stmt.query_row(params![value], |row| row.get(0))?;
|
||||||
let existing: Option<i64> = tx
|
|
||||||
.query_row(&query, params![value], |row| row.get(0))
|
|
||||||
.ok();
|
|
||||||
|
|
||||||
if let Some(id) = existing {
|
|
||||||
cache.insert(value.to_string(), id);
|
|
||||||
return Ok(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert new entry
|
|
||||||
let insert = format!("INSERT INTO {} (name) VALUES (?)", table);
|
|
||||||
tx.execute(&insert, params![value])?;
|
|
||||||
let id = tx.last_insert_rowid();
|
|
||||||
cache.insert(value.to_string(), id);
|
cache.insert(value.to_string(), id);
|
||||||
Ok(id)
|
Ok(id)
|
||||||
}
|
}
|
||||||
|
|||||||
51
src/files.rs
51
src/files.rs
@@ -1,10 +1,41 @@
|
|||||||
use anyhow::{anyhow, Result};
|
use anyhow::{Result, anyhow};
|
||||||
use chrono::NaiveDate;
|
use chrono::NaiveDate;
|
||||||
use flate2::read::GzDecoder;
|
use flate2::read::GzDecoder;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{BufRead, BufReader};
|
use std::io::{BufRead, BufReader, Read};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
/// Enum-based reader to avoid Box<dyn BufRead> heap allocation and dynamic dispatch
|
||||||
|
pub enum LogReader {
|
||||||
|
Plain(BufReader<File>),
|
||||||
|
Gzip(BufReader<GzDecoder<File>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Read for LogReader {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
match self {
|
||||||
|
LogReader::Plain(r) => r.read(buf),
|
||||||
|
LogReader::Gzip(r) => r.read(buf),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BufRead for LogReader {
|
||||||
|
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
|
||||||
|
match self {
|
||||||
|
LogReader::Plain(r) => r.fill_buf(),
|
||||||
|
LogReader::Gzip(r) => r.fill_buf(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn consume(&mut self, amt: usize) {
|
||||||
|
match self {
|
||||||
|
LogReader::Plain(r) => r.consume(amt),
|
||||||
|
LogReader::Gzip(r) => r.consume(amt),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Discovers log files for a given date range
|
/// Discovers log files for a given date range
|
||||||
pub struct LogFileDiscovery {
|
pub struct LogFileDiscovery {
|
||||||
base_dir: PathBuf,
|
base_dir: PathBuf,
|
||||||
@@ -25,9 +56,7 @@ impl LogFileDiscovery {
|
|||||||
if let Some(log_file) = self.find_log_for_date(current)? {
|
if let Some(log_file) = self.find_log_for_date(current)? {
|
||||||
files.push(log_file);
|
files.push(log_file);
|
||||||
}
|
}
|
||||||
current = current
|
current = current.succ_opt().ok_or_else(|| anyhow!("Date overflow"))?;
|
||||||
.succ_opt()
|
|
||||||
.ok_or_else(|| anyhow!("Date overflow"))?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(files)
|
Ok(files)
|
||||||
@@ -72,26 +101,26 @@ pub struct LogFile {
|
|||||||
|
|
||||||
impl LogFile {
|
impl LogFile {
|
||||||
/// Returns a buffered reader for this log file, handling compression transparently
|
/// Returns a buffered reader for this log file, handling compression transparently
|
||||||
pub fn reader(&self) -> Result<Box<dyn BufRead>> {
|
pub fn reader(&self) -> Result<LogReader> {
|
||||||
let file = File::open(&self.path)?;
|
let file = File::open(&self.path)?;
|
||||||
|
|
||||||
if self.compressed {
|
if self.compressed {
|
||||||
let decoder = GzDecoder::new(file);
|
let decoder = GzDecoder::new(file);
|
||||||
Ok(Box::new(BufReader::new(decoder)))
|
Ok(LogReader::Gzip(BufReader::new(decoder)))
|
||||||
} else {
|
} else {
|
||||||
Ok(Box::new(BufReader::new(file)))
|
Ok(LogReader::Plain(BufReader::new(file)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// For reading a single file directly (e.g., for testing)
|
/// For reading a single file directly (e.g., for testing)
|
||||||
pub fn read_log_file(path: &str) -> Result<Box<dyn BufRead>> {
|
pub fn read_log_file(path: &str) -> Result<LogReader> {
|
||||||
let file = File::open(path)?;
|
let file = File::open(path)?;
|
||||||
|
|
||||||
if path.ends_with(".gz") {
|
if path.ends_with(".gz") {
|
||||||
let decoder = GzDecoder::new(file);
|
let decoder = GzDecoder::new(file);
|
||||||
Ok(Box::new(BufReader::new(decoder)))
|
Ok(LogReader::Gzip(BufReader::new(decoder)))
|
||||||
} else {
|
} else {
|
||||||
Ok(Box::new(BufReader::new(file)))
|
Ok(LogReader::Plain(BufReader::new(file)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
80
src/main.rs
80
src/main.rs
@@ -1,13 +1,13 @@
|
|||||||
use anyhow::{anyhow, Result};
|
use anyhow::{Result, anyhow};
|
||||||
use chrono::NaiveDate;
|
use chrono::NaiveDate;
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use crossbeam_channel::{bounded, Sender};
|
use crossbeam_channel::{Sender, bounded};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::BufRead;
|
use std::io::BufRead;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
mod db;
|
mod db;
|
||||||
@@ -15,7 +15,7 @@ mod files;
|
|||||||
mod parser;
|
mod parser;
|
||||||
|
|
||||||
use db::Database;
|
use db::Database;
|
||||||
use files::{read_log_file, LogFile, LogFileDiscovery};
|
use files::{LogFile, LogFileDiscovery, LogReader, read_log_file};
|
||||||
use parser::{ParsedMessage, ParserRegistry, SignatureEntry};
|
use parser::{ParsedMessage, ParserRegistry, SignatureEntry};
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
@@ -63,11 +63,15 @@ fn main() -> Result<()> {
|
|||||||
let args = Args::parse();
|
let args = Args::parse();
|
||||||
|
|
||||||
// Configure rayon thread pool if threads specified
|
// Configure rayon thread pool if threads specified
|
||||||
if args.threads > 0 {
|
if args.threads > 0
|
||||||
rayon::ThreadPoolBuilder::new()
|
&& let Err(e) = rayon::ThreadPoolBuilder::new()
|
||||||
.num_threads(args.threads)
|
.num_threads(args.threads)
|
||||||
.build_global()
|
.build_global()
|
||||||
.ok(); // Ignore error if pool already initialized
|
{
|
||||||
|
eprintln!(
|
||||||
|
"Warning: Could not configure thread pool ({}), using default",
|
||||||
|
e
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let use_parallel = args.threads != 1;
|
let use_parallel = args.threads != 1;
|
||||||
@@ -79,6 +83,7 @@ fn main() -> Result<()> {
|
|||||||
eprintln!("Processing single file: {}", file_path.display());
|
eprintln!("Processing single file: {}", file_path.display());
|
||||||
let reader = read_log_file(file_path.to_str().unwrap())?;
|
let reader = read_log_file(file_path.to_str().unwrap())?;
|
||||||
process_reader(reader, ®istry, &mut db, args.batch_size)?;
|
process_reader(reader, ®istry, &mut db, args.batch_size)?;
|
||||||
|
db.finalize()?;
|
||||||
} else {
|
} else {
|
||||||
// Process date range
|
// Process date range
|
||||||
let from = parse_date(
|
let from = parse_date(
|
||||||
@@ -142,12 +147,18 @@ fn process_files_sequential(
|
|||||||
let reader = log_file.reader()?;
|
let reader = log_file.reader()?;
|
||||||
process_reader(reader, ®istry, &mut db, batch_size)?;
|
process_reader(reader, ®istry, &mut db, batch_size)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
db.finalize()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_files_parallel(log_files: Vec<LogFile>, output: &str, batch_size: usize) -> Result<()> {
|
fn process_files_parallel(log_files: Vec<LogFile>, output: &str, batch_size: usize) -> Result<()> {
|
||||||
let num_threads = rayon::current_num_threads();
|
let num_threads = rayon::current_num_threads();
|
||||||
eprintln!("Processing {} files with {} threads", log_files.len(), num_threads);
|
eprintln!(
|
||||||
|
"Processing {} files with {} threads",
|
||||||
|
log_files.len(),
|
||||||
|
num_threads
|
||||||
|
);
|
||||||
|
|
||||||
// Channel for sending parsed entries to the DB writer
|
// Channel for sending parsed entries to the DB writer
|
||||||
// Buffer size: enough batches to keep workers busy without excessive memory
|
// Buffer size: enough batches to keep workers busy without excessive memory
|
||||||
@@ -158,6 +169,9 @@ fn process_files_parallel(log_files: Vec<LogFile>, output: &str, batch_size: usi
|
|||||||
let parsed_lines = Arc::new(AtomicU64::new(0));
|
let parsed_lines = Arc::new(AtomicU64::new(0));
|
||||||
let error_count = Arc::new(AtomicU64::new(0));
|
let error_count = Arc::new(AtomicU64::new(0));
|
||||||
|
|
||||||
|
// Shared parser registry - parsers are stateless, so we can share one instance
|
||||||
|
let registry = Arc::new(ParserRegistry::new());
|
||||||
|
|
||||||
// Spawn DB writer thread
|
// Spawn DB writer thread
|
||||||
let output_path = output.to_string();
|
let output_path = output.to_string();
|
||||||
let db_handle = thread::spawn(move || -> Result<()> {
|
let db_handle = thread::spawn(move || -> Result<()> {
|
||||||
@@ -169,19 +183,23 @@ fn process_files_parallel(log_files: Vec<LogFile>, output: &str, batch_size: usi
|
|||||||
tx.commit()?;
|
tx.commit()?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
db.finalize()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
|
|
||||||
// Process files in parallel
|
// Process files in parallel
|
||||||
let result: Result<()> = log_files
|
let result: Result<()> = log_files.into_par_iter().try_for_each(|log_file| {
|
||||||
.into_par_iter()
|
|
||||||
.try_for_each(|log_file| {
|
|
||||||
let file_path = log_file.path.display().to_string();
|
let file_path = log_file.path.display().to_string();
|
||||||
let compressed = if log_file.compressed { "compressed" } else { "plain" };
|
let compressed = if log_file.compressed {
|
||||||
|
"compressed"
|
||||||
|
} else {
|
||||||
|
"plain"
|
||||||
|
};
|
||||||
eprintln!("Starting: {} ({})", file_path, compressed);
|
eprintln!("Starting: {} ({})", file_path, compressed);
|
||||||
|
|
||||||
process_file_parallel(
|
process_file_parallel(
|
||||||
log_file,
|
log_file,
|
||||||
|
®istry,
|
||||||
&sender,
|
&sender,
|
||||||
batch_size,
|
batch_size,
|
||||||
&total_lines,
|
&total_lines,
|
||||||
@@ -214,25 +232,35 @@ fn process_files_parallel(log_files: Vec<LogFile>, output: &str, batch_size: usi
|
|||||||
|
|
||||||
fn process_file_parallel(
|
fn process_file_parallel(
|
||||||
log_file: LogFile,
|
log_file: LogFile,
|
||||||
|
registry: &Arc<ParserRegistry>,
|
||||||
sender: &Sender<Vec<SignatureEntry>>,
|
sender: &Sender<Vec<SignatureEntry>>,
|
||||||
batch_size: usize,
|
batch_size: usize,
|
||||||
total_lines: &AtomicU64,
|
total_lines: &AtomicU64,
|
||||||
parsed_lines: &AtomicU64,
|
parsed_lines: &AtomicU64,
|
||||||
error_count: &AtomicU64,
|
error_count: &AtomicU64,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let registry = ParserRegistry::new();
|
let mut reader = log_file.reader()?;
|
||||||
let reader = log_file.reader()?;
|
|
||||||
|
|
||||||
let mut batch: Vec<SignatureEntry> = Vec::with_capacity(batch_size);
|
let mut batch: Vec<SignatureEntry> = Vec::with_capacity(batch_size);
|
||||||
let mut file_lines = 0u64;
|
let mut file_lines = 0u64;
|
||||||
let mut file_parsed = 0u64;
|
let mut file_parsed = 0u64;
|
||||||
let mut file_errors = 0u64;
|
let mut file_errors = 0u64;
|
||||||
|
|
||||||
for line_result in reader.lines() {
|
// Reuse line buffer to avoid per-line allocations
|
||||||
let line = line_result?;
|
let mut line = String::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
line.clear();
|
||||||
|
let bytes_read = reader.read_line(&mut line)?;
|
||||||
|
if bytes_read == 0 {
|
||||||
|
break; // EOF
|
||||||
|
}
|
||||||
file_lines += 1;
|
file_lines += 1;
|
||||||
|
|
||||||
if let Some(parse_result) = registry.parse(&line) {
|
// Trim newline for parsing (without reallocating)
|
||||||
|
let line_trimmed = line.trim_end();
|
||||||
|
|
||||||
|
if let Some(parse_result) = registry.parse(line_trimmed) {
|
||||||
match parse_result {
|
match parse_result {
|
||||||
Ok(ParsedMessage::Signature(entry)) => {
|
Ok(ParsedMessage::Signature(entry)) => {
|
||||||
batch.push(entry);
|
batch.push(entry);
|
||||||
@@ -266,7 +294,7 @@ fn process_file_parallel(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn process_reader(
|
fn process_reader(
|
||||||
reader: Box<dyn BufRead>,
|
mut reader: LogReader,
|
||||||
registry: &ParserRegistry,
|
registry: &ParserRegistry,
|
||||||
db: &mut Database,
|
db: &mut Database,
|
||||||
batch_size: usize,
|
batch_size: usize,
|
||||||
@@ -276,11 +304,21 @@ fn process_reader(
|
|||||||
let mut parsed_lines = 0u64;
|
let mut parsed_lines = 0u64;
|
||||||
let mut error_counts: HashMap<String, u64> = HashMap::new();
|
let mut error_counts: HashMap<String, u64> = HashMap::new();
|
||||||
|
|
||||||
for line_result in reader.lines() {
|
// Reuse line buffer to avoid per-line allocations
|
||||||
let line = line_result?;
|
let mut line = String::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
line.clear();
|
||||||
|
let bytes_read = reader.read_line(&mut line)?;
|
||||||
|
if bytes_read == 0 {
|
||||||
|
break; // EOF
|
||||||
|
}
|
||||||
total_lines += 1;
|
total_lines += 1;
|
||||||
|
|
||||||
if let Some(parse_result) = registry.parse(&line) {
|
// Trim newline for parsing (without reallocating)
|
||||||
|
let line_trimmed = line.trim_end();
|
||||||
|
|
||||||
|
if let Some(parse_result) = registry.parse(line_trimmed) {
|
||||||
match parse_result {
|
match parse_result {
|
||||||
Ok(ParsedMessage::Signature(entry)) => {
|
Ok(ParsedMessage::Signature(entry)) => {
|
||||||
signature_batch.push(entry);
|
signature_batch.push(entry);
|
||||||
|
|||||||
354
src/parser.rs
354
src/parser.rs
@@ -1,4 +1,4 @@
|
|||||||
use anyhow::{anyhow, Result};
|
use anyhow::{Result, anyhow};
|
||||||
use chrono::NaiveDateTime;
|
use chrono::NaiveDateTime;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use std::sync::LazyLock;
|
use std::sync::LazyLock;
|
||||||
@@ -23,6 +23,45 @@ pub struct SignatureEntry {
|
|||||||
pub password_autofill_usage: Option<i64>,
|
pub password_autofill_usage: Option<i64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Parsed details without HashMap allocation - holds string slices into the original details string
|
||||||
|
struct ParsedDetails<'a> {
|
||||||
|
offline_login_usage: Option<&'a str>,
|
||||||
|
is_password_autofill_enabled: Option<&'a str>,
|
||||||
|
camera_roll_usage: Option<&'a str>,
|
||||||
|
os: Option<&'a str>,
|
||||||
|
app_name: Option<&'a str>,
|
||||||
|
touch_id: Option<&'a str>,
|
||||||
|
is_offline_login_enabled: Option<&'a str>,
|
||||||
|
model: Option<&'a str>,
|
||||||
|
device: Option<&'a str>,
|
||||||
|
password_autofill_usage: Option<&'a str>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> ParsedDetails<'a> {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
offline_login_usage: None,
|
||||||
|
is_password_autofill_enabled: None,
|
||||||
|
camera_roll_usage: None,
|
||||||
|
os: None,
|
||||||
|
app_name: None,
|
||||||
|
touch_id: None,
|
||||||
|
is_offline_login_enabled: None,
|
||||||
|
model: None,
|
||||||
|
device: None,
|
||||||
|
password_autofill_usage: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parsed mobile details without HashMap allocation
|
||||||
|
struct ParsedMobileDetails<'a> {
|
||||||
|
os: Option<&'a str>,
|
||||||
|
app_name: Option<&'a str>,
|
||||||
|
model: Option<&'a str>,
|
||||||
|
device: Option<&'a str>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Trait for parsing different message types from logs.
|
/// Trait for parsing different message types from logs.
|
||||||
/// Implement this trait to add support for new message formats.
|
/// Implement this trait to add support for new message formats.
|
||||||
pub trait MessageParser: Send + Sync {
|
pub trait MessageParser: Send + Sync {
|
||||||
@@ -39,8 +78,9 @@ pub enum ParsedMessage {
|
|||||||
|
|
||||||
static SESSION_ID_RE: LazyLock<Regex> =
|
static SESSION_ID_RE: LazyLock<Regex> =
|
||||||
LazyLock::new(|| Regex::new(r"sessionId=([^,\s]+)").unwrap());
|
LazyLock::new(|| Regex::new(r"sessionId=([^,\s]+)").unwrap());
|
||||||
static DATETIME_RE: LazyLock<Regex> =
|
static DATETIME_RE: LazyLock<Regex> = LazyLock::new(|| {
|
||||||
LazyLock::new(|| Regex::new(r#"dt="(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})(?:,(\d{3}))?"#).unwrap());
|
Regex::new(r#"dt="(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})(?:,(\d{3}))?"#).unwrap()
|
||||||
|
});
|
||||||
static CORRELATION_ID_RE: LazyLock<Regex> =
|
static CORRELATION_ID_RE: LazyLock<Regex> =
|
||||||
LazyLock::new(|| Regex::new(r"correlationId=([^,\s]+)").unwrap());
|
LazyLock::new(|| Regex::new(r"correlationId=([^,\s]+)").unwrap());
|
||||||
static SIGNATURE_RE: LazyLock<Regex> =
|
static SIGNATURE_RE: LazyLock<Regex> =
|
||||||
@@ -100,25 +140,26 @@ impl SignatureParser {
|
|||||||
let version = caps.get(2).map(|m| m.as_str().to_string()).unwrap();
|
let version = caps.get(2).map(|m| m.as_str().to_string()).unwrap();
|
||||||
let details_str = caps.get(3).map(|m| m.as_str()).unwrap();
|
let details_str = caps.get(3).map(|m| m.as_str()).unwrap();
|
||||||
|
|
||||||
// Parse details key-value pairs
|
// Parse details key-value pairs directly into struct (no HashMap)
|
||||||
// Handle the tricky "device:iOS, Apple" case by parsing carefully
|
let details = parse_details_direct(details_str);
|
||||||
let details = parse_details(details_str)?;
|
|
||||||
|
|
||||||
let entry = SignatureEntry {
|
let entry = SignatureEntry {
|
||||||
session_id,
|
session_id,
|
||||||
timestamp_ms,
|
timestamp_ms,
|
||||||
app,
|
app,
|
||||||
version,
|
version,
|
||||||
offline_login_usage: parse_number(&details, "offlineLoginUsage"),
|
offline_login_usage: details.offline_login_usage.and_then(parse_number_str),
|
||||||
is_password_autofill_enabled: parse_bool(&details, "isPasswordAutofillEnabled"),
|
is_password_autofill_enabled: details
|
||||||
camera_roll_usage: parse_number(&details, "cameraRollUsage"),
|
.is_password_autofill_enabled
|
||||||
os: get_string(&details, "OS"),
|
.and_then(parse_bool_str),
|
||||||
app_name: get_string(&details, "appName"),
|
camera_roll_usage: details.camera_roll_usage.and_then(parse_number_str),
|
||||||
touch_id: parse_bool(&details, "touchID"),
|
os: details.os.map(|s| s.to_string()),
|
||||||
is_offline_login_enabled: parse_bool(&details, "isOfflineLoginEnabled"),
|
app_name: details.app_name.map(|s| s.to_string()),
|
||||||
model: get_string(&details, "model"),
|
touch_id: details.touch_id.and_then(parse_bool_str),
|
||||||
device: get_string(&details, "device"),
|
is_offline_login_enabled: details.is_offline_login_enabled.and_then(parse_bool_str),
|
||||||
password_autofill_usage: parse_number(&details, "passwordAutofillUsage"),
|
model: details.model.map(|s| s.to_string()),
|
||||||
|
device: details.device.map(|s| s.to_string()),
|
||||||
|
password_autofill_usage: details.password_autofill_usage.and_then(parse_number_str),
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(ParsedMessage::Signature(entry))
|
Ok(ParsedMessage::Signature(entry))
|
||||||
@@ -151,7 +192,7 @@ impl MobileClientIosParser {
|
|||||||
let version = caps.get(2).map(|m| m.as_str().to_string()).unwrap();
|
let version = caps.get(2).map(|m| m.as_str().to_string()).unwrap();
|
||||||
let details_str = caps.get(4).map(|m| m.as_str()).unwrap();
|
let details_str = caps.get(4).map(|m| m.as_str()).unwrap();
|
||||||
|
|
||||||
let details = parse_mobile_details(details_str);
|
let details = parse_mobile_details_direct(details_str);
|
||||||
|
|
||||||
let entry = SignatureEntry {
|
let entry = SignatureEntry {
|
||||||
session_id,
|
session_id,
|
||||||
@@ -161,11 +202,11 @@ impl MobileClientIosParser {
|
|||||||
offline_login_usage: None,
|
offline_login_usage: None,
|
||||||
is_password_autofill_enabled: None,
|
is_password_autofill_enabled: None,
|
||||||
camera_roll_usage: None,
|
camera_roll_usage: None,
|
||||||
os: get_string(&details, "os"),
|
os: details.os.map(|s| s.to_string()),
|
||||||
app_name: get_string(&details, "app-name"),
|
app_name: details.app_name.map(|s| s.to_string()),
|
||||||
touch_id: None,
|
touch_id: None,
|
||||||
is_offline_login_enabled: None,
|
is_offline_login_enabled: None,
|
||||||
model: get_string(&details, "model"),
|
model: details.model.map(|s| s.to_string()),
|
||||||
device: Some("iOS".to_string()),
|
device: Some("iOS".to_string()),
|
||||||
password_autofill_usage: None,
|
password_autofill_usage: None,
|
||||||
};
|
};
|
||||||
@@ -200,7 +241,7 @@ impl MobileClientAndroidParser {
|
|||||||
let version = caps.get(2).map(|m| m.as_str().to_string()).unwrap();
|
let version = caps.get(2).map(|m| m.as_str().to_string()).unwrap();
|
||||||
let details_str = caps.get(4).map(|m| m.as_str()).unwrap();
|
let details_str = caps.get(4).map(|m| m.as_str()).unwrap();
|
||||||
|
|
||||||
let details = parse_mobile_details_android(details_str);
|
let details = parse_mobile_details_android_direct(details_str);
|
||||||
|
|
||||||
let entry = SignatureEntry {
|
let entry = SignatureEntry {
|
||||||
session_id,
|
session_id,
|
||||||
@@ -210,12 +251,12 @@ impl MobileClientAndroidParser {
|
|||||||
offline_login_usage: None,
|
offline_login_usage: None,
|
||||||
is_password_autofill_enabled: None,
|
is_password_autofill_enabled: None,
|
||||||
camera_roll_usage: None,
|
camera_roll_usage: None,
|
||||||
os: get_string(&details, "os"),
|
os: details.os.map(|s| s.to_string()),
|
||||||
app_name: Some("native Android".to_string()),
|
app_name: Some("native Android".to_string()),
|
||||||
touch_id: None,
|
touch_id: None,
|
||||||
is_offline_login_enabled: None,
|
is_offline_login_enabled: None,
|
||||||
model: get_string(&details, "model"),
|
model: details.model.map(|s| s.to_string()),
|
||||||
device: get_string(&details, "device"),
|
device: details.device.map(|s| s.to_string()),
|
||||||
password_autofill_usage: None,
|
password_autofill_usage: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -250,93 +291,95 @@ fn extract_correlation_id(line: &str) -> Result<String> {
|
|||||||
.ok_or_else(|| anyhow!("Missing correlationId"))
|
.ok_or_else(|| anyhow!("Missing correlationId"))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse mobile client details for iOS (simple comma-separated key:value)
|
/// Parse mobile client details for iOS directly into struct (no HashMap allocation)
|
||||||
fn parse_mobile_details(details: &str) -> std::collections::HashMap<String, String> {
|
fn parse_mobile_details_direct(details: &str) -> ParsedMobileDetails<'_> {
|
||||||
let mut map = std::collections::HashMap::new();
|
|
||||||
|
|
||||||
// Keys for iOS mobile client
|
// Keys for iOS mobile client
|
||||||
let known_keys = ["sdk-client", "sdk-version", "app-name", "device", "model", "os"];
|
const KNOWN_KEYS: [&str; 6] = [
|
||||||
|
"sdk-client",
|
||||||
|
"sdk-version",
|
||||||
|
"app-name",
|
||||||
|
"device",
|
||||||
|
"model",
|
||||||
|
"os",
|
||||||
|
];
|
||||||
|
|
||||||
let mut key_positions: Vec<(usize, &str)> = known_keys
|
let mut key_positions: Vec<(usize, &str)> = KNOWN_KEYS
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|&key| {
|
.filter_map(|&key| find_key_position(details, key))
|
||||||
let pattern = format!("{}:", key);
|
|
||||||
details.find(&pattern).map(|pos| (pos, key))
|
|
||||||
})
|
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
key_positions.sort_by_key(|&(pos, _)| pos);
|
key_positions.sort_by_key(|&(pos, _)| pos);
|
||||||
|
|
||||||
for i in 0..key_positions.len() {
|
let mut result = ParsedMobileDetails {
|
||||||
let (pos, key) = key_positions[i];
|
os: None,
|
||||||
let value_start = pos + key.len() + 1;
|
app_name: None,
|
||||||
|
model: None,
|
||||||
let value_end = if i + 1 < key_positions.len() {
|
device: None,
|
||||||
let next_pos = key_positions[i + 1].0;
|
|
||||||
if next_pos > 0 && details.as_bytes().get(next_pos - 1) == Some(&b',') {
|
|
||||||
next_pos - 1
|
|
||||||
} else {
|
|
||||||
next_pos
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
details.find(" user-agent").unwrap_or(details.len())
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let value = details[value_start..value_end].trim().to_string();
|
for i in 0..key_positions.len() {
|
||||||
map.insert(key.to_string(), value);
|
let (pos, key) = key_positions[i];
|
||||||
|
let value = extract_value(details, pos, key, i, &key_positions);
|
||||||
|
|
||||||
|
match key {
|
||||||
|
"os" => result.os = Some(value),
|
||||||
|
"app-name" => result.app_name = Some(value),
|
||||||
|
"model" => result.model = Some(value),
|
||||||
|
"device" => result.device = Some(value),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
map
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse mobile client details for Android (handles device with commas)
|
/// Parse mobile client details for Android directly into struct (no HashMap allocation)
|
||||||
fn parse_mobile_details_android(details: &str) -> std::collections::HashMap<String, String> {
|
fn parse_mobile_details_android_direct(details: &str) -> ParsedMobileDetails<'_> {
|
||||||
let mut map = std::collections::HashMap::new();
|
|
||||||
|
|
||||||
// For Android, device can contain commas like "Android, samsung"
|
// For Android, device can contain commas like "Android, samsung"
|
||||||
// Keys in order: sdk-client, sdk-version, app-name, device, model, os
|
const KNOWN_KEYS: [&str; 6] = [
|
||||||
let known_keys = ["sdk-client", "sdk-version", "app-name", "device", "model", "os"];
|
"sdk-client",
|
||||||
|
"sdk-version",
|
||||||
|
"app-name",
|
||||||
|
"device",
|
||||||
|
"model",
|
||||||
|
"os",
|
||||||
|
];
|
||||||
|
|
||||||
let mut key_positions: Vec<(usize, &str)> = known_keys
|
let mut key_positions: Vec<(usize, &str)> = KNOWN_KEYS
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|&key| {
|
.filter_map(|&key| find_key_position(details, key))
|
||||||
let pattern = format!("{}:", key);
|
|
||||||
details.find(&pattern).map(|pos| (pos, key))
|
|
||||||
})
|
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
key_positions.sort_by_key(|&(pos, _)| pos);
|
key_positions.sort_by_key(|&(pos, _)| pos);
|
||||||
|
|
||||||
for i in 0..key_positions.len() {
|
let mut result = ParsedMobileDetails {
|
||||||
let (pos, key) = key_positions[i];
|
os: None,
|
||||||
let value_start = pos + key.len() + 1;
|
app_name: None,
|
||||||
|
model: None,
|
||||||
let value_end = if i + 1 < key_positions.len() {
|
device: None,
|
||||||
let next_pos = key_positions[i + 1].0;
|
|
||||||
// Find the comma before the next key
|
|
||||||
if next_pos > 0 && details.as_bytes().get(next_pos - 1) == Some(&b',') {
|
|
||||||
next_pos - 1
|
|
||||||
} else {
|
|
||||||
next_pos
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
details.find(" user-agent").unwrap_or(details.len())
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let value = details[value_start..value_end].trim().to_string();
|
for i in 0..key_positions.len() {
|
||||||
map.insert(key.to_string(), value);
|
let (pos, key) = key_positions[i];
|
||||||
|
let value = extract_value(details, pos, key, i, &key_positions);
|
||||||
|
|
||||||
|
match key {
|
||||||
|
"os" => result.os = Some(value),
|
||||||
|
"app-name" => result.app_name = Some(value),
|
||||||
|
"model" => result.model = Some(value),
|
||||||
|
"device" => result.device = Some(value),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
map
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse the details string which has format like:
|
/// Parse the details string directly into ParsedDetails (no HashMap allocation)
|
||||||
/// offlineLoginUsage:0,isPasswordAutofillEnabled:no,...,device:iOS, Apple,passwordAutofillUsage:0
|
/// Format: offlineLoginUsage:0,isPasswordAutofillEnabled:no,...,device:iOS, Apple,passwordAutofillUsage:0
|
||||||
fn parse_details(details: &str) -> Result<std::collections::HashMap<String, String>> {
|
fn parse_details_direct(details: &str) -> ParsedDetails<'_> {
|
||||||
let mut map = std::collections::HashMap::new();
|
|
||||||
|
|
||||||
// Known keys in order they appear
|
// Known keys in order they appear
|
||||||
let known_keys = [
|
const KNOWN_KEYS: [&str; 10] = [
|
||||||
"offlineLoginUsage",
|
"offlineLoginUsage",
|
||||||
"isPasswordAutofillEnabled",
|
"isPasswordAutofillEnabled",
|
||||||
"cameraRollUsage",
|
"cameraRollUsage",
|
||||||
@@ -350,25 +393,71 @@ fn parse_details(details: &str) -> Result<std::collections::HashMap<String, Stri
|
|||||||
];
|
];
|
||||||
|
|
||||||
// Find positions of each key
|
// Find positions of each key
|
||||||
let mut key_positions: Vec<(usize, &str)> = known_keys
|
let mut key_positions: Vec<(usize, &str)> = KNOWN_KEYS
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|&key| {
|
.filter_map(|&key| find_key_position(details, key))
|
||||||
let pattern = format!("{}:", key);
|
|
||||||
details.find(&pattern).map(|pos| (pos, key))
|
|
||||||
})
|
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Sort by position
|
// Sort by position
|
||||||
key_positions.sort_by_key(|&(pos, _)| pos);
|
key_positions.sort_by_key(|&(pos, _)| pos);
|
||||||
|
|
||||||
|
let mut result = ParsedDetails::new();
|
||||||
|
|
||||||
// Extract values between keys
|
// Extract values between keys
|
||||||
for i in 0..key_positions.len() {
|
for i in 0..key_positions.len() {
|
||||||
let (pos, key) = key_positions[i];
|
let (pos, key) = key_positions[i];
|
||||||
|
let value = extract_value(details, pos, key, i, &key_positions);
|
||||||
|
|
||||||
|
match key {
|
||||||
|
"offlineLoginUsage" => result.offline_login_usage = Some(value),
|
||||||
|
"isPasswordAutofillEnabled" => result.is_password_autofill_enabled = Some(value),
|
||||||
|
"cameraRollUsage" => result.camera_roll_usage = Some(value),
|
||||||
|
"OS" => result.os = Some(value),
|
||||||
|
"appName" => result.app_name = Some(value),
|
||||||
|
"touchID" => result.touch_id = Some(value),
|
||||||
|
"isOfflineLoginEnabled" => result.is_offline_login_enabled = Some(value),
|
||||||
|
"model" => result.model = Some(value),
|
||||||
|
"device" => result.device = Some(value),
|
||||||
|
"passwordAutofillUsage" => result.password_autofill_usage = Some(value),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Find the position of a key in the details string without allocating
|
||||||
|
#[inline]
|
||||||
|
fn find_key_position<'a>(details: &str, key: &'a str) -> Option<(usize, &'a str)> {
|
||||||
|
// Search for "key:" pattern
|
||||||
|
let mut search_start = 0;
|
||||||
|
while let Some(pos) = details[search_start..].find(key) {
|
||||||
|
let absolute_pos = search_start + pos;
|
||||||
|
// Check if followed by ':'
|
||||||
|
if details.as_bytes().get(absolute_pos + key.len()) == Some(&b':') {
|
||||||
|
// Check if at start or preceded by comma
|
||||||
|
if absolute_pos == 0 || details.as_bytes().get(absolute_pos - 1) == Some(&b',') {
|
||||||
|
return Some((absolute_pos, key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
search_start = absolute_pos + 1;
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract a value from the details string without allocating
|
||||||
|
#[inline]
|
||||||
|
fn extract_value<'a>(
|
||||||
|
details: &'a str,
|
||||||
|
pos: usize,
|
||||||
|
key: &str,
|
||||||
|
index: usize,
|
||||||
|
key_positions: &[(usize, &str)],
|
||||||
|
) -> &'a str {
|
||||||
let value_start = pos + key.len() + 1; // +1 for ':'
|
let value_start = pos + key.len() + 1; // +1 for ':'
|
||||||
|
|
||||||
let value_end = if i + 1 < key_positions.len() {
|
let value_end = if index + 1 < key_positions.len() {
|
||||||
// Value ends at the comma before the next key
|
let next_pos = key_positions[index + 1].0;
|
||||||
let next_pos = key_positions[i + 1].0;
|
|
||||||
// Find the comma before the next key
|
// Find the comma before the next key
|
||||||
if next_pos > 0 && details.as_bytes().get(next_pos - 1) == Some(&b',') {
|
if next_pos > 0 && details.as_bytes().get(next_pos - 1) == Some(&b',') {
|
||||||
next_pos - 1
|
next_pos - 1
|
||||||
@@ -377,34 +466,74 @@ fn parse_details(details: &str) -> Result<std::collections::HashMap<String, Stri
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Last key - value goes until " user-agent" or end
|
// Last key - value goes until " user-agent" or end
|
||||||
details
|
details.find(" user-agent").unwrap_or(details.len())
|
||||||
.find(" user-agent")
|
|
||||||
.unwrap_or(details.len())
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let value = details[value_start..value_end].trim().to_string();
|
details[value_start..value_end].trim()
|
||||||
map.insert(key.to_string(), value);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(map)
|
/// Parse a number from a string slice without allocation
|
||||||
|
#[inline]
|
||||||
|
fn parse_number_str(s: &str) -> Option<i64> {
|
||||||
|
s.parse().ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_number(map: &std::collections::HashMap<String, String>, key: &str) -> Option<i64> {
|
/// Parse a boolean from a string slice using ASCII-insensitive matching (no allocation)
|
||||||
map.get(key).and_then(|v| v.parse().ok())
|
#[inline]
|
||||||
|
fn parse_bool_str(value: &str) -> Option<bool> {
|
||||||
|
let bytes = value.as_bytes();
|
||||||
|
match bytes.len() {
|
||||||
|
1 => match bytes[0] {
|
||||||
|
b'1' => Some(true),
|
||||||
|
b'0' => Some(false),
|
||||||
|
_ => None,
|
||||||
|
},
|
||||||
|
2 => {
|
||||||
|
// "no"
|
||||||
|
if (bytes[0] == b'n' || bytes[0] == b'N') && (bytes[1] == b'o' || bytes[1] == b'O') {
|
||||||
|
Some(false)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
3 => {
|
||||||
|
// "yes"
|
||||||
|
if (bytes[0] == b'y' || bytes[0] == b'Y')
|
||||||
|
&& (bytes[1] == b'e' || bytes[1] == b'E')
|
||||||
|
&& (bytes[2] == b's' || bytes[2] == b'S')
|
||||||
|
{
|
||||||
|
Some(true)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
4 => {
|
||||||
|
// "true"
|
||||||
|
if (bytes[0] == b't' || bytes[0] == b'T')
|
||||||
|
&& (bytes[1] == b'r' || bytes[1] == b'R')
|
||||||
|
&& (bytes[2] == b'u' || bytes[2] == b'U')
|
||||||
|
&& (bytes[3] == b'e' || bytes[3] == b'E')
|
||||||
|
{
|
||||||
|
Some(true)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
5 => {
|
||||||
|
// "false"
|
||||||
|
if (bytes[0] == b'f' || bytes[0] == b'F')
|
||||||
|
&& (bytes[1] == b'a' || bytes[1] == b'A')
|
||||||
|
&& (bytes[2] == b'l' || bytes[2] == b'L')
|
||||||
|
&& (bytes[3] == b's' || bytes[3] == b'S')
|
||||||
|
&& (bytes[4] == b'e' || bytes[4] == b'E')
|
||||||
|
{
|
||||||
|
Some(false)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_bool(map: &std::collections::HashMap<String, String>, key: &str) -> Option<bool> {
|
|
||||||
map.get(key).and_then(|value| {
|
|
||||||
match value.to_lowercase().as_str() {
|
|
||||||
"yes" | "true" | "1" => Some(true),
|
|
||||||
"no" | "false" | "0" => Some(false),
|
|
||||||
_ => None,
|
_ => None,
|
||||||
}
|
}
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_string(map: &std::collections::HashMap<String, String>, key: &str) -> Option<String> {
|
|
||||||
map.get(key).map(|s| s.to_string())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Registry of all available message parsers
|
/// Registry of all available message parsers
|
||||||
@@ -477,7 +606,8 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_parse_non_signature_line() {
|
fn test_parse_non_signature_line() {
|
||||||
let line = r#"Jan 21 00:00:06 tom013 m1s-kv dt="2026-01-21 00:00:06", msg="some other message""#;
|
let line =
|
||||||
|
r#"Jan 21 00:00:06 tom013 m1s-kv dt="2026-01-21 00:00:06", msg="some other message""#;
|
||||||
let registry = ParserRegistry::new();
|
let registry = ParserRegistry::new();
|
||||||
assert!(registry.parse(line).is_none());
|
assert!(registry.parse(line).is_none());
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user