diff --git a/src/main.rs b/src/main.rs index 5801d47..831b26f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -87,6 +87,10 @@ struct SearchArgs { /// follow changeSessionId chains backward to session start (signature line) #[arg(short = 'e', long = "expand")] expand: bool, + + /// Number of parallel threads (0 = use all available cores, 1 = sequential) + #[arg(long, default_value = "0")] + threads: usize, } fn parse_date(s: &str) -> Result { @@ -104,6 +108,7 @@ fn main() -> Result<()> { &search_args.query, search_args.correlation_id, search_args.expand, + search_args.threads, ), } } diff --git a/src/search.rs b/src/search.rs index 05dfceb..bf538ba 100644 --- a/src/search.rs +++ b/src/search.rs @@ -1,9 +1,9 @@ use anyhow::Result; +use rayon::prelude::*; use regex::Regex; use std::collections::{HashMap, HashSet}; -use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::io::{BufRead, BufReader, Read as _, Seek, SeekFrom}; use std::sync::LazyLock; -use tempfile::TempDir; use crate::files::read_log_file; @@ -18,6 +18,11 @@ static CORRELATION_ID_RE: LazyLock = static SESSION_ID_RE: LazyLock = LazyLock::new(|| Regex::new(r"sessionId=([^,\s]+)").unwrap()); +/// Matches sessionDestroyed lines and captures the sid value. +/// Example: sessionDestroyed #s=8 sid=2010F74...node003 isnew=false +static SESSION_DESTROYED_RE: LazyLock = + LazyLock::new(|| Regex::new(r"sessionDestroyed\b.*?\bsid=([^,\s]+)").unwrap()); + /// Matches changeSessionId messages and captures the long-form new and old session IDs. /// Example: changeSessionId: newSessionId: sDF080BBD / DF080BBD...node011 replaces oldSessionId: sF9EE9D52 / F9EE9D52...node011 static CHANGE_SESSION_RE: LazyLock = LazyLock::new(|| { @@ -40,97 +45,270 @@ fn normalize_session_id(sid: &str) -> &str { sid } +fn build_thread_pool(threads: usize) -> Result { + let mut builder = rayon::ThreadPoolBuilder::new(); + if threads > 0 { + builder = builder.num_threads(threads); + } + builder + .build() + .map_err(|e| anyhow::anyhow!("Failed to build thread pool: {}", e)) +} + +/// Split a file into byte-offset chunks for parallel processing. +/// Each chunk is a (start, end) pair. Threads adjust to line boundaries at runtime. +fn compute_chunks(file_size: u64, num_chunks: usize) -> Vec<(u64, u64)> { + if file_size == 0 || num_chunks == 0 { + return vec![]; + } + let effective = num_chunks.min(file_size as usize); + let chunk_size = file_size / effective as u64; + (0..effective) + .map(|i| { + let start = i as u64 * chunk_size; + let end = if i == effective - 1 { + file_size + } else { + (i as u64 + 1) * chunk_size + }; + (start, end) + }) + .collect() +} + +/// Returns true if parallel chunk processing can be used (plain file + multiple threads). +fn can_parallelize(file_path: &str, num_threads: usize) -> bool { + num_threads > 1 && !file_path.ends_with(".gz") +} + +/// Stream all lines from a file (works with gzip and plain). Calls `process` for each line. +fn for_each_line_streaming(file_path: &str, mut process: F) -> Result<()> +where + F: FnMut(&str), +{ + let mut reader = read_log_file(file_path)?; + let mut line = String::new(); + loop { + line.clear(); + if reader.read_line(&mut line)? == 0 { + break; + } + process(line.trim_end()); + } + Ok(()) +} + +/// Process lines in a byte-offset chunk of a plain file. Calls `process` for each complete line. +/// +/// Chunk boundaries may fall mid-line. Convention: +/// - The chunk that started reading a line owns it (reads past `end` to finish it). +/// - The next chunk checks byte `start-1`: if it's `\n`, we're at a line start; otherwise +/// skip the partial first line (the previous chunk already handled it). +fn for_each_line_in_chunk(file_path: &str, start: u64, end: u64, mut process: F) -> Result<()> +where + F: FnMut(&str), +{ + let file = std::fs::File::open(file_path)?; + let mut reader = BufReader::with_capacity(256 * 1024, file); + let mut pos = start; + + if start > 0 { + // Check if we're at a line boundary or mid-line + reader.seek(SeekFrom::Start(start - 1))?; + let mut byte = [0u8; 1]; + reader.read_exact(&mut byte)?; + // Now positioned at `start` + if byte[0] != b'\n' { + // Mid-line: skip remainder (previous chunk owns this line) + let mut skip = String::new(); + let n = reader.read_line(&mut skip)?; + pos += n as u64; + } + } + + let mut line = String::new(); + while pos < end { + line.clear(); + let n = reader.read_line(&mut line)?; + if n == 0 { + break; + } + pos += n as u64; + process(line.trim_end()); + } + Ok(()) +} + pub fn run_search( file_path: &str, query: &str, show_correlation_id: bool, expand: bool, + threads: usize, ) -> Result<()> { + let pool = build_thread_pool(threads)?; if expand { - run_search_expanded(file_path, query) + run_search_expanded(file_path, query, &pool) } else { - run_search_simple(file_path, query, show_correlation_id) + run_search_simple(file_path, query, show_correlation_id, &pool) } } -fn run_search_simple(file_path: &str, query: &str, show_correlation_id: bool) -> Result<()> { - let mut reader = read_log_file(file_path)?; - let mut line = String::new(); - let mut match_count = 0u64; - - loop { - line.clear(); - let bytes_read = reader.read_line(&mut line)?; - if bytes_read == 0 { - break; - } - - let line_trimmed = line.trim_end(); - - if !line_trimmed.contains(query) { - continue; - } - - let timestamp = SYSLOG_TIMESTAMP_RE - .captures(line_trimmed) +fn format_simple_match(line: &str, show_correlation_id: bool) -> String { + let ts = SYSLOG_TIMESTAMP_RE + .captures(line) + .map(|c| c.get(1).unwrap().as_str()) + .unwrap_or("?"); + let msg = MSG_RE + .captures(line) + .map(|c| c.get(1).unwrap().as_str()) + .unwrap_or(""); + if show_correlation_id { + let cid = CORRELATION_ID_RE + .captures(line) .map(|c| c.get(1).unwrap().as_str()); - - let msg = MSG_RE - .captures(line_trimmed) - .map(|c| c.get(1).unwrap().as_str()); - - let corr_id = if show_correlation_id { - CORRELATION_ID_RE - .captures(line_trimmed) - .map(|c| c.get(1).unwrap().as_str()) - } else { - None - }; - - let ts_part = timestamp.unwrap_or("?"); - let msg_part = msg.unwrap_or(""); - - if let Some(cid) = corr_id { - println!("[{}] [{}] {}", ts_part, cid, msg_part); - } else { - println!("[{}] {}", ts_part, msg_part); + if let Some(cid) = cid { + return format!("[{}] [{}] {}", ts, cid, msg); } + } + format!("[{}] {}", ts, msg) +} - match_count += 1; +fn run_search_simple( + file_path: &str, + query: &str, + show_correlation_id: bool, + pool: &rayon::ThreadPool, +) -> Result<()> { + let num_threads = pool.current_num_threads(); + + if !can_parallelize(file_path, num_threads) { + // Sequential: stream directly (works for gzip and plain) + let mut match_count = 0u64; + for_each_line_streaming(file_path, |trimmed| { + if trimmed.contains(query) { + println!("{}", format_simple_match(trimmed, show_correlation_id)); + match_count += 1; + } + })?; + eprintln!("{} matching lines found", match_count); + return Ok(()); } - eprintln!("{} matching lines found", match_count); + // Parallel: split plain file into chunks + let file_size = std::fs::metadata(file_path)?.len(); + let chunks = compute_chunks(file_size, num_threads); + + eprintln!( + "Searching with {} threads across {} chunks", + num_threads, + chunks.len() + ); + + let results: Vec, u64)>> = pool.install(|| { + chunks + .par_iter() + .map(|&(start, end)| { + let mut lines = Vec::new(); + let mut count = 0u64; + for_each_line_in_chunk(file_path, start, end, |trimmed| { + if trimmed.contains(query) { + lines.push(format_simple_match(trimmed, show_correlation_id)); + count += 1; + } + })?; + Ok((lines, count)) + }) + .collect() + }); + + let mut total = 0u64; + for result in results { + let (lines, count) = result?; + for line in lines { + println!("{}", line); + } + total += count; + } + + eprintln!("{} matching lines found", total); Ok(()) } // --- Expand mode --- +#[derive(Default)] struct Pass1Result { seed_session_ids: HashSet, seed_correlation_ids: HashSet, - /// new_session_id (normalized) -> old_session_id (normalized) change_session_map: HashMap, - /// Session IDs (normalized) that have a signature line sessions_with_signature: HashSet, - /// Path to decompressed temp file (only for gzip files) - decompressed_path: Option, + line_count: u64, } -fn run_search_expanded(file_path: &str, query: &str) -> Result<()> { - let is_gzip = file_path.ends_with(".gz"); +impl Pass1Result { + fn merge(mut self, other: Pass1Result) -> Self { + self.seed_session_ids.extend(other.seed_session_ids); + self.seed_correlation_ids.extend(other.seed_correlation_ids); + self.change_session_map.extend(other.change_session_map); + self.sessions_with_signature + .extend(other.sessions_with_signature); + self.line_count += other.line_count; + self + } +} - let temp_dir = if is_gzip { - let dir = TempDir::with_prefix("log_ingest_")?; - eprintln!("Temp directory: {}", dir.path().display()); - Some(dir) - } else { - None - }; +fn process_line_pass1(trimmed: &str, query: &str, result: &mut Pass1Result) { + let sid = SESSION_ID_RE + .captures(trimmed) + .and_then(|c| c.get(1)) + .map(|m| normalize_session_id(m.as_str())); + + if trimmed.contains(r#"msg="signature:"#) + && let Some(s) = sid + { + result.sessions_with_signature.insert(s.to_string()); + } + + if trimmed.contains("changeSessionId:") + && let Some(caps) = CHANGE_SESSION_RE.captures(trimmed) + { + let new_sid = normalize_session_id(caps.get(1).unwrap().as_str()).to_string(); + let old_sid = normalize_session_id(caps.get(2).unwrap().as_str()).to_string(); + result.change_session_map.insert(new_sid, old_sid); + } + + if trimmed.contains(query) { + if let Some(s) = sid + && s != "noSession" + { + result.seed_session_ids.insert(s.to_string()); + } + if let Some(cid) = CORRELATION_ID_RE + .captures(trimmed) + .and_then(|c| c.get(1)) + { + result + .seed_correlation_ids + .insert(cid.as_str().to_string()); + } + } + + result.line_count += 1; +} + +fn run_search_expanded( + file_path: &str, + query: &str, + pool: &rayon::ThreadPool, +) -> Result<()> { + let num_threads = pool.current_num_threads(); + let use_parallel = can_parallelize(file_path, num_threads); // Pass 1: collect metadata - let pass1 = run_pass1(file_path, query, &temp_dir)?; + let pass1 = run_pass1(file_path, query, use_parallel, pool)?; - // Expansion + // Expansion (in-memory graph traversal — inherently sequential) let (expanded_sids, expanded_cids) = expand_seeds( &pass1.seed_session_ids, &pass1.seed_correlation_ids, @@ -149,120 +327,68 @@ fn run_search_expanded(file_path: &str, query: &str) -> Result<()> { expanded_cids.len() ); - // Pass 2: filter and print - let pass2_path = pass1 - .decompressed_path - .as_ref() - .map(|p| p.to_str().unwrap()) - .unwrap_or(file_path); - - let match_count = run_pass2(pass2_path, query, &expanded_sids, &expanded_cids)?; + // Pass 2: filter and print (re-reads file; for gzip this re-decompresses from stream) + let match_count = run_pass2(file_path, query, &expanded_sids, &expanded_cids, use_parallel, pool)?; eprintln!("{} lines output", match_count); - // TempDir drops here, cleaning up automatically Ok(()) } -fn run_pass1(file_path: &str, query: &str, temp_dir: &Option) -> Result { - let mut reader = read_log_file(file_path)?; - let mut line = String::new(); - - let mut seed_session_ids: HashSet = HashSet::new(); - let mut seed_correlation_ids: HashSet = HashSet::new(); - let mut change_session_map: HashMap = HashMap::new(); - let mut sessions_with_signature: HashSet = HashSet::new(); - - let decompressed_path = temp_dir - .as_ref() - .map(|dir| dir.path().join("decompressed.log")); - - let mut temp_writer: Option> = - if let Some(ref path) = decompressed_path { - let file = std::fs::File::create(path)?; - Some(BufWriter::with_capacity(256 * 1024, file)) - } else { - None - }; - - let mut line_count = 0u64; - - loop { - line.clear(); - let bytes_read = reader.read_line(&mut line)?; - if bytes_read == 0 { - break; - } - line_count += 1; - - // Write decompressed content to temp file for gzip - if let Some(ref mut writer) = temp_writer { - writer.write_all(line.as_bytes())?; - } - - let trimmed = line.trim_end(); - - // Extract sessionId (normalized) from every line - let sid = SESSION_ID_RE - .captures(trimmed) - .and_then(|c| c.get(1)) - .map(|m| normalize_session_id(m.as_str())); - - // Check for signature lines - if trimmed.contains(r#"msg="signature:"#) - && let Some(s) = sid - { - sessions_with_signature.insert(s.to_string()); - } - - // Check for changeSessionId lines - if trimmed.contains("changeSessionId:") - && let Some(caps) = CHANGE_SESSION_RE.captures(trimmed) - { - let new_sid = normalize_session_id(caps.get(1).unwrap().as_str()).to_string(); - let old_sid = normalize_session_id(caps.get(2).unwrap().as_str()).to_string(); - change_session_map.insert(new_sid, old_sid); - } - - // Check if this line matches the query - if trimmed.contains(query) { - if let Some(s) = sid - && s != "noSession" - { - seed_session_ids.insert(s.to_string()); - } - let cid = CORRELATION_ID_RE - .captures(trimmed) - .and_then(|c| c.get(1)) - .map(|m| m.as_str().to_string()); - if let Some(c) = cid { - seed_correlation_ids.insert(c); - } - } - - if line_count.is_multiple_of(1_000_000) { - eprintln!("Pass 1: {} lines scanned...", line_count); - } +fn run_pass1( + file_path: &str, + query: &str, + use_parallel: bool, + pool: &rayon::ThreadPool, +) -> Result { + if !use_parallel { + eprintln!("Pass 1: scanning sequentially..."); + let mut result = Pass1Result::default(); + for_each_line_streaming(file_path, |trimmed| { + process_line_pass1(trimmed, query, &mut result); + })?; + eprintln!( + "Pass 1 complete: {} lines, {} seed sessions, {} seed correlations, {} session changes", + result.line_count, + result.seed_session_ids.len(), + result.seed_correlation_ids.len(), + result.change_session_map.len() + ); + return Ok(result); } - if let Some(ref mut writer) = temp_writer { - writer.flush()?; + let file_size = std::fs::metadata(file_path)?.len(); + let num_threads = pool.current_num_threads(); + let chunks = compute_chunks(file_size, num_threads); + + eprintln!("Pass 1: scanning with {} threads...", chunks.len()); + + let results: Vec> = pool.install(|| { + chunks + .par_iter() + .map(|&(start, end)| { + let mut chunk_result = Pass1Result::default(); + for_each_line_in_chunk(file_path, start, end, |trimmed| { + process_line_pass1(trimmed, query, &mut chunk_result); + })?; + Ok(chunk_result) + }) + .collect() + }); + + let mut merged = Pass1Result::default(); + for r in results { + merged = merged.merge(r?); } eprintln!( "Pass 1 complete: {} lines, {} seed sessions, {} seed correlations, {} session changes", - line_count, - seed_session_ids.len(), - seed_correlation_ids.len(), - change_session_map.len() + merged.line_count, + merged.seed_session_ids.len(), + merged.seed_correlation_ids.len(), + merged.change_session_map.len() ); - Ok(Pass1Result { - seed_session_ids, - seed_correlation_ids, - change_session_map, - sessions_with_signature, - decompressed_path, - }) + Ok(merged) } /// Expand seed session IDs by following changeSessionId chains backward. @@ -291,74 +417,143 @@ fn expand_seeds( (expanded_sids, seed_cids.clone()) } +fn format_pass2_match(trimmed: &str, query: &str, expanded_sids: &HashSet, expanded_cids: &HashSet) -> Option { + let is_direct_match = trimmed.contains(query); + + let sid = SESSION_ID_RE + .captures(trimmed) + .and_then(|c| c.get(1)) + .map(|m| normalize_session_id(m.as_str())); + + let cid = CORRELATION_ID_RE + .captures(trimmed) + .and_then(|c| c.get(1)) + .map(|m| m.as_str()); + + let sid_match = sid.is_some_and(|s| expanded_sids.contains(s)); + let cid_match = cid.is_some_and(|c| expanded_cids.contains(c)); + + if !is_direct_match && !sid_match && !cid_match { + return None; + } + + let timestamp = SYSLOG_TIMESTAMP_RE + .captures(trimmed) + .map(|c| c.get(1).unwrap().as_str()) + .unwrap_or("?"); + + let msg = MSG_RE + .captures(trimmed) + .map(|c| c.get(1).unwrap().as_str()) + .unwrap_or(""); + + let sid_display = sid.unwrap_or("-"); + let cid_display = cid.unwrap_or("-"); + let prefix = if is_direct_match { "*" } else { " " }; + + Some(format!( + "{} [{}] [cid:{}] [sid:{}] {}", + prefix, timestamp, cid_display, sid_display, msg + )) +} + fn run_pass2( file_path: &str, query: &str, expanded_sids: &HashSet, expanded_cids: &HashSet, + use_parallel: bool, + pool: &rayon::ThreadPool, ) -> Result { - // Pass 2 always reads a plain file (original plain or decompressed temp) - let file = std::fs::File::open(file_path)?; - let mut reader = BufReader::with_capacity(256 * 1024, file); - let mut line = String::new(); - let mut output_count = 0u64; + if !use_parallel { + eprintln!("Pass 2: filtering sequentially..."); + let mut count = 0u64; + let mut reader = read_log_file(file_path)?; + let mut line = String::new(); + // Track sessions still alive; stop when all are destroyed + let mut remaining_sids: HashSet<&str> = + expanded_sids.iter().map(|s| s.as_str()).collect(); - loop { - line.clear(); - let bytes_read = reader.read_line(&mut line)?; - if bytes_read == 0 { - break; + loop { + line.clear(); + if reader.read_line(&mut line)? == 0 { + break; + } + let trimmed = line.trim_end(); + + // Check for sessionDestroyed + if trimmed.contains("sessionDestroyed") + && let Some(caps) = SESSION_DESTROYED_RE.captures(trimmed) + { + let destroyed_sid = normalize_session_id(caps.get(1).unwrap().as_str()); + if remaining_sids.remove(destroyed_sid) { + // Still output the sessionDestroyed line itself if it matches + if let Some(formatted) = + format_pass2_match(trimmed, query, expanded_sids, expanded_cids) + { + println!("{}", formatted); + count += 1; + } + if remaining_sids.is_empty() { + eprintln!( + "All {} expanded sessions destroyed, stopping early", + expanded_sids.len() + ); + break; + } + continue; + } + } + + if let Some(formatted) = + format_pass2_match(trimmed, query, expanded_sids, expanded_cids) + { + println!("{}", formatted); + count += 1; + } } - - let trimmed = line.trim_end(); - - let is_direct_match = trimmed.contains(query); - - let sid = SESSION_ID_RE - .captures(trimmed) - .and_then(|c| c.get(1)) - .map(|m| normalize_session_id(m.as_str())); - - let cid = CORRELATION_ID_RE - .captures(trimmed) - .and_then(|c| c.get(1)) - .map(|m| m.as_str()); - - let sid_match = sid.is_some_and(|s| expanded_sids.contains(s)); - let cid_match = cid.is_some_and(|c| expanded_cids.contains(c)); - - if !is_direct_match && !sid_match && !cid_match { - continue; - } - - let timestamp = SYSLOG_TIMESTAMP_RE - .captures(trimmed) - .map(|c| c.get(1).unwrap().as_str()) - .unwrap_or("?"); - - let msg = MSG_RE - .captures(trimmed) - .map(|c| c.get(1).unwrap().as_str()) - .unwrap_or(""); - - let sid_display = sid.unwrap_or("-"); - let cid_display = cid.unwrap_or("-"); - let prefix = if is_direct_match { "*" } else { " " }; - - println!( - "{} [{}] [cid:{}] [sid:{}] {}", - prefix, timestamp, cid_display, sid_display, msg - ); - - output_count += 1; + return Ok(count); } - Ok(output_count) + let file_size = std::fs::metadata(file_path)?.len(); + let num_threads = pool.current_num_threads(); + let chunks = compute_chunks(file_size, num_threads); + + eprintln!("Pass 2: filtering with {} threads...", chunks.len()); + + let results: Vec, u64)>> = pool.install(|| { + chunks + .par_iter() + .map(|&(start, end)| { + let mut lines = Vec::new(); + let mut count = 0u64; + for_each_line_in_chunk(file_path, start, end, |trimmed| { + if let Some(formatted) = format_pass2_match(trimmed, query, expanded_sids, expanded_cids) { + lines.push(formatted); + count += 1; + } + })?; + Ok((lines, count)) + }) + .collect() + }); + + let mut total = 0u64; + for result in results { + let (lines, count) = result?; + for line in lines { + println!("{}", line); + } + total += count; + } + + Ok(total) } #[cfg(test)] mod tests { use super::*; + use std::io::Write; #[test] fn test_syslog_timestamp_extraction() { @@ -588,7 +783,7 @@ mod tests { // Expected output: lines 1 and 2 (both have session AAAA or correlation c1). // Lines 3,4 (session BBBB) should NOT be included (AAAA is old, not new). // Line 5 (session XXXX) should NOT be included. - run_search(log_path.to_str().unwrap(), "findme", false, true)?; + run_search(log_path.to_str().unwrap(), "findme", false, true, 1)?; Ok(()) } @@ -622,7 +817,87 @@ mod tests { // "findme" matches line 4 (session NEW). changeSessionId maps NEW→OLD. // OLD has a signature → include OLD but stop recursing. // Expected: lines 1-4 all included (sessions OLD and NEW). - run_search(log_path.to_str().unwrap(), "findme", false, true)?; + run_search(log_path.to_str().unwrap(), "findme", false, true, 1)?; Ok(()) } + + // --- Chunk boundary tests --- + + #[test] + fn test_chunk_boundary_no_lost_lines() -> Result<()> { + // Write lines of known byte sizes, then split exactly on a line boundary + let dir = tempfile::tempdir()?; + let path = dir.path().join("test.log"); + let mut f = std::fs::File::create(&path)?; + // 3 lines: "line1\n", "line2\n", "line3\n" (6 bytes each) + write!(f, "line1\nline2\nline3\n")?; + drop(f); + + // Split at byte 6 (exactly between line1 and line2) + let mut collected = Vec::new(); + for_each_line_in_chunk(path.to_str().unwrap(), 0, 6, |l| { + collected.push(l.to_string()); + })?; + for_each_line_in_chunk(path.to_str().unwrap(), 6, 18, |l| { + collected.push(l.to_string()); + })?; + assert_eq!(collected, vec!["line1", "line2", "line3"]); + Ok(()) + } + + #[test] + fn test_chunk_boundary_mid_line() -> Result<()> { + let dir = tempfile::tempdir()?; + let path = dir.path().join("test.log"); + let mut f = std::fs::File::create(&path)?; + write!(f, "line1\nline2\nline3\n")?; + drop(f); + + // Split at byte 3 (middle of "line1") + let mut collected = Vec::new(); + for_each_line_in_chunk(path.to_str().unwrap(), 0, 3, |l| { + collected.push(l.to_string()); + })?; + for_each_line_in_chunk(path.to_str().unwrap(), 3, 18, |l| { + collected.push(l.to_string()); + })?; + assert_eq!(collected, vec!["line1", "line2", "line3"]); + Ok(()) + } + + #[test] + fn test_chunk_four_way_split() -> Result<()> { + let dir = tempfile::tempdir()?; + let path = dir.path().join("test.log"); + let mut f = std::fs::File::create(&path)?; + for i in 0..20 { + writeln!(f, "line {:02}", i)?; + } + drop(f); + + let file_size = std::fs::metadata(path.as_path())?.len(); + let chunks = compute_chunks(file_size, 4); + + let mut collected = Vec::new(); + for (start, end) in chunks { + for_each_line_in_chunk(path.to_str().unwrap(), start, end, |l| { + collected.push(l.to_string()); + })?; + } + let expected: Vec = (0..20).map(|i| format!("line {:02}", i)).collect(); + assert_eq!(collected, expected); + Ok(()) + } + + // --- sessionDestroyed regex test --- + + #[test] + fn test_session_destroyed_regex() { + let line = "sessionDestroyed #s=8 sid=2010F74498079D00A5647F3777545A64.node003 isnew=false age=693s last=644s attrs=loginState,lastRequest,userSessionData"; + let caps = SESSION_DESTROYED_RE.captures(line).unwrap(); + assert_eq!( + caps.get(1).unwrap().as_str(), + "2010F74498079D00A5647F3777545A64.node003" + ); + } }