diff --git a/Cargo.lock b/Cargo.lock index 33db092..1b17b8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -190,6 +190,46 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -318,7 +358,9 @@ dependencies = [ "anyhow", "chrono", "clap", + "crossbeam-channel", "flate2", + "rayon", "regex", "rusqlite", ] @@ -378,6 +420,26 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rayon" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "regex" version = "1.12.2" diff --git a/Cargo.toml b/Cargo.toml index 760f8f8..13351e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,5 @@ chrono = "0.4" regex = "1" flate2 = "1" anyhow = "1" +rayon = "1" +crossbeam-channel = "0.5" diff --git a/README.md b/README.md index c0ae8ad..728d7fc 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ Parses application logs containing signature messages and loads them into SQLite - Support for both plain `.log` and gzip compressed `.log.gz` files - File discovery by date range using `YYYY/mm/dd` directory structure - Batched inserts for performance with large files +- Parallel file processing for multi-day ingestion - Indexed columns (`session_id`, `version`) for efficient queries - Extensible parser architecture for adding new message types @@ -42,6 +43,21 @@ log_ingest \ The tool will look for files at `/YYYY/MM/DD/.gz` or `/YYYY/MM/DD/` for each day in the range. +### Parallel processing + +When processing multiple files, parsing runs in parallel by default using all available CPU cores. A single writer thread handles database inserts to avoid SQLite contention. + +```bash +# Use all CPU cores (default) +log_ingest --from 2026/01/01 --to 2026/01/31 ... + +# Limit to 4 threads +log_ingest --threads 4 --from 2026/01/01 --to 2026/01/31 ... + +# Sequential processing (disable parallelism) +log_ingest --threads 1 --from 2026/01/01 --to 2026/01/31 ... +``` + ### Options | Option | Description | @@ -53,6 +69,7 @@ The tool will look for files at `/YYYY/MM/DD/.gz` or `` | Log filename (e.g., `app.log`) | | `-o, --output ` | Output SQLite database path | | `--batch-size ` | Batch size for inserts (default: 10000) | +| `--threads ` | Number of parallel threads (0 = all cores, 1 = sequential) | ## Database Schema diff --git a/src/db.rs b/src/db.rs index 804558f..c37ffab 100644 --- a/src/db.rs +++ b/src/db.rs @@ -10,6 +10,10 @@ pub struct Database { impl Database { pub fn new(path: &str) -> Result { let conn = Connection::open(path)?; + + // Enable WAL mode for better concurrent read/write performance + conn.pragma_update(None, "journal_mode", "WAL")?; + let db = Self { conn }; db.init_schema()?; Ok(db) diff --git a/src/main.rs b/src/main.rs index 0082851..5cfe42f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,21 @@ use anyhow::{anyhow, Result}; use chrono::NaiveDate; use clap::Parser; +use crossbeam_channel::{bounded, Sender}; +use rayon::prelude::*; use std::collections::HashMap; use std::io::BufRead; use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::thread; mod db; mod files; mod parser; use db::Database; -use files::{read_log_file, LogFileDiscovery}; +use files::{read_log_file, LogFile, LogFileDiscovery}; use parser::{ParsedMessage, ParserRegistry, SignatureEntry}; #[derive(Parser, Debug)] @@ -43,6 +48,10 @@ struct Args { /// Batch size for database inserts #[arg(long, default_value = "10000")] batch_size: usize, + + /// Number of parallel threads for file processing (0 = use all available cores) + #[arg(long, default_value = "0")] + threads: usize, } fn parse_date(s: &str) -> Result { @@ -53,11 +62,20 @@ fn parse_date(s: &str) -> Result { fn main() -> Result<()> { let args = Args::parse(); - let mut db = Database::new(&args.output)?; - let registry = ParserRegistry::new(); + // Configure rayon thread pool if threads specified + if args.threads > 0 { + rayon::ThreadPoolBuilder::new() + .num_threads(args.threads) + .build_global() + .ok(); // Ignore error if pool already initialized + } + + let use_parallel = args.threads != 1; if let Some(file_path) = &args.file { - // Process single file + // Process single file (no parallelism needed) + let mut db = Database::new(&args.output)?; + let registry = ParserRegistry::new(); eprintln!("Processing single file: {}", file_path.display()); let reader = read_log_file(file_path.to_str().unwrap())?; process_reader(reader, ®istry, &mut db, args.batch_size)?; @@ -92,18 +110,10 @@ fn main() -> Result<()> { eprintln!("Found {} log files to process", log_files.len()); - for log_file in log_files { - eprintln!( - "Processing: {} ({})", - log_file.path.display(), - if log_file.compressed { - "compressed" - } else { - "plain" - } - ); - let reader = log_file.reader()?; - process_reader(reader, ®istry, &mut db, args.batch_size)?; + if use_parallel && log_files.len() > 1 { + process_files_parallel(log_files, &args.output, args.batch_size)?; + } else { + process_files_sequential(log_files, &args.output, args.batch_size)?; } } @@ -111,6 +121,150 @@ fn main() -> Result<()> { Ok(()) } +fn process_files_sequential( + log_files: Vec, + output: &str, + batch_size: usize, +) -> Result<()> { + let mut db = Database::new(output)?; + let registry = ParserRegistry::new(); + + for log_file in log_files { + eprintln!( + "Processing: {} ({})", + log_file.path.display(), + if log_file.compressed { + "compressed" + } else { + "plain" + } + ); + let reader = log_file.reader()?; + process_reader(reader, ®istry, &mut db, batch_size)?; + } + Ok(()) +} + +fn process_files_parallel(log_files: Vec, output: &str, batch_size: usize) -> Result<()> { + let num_threads = rayon::current_num_threads(); + eprintln!("Processing {} files with {} threads", log_files.len(), num_threads); + + // Channel for sending parsed entries to the DB writer + // Buffer size: enough batches to keep workers busy without excessive memory + let (sender, receiver) = bounded::>(num_threads * 2); + + // Shared counters for progress reporting + let total_lines = Arc::new(AtomicU64::new(0)); + let parsed_lines = Arc::new(AtomicU64::new(0)); + let error_count = Arc::new(AtomicU64::new(0)); + + // Spawn DB writer thread + let output_path = output.to_string(); + let db_handle = thread::spawn(move || -> Result<()> { + let mut db = Database::new(&output_path)?; + + for batch in receiver { + let tx = db.begin_transaction()?; + Database::insert_signature_batch(&tx, &batch)?; + tx.commit()?; + } + + Ok(()) + }); + + // Process files in parallel + let result: Result<()> = log_files + .into_par_iter() + .try_for_each(|log_file| { + let file_path = log_file.path.display().to_string(); + let compressed = if log_file.compressed { "compressed" } else { "plain" }; + eprintln!("Starting: {} ({})", file_path, compressed); + + process_file_parallel( + log_file, + &sender, + batch_size, + &total_lines, + &parsed_lines, + &error_count, + )?; + + eprintln!("Finished: {}", file_path); + Ok(()) + }); + + // Close the channel so DB writer knows to stop + drop(sender); + + // Wait for DB writer to finish + db_handle + .join() + .map_err(|_| anyhow!("DB writer thread panicked"))??; + + // Print final stats + eprintln!( + "Total: {} lines read, {} parsed, {} errors", + total_lines.load(Ordering::Relaxed), + parsed_lines.load(Ordering::Relaxed), + error_count.load(Ordering::Relaxed) + ); + + result +} + +fn process_file_parallel( + log_file: LogFile, + sender: &Sender>, + batch_size: usize, + total_lines: &AtomicU64, + parsed_lines: &AtomicU64, + error_count: &AtomicU64, +) -> Result<()> { + let registry = ParserRegistry::new(); + let reader = log_file.reader()?; + + let mut batch: Vec = Vec::with_capacity(batch_size); + let mut file_lines = 0u64; + let mut file_parsed = 0u64; + let mut file_errors = 0u64; + + for line_result in reader.lines() { + let line = line_result?; + file_lines += 1; + + if let Some(parse_result) = registry.parse(&line) { + match parse_result { + Ok(ParsedMessage::Signature(entry)) => { + batch.push(entry); + file_parsed += 1; + + if batch.len() >= batch_size { + sender.send(std::mem::replace( + &mut batch, + Vec::with_capacity(batch_size), + ))?; + } + } + Err(_) => { + file_errors += 1; + } + } + } + } + + // Send remaining entries + if !batch.is_empty() { + sender.send(batch)?; + } + + // Update shared counters + total_lines.fetch_add(file_lines, Ordering::Relaxed); + parsed_lines.fetch_add(file_parsed, Ordering::Relaxed); + error_count.fetch_add(file_errors, Ordering::Relaxed); + + Ok(()) +} + fn process_reader( reader: Box, registry: &ParserRegistry, @@ -142,7 +296,7 @@ fn process_reader( } } - if total_lines % 100_000 == 0 { + if total_lines.is_multiple_of(100_000) { let total_errors: u64 = error_counts.values().sum(); eprintln!( "Progress: {} lines read, {} parsed, {} errors",