Parallel processing

This commit is contained in:
2026-01-22 00:15:25 +01:00
parent 2d9f6eaa98
commit 946d0184a1
5 changed files with 256 additions and 17 deletions

62
Cargo.lock generated
View File

@@ -190,6 +190,46 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "crossbeam-channel"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "either"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]] [[package]]
name = "fallible-iterator" name = "fallible-iterator"
version = "0.3.0" version = "0.3.0"
@@ -318,7 +358,9 @@ dependencies = [
"anyhow", "anyhow",
"chrono", "chrono",
"clap", "clap",
"crossbeam-channel",
"flate2", "flate2",
"rayon",
"regex", "regex",
"rusqlite", "rusqlite",
] ]
@@ -378,6 +420,26 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "rayon"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.12.2" version = "1.12.2"

View File

@@ -11,3 +11,5 @@ chrono = "0.4"
regex = "1" regex = "1"
flate2 = "1" flate2 = "1"
anyhow = "1" anyhow = "1"
rayon = "1"
crossbeam-channel = "0.5"

View File

@@ -12,6 +12,7 @@ Parses application logs containing signature messages and loads them into SQLite
- Support for both plain `.log` and gzip compressed `.log.gz` files - Support for both plain `.log` and gzip compressed `.log.gz` files
- File discovery by date range using `YYYY/mm/dd` directory structure - File discovery by date range using `YYYY/mm/dd` directory structure
- Batched inserts for performance with large files - Batched inserts for performance with large files
- Parallel file processing for multi-day ingestion
- Indexed columns (`session_id`, `version`) for efficient queries - Indexed columns (`session_id`, `version`) for efficient queries
- Extensible parser architecture for adding new message types - Extensible parser architecture for adding new message types
@@ -42,6 +43,21 @@ log_ingest \
The tool will look for files at `<base-dir>/YYYY/MM/DD/<filename>.gz` or `<base-dir>/YYYY/MM/DD/<filename>` for each day in the range. The tool will look for files at `<base-dir>/YYYY/MM/DD/<filename>.gz` or `<base-dir>/YYYY/MM/DD/<filename>` for each day in the range.
### Parallel processing
When processing multiple files, parsing runs in parallel by default using all available CPU cores. A single writer thread handles database inserts to avoid SQLite contention.
```bash
# Use all CPU cores (default)
log_ingest --from 2026/01/01 --to 2026/01/31 ...
# Limit to 4 threads
log_ingest --threads 4 --from 2026/01/01 --to 2026/01/31 ...
# Sequential processing (disable parallelism)
log_ingest --threads 1 --from 2026/01/01 --to 2026/01/31 ...
```
### Options ### Options
| Option | Description | | Option | Description |
@@ -53,6 +69,7 @@ The tool will look for files at `<base-dir>/YYYY/MM/DD/<filename>.gz` or `<base-
| `--filename <NAME>` | Log filename (e.g., `app.log`) | | `--filename <NAME>` | Log filename (e.g., `app.log`) |
| `-o, --output <PATH>` | Output SQLite database path | | `-o, --output <PATH>` | Output SQLite database path |
| `--batch-size <N>` | Batch size for inserts (default: 10000) | | `--batch-size <N>` | Batch size for inserts (default: 10000) |
| `--threads <N>` | Number of parallel threads (0 = all cores, 1 = sequential) |
## Database Schema ## Database Schema

View File

@@ -10,6 +10,10 @@ pub struct Database {
impl Database { impl Database {
pub fn new(path: &str) -> Result<Self> { pub fn new(path: &str) -> Result<Self> {
let conn = Connection::open(path)?; let conn = Connection::open(path)?;
// Enable WAL mode for better concurrent read/write performance
conn.pragma_update(None, "journal_mode", "WAL")?;
let db = Self { conn }; let db = Self { conn };
db.init_schema()?; db.init_schema()?;
Ok(db) Ok(db)

View File

@@ -1,16 +1,21 @@
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use chrono::NaiveDate; use chrono::NaiveDate;
use clap::Parser; use clap::Parser;
use crossbeam_channel::{bounded, Sender};
use rayon::prelude::*;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::BufRead; use std::io::BufRead;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
mod db; mod db;
mod files; mod files;
mod parser; mod parser;
use db::Database; use db::Database;
use files::{read_log_file, LogFileDiscovery}; use files::{read_log_file, LogFile, LogFileDiscovery};
use parser::{ParsedMessage, ParserRegistry, SignatureEntry}; use parser::{ParsedMessage, ParserRegistry, SignatureEntry};
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@@ -43,6 +48,10 @@ struct Args {
/// Batch size for database inserts /// Batch size for database inserts
#[arg(long, default_value = "10000")] #[arg(long, default_value = "10000")]
batch_size: usize, batch_size: usize,
/// Number of parallel threads for file processing (0 = use all available cores)
#[arg(long, default_value = "0")]
threads: usize,
} }
fn parse_date(s: &str) -> Result<NaiveDate> { fn parse_date(s: &str) -> Result<NaiveDate> {
@@ -53,11 +62,20 @@ fn parse_date(s: &str) -> Result<NaiveDate> {
fn main() -> Result<()> { fn main() -> Result<()> {
let args = Args::parse(); let args = Args::parse();
let mut db = Database::new(&args.output)?; // Configure rayon thread pool if threads specified
let registry = ParserRegistry::new(); if args.threads > 0 {
rayon::ThreadPoolBuilder::new()
.num_threads(args.threads)
.build_global()
.ok(); // Ignore error if pool already initialized
}
let use_parallel = args.threads != 1;
if let Some(file_path) = &args.file { if let Some(file_path) = &args.file {
// Process single file // Process single file (no parallelism needed)
let mut db = Database::new(&args.output)?;
let registry = ParserRegistry::new();
eprintln!("Processing single file: {}", file_path.display()); eprintln!("Processing single file: {}", file_path.display());
let reader = read_log_file(file_path.to_str().unwrap())?; let reader = read_log_file(file_path.to_str().unwrap())?;
process_reader(reader, &registry, &mut db, args.batch_size)?; process_reader(reader, &registry, &mut db, args.batch_size)?;
@@ -92,18 +110,10 @@ fn main() -> Result<()> {
eprintln!("Found {} log files to process", log_files.len()); eprintln!("Found {} log files to process", log_files.len());
for log_file in log_files { if use_parallel && log_files.len() > 1 {
eprintln!( process_files_parallel(log_files, &args.output, args.batch_size)?;
"Processing: {} ({})", } else {
log_file.path.display(), process_files_sequential(log_files, &args.output, args.batch_size)?;
if log_file.compressed {
"compressed"
} else {
"plain"
}
);
let reader = log_file.reader()?;
process_reader(reader, &registry, &mut db, args.batch_size)?;
} }
} }
@@ -111,6 +121,150 @@ fn main() -> Result<()> {
Ok(()) Ok(())
} }
fn process_files_sequential(
log_files: Vec<LogFile>,
output: &str,
batch_size: usize,
) -> Result<()> {
let mut db = Database::new(output)?;
let registry = ParserRegistry::new();
for log_file in log_files {
eprintln!(
"Processing: {} ({})",
log_file.path.display(),
if log_file.compressed {
"compressed"
} else {
"plain"
}
);
let reader = log_file.reader()?;
process_reader(reader, &registry, &mut db, batch_size)?;
}
Ok(())
}
fn process_files_parallel(log_files: Vec<LogFile>, output: &str, batch_size: usize) -> Result<()> {
let num_threads = rayon::current_num_threads();
eprintln!("Processing {} files with {} threads", log_files.len(), num_threads);
// Channel for sending parsed entries to the DB writer
// Buffer size: enough batches to keep workers busy without excessive memory
let (sender, receiver) = bounded::<Vec<SignatureEntry>>(num_threads * 2);
// Shared counters for progress reporting
let total_lines = Arc::new(AtomicU64::new(0));
let parsed_lines = Arc::new(AtomicU64::new(0));
let error_count = Arc::new(AtomicU64::new(0));
// Spawn DB writer thread
let output_path = output.to_string();
let db_handle = thread::spawn(move || -> Result<()> {
let mut db = Database::new(&output_path)?;
for batch in receiver {
let tx = db.begin_transaction()?;
Database::insert_signature_batch(&tx, &batch)?;
tx.commit()?;
}
Ok(())
});
// Process files in parallel
let result: Result<()> = log_files
.into_par_iter()
.try_for_each(|log_file| {
let file_path = log_file.path.display().to_string();
let compressed = if log_file.compressed { "compressed" } else { "plain" };
eprintln!("Starting: {} ({})", file_path, compressed);
process_file_parallel(
log_file,
&sender,
batch_size,
&total_lines,
&parsed_lines,
&error_count,
)?;
eprintln!("Finished: {}", file_path);
Ok(())
});
// Close the channel so DB writer knows to stop
drop(sender);
// Wait for DB writer to finish
db_handle
.join()
.map_err(|_| anyhow!("DB writer thread panicked"))??;
// Print final stats
eprintln!(
"Total: {} lines read, {} parsed, {} errors",
total_lines.load(Ordering::Relaxed),
parsed_lines.load(Ordering::Relaxed),
error_count.load(Ordering::Relaxed)
);
result
}
fn process_file_parallel(
log_file: LogFile,
sender: &Sender<Vec<SignatureEntry>>,
batch_size: usize,
total_lines: &AtomicU64,
parsed_lines: &AtomicU64,
error_count: &AtomicU64,
) -> Result<()> {
let registry = ParserRegistry::new();
let reader = log_file.reader()?;
let mut batch: Vec<SignatureEntry> = Vec::with_capacity(batch_size);
let mut file_lines = 0u64;
let mut file_parsed = 0u64;
let mut file_errors = 0u64;
for line_result in reader.lines() {
let line = line_result?;
file_lines += 1;
if let Some(parse_result) = registry.parse(&line) {
match parse_result {
Ok(ParsedMessage::Signature(entry)) => {
batch.push(entry);
file_parsed += 1;
if batch.len() >= batch_size {
sender.send(std::mem::replace(
&mut batch,
Vec::with_capacity(batch_size),
))?;
}
}
Err(_) => {
file_errors += 1;
}
}
}
}
// Send remaining entries
if !batch.is_empty() {
sender.send(batch)?;
}
// Update shared counters
total_lines.fetch_add(file_lines, Ordering::Relaxed);
parsed_lines.fetch_add(file_parsed, Ordering::Relaxed);
error_count.fetch_add(file_errors, Ordering::Relaxed);
Ok(())
}
fn process_reader( fn process_reader(
reader: Box<dyn BufRead>, reader: Box<dyn BufRead>,
registry: &ParserRegistry, registry: &ParserRegistry,
@@ -142,7 +296,7 @@ fn process_reader(
} }
} }
if total_lines % 100_000 == 0 { if total_lines.is_multiple_of(100_000) {
let total_errors: u64 = error_counts.values().sum(); let total_errors: u64 = error_counts.values().sum();
eprintln!( eprintln!(
"Progress: {} lines read, {} parsed, {} errors", "Progress: {} lines read, {} parsed, {} errors",