Expansion improved

This commit is contained in:
Alexandr Mansurov
2026-02-20 21:53:01 +01:00
parent 43de2acade
commit 09fa289535
2 changed files with 497 additions and 217 deletions

View File

@@ -87,6 +87,10 @@ struct SearchArgs {
/// follow changeSessionId chains backward to session start (signature line)
#[arg(short = 'e', long = "expand")]
expand: bool,
/// Number of parallel threads (0 = use all available cores, 1 = sequential)
#[arg(long, default_value = "0")]
threads: usize,
}
fn parse_date(s: &str) -> Result<NaiveDate> {
@@ -104,6 +108,7 @@ fn main() -> Result<()> {
&search_args.query,
search_args.correlation_id,
search_args.expand,
search_args.threads,
),
}
}

View File

@@ -1,9 +1,9 @@
use anyhow::Result;
use rayon::prelude::*;
use regex::Regex;
use std::collections::{HashMap, HashSet};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::io::{BufRead, BufReader, Read as _, Seek, SeekFrom};
use std::sync::LazyLock;
use tempfile::TempDir;
use crate::files::read_log_file;
@@ -18,6 +18,11 @@ static CORRELATION_ID_RE: LazyLock<Regex> =
static SESSION_ID_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"sessionId=([^,\s]+)").unwrap());
/// Matches sessionDestroyed lines and captures the sid value.
/// Example: sessionDestroyed #s=8 sid=2010F74...node003 isnew=false
static SESSION_DESTROYED_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"sessionDestroyed\b.*?\bsid=([^,\s]+)").unwrap());
/// Matches changeSessionId messages and captures the long-form new and old session IDs.
/// Example: changeSessionId: newSessionId: sDF080BBD / DF080BBD...node011 replaces oldSessionId: sF9EE9D52 / F9EE9D52...node011
static CHANGE_SESSION_RE: LazyLock<Regex> = LazyLock::new(|| {
@@ -40,97 +45,270 @@ fn normalize_session_id(sid: &str) -> &str {
sid
}
fn build_thread_pool(threads: usize) -> Result<rayon::ThreadPool> {
let mut builder = rayon::ThreadPoolBuilder::new();
if threads > 0 {
builder = builder.num_threads(threads);
}
builder
.build()
.map_err(|e| anyhow::anyhow!("Failed to build thread pool: {}", e))
}
/// Split a file into byte-offset chunks for parallel processing.
/// Each chunk is a (start, end) pair. Threads adjust to line boundaries at runtime.
fn compute_chunks(file_size: u64, num_chunks: usize) -> Vec<(u64, u64)> {
if file_size == 0 || num_chunks == 0 {
return vec![];
}
let effective = num_chunks.min(file_size as usize);
let chunk_size = file_size / effective as u64;
(0..effective)
.map(|i| {
let start = i as u64 * chunk_size;
let end = if i == effective - 1 {
file_size
} else {
(i as u64 + 1) * chunk_size
};
(start, end)
})
.collect()
}
/// Returns true if parallel chunk processing can be used (plain file + multiple threads).
fn can_parallelize(file_path: &str, num_threads: usize) -> bool {
num_threads > 1 && !file_path.ends_with(".gz")
}
/// Stream all lines from a file (works with gzip and plain). Calls `process` for each line.
fn for_each_line_streaming<F>(file_path: &str, mut process: F) -> Result<()>
where
F: FnMut(&str),
{
let mut reader = read_log_file(file_path)?;
let mut line = String::new();
loop {
line.clear();
if reader.read_line(&mut line)? == 0 {
break;
}
process(line.trim_end());
}
Ok(())
}
/// Process lines in a byte-offset chunk of a plain file. Calls `process` for each complete line.
///
/// Chunk boundaries may fall mid-line. Convention:
/// - The chunk that started reading a line owns it (reads past `end` to finish it).
/// - The next chunk checks byte `start-1`: if it's `\n`, we're at a line start; otherwise
/// skip the partial first line (the previous chunk already handled it).
fn for_each_line_in_chunk<F>(file_path: &str, start: u64, end: u64, mut process: F) -> Result<()>
where
F: FnMut(&str),
{
let file = std::fs::File::open(file_path)?;
let mut reader = BufReader::with_capacity(256 * 1024, file);
let mut pos = start;
if start > 0 {
// Check if we're at a line boundary or mid-line
reader.seek(SeekFrom::Start(start - 1))?;
let mut byte = [0u8; 1];
reader.read_exact(&mut byte)?;
// Now positioned at `start`
if byte[0] != b'\n' {
// Mid-line: skip remainder (previous chunk owns this line)
let mut skip = String::new();
let n = reader.read_line(&mut skip)?;
pos += n as u64;
}
}
let mut line = String::new();
while pos < end {
line.clear();
let n = reader.read_line(&mut line)?;
if n == 0 {
break;
}
pos += n as u64;
process(line.trim_end());
}
Ok(())
}
pub fn run_search(
file_path: &str,
query: &str,
show_correlation_id: bool,
expand: bool,
threads: usize,
) -> Result<()> {
let pool = build_thread_pool(threads)?;
if expand {
run_search_expanded(file_path, query)
run_search_expanded(file_path, query, &pool)
} else {
run_search_simple(file_path, query, show_correlation_id)
run_search_simple(file_path, query, show_correlation_id, &pool)
}
}
fn run_search_simple(file_path: &str, query: &str, show_correlation_id: bool) -> Result<()> {
let mut reader = read_log_file(file_path)?;
let mut line = String::new();
let mut match_count = 0u64;
loop {
line.clear();
let bytes_read = reader.read_line(&mut line)?;
if bytes_read == 0 {
break;
}
let line_trimmed = line.trim_end();
if !line_trimmed.contains(query) {
continue;
}
let timestamp = SYSLOG_TIMESTAMP_RE
.captures(line_trimmed)
.map(|c| c.get(1).unwrap().as_str());
let msg = MSG_RE
.captures(line_trimmed)
.map(|c| c.get(1).unwrap().as_str());
let corr_id = if show_correlation_id {
CORRELATION_ID_RE
.captures(line_trimmed)
fn format_simple_match(line: &str, show_correlation_id: bool) -> String {
let ts = SYSLOG_TIMESTAMP_RE
.captures(line)
.map(|c| c.get(1).unwrap().as_str())
} else {
None
};
let ts_part = timestamp.unwrap_or("?");
let msg_part = msg.unwrap_or("<no msg field>");
if let Some(cid) = corr_id {
println!("[{}] [{}] {}", ts_part, cid, msg_part);
} else {
println!("[{}] {}", ts_part, msg_part);
.unwrap_or("?");
let msg = MSG_RE
.captures(line)
.map(|c| c.get(1).unwrap().as_str())
.unwrap_or("<no msg field>");
if show_correlation_id {
let cid = CORRELATION_ID_RE
.captures(line)
.map(|c| c.get(1).unwrap().as_str());
if let Some(cid) = cid {
return format!("[{}] [{}] {}", ts, cid, msg);
}
}
format!("[{}] {}", ts, msg)
}
fn run_search_simple(
file_path: &str,
query: &str,
show_correlation_id: bool,
pool: &rayon::ThreadPool,
) -> Result<()> {
let num_threads = pool.current_num_threads();
if !can_parallelize(file_path, num_threads) {
// Sequential: stream directly (works for gzip and plain)
let mut match_count = 0u64;
for_each_line_streaming(file_path, |trimmed| {
if trimmed.contains(query) {
println!("{}", format_simple_match(trimmed, show_correlation_id));
match_count += 1;
}
})?;
eprintln!("{} matching lines found", match_count);
return Ok(());
}
// Parallel: split plain file into chunks
let file_size = std::fs::metadata(file_path)?.len();
let chunks = compute_chunks(file_size, num_threads);
eprintln!(
"Searching with {} threads across {} chunks",
num_threads,
chunks.len()
);
let results: Vec<Result<(Vec<String>, u64)>> = pool.install(|| {
chunks
.par_iter()
.map(|&(start, end)| {
let mut lines = Vec::new();
let mut count = 0u64;
for_each_line_in_chunk(file_path, start, end, |trimmed| {
if trimmed.contains(query) {
lines.push(format_simple_match(trimmed, show_correlation_id));
count += 1;
}
})?;
Ok((lines, count))
})
.collect()
});
let mut total = 0u64;
for result in results {
let (lines, count) = result?;
for line in lines {
println!("{}", line);
}
total += count;
}
eprintln!("{} matching lines found", total);
Ok(())
}
// --- Expand mode ---
#[derive(Default)]
struct Pass1Result {
seed_session_ids: HashSet<String>,
seed_correlation_ids: HashSet<String>,
/// new_session_id (normalized) -> old_session_id (normalized)
change_session_map: HashMap<String, String>,
/// Session IDs (normalized) that have a signature line
sessions_with_signature: HashSet<String>,
/// Path to decompressed temp file (only for gzip files)
decompressed_path: Option<std::path::PathBuf>,
line_count: u64,
}
fn run_search_expanded(file_path: &str, query: &str) -> Result<()> {
let is_gzip = file_path.ends_with(".gz");
impl Pass1Result {
fn merge(mut self, other: Pass1Result) -> Self {
self.seed_session_ids.extend(other.seed_session_ids);
self.seed_correlation_ids.extend(other.seed_correlation_ids);
self.change_session_map.extend(other.change_session_map);
self.sessions_with_signature
.extend(other.sessions_with_signature);
self.line_count += other.line_count;
self
}
}
let temp_dir = if is_gzip {
let dir = TempDir::with_prefix("log_ingest_")?;
eprintln!("Temp directory: {}", dir.path().display());
Some(dir)
} else {
None
};
fn process_line_pass1(trimmed: &str, query: &str, result: &mut Pass1Result) {
let sid = SESSION_ID_RE
.captures(trimmed)
.and_then(|c| c.get(1))
.map(|m| normalize_session_id(m.as_str()));
if trimmed.contains(r#"msg="signature:"#)
&& let Some(s) = sid
{
result.sessions_with_signature.insert(s.to_string());
}
if trimmed.contains("changeSessionId:")
&& let Some(caps) = CHANGE_SESSION_RE.captures(trimmed)
{
let new_sid = normalize_session_id(caps.get(1).unwrap().as_str()).to_string();
let old_sid = normalize_session_id(caps.get(2).unwrap().as_str()).to_string();
result.change_session_map.insert(new_sid, old_sid);
}
if trimmed.contains(query) {
if let Some(s) = sid
&& s != "noSession"
{
result.seed_session_ids.insert(s.to_string());
}
if let Some(cid) = CORRELATION_ID_RE
.captures(trimmed)
.and_then(|c| c.get(1))
{
result
.seed_correlation_ids
.insert(cid.as_str().to_string());
}
}
result.line_count += 1;
}
fn run_search_expanded(
file_path: &str,
query: &str,
pool: &rayon::ThreadPool,
) -> Result<()> {
let num_threads = pool.current_num_threads();
let use_parallel = can_parallelize(file_path, num_threads);
// Pass 1: collect metadata
let pass1 = run_pass1(file_path, query, &temp_dir)?;
let pass1 = run_pass1(file_path, query, use_parallel, pool)?;
// Expansion
// Expansion (in-memory graph traversal — inherently sequential)
let (expanded_sids, expanded_cids) = expand_seeds(
&pass1.seed_session_ids,
&pass1.seed_correlation_ids,
@@ -149,120 +327,68 @@ fn run_search_expanded(file_path: &str, query: &str) -> Result<()> {
expanded_cids.len()
);
// Pass 2: filter and print
let pass2_path = pass1
.decompressed_path
.as_ref()
.map(|p| p.to_str().unwrap())
.unwrap_or(file_path);
let match_count = run_pass2(pass2_path, query, &expanded_sids, &expanded_cids)?;
// Pass 2: filter and print (re-reads file; for gzip this re-decompresses from stream)
let match_count = run_pass2(file_path, query, &expanded_sids, &expanded_cids, use_parallel, pool)?;
eprintln!("{} lines output", match_count);
// TempDir drops here, cleaning up automatically
Ok(())
}
fn run_pass1(file_path: &str, query: &str, temp_dir: &Option<TempDir>) -> Result<Pass1Result> {
let mut reader = read_log_file(file_path)?;
let mut line = String::new();
let mut seed_session_ids: HashSet<String> = HashSet::new();
let mut seed_correlation_ids: HashSet<String> = HashSet::new();
let mut change_session_map: HashMap<String, String> = HashMap::new();
let mut sessions_with_signature: HashSet<String> = HashSet::new();
let decompressed_path = temp_dir
.as_ref()
.map(|dir| dir.path().join("decompressed.log"));
let mut temp_writer: Option<BufWriter<std::fs::File>> =
if let Some(ref path) = decompressed_path {
let file = std::fs::File::create(path)?;
Some(BufWriter::with_capacity(256 * 1024, file))
} else {
None
};
let mut line_count = 0u64;
loop {
line.clear();
let bytes_read = reader.read_line(&mut line)?;
if bytes_read == 0 {
break;
}
line_count += 1;
// Write decompressed content to temp file for gzip
if let Some(ref mut writer) = temp_writer {
writer.write_all(line.as_bytes())?;
fn run_pass1(
file_path: &str,
query: &str,
use_parallel: bool,
pool: &rayon::ThreadPool,
) -> Result<Pass1Result> {
if !use_parallel {
eprintln!("Pass 1: scanning sequentially...");
let mut result = Pass1Result::default();
for_each_line_streaming(file_path, |trimmed| {
process_line_pass1(trimmed, query, &mut result);
})?;
eprintln!(
"Pass 1 complete: {} lines, {} seed sessions, {} seed correlations, {} session changes",
result.line_count,
result.seed_session_ids.len(),
result.seed_correlation_ids.len(),
result.change_session_map.len()
);
return Ok(result);
}
let trimmed = line.trim_end();
let file_size = std::fs::metadata(file_path)?.len();
let num_threads = pool.current_num_threads();
let chunks = compute_chunks(file_size, num_threads);
// Extract sessionId (normalized) from every line
let sid = SESSION_ID_RE
.captures(trimmed)
.and_then(|c| c.get(1))
.map(|m| normalize_session_id(m.as_str()));
eprintln!("Pass 1: scanning with {} threads...", chunks.len());
// Check for signature lines
if trimmed.contains(r#"msg="signature:"#)
&& let Some(s) = sid
{
sessions_with_signature.insert(s.to_string());
}
let results: Vec<Result<Pass1Result>> = pool.install(|| {
chunks
.par_iter()
.map(|&(start, end)| {
let mut chunk_result = Pass1Result::default();
for_each_line_in_chunk(file_path, start, end, |trimmed| {
process_line_pass1(trimmed, query, &mut chunk_result);
})?;
Ok(chunk_result)
})
.collect()
});
// Check for changeSessionId lines
if trimmed.contains("changeSessionId:")
&& let Some(caps) = CHANGE_SESSION_RE.captures(trimmed)
{
let new_sid = normalize_session_id(caps.get(1).unwrap().as_str()).to_string();
let old_sid = normalize_session_id(caps.get(2).unwrap().as_str()).to_string();
change_session_map.insert(new_sid, old_sid);
}
// Check if this line matches the query
if trimmed.contains(query) {
if let Some(s) = sid
&& s != "noSession"
{
seed_session_ids.insert(s.to_string());
}
let cid = CORRELATION_ID_RE
.captures(trimmed)
.and_then(|c| c.get(1))
.map(|m| m.as_str().to_string());
if let Some(c) = cid {
seed_correlation_ids.insert(c);
}
}
if line_count.is_multiple_of(1_000_000) {
eprintln!("Pass 1: {} lines scanned...", line_count);
}
}
if let Some(ref mut writer) = temp_writer {
writer.flush()?;
let mut merged = Pass1Result::default();
for r in results {
merged = merged.merge(r?);
}
eprintln!(
"Pass 1 complete: {} lines, {} seed sessions, {} seed correlations, {} session changes",
line_count,
seed_session_ids.len(),
seed_correlation_ids.len(),
change_session_map.len()
merged.line_count,
merged.seed_session_ids.len(),
merged.seed_correlation_ids.len(),
merged.change_session_map.len()
);
Ok(Pass1Result {
seed_session_ids,
seed_correlation_ids,
change_session_map,
sessions_with_signature,
decompressed_path,
})
Ok(merged)
}
/// Expand seed session IDs by following changeSessionId chains backward.
@@ -291,27 +417,7 @@ fn expand_seeds(
(expanded_sids, seed_cids.clone())
}
fn run_pass2(
file_path: &str,
query: &str,
expanded_sids: &HashSet<String>,
expanded_cids: &HashSet<String>,
) -> Result<u64> {
// Pass 2 always reads a plain file (original plain or decompressed temp)
let file = std::fs::File::open(file_path)?;
let mut reader = BufReader::with_capacity(256 * 1024, file);
let mut line = String::new();
let mut output_count = 0u64;
loop {
line.clear();
let bytes_read = reader.read_line(&mut line)?;
if bytes_read == 0 {
break;
}
let trimmed = line.trim_end();
fn format_pass2_match(trimmed: &str, query: &str, expanded_sids: &HashSet<String>, expanded_cids: &HashSet<String>) -> Option<String> {
let is_direct_match = trimmed.contains(query);
let sid = SESSION_ID_RE
@@ -328,7 +434,7 @@ fn run_pass2(
let cid_match = cid.is_some_and(|c| expanded_cids.contains(c));
if !is_direct_match && !sid_match && !cid_match {
continue;
return None;
}
let timestamp = SYSLOG_TIMESTAMP_RE
@@ -345,20 +451,109 @@ fn run_pass2(
let cid_display = cid.unwrap_or("-");
let prefix = if is_direct_match { "*" } else { " " };
println!(
Some(format!(
"{} [{}] [cid:{}] [sid:{}] {}",
prefix, timestamp, cid_display, sid_display, msg
);
))
}
output_count += 1;
fn run_pass2(
file_path: &str,
query: &str,
expanded_sids: &HashSet<String>,
expanded_cids: &HashSet<String>,
use_parallel: bool,
pool: &rayon::ThreadPool,
) -> Result<u64> {
if !use_parallel {
eprintln!("Pass 2: filtering sequentially...");
let mut count = 0u64;
let mut reader = read_log_file(file_path)?;
let mut line = String::new();
// Track sessions still alive; stop when all are destroyed
let mut remaining_sids: HashSet<&str> =
expanded_sids.iter().map(|s| s.as_str()).collect();
loop {
line.clear();
if reader.read_line(&mut line)? == 0 {
break;
}
let trimmed = line.trim_end();
// Check for sessionDestroyed
if trimmed.contains("sessionDestroyed")
&& let Some(caps) = SESSION_DESTROYED_RE.captures(trimmed)
{
let destroyed_sid = normalize_session_id(caps.get(1).unwrap().as_str());
if remaining_sids.remove(destroyed_sid) {
// Still output the sessionDestroyed line itself if it matches
if let Some(formatted) =
format_pass2_match(trimmed, query, expanded_sids, expanded_cids)
{
println!("{}", formatted);
count += 1;
}
if remaining_sids.is_empty() {
eprintln!(
"All {} expanded sessions destroyed, stopping early",
expanded_sids.len()
);
break;
}
continue;
}
}
Ok(output_count)
if let Some(formatted) =
format_pass2_match(trimmed, query, expanded_sids, expanded_cids)
{
println!("{}", formatted);
count += 1;
}
}
return Ok(count);
}
let file_size = std::fs::metadata(file_path)?.len();
let num_threads = pool.current_num_threads();
let chunks = compute_chunks(file_size, num_threads);
eprintln!("Pass 2: filtering with {} threads...", chunks.len());
let results: Vec<Result<(Vec<String>, u64)>> = pool.install(|| {
chunks
.par_iter()
.map(|&(start, end)| {
let mut lines = Vec::new();
let mut count = 0u64;
for_each_line_in_chunk(file_path, start, end, |trimmed| {
if let Some(formatted) = format_pass2_match(trimmed, query, expanded_sids, expanded_cids) {
lines.push(formatted);
count += 1;
}
})?;
Ok((lines, count))
})
.collect()
});
let mut total = 0u64;
for result in results {
let (lines, count) = result?;
for line in lines {
println!("{}", line);
}
total += count;
}
Ok(total)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
#[test]
fn test_syslog_timestamp_extraction() {
@@ -588,7 +783,7 @@ mod tests {
// Expected output: lines 1 and 2 (both have session AAAA or correlation c1).
// Lines 3,4 (session BBBB) should NOT be included (AAAA is old, not new).
// Line 5 (session XXXX) should NOT be included.
run_search(log_path.to_str().unwrap(), "findme", false, true)?;
run_search(log_path.to_str().unwrap(), "findme", false, true, 1)?;
Ok(())
}
@@ -622,7 +817,87 @@ mod tests {
// "findme" matches line 4 (session NEW). changeSessionId maps NEW→OLD.
// OLD has a signature → include OLD but stop recursing.
// Expected: lines 1-4 all included (sessions OLD and NEW).
run_search(log_path.to_str().unwrap(), "findme", false, true)?;
run_search(log_path.to_str().unwrap(), "findme", false, true, 1)?;
Ok(())
}
// --- Chunk boundary tests ---
#[test]
fn test_chunk_boundary_no_lost_lines() -> Result<()> {
// Write lines of known byte sizes, then split exactly on a line boundary
let dir = tempfile::tempdir()?;
let path = dir.path().join("test.log");
let mut f = std::fs::File::create(&path)?;
// 3 lines: "line1\n", "line2\n", "line3\n" (6 bytes each)
write!(f, "line1\nline2\nline3\n")?;
drop(f);
// Split at byte 6 (exactly between line1 and line2)
let mut collected = Vec::new();
for_each_line_in_chunk(path.to_str().unwrap(), 0, 6, |l| {
collected.push(l.to_string());
})?;
for_each_line_in_chunk(path.to_str().unwrap(), 6, 18, |l| {
collected.push(l.to_string());
})?;
assert_eq!(collected, vec!["line1", "line2", "line3"]);
Ok(())
}
#[test]
fn test_chunk_boundary_mid_line() -> Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("test.log");
let mut f = std::fs::File::create(&path)?;
write!(f, "line1\nline2\nline3\n")?;
drop(f);
// Split at byte 3 (middle of "line1")
let mut collected = Vec::new();
for_each_line_in_chunk(path.to_str().unwrap(), 0, 3, |l| {
collected.push(l.to_string());
})?;
for_each_line_in_chunk(path.to_str().unwrap(), 3, 18, |l| {
collected.push(l.to_string());
})?;
assert_eq!(collected, vec!["line1", "line2", "line3"]);
Ok(())
}
#[test]
fn test_chunk_four_way_split() -> Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("test.log");
let mut f = std::fs::File::create(&path)?;
for i in 0..20 {
writeln!(f, "line {:02}", i)?;
}
drop(f);
let file_size = std::fs::metadata(path.as_path())?.len();
let chunks = compute_chunks(file_size, 4);
let mut collected = Vec::new();
for (start, end) in chunks {
for_each_line_in_chunk(path.to_str().unwrap(), start, end, |l| {
collected.push(l.to_string());
})?;
}
let expected: Vec<String> = (0..20).map(|i| format!("line {:02}", i)).collect();
assert_eq!(collected, expected);
Ok(())
}
// --- sessionDestroyed regex test ---
#[test]
fn test_session_destroyed_regex() {
let line = "sessionDestroyed #s=8 sid=2010F74498079D00A5647F3777545A64.node003 isnew=false age=693s last=644s attrs=loginState,lastRequest,userSessionData";
let caps = SESSION_DESTROYED_RE.captures(line).unwrap();
assert_eq!(
caps.get(1).unwrap().as_str(),
"2010F74498079D00A5647F3777545A64.node003"
);
}
}