Compare commits
7 Commits
main
...
feature/se
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bdae57d801 | ||
|
|
8620359c79 | ||
|
|
09fa289535 | ||
|
|
43de2acade | ||
|
|
7f4aab32a1 | ||
|
|
6e7f2c1eb3 | ||
|
|
bbf8102959 |
86
README.md
86
README.md
@@ -1,10 +1,10 @@
|
||||
# log_ingest
|
||||
|
||||
A Rust CLI tool for ingesting and searching application log files.
|
||||
A Rust CLI tool for loading log files into a SQLite database for analysis.
|
||||
|
||||
## Overview
|
||||
|
||||
Parses application logs containing signature messages and loads them into SQLite for querying. Also provides fast text search across log files with session-aware context expansion. Designed to handle large log volumes (10GB+ per day) with parallel processing.
|
||||
Parses application logs containing signature messages and loads them into SQLite for querying. Designed to handle large log volumes (10GB+ per day) with batched inserts and efficient parsing.
|
||||
|
||||
## Features
|
||||
|
||||
@@ -15,9 +15,6 @@ Parses application logs containing signature messages and loads them into SQLite
|
||||
- Parallel file processing for multi-day ingestion
|
||||
- Indexed columns (`session_id`, `version`) for efficient queries
|
||||
- Extensible parser architecture for adding new message types
|
||||
- Full-text search with timestamp and message extraction
|
||||
- Session-aware expanded search that follows `changeSessionId` chains and correlation IDs
|
||||
- Exception search filtered by app signature
|
||||
|
||||
## Installation
|
||||
|
||||
@@ -27,20 +24,16 @@ cargo build --release
|
||||
|
||||
## Usage
|
||||
|
||||
The CLI uses subcommands: `signature`, `search`, and `search-exceptions`.
|
||||
|
||||
### `signature` — Load log entries into SQLite
|
||||
|
||||
#### Process a single file
|
||||
### Process a single file
|
||||
|
||||
```bash
|
||||
log_ingest signature --file /path/to/logs.log --output output.db
|
||||
log_ingest --file /path/to/logs.log --output output.db
|
||||
```
|
||||
|
||||
#### Process a date range
|
||||
### Process a date range
|
||||
|
||||
```bash
|
||||
log_ingest signature \
|
||||
log_ingest \
|
||||
--from 2026/01/20 \
|
||||
--to 2026/01/21 \
|
||||
--base-dir /var/log/myapp \
|
||||
@@ -50,22 +43,22 @@ log_ingest signature \
|
||||
|
||||
The tool will look for files at `<base-dir>/YYYY/MM/DD/<filename>.gz` or `<base-dir>/YYYY/MM/DD/<filename>` for each day in the range.
|
||||
|
||||
#### Parallel processing
|
||||
### Parallel processing
|
||||
|
||||
When processing multiple files, parsing runs in parallel by default using all available CPU cores. A single writer thread handles database inserts to avoid SQLite contention.
|
||||
|
||||
```bash
|
||||
# Use all CPU cores (default)
|
||||
log_ingest signature --from 2026/01/01 --to 2026/01/31 ...
|
||||
log_ingest --from 2026/01/01 --to 2026/01/31 ...
|
||||
|
||||
# Limit to 4 threads
|
||||
log_ingest signature --threads 4 --from 2026/01/01 --to 2026/01/31 ...
|
||||
log_ingest --threads 4 --from 2026/01/01 --to 2026/01/31 ...
|
||||
|
||||
# Sequential processing (disable parallelism)
|
||||
log_ingest signature --threads 1 --from 2026/01/01 --to 2026/01/31 ...
|
||||
log_ingest --threads 1 --from 2026/01/01 --to 2026/01/31 ...
|
||||
```
|
||||
|
||||
#### Options
|
||||
### Options
|
||||
|
||||
| Option | Description |
|
||||
|--------|-------------|
|
||||
@@ -78,63 +71,6 @@ log_ingest signature --threads 1 --from 2026/01/01 --to 2026/01/31 ...
|
||||
| `--batch-size <N>` | Batch size for inserts (default: 10000) |
|
||||
| `--threads <N>` | Number of parallel threads (0 = all cores, 1 = sequential) |
|
||||
|
||||
### `search` — Search log files for matching lines
|
||||
|
||||
Searches a log file for lines containing a query string and prints the timestamp and message for each match. Supports both plain `.log` and gzip `.log.gz` files, with parallel chunk-based processing for plain files.
|
||||
|
||||
```bash
|
||||
# Basic search
|
||||
log_ingest search --file /path/to/logs.log --query "NullPointerException"
|
||||
|
||||
# Include correlationId in output
|
||||
log_ingest search --file /path/to/logs.log --query "timeout" -c
|
||||
|
||||
# Expanded search: find all lines sharing sessionId/correlationId with matches,
|
||||
# following changeSessionId chains backward to the session start (signature line)
|
||||
log_ingest search --file /path/to/logs.log --query "Exception" -e
|
||||
```
|
||||
|
||||
Expand mode (`-e`) performs a two-pass search:
|
||||
1. **Pass 1**: Scans the entire file to find matching lines and collects their session IDs and correlation IDs. Also builds a `changeSessionId` graph and tracks which sessions have signature lines.
|
||||
2. **Expansion**: Follows `changeSessionId` chains backward from seed sessions, stopping at sessions that have a signature (session start boundary).
|
||||
3. **Pass 2**: Re-reads the file and outputs all lines belonging to expanded session IDs or correlation IDs, stopping early when all expanded sessions are destroyed (`sessionDestroyed`).
|
||||
|
||||
#### Options
|
||||
|
||||
| Option | Description |
|
||||
|--------|-------------|
|
||||
| `--file <PATH>` | Log file to search |
|
||||
| `--query <TEXT>` | Text to search for in log lines |
|
||||
| `-c, --correlation-id` | Include correlationId in output |
|
||||
| `-e, --expand` | Expand results to full session context |
|
||||
| `--threads <N>` | Number of parallel threads (0 = all cores, 1 = sequential) |
|
||||
|
||||
### `search-exceptions` — Search for exceptions filtered by app
|
||||
|
||||
A specialized search that finds `Exception` lines and expands them to full session context, filtered to only sessions belonging to specific apps (identified by their `signature:` line).
|
||||
|
||||
```bash
|
||||
# Find exceptions for a specific app
|
||||
log_ingest search-exceptions --file /path/to/logs.log --app XAMARIN_APP
|
||||
|
||||
# Filter to multiple apps
|
||||
log_ingest search-exceptions --file /path/to/logs.log --app XAMARIN_APP --app ANOTHER_APP
|
||||
```
|
||||
|
||||
This uses the same two-pass expand approach as `search -e`, with an additional app-filtering step that:
|
||||
- Matches expanded sessions to their app via the `signature:APP/...` line
|
||||
- Propagates app membership forward through `changeSessionId` chains
|
||||
- Filters correlation IDs to only those originating from matching-app sessions
|
||||
- Uses strict app isolation in pass 2 to prevent lines from non-matching apps leaking through shared correlation IDs
|
||||
|
||||
#### Options
|
||||
|
||||
| Option | Description |
|
||||
|--------|-------------|
|
||||
| `--file <PATH>` | Log file to search |
|
||||
| `--app <NAME>` | Filter to sessions with this app signature (repeatable) |
|
||||
| `--threads <N>` | Number of parallel threads (0 = all cores, 1 = sequential) |
|
||||
|
||||
## Database Schema
|
||||
|
||||
The schema uses normalized lookup tables to minimize disk usage for large datasets.
|
||||
|
||||
32
src/main.rs
32
src/main.rs
@@ -120,26 +120,18 @@ fn main() -> Result<()> {
|
||||
|
||||
match args.command {
|
||||
Command::Signature(sig_args) => run_signature(sig_args),
|
||||
Command::Search(search_args) => {
|
||||
let path = search_args
|
||||
.file
|
||||
.to_str()
|
||||
.ok_or_else(|| anyhow!("File path contains invalid UTF-8"))?;
|
||||
search::run_search(
|
||||
path,
|
||||
&search_args.query,
|
||||
search_args.correlation_id,
|
||||
search_args.expand,
|
||||
search_args.threads,
|
||||
)
|
||||
}
|
||||
Command::SearchExceptions(args) => {
|
||||
let path = args
|
||||
.file
|
||||
.to_str()
|
||||
.ok_or_else(|| anyhow!("File path contains invalid UTF-8"))?;
|
||||
search::run_search_exceptions(path, &args.app, args.threads)
|
||||
}
|
||||
Command::Search(search_args) => search::run_search(
|
||||
search_args.file.to_str().unwrap(),
|
||||
&search_args.query,
|
||||
search_args.correlation_id,
|
||||
search_args.expand,
|
||||
search_args.threads,
|
||||
),
|
||||
Command::SearchExceptions(args) => search::run_search_exceptions(
|
||||
args.file.to_str().unwrap(),
|
||||
&args.app,
|
||||
args.threads,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
272
src/search.rs
272
src/search.rs
@@ -354,7 +354,7 @@ fn run_search_expanded(
|
||||
);
|
||||
|
||||
// Pass 2: filter and print (re-reads file; for gzip this re-decompresses from stream)
|
||||
let match_count = run_pass2(file_path, query, &expanded_sids, &expanded_cids, use_parallel, pool, false)?;
|
||||
let match_count = run_pass2(file_path, query, &expanded_sids, &expanded_cids, use_parallel, pool)?;
|
||||
|
||||
eprintln!("{} lines output", match_count);
|
||||
Ok(())
|
||||
@@ -443,13 +443,7 @@ fn expand_seeds(
|
||||
(expanded_sids, seed_cids.clone())
|
||||
}
|
||||
|
||||
fn format_pass2_match(
|
||||
trimmed: &str,
|
||||
query: &str,
|
||||
expanded_sids: &HashSet<String>,
|
||||
expanded_cids: &HashSet<String>,
|
||||
strict_app_isolation: bool,
|
||||
) -> Option<String> {
|
||||
fn format_pass2_match(trimmed: &str, query: &str, expanded_sids: &HashSet<String>, expanded_cids: &HashSet<String>) -> Option<String> {
|
||||
let is_direct_match = trimmed.contains(query);
|
||||
|
||||
let sid = SESSION_ID_RE
|
||||
@@ -465,14 +459,7 @@ fn format_pass2_match(
|
||||
let sid_match = sid.is_some_and(|s| expanded_sids.contains(s));
|
||||
let cid_match = cid.is_some_and(|c| expanded_cids.contains(c));
|
||||
|
||||
// When strict_app_isolation is enabled (search-exceptions), a CID match
|
||||
// alone is not enough — the line's session must also be in the filtered set
|
||||
// (or absent). This prevents leaking lines from non-matching apps that
|
||||
// happen to share a correlation ID.
|
||||
let effective_cid_match =
|
||||
cid_match && !(strict_app_isolation && sid.is_some() && !sid_match);
|
||||
|
||||
if !is_direct_match && !sid_match && !effective_cid_match {
|
||||
if !is_direct_match && !sid_match && !cid_match {
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -496,30 +483,6 @@ fn format_pass2_match(
|
||||
))
|
||||
}
|
||||
|
||||
/// Events collected per-line during pass2 parallel chunk processing.
|
||||
/// Preserves file order so the assembly phase can stop at sessionDestroyed.
|
||||
enum Pass2Event {
|
||||
/// Line matched and should be output.
|
||||
Match(String),
|
||||
/// An expanded session was destroyed (no output for this line).
|
||||
Destroy(String),
|
||||
/// Line matched AND an expanded session was destroyed on this line.
|
||||
MatchAndDestroy(String, String),
|
||||
}
|
||||
|
||||
/// Check whether a line destroys one of the expanded sessions.
|
||||
fn check_session_destroyed(trimmed: &str, expanded_sids: &HashSet<String>) -> Option<String> {
|
||||
if trimmed.contains("sessionDestroyed")
|
||||
&& let Some(caps) = SESSION_DESTROYED_RE.captures(trimmed)
|
||||
{
|
||||
let sid = normalize_session_id(caps.get(1).unwrap().as_str());
|
||||
if expanded_sids.contains(sid) {
|
||||
return Some(sid.to_string());
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn run_pass2(
|
||||
file_path: &str,
|
||||
query: &str,
|
||||
@@ -527,13 +490,13 @@ fn run_pass2(
|
||||
expanded_cids: &HashSet<String>,
|
||||
use_parallel: bool,
|
||||
pool: &rayon::ThreadPool,
|
||||
strict_app_isolation: bool,
|
||||
) -> Result<u64> {
|
||||
if !use_parallel {
|
||||
eprintln!("Pass 2: filtering sequentially...");
|
||||
let mut count = 0u64;
|
||||
let mut reader = read_log_file(file_path)?;
|
||||
let mut line = String::new();
|
||||
// Track sessions still alive; stop when all are destroyed
|
||||
let mut remaining_sids: HashSet<&str> =
|
||||
expanded_sids.iter().map(|s| s.as_str()).collect();
|
||||
|
||||
@@ -552,7 +515,7 @@ fn run_pass2(
|
||||
if remaining_sids.remove(destroyed_sid) {
|
||||
// Still output the sessionDestroyed line itself if it matches
|
||||
if let Some(formatted) =
|
||||
format_pass2_match(trimmed, query, expanded_sids, expanded_cids, strict_app_isolation)
|
||||
format_pass2_match(trimmed, query, expanded_sids, expanded_cids)
|
||||
{
|
||||
println!("{}", formatted);
|
||||
count += 1;
|
||||
@@ -569,7 +532,7 @@ fn run_pass2(
|
||||
}
|
||||
|
||||
if let Some(formatted) =
|
||||
format_pass2_match(trimmed, query, expanded_sids, expanded_cids, strict_app_isolation)
|
||||
format_pass2_match(trimmed, query, expanded_sids, expanded_cids)
|
||||
{
|
||||
println!("{}", formatted);
|
||||
count += 1;
|
||||
@@ -584,58 +547,30 @@ fn run_pass2(
|
||||
|
||||
eprintln!("Pass 2: filtering with {} threads...", chunks.len());
|
||||
|
||||
// Each chunk collects events preserving file order so the assembly
|
||||
// phase can apply the same sessionDestroyed stop logic as sequential.
|
||||
let results: Vec<Result<Vec<Pass2Event>>> = pool.install(|| {
|
||||
let results: Vec<Result<(Vec<String>, u64)>> = pool.install(|| {
|
||||
chunks
|
||||
.par_iter()
|
||||
.map(|&(start, end)| {
|
||||
let mut events = Vec::new();
|
||||
let mut lines = Vec::new();
|
||||
let mut count = 0u64;
|
||||
for_each_line_in_chunk(file_path, start, end, |trimmed| {
|
||||
let formatted = format_pass2_match(
|
||||
trimmed, query, expanded_sids, expanded_cids, strict_app_isolation,
|
||||
);
|
||||
let destroyed = check_session_destroyed(trimmed, expanded_sids);
|
||||
match (formatted, destroyed) {
|
||||
(Some(f), Some(d)) => events.push(Pass2Event::MatchAndDestroy(f, d)),
|
||||
(Some(f), None) => events.push(Pass2Event::Match(f)),
|
||||
(None, Some(d)) => events.push(Pass2Event::Destroy(d)),
|
||||
(None, None) => {}
|
||||
if let Some(formatted) = format_pass2_match(trimmed, query, expanded_sids, expanded_cids) {
|
||||
lines.push(formatted);
|
||||
count += 1;
|
||||
}
|
||||
})?;
|
||||
Ok(events)
|
||||
Ok((lines, count))
|
||||
})
|
||||
.collect()
|
||||
});
|
||||
|
||||
// Assemble in chunk order, stopping when all expanded sessions are destroyed.
|
||||
let mut remaining_sids: HashSet<&str> =
|
||||
expanded_sids.iter().map(|s| s.as_str()).collect();
|
||||
let mut total = 0u64;
|
||||
'outer: for result in results {
|
||||
let events = result?;
|
||||
for event in events {
|
||||
match event {
|
||||
Pass2Event::Match(line) => {
|
||||
println!("{}", line);
|
||||
total += 1;
|
||||
}
|
||||
Pass2Event::Destroy(sid) => {
|
||||
remaining_sids.remove(sid.as_str());
|
||||
if remaining_sids.is_empty() {
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
Pass2Event::MatchAndDestroy(line, sid) => {
|
||||
println!("{}", line);
|
||||
total += 1;
|
||||
remaining_sids.remove(sid.as_str());
|
||||
if remaining_sids.is_empty() {
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
for result in results {
|
||||
let (lines, count) = result?;
|
||||
for line in lines {
|
||||
println!("{}", line);
|
||||
}
|
||||
total += count;
|
||||
}
|
||||
|
||||
Ok(total)
|
||||
@@ -739,7 +674,7 @@ pub fn run_search_exceptions(file_path: &str, app_filters: &[String], threads: u
|
||||
expanded_cids.len()
|
||||
);
|
||||
|
||||
// Pass 2: filter and print (strict isolation prevents CID leaking across apps)
|
||||
// Pass 2: filter and print
|
||||
let match_count = run_pass2(
|
||||
file_path,
|
||||
query,
|
||||
@@ -747,7 +682,6 @@ pub fn run_search_exceptions(file_path: &str, app_filters: &[String], threads: u
|
||||
&filtered_cids,
|
||||
use_parallel,
|
||||
&pool,
|
||||
true,
|
||||
)?;
|
||||
|
||||
eprintln!("{} lines output", match_count);
|
||||
@@ -761,7 +695,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_syslog_timestamp_extraction() {
|
||||
let line = r#"Jan 27 17:21:17 a.b.c.d m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, msg="hello""#;
|
||||
let line = r#"Jan 27 17:21:17 tom003.testintg.dbank.loc m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, msg="hello""#;
|
||||
let caps = SYSLOG_TIMESTAMP_RE.captures(line).unwrap();
|
||||
assert_eq!(caps.get(1).unwrap().as_str(), "Jan 27 17:21:17");
|
||||
}
|
||||
@@ -778,7 +712,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_full_line_extraction() {
|
||||
let line = r#"Jan 27 17:21:17 a.b.c.d m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, lc=com.a.b.c.d.e.v5.endpoint.f, threadId=183, externalUserId=null, clientIp=1.1.1.1, xsrfToken=null, correlationId=abcd, sessionId=noSession, securityContext=CA_LOGGED_IN, userId=123, request_id=[(null)]snoSessio.abc, msg="getUnreadFilesCount(externalUserId=aaaaa,externalTeamSafeIds=bbbbb)", ex=""#;
|
||||
let line = r#"Jan 27 17:21:17 tom003.testintg.dbank.loc m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, lc=com.m1.m1.server.api.enterprise.v5.endpoint.TeamSafeEndpointV5, threadId=183, externalUserId=null, clientIp=160.83.36.132, xsrfToken=null, correlationId=aXjl_RwRs-3BWsshhut44wAABKY, sessionId=noSession, securityContext=CA_LOGGED_IN, userId=238, request_id=[(null)]snoSessio.r44wAABKY, msg="getUnreadFilesCount(externalUserId=102c1271eddd4e62832db4b1e70b8cb4,externalTeamSafeIds=053fac9da79543d5b90612ed7d5d0ca2)", ex=""#;
|
||||
|
||||
let ts = SYSLOG_TIMESTAMP_RE
|
||||
.captures(line)
|
||||
@@ -792,7 +726,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
msg,
|
||||
"getUnreadFilesCount(externalUserId=aaaaa,externalTeamSafeIds=bbbbb)"
|
||||
"getUnreadFilesCount(externalUserId=102c1271eddd4e62832db4b1e70b8cb4,externalTeamSafeIds=053fac9da79543d5b90612ed7d5d0ca2)"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -958,7 +892,7 @@ mod tests {
|
||||
// Line 1: signature for session A
|
||||
writeln!(
|
||||
file,
|
||||
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=AAAA.node001, correlationId=c1, msg="signature:APP/1.0/ details:OS:1""#
|
||||
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=AAAA.node001, correlationId=c1, msg="signature:XAMARIN_APP/1.0/ details:OS:1""#
|
||||
)?;
|
||||
// Line 2: normal line for session A, matches query
|
||||
writeln!(
|
||||
@@ -1000,7 +934,7 @@ mod tests {
|
||||
// Line 1: signature for session OLD
|
||||
writeln!(
|
||||
file,
|
||||
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=OLD.node001, correlationId=c0, msg="signature:APP/1.0/ details:OS:1""#
|
||||
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=OLD.node001, correlationId=c0, msg="signature:XAMARIN_APP/1.0/ details:OS:1""#
|
||||
)?;
|
||||
// Line 2: normal line for session OLD
|
||||
writeln!(
|
||||
@@ -1093,78 +1027,7 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// --- Regression: thread-consistency (P1) ---
|
||||
// Both sequential and parallel pass2 must stop at sessionDestroyed
|
||||
// and produce identical results regardless of thread count.
|
||||
|
||||
#[test]
|
||||
fn test_pass2_stops_at_session_destroyed() -> Result<()> {
|
||||
let dir = tempfile::tempdir()?;
|
||||
let log_path = dir.path().join("test.log");
|
||||
let mut file = std::fs::File::create(&log_path)?;
|
||||
|
||||
// Session AAAA with signature
|
||||
writeln!(
|
||||
file,
|
||||
r#"Jan 01 00:00:01 host app sessionId=AAAA.node001, correlationId=c1, msg="signature:APP_A/1.0/ details:OS:1""#
|
||||
)?;
|
||||
// Line that matches query, with CID c1
|
||||
writeln!(
|
||||
file,
|
||||
r#"Jan 01 00:00:02 host app sessionId=AAAA.node001, correlationId=c1, msg="findme error""#
|
||||
)?;
|
||||
// Session destroyed for AAAA
|
||||
writeln!(
|
||||
file,
|
||||
r#"Jan 01 00:00:03 host app msg="sessionDestroyed #s=1 sid=AAAA.node001 isnew=false""#
|
||||
)?;
|
||||
// Post-destroy line with same CID c1 — must NOT be included
|
||||
writeln!(
|
||||
file,
|
||||
r#"Jan 01 00:00:04 host app sessionId=noSession, correlationId=c1, msg="async callback after destroy""#
|
||||
)?;
|
||||
|
||||
// Run pass1 + expand
|
||||
let pool = build_thread_pool(1)?;
|
||||
let pass1 = run_pass1(log_path.to_str().unwrap(), "findme", false, &pool)?;
|
||||
let (expanded_sids, expanded_cids) = expand_seeds(
|
||||
&pass1.seed_session_ids,
|
||||
&pass1.seed_correlation_ids,
|
||||
&pass1.change_session_map,
|
||||
&pass1.sessions_with_signature,
|
||||
);
|
||||
|
||||
assert!(expanded_cids.contains("c1"));
|
||||
|
||||
// Sequential: lines 1 (sid match) + 2 (query+sid+cid) = 2, then
|
||||
// line 3 destroys AAAA (the only expanded session) → stop.
|
||||
// Line 4 must NOT be included.
|
||||
let seq_count = run_pass2(
|
||||
log_path.to_str().unwrap(),
|
||||
"findme",
|
||||
&expanded_sids,
|
||||
&expanded_cids,
|
||||
false,
|
||||
&pool,
|
||||
false,
|
||||
)?;
|
||||
assert_eq!(seq_count, 2, "sequential pass2 must stop at sessionDestroyed");
|
||||
|
||||
// Parallel: must produce the same count.
|
||||
let par_pool = build_thread_pool(2)?;
|
||||
let par_count = run_pass2(
|
||||
log_path.to_str().unwrap(),
|
||||
"findme",
|
||||
&expanded_sids,
|
||||
&expanded_cids,
|
||||
true,
|
||||
&par_pool,
|
||||
false,
|
||||
)?;
|
||||
assert_eq!(par_count, seq_count, "parallel pass2 must match sequential");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
// --- sessionDestroyed regex test ---
|
||||
|
||||
#[test]
|
||||
fn test_session_destroyed_regex() {
|
||||
@@ -1175,89 +1038,4 @@ mod tests {
|
||||
"2010F74498079D00A5647F3777545A64.node003"
|
||||
);
|
||||
}
|
||||
|
||||
// --- Regression: strict app isolation (P1) ---
|
||||
// search-exceptions --app APP_A must not include lines from APP_B even when
|
||||
// they share a correlation ID with an APP_A session.
|
||||
|
||||
#[test]
|
||||
fn test_search_exceptions_strict_app_isolation() -> Result<()> {
|
||||
let dir = tempfile::tempdir()?;
|
||||
let log_path = dir.path().join("test.log");
|
||||
let mut file = std::fs::File::create(&log_path)?;
|
||||
|
||||
// APP_A session
|
||||
writeln!(
|
||||
file,
|
||||
r#"Jan 01 00:00:01 host app sessionId=AAAA.node001, correlationId=c1, msg="signature:APP_A/1.0/ details:OS:1""#
|
||||
)?;
|
||||
// APP_A Exception line with shared CID
|
||||
writeln!(
|
||||
file,
|
||||
r#"Jan 01 00:00:02 host app sessionId=AAAA.node001, correlationId=shared_cid, msg="Exception in APP_A""#
|
||||
)?;
|
||||
// APP_B session with same shared CID
|
||||
writeln!(
|
||||
file,
|
||||
r#"Jan 01 00:00:03 host app sessionId=BBBB.node001, correlationId=c2, msg="signature:APP_B/2.0/ details:OS:1""#
|
||||
)?;
|
||||
writeln!(
|
||||
file,
|
||||
r#"Jan 01 00:00:04 host app sessionId=BBBB.node001, correlationId=shared_cid, msg="handling request from APP_B""#
|
||||
)?;
|
||||
|
||||
// Run the full pipeline as search-exceptions does
|
||||
let pool = build_thread_pool(1)?;
|
||||
let query = "Exception";
|
||||
let pass1 = run_pass1(log_path.to_str().unwrap(), query, false, &pool)?;
|
||||
|
||||
let (expanded_sids, expanded_cids) = expand_seeds(
|
||||
&pass1.seed_session_ids,
|
||||
&pass1.seed_correlation_ids,
|
||||
&pass1.change_session_map,
|
||||
&pass1.sessions_with_signature,
|
||||
);
|
||||
|
||||
let app_filters = vec!["APP_A".to_string()];
|
||||
let filtered_sids = filter_expanded_by_app(
|
||||
&expanded_sids,
|
||||
&pass1.change_session_map,
|
||||
&pass1.session_app_map,
|
||||
&app_filters,
|
||||
);
|
||||
|
||||
let filtered_cids: HashSet<String> = expanded_cids
|
||||
.iter()
|
||||
.filter(|cid| {
|
||||
pass1
|
||||
.seed_cid_sessions
|
||||
.get(cid.as_str())
|
||||
.is_some_and(|sid| filtered_sids.contains(sid))
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
// shared_cid should be in filtered_cids (it came from APP_A's session)
|
||||
assert!(filtered_cids.contains("shared_cid"));
|
||||
// BBBB should NOT be in filtered_sids
|
||||
assert!(!filtered_sids.contains("BBBB"));
|
||||
|
||||
// Run pass2 with strict_app_isolation=true
|
||||
let count = run_pass2(
|
||||
log_path.to_str().unwrap(),
|
||||
query,
|
||||
&filtered_sids,
|
||||
&filtered_cids,
|
||||
false,
|
||||
&pool,
|
||||
true,
|
||||
)?;
|
||||
|
||||
// Line 1 (sig for APP_A, sid match) = included
|
||||
// Line 2 (Exception, sid+cid match) = included
|
||||
// Line 3 (sig for APP_B, sid NOT in filtered) = excluded
|
||||
// Line 4 (APP_B line, cid=shared_cid but sid=BBBB not in filtered) = excluded by strict isolation
|
||||
assert_eq!(count, 2, "APP_B lines must not leak through shared CID");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user