Files
log_ingest/src/search.rs

1264 lines
43 KiB
Rust
Raw Normal View History

use anyhow::Result;
use rayon::prelude::*;
use regex::Regex;
use std::collections::{HashMap, HashSet};
use std::io::{BufRead, BufReader, Read as _, Seek, SeekFrom};
use std::sync::LazyLock;
use crate::files::read_log_file;
static SYSLOG_TIMESTAMP_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})").unwrap());
static MSG_RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r#"msg="([^"]+)""#).unwrap());
static CORRELATION_ID_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"correlationId=([^,\s]+)").unwrap());
static SESSION_ID_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"sessionId=([^,\s]+)").unwrap());
/// Matches sessionDestroyed lines and captures the sid value.
/// Example: sessionDestroyed #s=8 sid=2010F74...node003 isnew=false
static SESSION_DESTROYED_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"sessionDestroyed\b.*?\bsid=([^,\s]+)").unwrap());
/// Extracts the app name from any signature line format.
/// Matches both `msg="signature:APP/..."` and `msg="MOBILE_CLIENT_LOG: signature:APP/..."`.
static SIGNATURE_APP_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"signature:([^/\s]+)/").unwrap());
/// Matches changeSessionId messages and captures the long-form new and old session IDs.
/// Example: changeSessionId: newSessionId: sDF080BBD / DF080BBD...node011 replaces oldSessionId: sF9EE9D52 / F9EE9D52...node011
static CHANGE_SESSION_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(
r#"changeSessionId:.*?newSessionId:\s*\S+\s*/\s*([^,\s"]+).*?replaces\s+oldSessionId:\s*\S+\s*/\s*([^,\s"]+)"#,
)
.unwrap()
});
/// Strips the `.nodeXXX` suffix from a session ID for comparison purposes.
/// "DF080BBD8D5E954C642F6C3B5639D6EE.node011" -> "DF080BBD8D5E954C642F6C3B5639D6EE"
/// "noSession" -> "noSession"
fn normalize_session_id(sid: &str) -> &str {
if let Some(dot_pos) = sid.rfind('.') {
let suffix = &sid[dot_pos + 1..];
if suffix.starts_with("node") {
return &sid[..dot_pos];
}
}
sid
}
fn build_thread_pool(threads: usize) -> Result<rayon::ThreadPool> {
let mut builder = rayon::ThreadPoolBuilder::new();
if threads > 0 {
builder = builder.num_threads(threads);
}
builder
.build()
.map_err(|e| anyhow::anyhow!("Failed to build thread pool: {}", e))
}
/// Split a file into byte-offset chunks for parallel processing.
/// Each chunk is a (start, end) pair. Threads adjust to line boundaries at runtime.
fn compute_chunks(file_size: u64, num_chunks: usize) -> Vec<(u64, u64)> {
if file_size == 0 || num_chunks == 0 {
return vec![];
}
let effective = num_chunks.min(file_size as usize);
let chunk_size = file_size / effective as u64;
(0..effective)
.map(|i| {
let start = i as u64 * chunk_size;
let end = if i == effective - 1 {
file_size
} else {
(i as u64 + 1) * chunk_size
};
(start, end)
})
.collect()
}
/// Returns true if parallel chunk processing can be used (plain file + multiple threads).
fn can_parallelize(file_path: &str, num_threads: usize) -> bool {
num_threads > 1 && !file_path.ends_with(".gz")
}
/// Stream all lines from a file (works with gzip and plain). Calls `process` for each line.
fn for_each_line_streaming<F>(file_path: &str, mut process: F) -> Result<()>
where
F: FnMut(&str),
{
let mut reader = read_log_file(file_path)?;
let mut line = String::new();
loop {
line.clear();
if reader.read_line(&mut line)? == 0 {
break;
}
process(line.trim_end());
}
Ok(())
}
/// Process lines in a byte-offset chunk of a plain file. Calls `process` for each complete line.
///
/// Chunk boundaries may fall mid-line. Convention:
/// - The chunk that started reading a line owns it (reads past `end` to finish it).
/// - The next chunk checks byte `start-1`: if it's `\n`, we're at a line start; otherwise
/// skip the partial first line (the previous chunk already handled it).
fn for_each_line_in_chunk<F>(file_path: &str, start: u64, end: u64, mut process: F) -> Result<()>
where
F: FnMut(&str),
{
let file = std::fs::File::open(file_path)?;
let mut reader = BufReader::with_capacity(256 * 1024, file);
let mut pos = start;
if start > 0 {
// Check if we're at a line boundary or mid-line
reader.seek(SeekFrom::Start(start - 1))?;
let mut byte = [0u8; 1];
reader.read_exact(&mut byte)?;
// Now positioned at `start`
if byte[0] != b'\n' {
// Mid-line: skip remainder (previous chunk owns this line)
let mut skip = String::new();
let n = reader.read_line(&mut skip)?;
pos += n as u64;
}
}
let mut line = String::new();
while pos < end {
line.clear();
let n = reader.read_line(&mut line)?;
if n == 0 {
break;
}
pos += n as u64;
process(line.trim_end());
}
Ok(())
}
pub fn run_search(
file_path: &str,
query: &str,
show_correlation_id: bool,
expand: bool,
threads: usize,
) -> Result<()> {
let pool = build_thread_pool(threads)?;
if expand {
run_search_expanded(file_path, query, &pool)
} else {
run_search_simple(file_path, query, show_correlation_id, &pool)
}
}
fn format_simple_match(line: &str, show_correlation_id: bool) -> String {
let ts = SYSLOG_TIMESTAMP_RE
.captures(line)
.map(|c| c.get(1).unwrap().as_str())
.unwrap_or("?");
let msg = MSG_RE
.captures(line)
.map(|c| c.get(1).unwrap().as_str())
.unwrap_or("<no msg field>");
if show_correlation_id {
let cid = CORRELATION_ID_RE
.captures(line)
.map(|c| c.get(1).unwrap().as_str());
if let Some(cid) = cid {
return format!("[{}] [{}] {}", ts, cid, msg);
}
}
format!("[{}] {}", ts, msg)
}
fn run_search_simple(
file_path: &str,
query: &str,
show_correlation_id: bool,
pool: &rayon::ThreadPool,
) -> Result<()> {
let num_threads = pool.current_num_threads();
if !can_parallelize(file_path, num_threads) {
// Sequential: stream directly (works for gzip and plain)
let mut match_count = 0u64;
for_each_line_streaming(file_path, |trimmed| {
if trimmed.contains(query) {
println!("{}", format_simple_match(trimmed, show_correlation_id));
match_count += 1;
}
})?;
eprintln!("{} matching lines found", match_count);
return Ok(());
}
// Parallel: split plain file into chunks
let file_size = std::fs::metadata(file_path)?.len();
let chunks = compute_chunks(file_size, num_threads);
eprintln!(
"Searching with {} threads across {} chunks",
num_threads,
chunks.len()
);
let results: Vec<Result<(Vec<String>, u64)>> = pool.install(|| {
chunks
.par_iter()
.map(|&(start, end)| {
let mut lines = Vec::new();
let mut count = 0u64;
for_each_line_in_chunk(file_path, start, end, |trimmed| {
if trimmed.contains(query) {
lines.push(format_simple_match(trimmed, show_correlation_id));
count += 1;
}
})?;
Ok((lines, count))
})
.collect()
});
let mut total = 0u64;
for result in results {
let (lines, count) = result?;
for line in lines {
println!("{}", line);
}
total += count;
}
eprintln!("{} matching lines found", total);
Ok(())
}
// --- Expand mode ---
#[derive(Default)]
struct Pass1Result {
seed_session_ids: HashSet<String>,
seed_correlation_ids: HashSet<String>,
change_session_map: HashMap<String, String>,
sessions_with_signature: HashSet<String>,
/// Maps normalized session ID to app name from its signature line
session_app_map: HashMap<String, String>,
/// Maps seed correlation ID to the session ID from the same line (for app filtering)
seed_cid_sessions: HashMap<String, String>,
line_count: u64,
}
impl Pass1Result {
fn merge(mut self, other: Pass1Result) -> Self {
self.seed_session_ids.extend(other.seed_session_ids);
self.seed_correlation_ids.extend(other.seed_correlation_ids);
self.change_session_map.extend(other.change_session_map);
self.sessions_with_signature
.extend(other.sessions_with_signature);
self.session_app_map.extend(other.session_app_map);
self.seed_cid_sessions.extend(other.seed_cid_sessions);
self.line_count += other.line_count;
self
}
}
fn process_line_pass1(trimmed: &str, query: &str, result: &mut Pass1Result) {
let sid = SESSION_ID_RE
.captures(trimmed)
.and_then(|c| c.get(1))
.map(|m| normalize_session_id(m.as_str()));
// Detect signature lines using broad regex (matches both msg="signature:APP/..."
// and msg="MOBILE_CLIENT_LOG: signature:APP/...")
if let Some(sig_caps) = SIGNATURE_APP_RE.captures(trimmed) {
let app = sig_caps.get(1).unwrap().as_str();
if let Some(s) = sid {
result.sessions_with_signature.insert(s.to_string());
result
.session_app_map
.insert(s.to_string(), app.to_string());
}
}
if trimmed.contains("changeSessionId:")
&& let Some(caps) = CHANGE_SESSION_RE.captures(trimmed)
{
let new_sid = normalize_session_id(caps.get(1).unwrap().as_str()).to_string();
let old_sid = normalize_session_id(caps.get(2).unwrap().as_str()).to_string();
result.change_session_map.insert(new_sid, old_sid);
}
if trimmed.contains(query) {
if let Some(s) = sid
&& s != "noSession"
{
result.seed_session_ids.insert(s.to_string());
// Track which correlation IDs belong to which sessions (for app filtering)
if let Some(cid) = CORRELATION_ID_RE
.captures(trimmed)
.and_then(|c| c.get(1))
{
result
.seed_cid_sessions
.insert(cid.as_str().to_string(), s.to_string());
}
}
if let Some(cid) = CORRELATION_ID_RE
.captures(trimmed)
.and_then(|c| c.get(1))
{
result
.seed_correlation_ids
.insert(cid.as_str().to_string());
}
}
result.line_count += 1;
}
fn run_search_expanded(
file_path: &str,
query: &str,
pool: &rayon::ThreadPool,
) -> Result<()> {
let num_threads = pool.current_num_threads();
let use_parallel = can_parallelize(file_path, num_threads);
// Pass 1: collect metadata
let pass1 = run_pass1(file_path, query, use_parallel, pool)?;
// Expansion (in-memory graph traversal — inherently sequential)
let (expanded_sids, expanded_cids) = expand_seeds(
&pass1.seed_session_ids,
&pass1.seed_correlation_ids,
&pass1.change_session_map,
&pass1.sessions_with_signature,
);
if expanded_sids.is_empty() && expanded_cids.is_empty() {
eprintln!("0 matching lines found (no sessions or correlations to expand)");
return Ok(());
}
eprintln!(
"Expanding: {} session IDs, {} correlation IDs",
expanded_sids.len(),
expanded_cids.len()
);
// Pass 2: filter and print (re-reads file; for gzip this re-decompresses from stream)
let match_count = run_pass2(file_path, query, &expanded_sids, &expanded_cids, use_parallel, pool, false)?;
eprintln!("{} lines output", match_count);
Ok(())
}
fn run_pass1(
file_path: &str,
query: &str,
use_parallel: bool,
pool: &rayon::ThreadPool,
) -> Result<Pass1Result> {
if !use_parallel {
eprintln!("Pass 1: scanning sequentially...");
let mut result = Pass1Result::default();
for_each_line_streaming(file_path, |trimmed| {
process_line_pass1(trimmed, query, &mut result);
})?;
eprintln!(
"Pass 1 complete: {} lines, {} seed sessions, {} seed correlations, {} session changes",
result.line_count,
result.seed_session_ids.len(),
result.seed_correlation_ids.len(),
result.change_session_map.len()
);
return Ok(result);
}
let file_size = std::fs::metadata(file_path)?.len();
let num_threads = pool.current_num_threads();
let chunks = compute_chunks(file_size, num_threads);
eprintln!("Pass 1: scanning with {} threads...", chunks.len());
let results: Vec<Result<Pass1Result>> = pool.install(|| {
chunks
.par_iter()
.map(|&(start, end)| {
let mut chunk_result = Pass1Result::default();
for_each_line_in_chunk(file_path, start, end, |trimmed| {
process_line_pass1(trimmed, query, &mut chunk_result);
})?;
Ok(chunk_result)
})
.collect()
});
let mut merged = Pass1Result::default();
for r in results {
merged = merged.merge(r?);
}
eprintln!(
"Pass 1 complete: {} lines, {} seed sessions, {} seed correlations, {} session changes",
merged.line_count,
merged.seed_session_ids.len(),
merged.seed_correlation_ids.len(),
merged.change_session_map.len()
);
Ok(merged)
}
/// Expand seed session IDs by following changeSessionId chains backward.
/// Stops recursion when an old session has a signature line (session start).
fn expand_seeds(
seed_sids: &HashSet<String>,
seed_cids: &HashSet<String>,
change_map: &HashMap<String, String>,
sig_sessions: &HashSet<String>,
) -> (HashSet<String>, HashSet<String>) {
let mut expanded_sids = seed_sids.clone();
let mut work_queue: Vec<String> = seed_sids.iter().cloned().collect();
while let Some(current) = work_queue.pop() {
if let Some(old_sid) = change_map.get(&current)
&& expanded_sids.insert(old_sid.clone())
{
// Newly added — include its lines.
// If it has a signature, stop recursing from it.
if !sig_sessions.contains(old_sid) {
work_queue.push(old_sid.clone());
}
}
}
(expanded_sids, seed_cids.clone())
}
fn format_pass2_match(
trimmed: &str,
query: &str,
expanded_sids: &HashSet<String>,
expanded_cids: &HashSet<String>,
strict_app_isolation: bool,
) -> Option<String> {
let is_direct_match = trimmed.contains(query);
let sid = SESSION_ID_RE
.captures(trimmed)
.and_then(|c| c.get(1))
.map(|m| normalize_session_id(m.as_str()));
let cid = CORRELATION_ID_RE
.captures(trimmed)
.and_then(|c| c.get(1))
.map(|m| m.as_str());
let sid_match = sid.is_some_and(|s| expanded_sids.contains(s));
let cid_match = cid.is_some_and(|c| expanded_cids.contains(c));
// When strict_app_isolation is enabled (search-exceptions), a CID match
// alone is not enough — the line's session must also be in the filtered set
// (or absent). This prevents leaking lines from non-matching apps that
// happen to share a correlation ID.
let effective_cid_match =
cid_match && !(strict_app_isolation && sid.is_some() && !sid_match);
if !is_direct_match && !sid_match && !effective_cid_match {
return None;
}
let timestamp = SYSLOG_TIMESTAMP_RE
.captures(trimmed)
.map(|c| c.get(1).unwrap().as_str())
.unwrap_or("?");
let msg = MSG_RE
.captures(trimmed)
.map(|c| c.get(1).unwrap().as_str())
.unwrap_or("<no msg field>");
let sid_display = sid.unwrap_or("-");
let cid_display = cid.unwrap_or("-");
let prefix = if is_direct_match { "*" } else { " " };
Some(format!(
"{} [{}] [cid:{}] [sid:{}] {}",
prefix, timestamp, cid_display, sid_display, msg
))
}
/// Events collected per-line during pass2 parallel chunk processing.
/// Preserves file order so the assembly phase can stop at sessionDestroyed.
enum Pass2Event {
/// Line matched and should be output.
Match(String),
/// An expanded session was destroyed (no output for this line).
Destroy(String),
/// Line matched AND an expanded session was destroyed on this line.
MatchAndDestroy(String, String),
}
/// Check whether a line destroys one of the expanded sessions.
fn check_session_destroyed(trimmed: &str, expanded_sids: &HashSet<String>) -> Option<String> {
if trimmed.contains("sessionDestroyed")
&& let Some(caps) = SESSION_DESTROYED_RE.captures(trimmed)
{
let sid = normalize_session_id(caps.get(1).unwrap().as_str());
if expanded_sids.contains(sid) {
return Some(sid.to_string());
}
}
None
}
fn run_pass2(
file_path: &str,
query: &str,
expanded_sids: &HashSet<String>,
expanded_cids: &HashSet<String>,
use_parallel: bool,
pool: &rayon::ThreadPool,
strict_app_isolation: bool,
) -> Result<u64> {
if !use_parallel {
eprintln!("Pass 2: filtering sequentially...");
let mut count = 0u64;
let mut reader = read_log_file(file_path)?;
let mut line = String::new();
let mut remaining_sids: HashSet<&str> =
expanded_sids.iter().map(|s| s.as_str()).collect();
loop {
line.clear();
if reader.read_line(&mut line)? == 0 {
break;
}
let trimmed = line.trim_end();
// Check for sessionDestroyed
if trimmed.contains("sessionDestroyed")
&& let Some(caps) = SESSION_DESTROYED_RE.captures(trimmed)
{
let destroyed_sid = normalize_session_id(caps.get(1).unwrap().as_str());
if remaining_sids.remove(destroyed_sid) {
// Still output the sessionDestroyed line itself if it matches
if let Some(formatted) =
format_pass2_match(trimmed, query, expanded_sids, expanded_cids, strict_app_isolation)
{
println!("{}", formatted);
count += 1;
}
if remaining_sids.is_empty() {
eprintln!(
"All {} expanded sessions destroyed, stopping early",
expanded_sids.len()
);
break;
}
continue;
}
}
if let Some(formatted) =
format_pass2_match(trimmed, query, expanded_sids, expanded_cids, strict_app_isolation)
{
println!("{}", formatted);
count += 1;
}
}
return Ok(count);
}
let file_size = std::fs::metadata(file_path)?.len();
let num_threads = pool.current_num_threads();
let chunks = compute_chunks(file_size, num_threads);
eprintln!("Pass 2: filtering with {} threads...", chunks.len());
// Each chunk collects events preserving file order so the assembly
// phase can apply the same sessionDestroyed stop logic as sequential.
let results: Vec<Result<Vec<Pass2Event>>> = pool.install(|| {
chunks
.par_iter()
.map(|&(start, end)| {
let mut events = Vec::new();
for_each_line_in_chunk(file_path, start, end, |trimmed| {
let formatted = format_pass2_match(
trimmed, query, expanded_sids, expanded_cids, strict_app_isolation,
);
let destroyed = check_session_destroyed(trimmed, expanded_sids);
match (formatted, destroyed) {
(Some(f), Some(d)) => events.push(Pass2Event::MatchAndDestroy(f, d)),
(Some(f), None) => events.push(Pass2Event::Match(f)),
(None, Some(d)) => events.push(Pass2Event::Destroy(d)),
(None, None) => {}
}
})?;
Ok(events)
})
.collect()
});
// Assemble in chunk order, stopping when all expanded sessions are destroyed.
let mut remaining_sids: HashSet<&str> =
expanded_sids.iter().map(|s| s.as_str()).collect();
let mut total = 0u64;
'outer: for result in results {
let events = result?;
for event in events {
match event {
Pass2Event::Match(line) => {
println!("{}", line);
total += 1;
}
Pass2Event::Destroy(sid) => {
remaining_sids.remove(sid.as_str());
if remaining_sids.is_empty() {
break 'outer;
}
}
Pass2Event::MatchAndDestroy(line, sid) => {
println!("{}", line);
total += 1;
remaining_sids.remove(sid.as_str());
if remaining_sids.is_empty() {
break 'outer;
}
}
}
}
}
Ok(total)
}
// --- search_exceptions ---
/// Filter expanded session IDs to only those in chains containing a matching-app signature.
/// Builds a reverse change_session_map (old → [new]) and propagates forward from matching roots.
fn filter_expanded_by_app(
expanded_sids: &HashSet<String>,
change_map: &HashMap<String, String>,
session_app_map: &HashMap<String, String>,
app_filters: &[String],
) -> HashSet<String> {
// Build reverse map: old → [new1, new2, ...]
let mut reverse_map: HashMap<&str, Vec<&str>> = HashMap::new();
for (new_sid, old_sid) in change_map {
reverse_map
.entry(old_sid.as_str())
.or_default()
.push(new_sid.as_str());
}
// Find all sessions with any matching app
let matching_roots: Vec<&str> = session_app_map
.iter()
.filter(|(_, app)| app_filters.iter().any(|f| f == app.as_str()))
.map(|(sid, _)| sid.as_str())
.collect();
// Propagate forward from matching roots through the reverse map
let mut matching_sessions: HashSet<String> = HashSet::new();
let mut work_queue: Vec<&str> = matching_roots;
while let Some(current) = work_queue.pop() {
if !matching_sessions.insert(current.to_string()) {
continue;
}
if let Some(nexts) = reverse_map.get(current) {
work_queue.extend(nexts.iter().copied());
}
}
// Intersect with expanded_sids
expanded_sids
.intersection(&matching_sessions)
.cloned()
.collect()
}
pub fn run_search_exceptions(file_path: &str, app_filters: &[String], threads: usize) -> Result<()> {
let pool = build_thread_pool(threads)?;
let query = "Exception";
let num_threads = pool.current_num_threads();
let use_parallel = can_parallelize(file_path, num_threads);
// Pass 1: collect metadata
let pass1 = run_pass1(file_path, query, use_parallel, &pool)?;
// Expand seeds
let (expanded_sids, expanded_cids) = expand_seeds(
&pass1.seed_session_ids,
&pass1.seed_correlation_ids,
&pass1.change_session_map,
&pass1.sessions_with_signature,
);
// Filter by app
let filtered_sids = filter_expanded_by_app(
&expanded_sids,
&pass1.change_session_map,
&pass1.session_app_map,
app_filters,
);
// Filter correlation IDs: keep only those whose seed line's session is in the filtered set
let filtered_cids: HashSet<String> = expanded_cids
.iter()
.filter(|cid| {
pass1
.seed_cid_sessions
.get(cid.as_str())
.is_some_and(|sid| filtered_sids.contains(sid))
})
.cloned()
.collect();
if filtered_sids.is_empty() && filtered_cids.is_empty() {
eprintln!(
"0 matching lines found (no sessions matching apps {:?})",
app_filters
);
return Ok(());
}
eprintln!(
"Expanding: {} session IDs (filtered from {}), {} correlation IDs (filtered from {})",
filtered_sids.len(),
expanded_sids.len(),
filtered_cids.len(),
expanded_cids.len()
);
// Pass 2: filter and print (strict isolation prevents CID leaking across apps)
let match_count = run_pass2(
file_path,
query,
&filtered_sids,
&filtered_cids,
use_parallel,
&pool,
true,
)?;
eprintln!("{} lines output", match_count);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
#[test]
fn test_syslog_timestamp_extraction() {
let line = r#"Jan 27 17:21:17 a.b.c.d m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, msg="hello""#;
let caps = SYSLOG_TIMESTAMP_RE.captures(line).unwrap();
assert_eq!(caps.get(1).unwrap().as_str(), "Jan 27 17:21:17");
}
#[test]
fn test_msg_extraction() {
let line = r#"some prefix msg="getUnreadFilesCount(externalUserId=abc123)", ex=""#;
let caps = MSG_RE.captures(line).unwrap();
assert_eq!(
caps.get(1).unwrap().as_str(),
"getUnreadFilesCount(externalUserId=abc123)"
);
}
#[test]
fn test_full_line_extraction() {
let line = r#"Jan 27 17:21:17 a.b.c.d m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, lc=com.a.b.c.d.e.v5.endpoint.f, threadId=183, externalUserId=null, clientIp=1.1.1.1, xsrfToken=null, correlationId=abcd, sessionId=noSession, securityContext=CA_LOGGED_IN, userId=123, request_id=[(null)]snoSessio.abc, msg="getUnreadFilesCount(externalUserId=aaaaa,externalTeamSafeIds=bbbbb)", ex=""#;
let ts = SYSLOG_TIMESTAMP_RE
.captures(line)
.map(|c| c.get(1).unwrap().as_str())
.unwrap();
assert_eq!(ts, "Jan 27 17:21:17");
let msg = MSG_RE
.captures(line)
.map(|c| c.get(1).unwrap().as_str())
.unwrap();
assert_eq!(
msg,
"getUnreadFilesCount(externalUserId=aaaaa,externalTeamSafeIds=bbbbb)"
);
}
#[test]
fn test_no_timestamp() {
let line = r#"some garbage line without proper timestamp msg="hello""#;
assert!(SYSLOG_TIMESTAMP_RE.captures(line).is_none());
}
#[test]
fn test_no_msg() {
let line = "Jan 27 17:21:17 some line without msg field";
assert!(MSG_RE.captures(line).is_none());
}
// --- normalize_session_id tests ---
#[test]
fn test_normalize_session_id_with_node_suffix() {
assert_eq!(
normalize_session_id("DF080BBD8D5E954C642F6C3B5639D6EE.node011"),
"DF080BBD8D5E954C642F6C3B5639D6EE"
);
}
#[test]
fn test_normalize_session_id_without_suffix() {
assert_eq!(
normalize_session_id("DF080BBD8D5E954C642F6C3B5639D6EE"),
"DF080BBD8D5E954C642F6C3B5639D6EE"
);
}
#[test]
fn test_normalize_session_id_no_session() {
assert_eq!(normalize_session_id("noSession"), "noSession");
}
#[test]
fn test_normalize_session_id_non_node_dot() {
assert_eq!(normalize_session_id("some.session.id"), "some.session.id");
}
// --- CHANGE_SESSION_RE tests ---
#[test]
fn test_change_session_id_regex() {
let line = r#"msg="changeSessionId: newSessionId: sDF080BBD / DF080BBD8D5E954C642F6C3B5639D6EE.node011 replaces oldSessionId: sF9EE9D52 / F9EE9D52FDB4502EB5CE6FFA24194AFD.node011""#;
let caps = CHANGE_SESSION_RE.captures(line).unwrap();
assert_eq!(
caps.get(1).unwrap().as_str(),
"DF080BBD8D5E954C642F6C3B5639D6EE.node011"
);
assert_eq!(
caps.get(2).unwrap().as_str(),
"F9EE9D52FDB4502EB5CE6FFA24194AFD.node011"
);
}
// --- SESSION_ID_RE tests ---
#[test]
fn test_session_id_extraction() {
let line = "sessionId=ABC123DEF456.node005, something else";
let caps = SESSION_ID_RE.captures(line).unwrap();
assert_eq!(caps.get(1).unwrap().as_str(), "ABC123DEF456.node005");
}
// --- expand_seeds tests ---
#[test]
fn test_expand_seeds_no_chain() {
let seed_sids: HashSet<String> = ["A".to_string()].into();
let seed_cids: HashSet<String> = ["c1".to_string()].into();
let change_map = HashMap::new();
let sig_sessions = HashSet::new();
let (sids, cids) = expand_seeds(&seed_sids, &seed_cids, &change_map, &sig_sessions);
assert_eq!(sids, seed_sids);
assert_eq!(cids, seed_cids);
}
#[test]
fn test_expand_seeds_single_chain() {
// B replaced A (A is old, B is new). Seed is B.
let seed_sids: HashSet<String> = ["B".to_string()].into();
let seed_cids: HashSet<String> = HashSet::new();
let change_map: HashMap<String, String> = [("B".to_string(), "A".to_string())].into();
let sig_sessions: HashSet<String> = ["A".to_string()].into();
let (sids, _) = expand_seeds(&seed_sids, &seed_cids, &change_map, &sig_sessions);
assert!(sids.contains("A"));
assert!(sids.contains("B"));
assert_eq!(sids.len(), 2);
}
#[test]
fn test_expand_seeds_multi_hop_chain() {
// C replaced B, B replaced A. Seed is C.
let seed_sids: HashSet<String> = ["C".to_string()].into();
let seed_cids: HashSet<String> = HashSet::new();
let change_map: HashMap<String, String> = [
("C".to_string(), "B".to_string()),
("B".to_string(), "A".to_string()),
]
.into();
let sig_sessions: HashSet<String> = ["A".to_string()].into();
let (sids, _) = expand_seeds(&seed_sids, &seed_cids, &change_map, &sig_sessions);
assert!(sids.contains("A"));
assert!(sids.contains("B"));
assert!(sids.contains("C"));
assert_eq!(sids.len(), 3);
}
#[test]
fn test_expand_seeds_stops_at_signature() {
// D replaced C, C replaced B, B replaced A. B has signature. Seed is D.
let seed_sids: HashSet<String> = ["D".to_string()].into();
let seed_cids: HashSet<String> = HashSet::new();
let change_map: HashMap<String, String> = [
("D".to_string(), "C".to_string()),
("C".to_string(), "B".to_string()),
("B".to_string(), "A".to_string()),
]
.into();
let sig_sessions: HashSet<String> = ["B".to_string()].into();
let (sids, _) = expand_seeds(&seed_sids, &seed_cids, &change_map, &sig_sessions);
assert!(sids.contains("D"));
assert!(sids.contains("C"));
assert!(sids.contains("B"));
assert!(!sids.contains("A"));
assert_eq!(sids.len(), 3);
}
#[test]
fn test_expand_seeds_cycle_protection() {
// A -> B -> A (cycle)
let seed_sids: HashSet<String> = ["A".to_string()].into();
let seed_cids: HashSet<String> = HashSet::new();
let change_map: HashMap<String, String> = [
("A".to_string(), "B".to_string()),
("B".to_string(), "A".to_string()),
]
.into();
let sig_sessions: HashSet<String> = HashSet::new();
let (sids, _) = expand_seeds(&seed_sids, &seed_cids, &change_map, &sig_sessions);
assert!(sids.contains("A"));
assert!(sids.contains("B"));
assert_eq!(sids.len(), 2);
}
// --- Integration test ---
#[test]
fn test_expand_integration() -> Result<()> {
let dir = tempfile::tempdir()?;
let log_path = dir.path().join("test.log");
let mut file = std::fs::File::create(&log_path)?;
// Line 1: signature for session A
writeln!(
file,
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=AAAA.node001, correlationId=c1, msg="signature:APP/1.0/ details:OS:1""#
)?;
// Line 2: normal line for session A, matches query
writeln!(
file,
r#"Jan 01 00:00:02 host app dt="2026-01-01 00:00:02,000", sessionId=AAAA.node001, correlationId=c1, msg="findme something""#
)?;
// Line 3: changeSessionId: B replaces A
writeln!(
file,
r#"Jan 01 00:00:03 host app dt="2026-01-01 00:00:03,000", sessionId=BBBB.node001, correlationId=c2, msg="changeSessionId: newSessionId: sBBBB / BBBB.node001 replaces oldSessionId: sAAAA / AAAA.node001""#
)?;
// Line 4: normal line for session B
writeln!(
file,
r#"Jan 01 00:00:04 host app dt="2026-01-01 00:00:04,000", sessionId=BBBB.node001, correlationId=c2, msg="some other action""#
)?;
// Line 5: unrelated session X
writeln!(
file,
r#"Jan 01 00:00:05 host app dt="2026-01-01 00:00:05,000", sessionId=XXXX.node002, correlationId=c9, msg="unrelated""#
)?;
// "findme" matches line 2 (session AAAA, correlation c1).
// Session AAAA is in seeds. No changeSessionId has AAAA as new → no backward expansion.
// Correlation c1 is in seeds.
// Expected output: lines 1 and 2 (both have session AAAA or correlation c1).
// Lines 3,4 (session BBBB) should NOT be included (AAAA is old, not new).
// Line 5 (session XXXX) should NOT be included.
run_search(log_path.to_str().unwrap(), "findme", false, true, 1)?;
Ok(())
}
#[test]
fn test_expand_follows_change_session_backward() -> Result<()> {
let dir = tempfile::tempdir()?;
let log_path = dir.path().join("test.log");
let mut file = std::fs::File::create(&log_path)?;
// Line 1: signature for session OLD
writeln!(
file,
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=OLD.node001, correlationId=c0, msg="signature:APP/1.0/ details:OS:1""#
)?;
// Line 2: normal line for session OLD
writeln!(
file,
r#"Jan 01 00:00:02 host app dt="2026-01-01 00:00:02,000", sessionId=OLD.node001, correlationId=c1, msg="doing stuff""#
)?;
// Line 3: changeSessionId: NEW replaces OLD
writeln!(
file,
r#"Jan 01 00:00:03 host app dt="2026-01-01 00:00:03,000", sessionId=NEW.node001, correlationId=c2, msg="changeSessionId: newSessionId: sNEW / NEW.node001 replaces oldSessionId: sOLD / OLD.node001""#
)?;
// Line 4: normal line for session NEW, matches query
writeln!(
file,
r#"Jan 01 00:00:04 host app dt="2026-01-01 00:00:04,000", sessionId=NEW.node001, correlationId=c3, msg="findme in new session""#
)?;
// "findme" matches line 4 (session NEW). changeSessionId maps NEW→OLD.
// OLD has a signature → include OLD but stop recursing.
// Expected: lines 1-4 all included (sessions OLD and NEW).
run_search(log_path.to_str().unwrap(), "findme", false, true, 1)?;
Ok(())
}
// --- Chunk boundary tests ---
#[test]
fn test_chunk_boundary_no_lost_lines() -> Result<()> {
// Write lines of known byte sizes, then split exactly on a line boundary
let dir = tempfile::tempdir()?;
let path = dir.path().join("test.log");
let mut f = std::fs::File::create(&path)?;
// 3 lines: "line1\n", "line2\n", "line3\n" (6 bytes each)
write!(f, "line1\nline2\nline3\n")?;
drop(f);
// Split at byte 6 (exactly between line1 and line2)
let mut collected = Vec::new();
for_each_line_in_chunk(path.to_str().unwrap(), 0, 6, |l| {
collected.push(l.to_string());
})?;
for_each_line_in_chunk(path.to_str().unwrap(), 6, 18, |l| {
collected.push(l.to_string());
})?;
assert_eq!(collected, vec!["line1", "line2", "line3"]);
Ok(())
}
#[test]
fn test_chunk_boundary_mid_line() -> Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("test.log");
let mut f = std::fs::File::create(&path)?;
write!(f, "line1\nline2\nline3\n")?;
drop(f);
// Split at byte 3 (middle of "line1")
let mut collected = Vec::new();
for_each_line_in_chunk(path.to_str().unwrap(), 0, 3, |l| {
collected.push(l.to_string());
})?;
for_each_line_in_chunk(path.to_str().unwrap(), 3, 18, |l| {
collected.push(l.to_string());
})?;
assert_eq!(collected, vec!["line1", "line2", "line3"]);
Ok(())
}
#[test]
fn test_chunk_four_way_split() -> Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("test.log");
let mut f = std::fs::File::create(&path)?;
for i in 0..20 {
writeln!(f, "line {:02}", i)?;
}
drop(f);
let file_size = std::fs::metadata(path.as_path())?.len();
let chunks = compute_chunks(file_size, 4);
let mut collected = Vec::new();
for (start, end) in chunks {
for_each_line_in_chunk(path.to_str().unwrap(), start, end, |l| {
collected.push(l.to_string());
})?;
}
let expected: Vec<String> = (0..20).map(|i| format!("line {:02}", i)).collect();
assert_eq!(collected, expected);
Ok(())
}
// --- Regression: thread-consistency (P1) ---
// Both sequential and parallel pass2 must stop at sessionDestroyed
// and produce identical results regardless of thread count.
#[test]
fn test_pass2_stops_at_session_destroyed() -> Result<()> {
let dir = tempfile::tempdir()?;
let log_path = dir.path().join("test.log");
let mut file = std::fs::File::create(&log_path)?;
// Session AAAA with signature
writeln!(
file,
r#"Jan 01 00:00:01 host app sessionId=AAAA.node001, correlationId=c1, msg="signature:APP_A/1.0/ details:OS:1""#
)?;
// Line that matches query, with CID c1
writeln!(
file,
r#"Jan 01 00:00:02 host app sessionId=AAAA.node001, correlationId=c1, msg="findme error""#
)?;
// Session destroyed for AAAA
writeln!(
file,
r#"Jan 01 00:00:03 host app msg="sessionDestroyed #s=1 sid=AAAA.node001 isnew=false""#
)?;
// Post-destroy line with same CID c1 — must NOT be included
writeln!(
file,
r#"Jan 01 00:00:04 host app sessionId=noSession, correlationId=c1, msg="async callback after destroy""#
)?;
// Run pass1 + expand
let pool = build_thread_pool(1)?;
let pass1 = run_pass1(log_path.to_str().unwrap(), "findme", false, &pool)?;
let (expanded_sids, expanded_cids) = expand_seeds(
&pass1.seed_session_ids,
&pass1.seed_correlation_ids,
&pass1.change_session_map,
&pass1.sessions_with_signature,
);
assert!(expanded_cids.contains("c1"));
// Sequential: lines 1 (sid match) + 2 (query+sid+cid) = 2, then
// line 3 destroys AAAA (the only expanded session) → stop.
// Line 4 must NOT be included.
let seq_count = run_pass2(
log_path.to_str().unwrap(),
"findme",
&expanded_sids,
&expanded_cids,
false,
&pool,
false,
)?;
assert_eq!(seq_count, 2, "sequential pass2 must stop at sessionDestroyed");
// Parallel: must produce the same count.
let par_pool = build_thread_pool(2)?;
let par_count = run_pass2(
log_path.to_str().unwrap(),
"findme",
&expanded_sids,
&expanded_cids,
true,
&par_pool,
false,
)?;
assert_eq!(par_count, seq_count, "parallel pass2 must match sequential");
Ok(())
}
#[test]
fn test_session_destroyed_regex() {
let line = "sessionDestroyed #s=8 sid=2010F74498079D00A5647F3777545A64.node003 isnew=false age=693s last=644s attrs=loginState,lastRequest,userSessionData";
let caps = SESSION_DESTROYED_RE.captures(line).unwrap();
assert_eq!(
caps.get(1).unwrap().as_str(),
"2010F74498079D00A5647F3777545A64.node003"
);
}
// --- Regression: strict app isolation (P1) ---
// search-exceptions --app APP_A must not include lines from APP_B even when
// they share a correlation ID with an APP_A session.
#[test]
fn test_search_exceptions_strict_app_isolation() -> Result<()> {
let dir = tempfile::tempdir()?;
let log_path = dir.path().join("test.log");
let mut file = std::fs::File::create(&log_path)?;
// APP_A session
writeln!(
file,
r#"Jan 01 00:00:01 host app sessionId=AAAA.node001, correlationId=c1, msg="signature:APP_A/1.0/ details:OS:1""#
)?;
// APP_A Exception line with shared CID
writeln!(
file,
r#"Jan 01 00:00:02 host app sessionId=AAAA.node001, correlationId=shared_cid, msg="Exception in APP_A""#
)?;
// APP_B session with same shared CID
writeln!(
file,
r#"Jan 01 00:00:03 host app sessionId=BBBB.node001, correlationId=c2, msg="signature:APP_B/2.0/ details:OS:1""#
)?;
writeln!(
file,
r#"Jan 01 00:00:04 host app sessionId=BBBB.node001, correlationId=shared_cid, msg="handling request from APP_B""#
)?;
// Run the full pipeline as search-exceptions does
let pool = build_thread_pool(1)?;
let query = "Exception";
let pass1 = run_pass1(log_path.to_str().unwrap(), query, false, &pool)?;
let (expanded_sids, expanded_cids) = expand_seeds(
&pass1.seed_session_ids,
&pass1.seed_correlation_ids,
&pass1.change_session_map,
&pass1.sessions_with_signature,
);
let app_filters = vec!["APP_A".to_string()];
let filtered_sids = filter_expanded_by_app(
&expanded_sids,
&pass1.change_session_map,
&pass1.session_app_map,
&app_filters,
);
let filtered_cids: HashSet<String> = expanded_cids
.iter()
.filter(|cid| {
pass1
.seed_cid_sessions
.get(cid.as_str())
.is_some_and(|sid| filtered_sids.contains(sid))
})
.cloned()
.collect();
// shared_cid should be in filtered_cids (it came from APP_A's session)
assert!(filtered_cids.contains("shared_cid"));
// BBBB should NOT be in filtered_sids
assert!(!filtered_sids.contains("BBBB"));
// Run pass2 with strict_app_isolation=true
let count = run_pass2(
log_path.to_str().unwrap(),
query,
&filtered_sids,
&filtered_cids,
false,
&pool,
true,
)?;
// Line 1 (sig for APP_A, sid match) = included
// Line 2 (Exception, sid+cid match) = included
// Line 3 (sig for APP_B, sid NOT in filtered) = excluded
// Line 4 (APP_B line, cid=shared_cid but sid=BBBB not in filtered) = excluded by strict isolation
assert_eq!(count, 2, "APP_B lines must not leak through shared CID");
Ok(())
}
}