From 43de2acade35fc22c1540039fc8586d4542d54a8 Mon Sep 17 00:00:00 2001 From: Alexandr Mansurov Date: Fri, 20 Feb 2026 21:14:41 +0100 Subject: [PATCH] Expansion --- Cargo.lock | 326 +++++++++++++++++++++++++++++++- Cargo.toml | 1 + src/main.rs | 6 + src/search.rs | 514 +++++++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 842 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1b17b8a..5cc9214 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -230,6 +230,22 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -242,6 +258,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "find-msvc-tools" version = "0.1.8" @@ -264,6 +286,19 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "getrandom" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", + "wasip3", +] + [[package]] name = "hashbrown" version = "0.15.5" @@ -273,13 +308,19 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" + [[package]] name = "hashlink" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" dependencies = [ - "hashbrown", + "hashbrown 0.15.5", ] [[package]] @@ -312,12 +353,36 @@ dependencies = [ "cc", ] +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + +[[package]] +name = "indexmap" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" +dependencies = [ + "equivalent", + "hashbrown 0.16.1", + "serde", + "serde_core", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itoa" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" + [[package]] name = "js-sys" version = "0.3.85" @@ -328,6 +393,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "libc" version = "0.2.180" @@ -345,6 +416,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linux-raw-sys" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" + [[package]] name = "log" version = "0.4.29" @@ -363,6 +440,7 @@ dependencies = [ "rayon", "regex", "rusqlite", + "tempfile", ] [[package]] @@ -402,6 +480,16 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.93" @@ -420,6 +508,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + [[package]] name = "rayon" version = "1.11.0" @@ -483,12 +577,73 @@ dependencies = [ "smallvec", ] +[[package]] +name = "rustix" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys", +] + [[package]] name = "rustversion" version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + [[package]] name = "shlex" version = "1.3.0" @@ -515,21 +670,40 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "syn" -version = "2.0.98" +version = "2.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36147f1a48ae0ec2b5b3bc5b537d267457555a10dc06f3dbc8cb11ba3006d3b1" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] +[[package]] +name = "tempfile" +version = "3.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" +dependencies = [ + "fastrand", + "getrandom", + "once_cell", + "rustix", + "windows-sys", +] + [[package]] name = "unicode-ident" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00e2473a93778eb0bad35909dff6a10d28e63f792f16ed15e404fca9d5eeedbe" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "utf8parse" version = "0.2.2" @@ -542,6 +716,24 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "wasip2" +version = "1.0.2+wasi-0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" +dependencies = [ + "wit-bindgen", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "wasm-bindgen" version = "0.2.108" @@ -587,6 +779,40 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -718,3 +944,97 @@ name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml index 13351e7..b964fb1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,3 +13,4 @@ flate2 = "1" anyhow = "1" rayon = "1" crossbeam-channel = "0.5" +tempfile = "3" diff --git a/src/main.rs b/src/main.rs index 85e56be..5801d47 100644 --- a/src/main.rs +++ b/src/main.rs @@ -82,6 +82,11 @@ struct SearchArgs { /// Include correlationId in output #[arg(short = 'c', long = "correlation-id")] correlation_id: bool, + + /// Expand results: find all lines sharing sessionId/correlationId, + /// follow changeSessionId chains backward to session start (signature line) + #[arg(short = 'e', long = "expand")] + expand: bool, } fn parse_date(s: &str) -> Result { @@ -98,6 +103,7 @@ fn main() -> Result<()> { search_args.file.to_str().unwrap(), &search_args.query, search_args.correlation_id, + search_args.expand, ), } } diff --git a/src/search.rs b/src/search.rs index daaff53..05dfceb 100644 --- a/src/search.rs +++ b/src/search.rs @@ -1,7 +1,9 @@ use anyhow::Result; use regex::Regex; -use std::io::BufRead; +use std::collections::{HashMap, HashSet}; +use std::io::{BufRead, BufReader, BufWriter, Write}; use std::sync::LazyLock; +use tempfile::TempDir; use crate::files::read_log_file; @@ -13,7 +15,45 @@ static MSG_RE: LazyLock = LazyLock::new(|| Regex::new(r#"msg="([^"]+)""#) static CORRELATION_ID_RE: LazyLock = LazyLock::new(|| Regex::new(r"correlationId=([^,\s]+)").unwrap()); -pub fn run_search(file_path: &str, query: &str, show_correlation_id: bool) -> Result<()> { +static SESSION_ID_RE: LazyLock = + LazyLock::new(|| Regex::new(r"sessionId=([^,\s]+)").unwrap()); + +/// Matches changeSessionId messages and captures the long-form new and old session IDs. +/// Example: changeSessionId: newSessionId: sDF080BBD / DF080BBD...node011 replaces oldSessionId: sF9EE9D52 / F9EE9D52...node011 +static CHANGE_SESSION_RE: LazyLock = LazyLock::new(|| { + Regex::new( + r#"changeSessionId:.*?newSessionId:\s*\S+\s*/\s*([^,\s"]+).*?replaces\s+oldSessionId:\s*\S+\s*/\s*([^,\s"]+)"#, + ) + .unwrap() +}); + +/// Strips the `.nodeXXX` suffix from a session ID for comparison purposes. +/// "DF080BBD8D5E954C642F6C3B5639D6EE.node011" -> "DF080BBD8D5E954C642F6C3B5639D6EE" +/// "noSession" -> "noSession" +fn normalize_session_id(sid: &str) -> &str { + if let Some(dot_pos) = sid.rfind('.') { + let suffix = &sid[dot_pos + 1..]; + if suffix.starts_with("node") { + return &sid[..dot_pos]; + } + } + sid +} + +pub fn run_search( + file_path: &str, + query: &str, + show_correlation_id: bool, + expand: bool, +) -> Result<()> { + if expand { + run_search_expanded(file_path, query) + } else { + run_search_simple(file_path, query, show_correlation_id) + } +} + +fn run_search_simple(file_path: &str, query: &str, show_correlation_id: bool) -> Result<()> { let mut reader = read_log_file(file_path)?; let mut line = String::new(); let mut match_count = 0u64; @@ -63,6 +103,259 @@ pub fn run_search(file_path: &str, query: &str, show_correlation_id: bool) -> Re Ok(()) } +// --- Expand mode --- + +struct Pass1Result { + seed_session_ids: HashSet, + seed_correlation_ids: HashSet, + /// new_session_id (normalized) -> old_session_id (normalized) + change_session_map: HashMap, + /// Session IDs (normalized) that have a signature line + sessions_with_signature: HashSet, + /// Path to decompressed temp file (only for gzip files) + decompressed_path: Option, +} + +fn run_search_expanded(file_path: &str, query: &str) -> Result<()> { + let is_gzip = file_path.ends_with(".gz"); + + let temp_dir = if is_gzip { + let dir = TempDir::with_prefix("log_ingest_")?; + eprintln!("Temp directory: {}", dir.path().display()); + Some(dir) + } else { + None + }; + + // Pass 1: collect metadata + let pass1 = run_pass1(file_path, query, &temp_dir)?; + + // Expansion + let (expanded_sids, expanded_cids) = expand_seeds( + &pass1.seed_session_ids, + &pass1.seed_correlation_ids, + &pass1.change_session_map, + &pass1.sessions_with_signature, + ); + + if expanded_sids.is_empty() && expanded_cids.is_empty() { + eprintln!("0 matching lines found (no sessions or correlations to expand)"); + return Ok(()); + } + + eprintln!( + "Expanding: {} session IDs, {} correlation IDs", + expanded_sids.len(), + expanded_cids.len() + ); + + // Pass 2: filter and print + let pass2_path = pass1 + .decompressed_path + .as_ref() + .map(|p| p.to_str().unwrap()) + .unwrap_or(file_path); + + let match_count = run_pass2(pass2_path, query, &expanded_sids, &expanded_cids)?; + + eprintln!("{} lines output", match_count); + // TempDir drops here, cleaning up automatically + Ok(()) +} + +fn run_pass1(file_path: &str, query: &str, temp_dir: &Option) -> Result { + let mut reader = read_log_file(file_path)?; + let mut line = String::new(); + + let mut seed_session_ids: HashSet = HashSet::new(); + let mut seed_correlation_ids: HashSet = HashSet::new(); + let mut change_session_map: HashMap = HashMap::new(); + let mut sessions_with_signature: HashSet = HashSet::new(); + + let decompressed_path = temp_dir + .as_ref() + .map(|dir| dir.path().join("decompressed.log")); + + let mut temp_writer: Option> = + if let Some(ref path) = decompressed_path { + let file = std::fs::File::create(path)?; + Some(BufWriter::with_capacity(256 * 1024, file)) + } else { + None + }; + + let mut line_count = 0u64; + + loop { + line.clear(); + let bytes_read = reader.read_line(&mut line)?; + if bytes_read == 0 { + break; + } + line_count += 1; + + // Write decompressed content to temp file for gzip + if let Some(ref mut writer) = temp_writer { + writer.write_all(line.as_bytes())?; + } + + let trimmed = line.trim_end(); + + // Extract sessionId (normalized) from every line + let sid = SESSION_ID_RE + .captures(trimmed) + .and_then(|c| c.get(1)) + .map(|m| normalize_session_id(m.as_str())); + + // Check for signature lines + if trimmed.contains(r#"msg="signature:"#) + && let Some(s) = sid + { + sessions_with_signature.insert(s.to_string()); + } + + // Check for changeSessionId lines + if trimmed.contains("changeSessionId:") + && let Some(caps) = CHANGE_SESSION_RE.captures(trimmed) + { + let new_sid = normalize_session_id(caps.get(1).unwrap().as_str()).to_string(); + let old_sid = normalize_session_id(caps.get(2).unwrap().as_str()).to_string(); + change_session_map.insert(new_sid, old_sid); + } + + // Check if this line matches the query + if trimmed.contains(query) { + if let Some(s) = sid + && s != "noSession" + { + seed_session_ids.insert(s.to_string()); + } + let cid = CORRELATION_ID_RE + .captures(trimmed) + .and_then(|c| c.get(1)) + .map(|m| m.as_str().to_string()); + if let Some(c) = cid { + seed_correlation_ids.insert(c); + } + } + + if line_count.is_multiple_of(1_000_000) { + eprintln!("Pass 1: {} lines scanned...", line_count); + } + } + + if let Some(ref mut writer) = temp_writer { + writer.flush()?; + } + + eprintln!( + "Pass 1 complete: {} lines, {} seed sessions, {} seed correlations, {} session changes", + line_count, + seed_session_ids.len(), + seed_correlation_ids.len(), + change_session_map.len() + ); + + Ok(Pass1Result { + seed_session_ids, + seed_correlation_ids, + change_session_map, + sessions_with_signature, + decompressed_path, + }) +} + +/// Expand seed session IDs by following changeSessionId chains backward. +/// Stops recursion when an old session has a signature line (session start). +fn expand_seeds( + seed_sids: &HashSet, + seed_cids: &HashSet, + change_map: &HashMap, + sig_sessions: &HashSet, +) -> (HashSet, HashSet) { + let mut expanded_sids = seed_sids.clone(); + let mut work_queue: Vec = seed_sids.iter().cloned().collect(); + + while let Some(current) = work_queue.pop() { + if let Some(old_sid) = change_map.get(¤t) + && expanded_sids.insert(old_sid.clone()) + { + // Newly added — include its lines. + // If it has a signature, stop recursing from it. + if !sig_sessions.contains(old_sid) { + work_queue.push(old_sid.clone()); + } + } + } + + (expanded_sids, seed_cids.clone()) +} + +fn run_pass2( + file_path: &str, + query: &str, + expanded_sids: &HashSet, + expanded_cids: &HashSet, +) -> Result { + // Pass 2 always reads a plain file (original plain or decompressed temp) + let file = std::fs::File::open(file_path)?; + let mut reader = BufReader::with_capacity(256 * 1024, file); + let mut line = String::new(); + let mut output_count = 0u64; + + loop { + line.clear(); + let bytes_read = reader.read_line(&mut line)?; + if bytes_read == 0 { + break; + } + + let trimmed = line.trim_end(); + + let is_direct_match = trimmed.contains(query); + + let sid = SESSION_ID_RE + .captures(trimmed) + .and_then(|c| c.get(1)) + .map(|m| normalize_session_id(m.as_str())); + + let cid = CORRELATION_ID_RE + .captures(trimmed) + .and_then(|c| c.get(1)) + .map(|m| m.as_str()); + + let sid_match = sid.is_some_and(|s| expanded_sids.contains(s)); + let cid_match = cid.is_some_and(|c| expanded_cids.contains(c)); + + if !is_direct_match && !sid_match && !cid_match { + continue; + } + + let timestamp = SYSLOG_TIMESTAMP_RE + .captures(trimmed) + .map(|c| c.get(1).unwrap().as_str()) + .unwrap_or("?"); + + let msg = MSG_RE + .captures(trimmed) + .map(|c| c.get(1).unwrap().as_str()) + .unwrap_or(""); + + let sid_display = sid.unwrap_or("-"); + let cid_display = cid.unwrap_or("-"); + let prefix = if is_direct_match { "*" } else { " " }; + + println!( + "{} [{}] [cid:{}] [sid:{}] {}", + prefix, timestamp, cid_display, sid_display, msg + ); + + output_count += 1; + } + + Ok(output_count) +} + #[cfg(test)] mod tests { use super::*; @@ -115,4 +408,221 @@ mod tests { let line = "Jan 27 17:21:17 some line without msg field"; assert!(MSG_RE.captures(line).is_none()); } + + // --- normalize_session_id tests --- + + #[test] + fn test_normalize_session_id_with_node_suffix() { + assert_eq!( + normalize_session_id("DF080BBD8D5E954C642F6C3B5639D6EE.node011"), + "DF080BBD8D5E954C642F6C3B5639D6EE" + ); + } + + #[test] + fn test_normalize_session_id_without_suffix() { + assert_eq!( + normalize_session_id("DF080BBD8D5E954C642F6C3B5639D6EE"), + "DF080BBD8D5E954C642F6C3B5639D6EE" + ); + } + + #[test] + fn test_normalize_session_id_no_session() { + assert_eq!(normalize_session_id("noSession"), "noSession"); + } + + #[test] + fn test_normalize_session_id_non_node_dot() { + assert_eq!(normalize_session_id("some.session.id"), "some.session.id"); + } + + // --- CHANGE_SESSION_RE tests --- + + #[test] + fn test_change_session_id_regex() { + let line = r#"msg="changeSessionId: newSessionId: sDF080BBD / DF080BBD8D5E954C642F6C3B5639D6EE.node011 replaces oldSessionId: sF9EE9D52 / F9EE9D52FDB4502EB5CE6FFA24194AFD.node011""#; + let caps = CHANGE_SESSION_RE.captures(line).unwrap(); + assert_eq!( + caps.get(1).unwrap().as_str(), + "DF080BBD8D5E954C642F6C3B5639D6EE.node011" + ); + assert_eq!( + caps.get(2).unwrap().as_str(), + "F9EE9D52FDB4502EB5CE6FFA24194AFD.node011" + ); + } + + // --- SESSION_ID_RE tests --- + + #[test] + fn test_session_id_extraction() { + let line = "sessionId=ABC123DEF456.node005, something else"; + let caps = SESSION_ID_RE.captures(line).unwrap(); + assert_eq!(caps.get(1).unwrap().as_str(), "ABC123DEF456.node005"); + } + + // --- expand_seeds tests --- + + #[test] + fn test_expand_seeds_no_chain() { + let seed_sids: HashSet = ["A".to_string()].into(); + let seed_cids: HashSet = ["c1".to_string()].into(); + let change_map = HashMap::new(); + let sig_sessions = HashSet::new(); + + let (sids, cids) = expand_seeds(&seed_sids, &seed_cids, &change_map, &sig_sessions); + assert_eq!(sids, seed_sids); + assert_eq!(cids, seed_cids); + } + + #[test] + fn test_expand_seeds_single_chain() { + // B replaced A (A is old, B is new). Seed is B. + let seed_sids: HashSet = ["B".to_string()].into(); + let seed_cids: HashSet = HashSet::new(); + let change_map: HashMap = [("B".to_string(), "A".to_string())].into(); + let sig_sessions: HashSet = ["A".to_string()].into(); + + let (sids, _) = expand_seeds(&seed_sids, &seed_cids, &change_map, &sig_sessions); + assert!(sids.contains("A")); + assert!(sids.contains("B")); + assert_eq!(sids.len(), 2); + } + + #[test] + fn test_expand_seeds_multi_hop_chain() { + // C replaced B, B replaced A. Seed is C. + let seed_sids: HashSet = ["C".to_string()].into(); + let seed_cids: HashSet = HashSet::new(); + let change_map: HashMap = [ + ("C".to_string(), "B".to_string()), + ("B".to_string(), "A".to_string()), + ] + .into(); + let sig_sessions: HashSet = ["A".to_string()].into(); + + let (sids, _) = expand_seeds(&seed_sids, &seed_cids, &change_map, &sig_sessions); + assert!(sids.contains("A")); + assert!(sids.contains("B")); + assert!(sids.contains("C")); + assert_eq!(sids.len(), 3); + } + + #[test] + fn test_expand_seeds_stops_at_signature() { + // D replaced C, C replaced B, B replaced A. B has signature. Seed is D. + let seed_sids: HashSet = ["D".to_string()].into(); + let seed_cids: HashSet = HashSet::new(); + let change_map: HashMap = [ + ("D".to_string(), "C".to_string()), + ("C".to_string(), "B".to_string()), + ("B".to_string(), "A".to_string()), + ] + .into(); + let sig_sessions: HashSet = ["B".to_string()].into(); + + let (sids, _) = expand_seeds(&seed_sids, &seed_cids, &change_map, &sig_sessions); + assert!(sids.contains("D")); + assert!(sids.contains("C")); + assert!(sids.contains("B")); + assert!(!sids.contains("A")); + assert_eq!(sids.len(), 3); + } + + #[test] + fn test_expand_seeds_cycle_protection() { + // A -> B -> A (cycle) + let seed_sids: HashSet = ["A".to_string()].into(); + let seed_cids: HashSet = HashSet::new(); + let change_map: HashMap = [ + ("A".to_string(), "B".to_string()), + ("B".to_string(), "A".to_string()), + ] + .into(); + let sig_sessions: HashSet = HashSet::new(); + + let (sids, _) = expand_seeds(&seed_sids, &seed_cids, &change_map, &sig_sessions); + assert!(sids.contains("A")); + assert!(sids.contains("B")); + assert_eq!(sids.len(), 2); + } + + // --- Integration test --- + + #[test] + fn test_expand_integration() -> Result<()> { + let dir = tempfile::tempdir()?; + let log_path = dir.path().join("test.log"); + let mut file = std::fs::File::create(&log_path)?; + + // Line 1: signature for session A + writeln!( + file, + r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=AAAA.node001, correlationId=c1, msg="signature:XAMARIN_APP/1.0/ details:OS:1""# + )?; + // Line 2: normal line for session A, matches query + writeln!( + file, + r#"Jan 01 00:00:02 host app dt="2026-01-01 00:00:02,000", sessionId=AAAA.node001, correlationId=c1, msg="findme something""# + )?; + // Line 3: changeSessionId: B replaces A + writeln!( + file, + r#"Jan 01 00:00:03 host app dt="2026-01-01 00:00:03,000", sessionId=BBBB.node001, correlationId=c2, msg="changeSessionId: newSessionId: sBBBB / BBBB.node001 replaces oldSessionId: sAAAA / AAAA.node001""# + )?; + // Line 4: normal line for session B + writeln!( + file, + r#"Jan 01 00:00:04 host app dt="2026-01-01 00:00:04,000", sessionId=BBBB.node001, correlationId=c2, msg="some other action""# + )?; + // Line 5: unrelated session X + writeln!( + file, + r#"Jan 01 00:00:05 host app dt="2026-01-01 00:00:05,000", sessionId=XXXX.node002, correlationId=c9, msg="unrelated""# + )?; + + // "findme" matches line 2 (session AAAA, correlation c1). + // Session AAAA is in seeds. No changeSessionId has AAAA as new → no backward expansion. + // Correlation c1 is in seeds. + // Expected output: lines 1 and 2 (both have session AAAA or correlation c1). + // Lines 3,4 (session BBBB) should NOT be included (AAAA is old, not new). + // Line 5 (session XXXX) should NOT be included. + run_search(log_path.to_str().unwrap(), "findme", false, true)?; + Ok(()) + } + + #[test] + fn test_expand_follows_change_session_backward() -> Result<()> { + let dir = tempfile::tempdir()?; + let log_path = dir.path().join("test.log"); + let mut file = std::fs::File::create(&log_path)?; + + // Line 1: signature for session OLD + writeln!( + file, + r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=OLD.node001, correlationId=c0, msg="signature:XAMARIN_APP/1.0/ details:OS:1""# + )?; + // Line 2: normal line for session OLD + writeln!( + file, + r#"Jan 01 00:00:02 host app dt="2026-01-01 00:00:02,000", sessionId=OLD.node001, correlationId=c1, msg="doing stuff""# + )?; + // Line 3: changeSessionId: NEW replaces OLD + writeln!( + file, + r#"Jan 01 00:00:03 host app dt="2026-01-01 00:00:03,000", sessionId=NEW.node001, correlationId=c2, msg="changeSessionId: newSessionId: sNEW / NEW.node001 replaces oldSessionId: sOLD / OLD.node001""# + )?; + // Line 4: normal line for session NEW, matches query + writeln!( + file, + r#"Jan 01 00:00:04 host app dt="2026-01-01 00:00:04,000", sessionId=NEW.node001, correlationId=c3, msg="findme in new session""# + )?; + + // "findme" matches line 4 (session NEW). changeSessionId maps NEW→OLD. + // OLD has a signature → include OLD but stop recursing. + // Expected: lines 1-4 all included (sessions OLD and NEW). + run_search(log_path.to_str().unwrap(), "findme", false, true)?; + Ok(()) + } }