Compare commits
7 Commits
main
...
feature/se
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bdae57d801 | ||
|
|
8620359c79 | ||
|
|
09fa289535 | ||
|
|
43de2acade | ||
|
|
7f4aab32a1 | ||
|
|
6e7f2c1eb3 | ||
|
|
bbf8102959 |
86
README.md
86
README.md
@@ -1,10 +1,10 @@
|
|||||||
# log_ingest
|
# log_ingest
|
||||||
|
|
||||||
A Rust CLI tool for ingesting and searching application log files.
|
A Rust CLI tool for loading log files into a SQLite database for analysis.
|
||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
Parses application logs containing signature messages and loads them into SQLite for querying. Also provides fast text search across log files with session-aware context expansion. Designed to handle large log volumes (10GB+ per day) with parallel processing.
|
Parses application logs containing signature messages and loads them into SQLite for querying. Designed to handle large log volumes (10GB+ per day) with batched inserts and efficient parsing.
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
@@ -15,9 +15,6 @@ Parses application logs containing signature messages and loads them into SQLite
|
|||||||
- Parallel file processing for multi-day ingestion
|
- Parallel file processing for multi-day ingestion
|
||||||
- Indexed columns (`session_id`, `version`) for efficient queries
|
- Indexed columns (`session_id`, `version`) for efficient queries
|
||||||
- Extensible parser architecture for adding new message types
|
- Extensible parser architecture for adding new message types
|
||||||
- Full-text search with timestamp and message extraction
|
|
||||||
- Session-aware expanded search that follows `changeSessionId` chains and correlation IDs
|
|
||||||
- Exception search filtered by app signature
|
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
@@ -27,20 +24,16 @@ cargo build --release
|
|||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
The CLI uses subcommands: `signature`, `search`, and `search-exceptions`.
|
### Process a single file
|
||||||
|
|
||||||
### `signature` — Load log entries into SQLite
|
|
||||||
|
|
||||||
#### Process a single file
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
log_ingest signature --file /path/to/logs.log --output output.db
|
log_ingest --file /path/to/logs.log --output output.db
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Process a date range
|
### Process a date range
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
log_ingest signature \
|
log_ingest \
|
||||||
--from 2026/01/20 \
|
--from 2026/01/20 \
|
||||||
--to 2026/01/21 \
|
--to 2026/01/21 \
|
||||||
--base-dir /var/log/myapp \
|
--base-dir /var/log/myapp \
|
||||||
@@ -50,22 +43,22 @@ log_ingest signature \
|
|||||||
|
|
||||||
The tool will look for files at `<base-dir>/YYYY/MM/DD/<filename>.gz` or `<base-dir>/YYYY/MM/DD/<filename>` for each day in the range.
|
The tool will look for files at `<base-dir>/YYYY/MM/DD/<filename>.gz` or `<base-dir>/YYYY/MM/DD/<filename>` for each day in the range.
|
||||||
|
|
||||||
#### Parallel processing
|
### Parallel processing
|
||||||
|
|
||||||
When processing multiple files, parsing runs in parallel by default using all available CPU cores. A single writer thread handles database inserts to avoid SQLite contention.
|
When processing multiple files, parsing runs in parallel by default using all available CPU cores. A single writer thread handles database inserts to avoid SQLite contention.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Use all CPU cores (default)
|
# Use all CPU cores (default)
|
||||||
log_ingest signature --from 2026/01/01 --to 2026/01/31 ...
|
log_ingest --from 2026/01/01 --to 2026/01/31 ...
|
||||||
|
|
||||||
# Limit to 4 threads
|
# Limit to 4 threads
|
||||||
log_ingest signature --threads 4 --from 2026/01/01 --to 2026/01/31 ...
|
log_ingest --threads 4 --from 2026/01/01 --to 2026/01/31 ...
|
||||||
|
|
||||||
# Sequential processing (disable parallelism)
|
# Sequential processing (disable parallelism)
|
||||||
log_ingest signature --threads 1 --from 2026/01/01 --to 2026/01/31 ...
|
log_ingest --threads 1 --from 2026/01/01 --to 2026/01/31 ...
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Options
|
### Options
|
||||||
|
|
||||||
| Option | Description |
|
| Option | Description |
|
||||||
|--------|-------------|
|
|--------|-------------|
|
||||||
@@ -78,63 +71,6 @@ log_ingest signature --threads 1 --from 2026/01/01 --to 2026/01/31 ...
|
|||||||
| `--batch-size <N>` | Batch size for inserts (default: 10000) |
|
| `--batch-size <N>` | Batch size for inserts (default: 10000) |
|
||||||
| `--threads <N>` | Number of parallel threads (0 = all cores, 1 = sequential) |
|
| `--threads <N>` | Number of parallel threads (0 = all cores, 1 = sequential) |
|
||||||
|
|
||||||
### `search` — Search log files for matching lines
|
|
||||||
|
|
||||||
Searches a log file for lines containing a query string and prints the timestamp and message for each match. Supports both plain `.log` and gzip `.log.gz` files, with parallel chunk-based processing for plain files.
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Basic search
|
|
||||||
log_ingest search --file /path/to/logs.log --query "NullPointerException"
|
|
||||||
|
|
||||||
# Include correlationId in output
|
|
||||||
log_ingest search --file /path/to/logs.log --query "timeout" -c
|
|
||||||
|
|
||||||
# Expanded search: find all lines sharing sessionId/correlationId with matches,
|
|
||||||
# following changeSessionId chains backward to the session start (signature line)
|
|
||||||
log_ingest search --file /path/to/logs.log --query "Exception" -e
|
|
||||||
```
|
|
||||||
|
|
||||||
Expand mode (`-e`) performs a two-pass search:
|
|
||||||
1. **Pass 1**: Scans the entire file to find matching lines and collects their session IDs and correlation IDs. Also builds a `changeSessionId` graph and tracks which sessions have signature lines.
|
|
||||||
2. **Expansion**: Follows `changeSessionId` chains backward from seed sessions, stopping at sessions that have a signature (session start boundary).
|
|
||||||
3. **Pass 2**: Re-reads the file and outputs all lines belonging to expanded session IDs or correlation IDs, stopping early when all expanded sessions are destroyed (`sessionDestroyed`).
|
|
||||||
|
|
||||||
#### Options
|
|
||||||
|
|
||||||
| Option | Description |
|
|
||||||
|--------|-------------|
|
|
||||||
| `--file <PATH>` | Log file to search |
|
|
||||||
| `--query <TEXT>` | Text to search for in log lines |
|
|
||||||
| `-c, --correlation-id` | Include correlationId in output |
|
|
||||||
| `-e, --expand` | Expand results to full session context |
|
|
||||||
| `--threads <N>` | Number of parallel threads (0 = all cores, 1 = sequential) |
|
|
||||||
|
|
||||||
### `search-exceptions` — Search for exceptions filtered by app
|
|
||||||
|
|
||||||
A specialized search that finds `Exception` lines and expands them to full session context, filtered to only sessions belonging to specific apps (identified by their `signature:` line).
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Find exceptions for a specific app
|
|
||||||
log_ingest search-exceptions --file /path/to/logs.log --app XAMARIN_APP
|
|
||||||
|
|
||||||
# Filter to multiple apps
|
|
||||||
log_ingest search-exceptions --file /path/to/logs.log --app XAMARIN_APP --app ANOTHER_APP
|
|
||||||
```
|
|
||||||
|
|
||||||
This uses the same two-pass expand approach as `search -e`, with an additional app-filtering step that:
|
|
||||||
- Matches expanded sessions to their app via the `signature:APP/...` line
|
|
||||||
- Propagates app membership forward through `changeSessionId` chains
|
|
||||||
- Filters correlation IDs to only those originating from matching-app sessions
|
|
||||||
- Uses strict app isolation in pass 2 to prevent lines from non-matching apps leaking through shared correlation IDs
|
|
||||||
|
|
||||||
#### Options
|
|
||||||
|
|
||||||
| Option | Description |
|
|
||||||
|--------|-------------|
|
|
||||||
| `--file <PATH>` | Log file to search |
|
|
||||||
| `--app <NAME>` | Filter to sessions with this app signature (repeatable) |
|
|
||||||
| `--threads <N>` | Number of parallel threads (0 = all cores, 1 = sequential) |
|
|
||||||
|
|
||||||
## Database Schema
|
## Database Schema
|
||||||
|
|
||||||
The schema uses normalized lookup tables to minimize disk usage for large datasets.
|
The schema uses normalized lookup tables to minimize disk usage for large datasets.
|
||||||
|
|||||||
24
src/main.rs
24
src/main.rs
@@ -120,26 +120,18 @@ fn main() -> Result<()> {
|
|||||||
|
|
||||||
match args.command {
|
match args.command {
|
||||||
Command::Signature(sig_args) => run_signature(sig_args),
|
Command::Signature(sig_args) => run_signature(sig_args),
|
||||||
Command::Search(search_args) => {
|
Command::Search(search_args) => search::run_search(
|
||||||
let path = search_args
|
search_args.file.to_str().unwrap(),
|
||||||
.file
|
|
||||||
.to_str()
|
|
||||||
.ok_or_else(|| anyhow!("File path contains invalid UTF-8"))?;
|
|
||||||
search::run_search(
|
|
||||||
path,
|
|
||||||
&search_args.query,
|
&search_args.query,
|
||||||
search_args.correlation_id,
|
search_args.correlation_id,
|
||||||
search_args.expand,
|
search_args.expand,
|
||||||
search_args.threads,
|
search_args.threads,
|
||||||
)
|
),
|
||||||
}
|
Command::SearchExceptions(args) => search::run_search_exceptions(
|
||||||
Command::SearchExceptions(args) => {
|
args.file.to_str().unwrap(),
|
||||||
let path = args
|
&args.app,
|
||||||
.file
|
args.threads,
|
||||||
.to_str()
|
),
|
||||||
.ok_or_else(|| anyhow!("File path contains invalid UTF-8"))?;
|
|
||||||
search::run_search_exceptions(path, &args.app, args.threads)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
270
src/search.rs
270
src/search.rs
@@ -354,7 +354,7 @@ fn run_search_expanded(
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Pass 2: filter and print (re-reads file; for gzip this re-decompresses from stream)
|
// Pass 2: filter and print (re-reads file; for gzip this re-decompresses from stream)
|
||||||
let match_count = run_pass2(file_path, query, &expanded_sids, &expanded_cids, use_parallel, pool, false)?;
|
let match_count = run_pass2(file_path, query, &expanded_sids, &expanded_cids, use_parallel, pool)?;
|
||||||
|
|
||||||
eprintln!("{} lines output", match_count);
|
eprintln!("{} lines output", match_count);
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -443,13 +443,7 @@ fn expand_seeds(
|
|||||||
(expanded_sids, seed_cids.clone())
|
(expanded_sids, seed_cids.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn format_pass2_match(
|
fn format_pass2_match(trimmed: &str, query: &str, expanded_sids: &HashSet<String>, expanded_cids: &HashSet<String>) -> Option<String> {
|
||||||
trimmed: &str,
|
|
||||||
query: &str,
|
|
||||||
expanded_sids: &HashSet<String>,
|
|
||||||
expanded_cids: &HashSet<String>,
|
|
||||||
strict_app_isolation: bool,
|
|
||||||
) -> Option<String> {
|
|
||||||
let is_direct_match = trimmed.contains(query);
|
let is_direct_match = trimmed.contains(query);
|
||||||
|
|
||||||
let sid = SESSION_ID_RE
|
let sid = SESSION_ID_RE
|
||||||
@@ -465,14 +459,7 @@ fn format_pass2_match(
|
|||||||
let sid_match = sid.is_some_and(|s| expanded_sids.contains(s));
|
let sid_match = sid.is_some_and(|s| expanded_sids.contains(s));
|
||||||
let cid_match = cid.is_some_and(|c| expanded_cids.contains(c));
|
let cid_match = cid.is_some_and(|c| expanded_cids.contains(c));
|
||||||
|
|
||||||
// When strict_app_isolation is enabled (search-exceptions), a CID match
|
if !is_direct_match && !sid_match && !cid_match {
|
||||||
// alone is not enough — the line's session must also be in the filtered set
|
|
||||||
// (or absent). This prevents leaking lines from non-matching apps that
|
|
||||||
// happen to share a correlation ID.
|
|
||||||
let effective_cid_match =
|
|
||||||
cid_match && !(strict_app_isolation && sid.is_some() && !sid_match);
|
|
||||||
|
|
||||||
if !is_direct_match && !sid_match && !effective_cid_match {
|
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -496,30 +483,6 @@ fn format_pass2_match(
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Events collected per-line during pass2 parallel chunk processing.
|
|
||||||
/// Preserves file order so the assembly phase can stop at sessionDestroyed.
|
|
||||||
enum Pass2Event {
|
|
||||||
/// Line matched and should be output.
|
|
||||||
Match(String),
|
|
||||||
/// An expanded session was destroyed (no output for this line).
|
|
||||||
Destroy(String),
|
|
||||||
/// Line matched AND an expanded session was destroyed on this line.
|
|
||||||
MatchAndDestroy(String, String),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check whether a line destroys one of the expanded sessions.
|
|
||||||
fn check_session_destroyed(trimmed: &str, expanded_sids: &HashSet<String>) -> Option<String> {
|
|
||||||
if trimmed.contains("sessionDestroyed")
|
|
||||||
&& let Some(caps) = SESSION_DESTROYED_RE.captures(trimmed)
|
|
||||||
{
|
|
||||||
let sid = normalize_session_id(caps.get(1).unwrap().as_str());
|
|
||||||
if expanded_sids.contains(sid) {
|
|
||||||
return Some(sid.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run_pass2(
|
fn run_pass2(
|
||||||
file_path: &str,
|
file_path: &str,
|
||||||
query: &str,
|
query: &str,
|
||||||
@@ -527,13 +490,13 @@ fn run_pass2(
|
|||||||
expanded_cids: &HashSet<String>,
|
expanded_cids: &HashSet<String>,
|
||||||
use_parallel: bool,
|
use_parallel: bool,
|
||||||
pool: &rayon::ThreadPool,
|
pool: &rayon::ThreadPool,
|
||||||
strict_app_isolation: bool,
|
|
||||||
) -> Result<u64> {
|
) -> Result<u64> {
|
||||||
if !use_parallel {
|
if !use_parallel {
|
||||||
eprintln!("Pass 2: filtering sequentially...");
|
eprintln!("Pass 2: filtering sequentially...");
|
||||||
let mut count = 0u64;
|
let mut count = 0u64;
|
||||||
let mut reader = read_log_file(file_path)?;
|
let mut reader = read_log_file(file_path)?;
|
||||||
let mut line = String::new();
|
let mut line = String::new();
|
||||||
|
// Track sessions still alive; stop when all are destroyed
|
||||||
let mut remaining_sids: HashSet<&str> =
|
let mut remaining_sids: HashSet<&str> =
|
||||||
expanded_sids.iter().map(|s| s.as_str()).collect();
|
expanded_sids.iter().map(|s| s.as_str()).collect();
|
||||||
|
|
||||||
@@ -552,7 +515,7 @@ fn run_pass2(
|
|||||||
if remaining_sids.remove(destroyed_sid) {
|
if remaining_sids.remove(destroyed_sid) {
|
||||||
// Still output the sessionDestroyed line itself if it matches
|
// Still output the sessionDestroyed line itself if it matches
|
||||||
if let Some(formatted) =
|
if let Some(formatted) =
|
||||||
format_pass2_match(trimmed, query, expanded_sids, expanded_cids, strict_app_isolation)
|
format_pass2_match(trimmed, query, expanded_sids, expanded_cids)
|
||||||
{
|
{
|
||||||
println!("{}", formatted);
|
println!("{}", formatted);
|
||||||
count += 1;
|
count += 1;
|
||||||
@@ -569,7 +532,7 @@ fn run_pass2(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some(formatted) =
|
if let Some(formatted) =
|
||||||
format_pass2_match(trimmed, query, expanded_sids, expanded_cids, strict_app_isolation)
|
format_pass2_match(trimmed, query, expanded_sids, expanded_cids)
|
||||||
{
|
{
|
||||||
println!("{}", formatted);
|
println!("{}", formatted);
|
||||||
count += 1;
|
count += 1;
|
||||||
@@ -584,58 +547,30 @@ fn run_pass2(
|
|||||||
|
|
||||||
eprintln!("Pass 2: filtering with {} threads...", chunks.len());
|
eprintln!("Pass 2: filtering with {} threads...", chunks.len());
|
||||||
|
|
||||||
// Each chunk collects events preserving file order so the assembly
|
let results: Vec<Result<(Vec<String>, u64)>> = pool.install(|| {
|
||||||
// phase can apply the same sessionDestroyed stop logic as sequential.
|
|
||||||
let results: Vec<Result<Vec<Pass2Event>>> = pool.install(|| {
|
|
||||||
chunks
|
chunks
|
||||||
.par_iter()
|
.par_iter()
|
||||||
.map(|&(start, end)| {
|
.map(|&(start, end)| {
|
||||||
let mut events = Vec::new();
|
let mut lines = Vec::new();
|
||||||
|
let mut count = 0u64;
|
||||||
for_each_line_in_chunk(file_path, start, end, |trimmed| {
|
for_each_line_in_chunk(file_path, start, end, |trimmed| {
|
||||||
let formatted = format_pass2_match(
|
if let Some(formatted) = format_pass2_match(trimmed, query, expanded_sids, expanded_cids) {
|
||||||
trimmed, query, expanded_sids, expanded_cids, strict_app_isolation,
|
lines.push(formatted);
|
||||||
);
|
count += 1;
|
||||||
let destroyed = check_session_destroyed(trimmed, expanded_sids);
|
|
||||||
match (formatted, destroyed) {
|
|
||||||
(Some(f), Some(d)) => events.push(Pass2Event::MatchAndDestroy(f, d)),
|
|
||||||
(Some(f), None) => events.push(Pass2Event::Match(f)),
|
|
||||||
(None, Some(d)) => events.push(Pass2Event::Destroy(d)),
|
|
||||||
(None, None) => {}
|
|
||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
Ok(events)
|
Ok((lines, count))
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
});
|
});
|
||||||
|
|
||||||
// Assemble in chunk order, stopping when all expanded sessions are destroyed.
|
|
||||||
let mut remaining_sids: HashSet<&str> =
|
|
||||||
expanded_sids.iter().map(|s| s.as_str()).collect();
|
|
||||||
let mut total = 0u64;
|
let mut total = 0u64;
|
||||||
'outer: for result in results {
|
for result in results {
|
||||||
let events = result?;
|
let (lines, count) = result?;
|
||||||
for event in events {
|
for line in lines {
|
||||||
match event {
|
|
||||||
Pass2Event::Match(line) => {
|
|
||||||
println!("{}", line);
|
println!("{}", line);
|
||||||
total += 1;
|
|
||||||
}
|
|
||||||
Pass2Event::Destroy(sid) => {
|
|
||||||
remaining_sids.remove(sid.as_str());
|
|
||||||
if remaining_sids.is_empty() {
|
|
||||||
break 'outer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Pass2Event::MatchAndDestroy(line, sid) => {
|
|
||||||
println!("{}", line);
|
|
||||||
total += 1;
|
|
||||||
remaining_sids.remove(sid.as_str());
|
|
||||||
if remaining_sids.is_empty() {
|
|
||||||
break 'outer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
total += count;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(total)
|
Ok(total)
|
||||||
@@ -739,7 +674,7 @@ pub fn run_search_exceptions(file_path: &str, app_filters: &[String], threads: u
|
|||||||
expanded_cids.len()
|
expanded_cids.len()
|
||||||
);
|
);
|
||||||
|
|
||||||
// Pass 2: filter and print (strict isolation prevents CID leaking across apps)
|
// Pass 2: filter and print
|
||||||
let match_count = run_pass2(
|
let match_count = run_pass2(
|
||||||
file_path,
|
file_path,
|
||||||
query,
|
query,
|
||||||
@@ -747,7 +682,6 @@ pub fn run_search_exceptions(file_path: &str, app_filters: &[String], threads: u
|
|||||||
&filtered_cids,
|
&filtered_cids,
|
||||||
use_parallel,
|
use_parallel,
|
||||||
&pool,
|
&pool,
|
||||||
true,
|
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
eprintln!("{} lines output", match_count);
|
eprintln!("{} lines output", match_count);
|
||||||
@@ -761,7 +695,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_syslog_timestamp_extraction() {
|
fn test_syslog_timestamp_extraction() {
|
||||||
let line = r#"Jan 27 17:21:17 a.b.c.d m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, msg="hello""#;
|
let line = r#"Jan 27 17:21:17 tom003.testintg.dbank.loc m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, msg="hello""#;
|
||||||
let caps = SYSLOG_TIMESTAMP_RE.captures(line).unwrap();
|
let caps = SYSLOG_TIMESTAMP_RE.captures(line).unwrap();
|
||||||
assert_eq!(caps.get(1).unwrap().as_str(), "Jan 27 17:21:17");
|
assert_eq!(caps.get(1).unwrap().as_str(), "Jan 27 17:21:17");
|
||||||
}
|
}
|
||||||
@@ -778,7 +712,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_full_line_extraction() {
|
fn test_full_line_extraction() {
|
||||||
let line = r#"Jan 27 17:21:17 a.b.c.d m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, lc=com.a.b.c.d.e.v5.endpoint.f, threadId=183, externalUserId=null, clientIp=1.1.1.1, xsrfToken=null, correlationId=abcd, sessionId=noSession, securityContext=CA_LOGGED_IN, userId=123, request_id=[(null)]snoSessio.abc, msg="getUnreadFilesCount(externalUserId=aaaaa,externalTeamSafeIds=bbbbb)", ex=""#;
|
let line = r#"Jan 27 17:21:17 tom003.testintg.dbank.loc m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, lc=com.m1.m1.server.api.enterprise.v5.endpoint.TeamSafeEndpointV5, threadId=183, externalUserId=null, clientIp=160.83.36.132, xsrfToken=null, correlationId=aXjl_RwRs-3BWsshhut44wAABKY, sessionId=noSession, securityContext=CA_LOGGED_IN, userId=238, request_id=[(null)]snoSessio.r44wAABKY, msg="getUnreadFilesCount(externalUserId=102c1271eddd4e62832db4b1e70b8cb4,externalTeamSafeIds=053fac9da79543d5b90612ed7d5d0ca2)", ex=""#;
|
||||||
|
|
||||||
let ts = SYSLOG_TIMESTAMP_RE
|
let ts = SYSLOG_TIMESTAMP_RE
|
||||||
.captures(line)
|
.captures(line)
|
||||||
@@ -792,7 +726,7 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
msg,
|
msg,
|
||||||
"getUnreadFilesCount(externalUserId=aaaaa,externalTeamSafeIds=bbbbb)"
|
"getUnreadFilesCount(externalUserId=102c1271eddd4e62832db4b1e70b8cb4,externalTeamSafeIds=053fac9da79543d5b90612ed7d5d0ca2)"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -958,7 +892,7 @@ mod tests {
|
|||||||
// Line 1: signature for session A
|
// Line 1: signature for session A
|
||||||
writeln!(
|
writeln!(
|
||||||
file,
|
file,
|
||||||
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=AAAA.node001, correlationId=c1, msg="signature:APP/1.0/ details:OS:1""#
|
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=AAAA.node001, correlationId=c1, msg="signature:XAMARIN_APP/1.0/ details:OS:1""#
|
||||||
)?;
|
)?;
|
||||||
// Line 2: normal line for session A, matches query
|
// Line 2: normal line for session A, matches query
|
||||||
writeln!(
|
writeln!(
|
||||||
@@ -1000,7 +934,7 @@ mod tests {
|
|||||||
// Line 1: signature for session OLD
|
// Line 1: signature for session OLD
|
||||||
writeln!(
|
writeln!(
|
||||||
file,
|
file,
|
||||||
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=OLD.node001, correlationId=c0, msg="signature:APP/1.0/ details:OS:1""#
|
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=OLD.node001, correlationId=c0, msg="signature:XAMARIN_APP/1.0/ details:OS:1""#
|
||||||
)?;
|
)?;
|
||||||
// Line 2: normal line for session OLD
|
// Line 2: normal line for session OLD
|
||||||
writeln!(
|
writeln!(
|
||||||
@@ -1093,78 +1027,7 @@ mod tests {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Regression: thread-consistency (P1) ---
|
// --- sessionDestroyed regex test ---
|
||||||
// Both sequential and parallel pass2 must stop at sessionDestroyed
|
|
||||||
// and produce identical results regardless of thread count.
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_pass2_stops_at_session_destroyed() -> Result<()> {
|
|
||||||
let dir = tempfile::tempdir()?;
|
|
||||||
let log_path = dir.path().join("test.log");
|
|
||||||
let mut file = std::fs::File::create(&log_path)?;
|
|
||||||
|
|
||||||
// Session AAAA with signature
|
|
||||||
writeln!(
|
|
||||||
file,
|
|
||||||
r#"Jan 01 00:00:01 host app sessionId=AAAA.node001, correlationId=c1, msg="signature:APP_A/1.0/ details:OS:1""#
|
|
||||||
)?;
|
|
||||||
// Line that matches query, with CID c1
|
|
||||||
writeln!(
|
|
||||||
file,
|
|
||||||
r#"Jan 01 00:00:02 host app sessionId=AAAA.node001, correlationId=c1, msg="findme error""#
|
|
||||||
)?;
|
|
||||||
// Session destroyed for AAAA
|
|
||||||
writeln!(
|
|
||||||
file,
|
|
||||||
r#"Jan 01 00:00:03 host app msg="sessionDestroyed #s=1 sid=AAAA.node001 isnew=false""#
|
|
||||||
)?;
|
|
||||||
// Post-destroy line with same CID c1 — must NOT be included
|
|
||||||
writeln!(
|
|
||||||
file,
|
|
||||||
r#"Jan 01 00:00:04 host app sessionId=noSession, correlationId=c1, msg="async callback after destroy""#
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// Run pass1 + expand
|
|
||||||
let pool = build_thread_pool(1)?;
|
|
||||||
let pass1 = run_pass1(log_path.to_str().unwrap(), "findme", false, &pool)?;
|
|
||||||
let (expanded_sids, expanded_cids) = expand_seeds(
|
|
||||||
&pass1.seed_session_ids,
|
|
||||||
&pass1.seed_correlation_ids,
|
|
||||||
&pass1.change_session_map,
|
|
||||||
&pass1.sessions_with_signature,
|
|
||||||
);
|
|
||||||
|
|
||||||
assert!(expanded_cids.contains("c1"));
|
|
||||||
|
|
||||||
// Sequential: lines 1 (sid match) + 2 (query+sid+cid) = 2, then
|
|
||||||
// line 3 destroys AAAA (the only expanded session) → stop.
|
|
||||||
// Line 4 must NOT be included.
|
|
||||||
let seq_count = run_pass2(
|
|
||||||
log_path.to_str().unwrap(),
|
|
||||||
"findme",
|
|
||||||
&expanded_sids,
|
|
||||||
&expanded_cids,
|
|
||||||
false,
|
|
||||||
&pool,
|
|
||||||
false,
|
|
||||||
)?;
|
|
||||||
assert_eq!(seq_count, 2, "sequential pass2 must stop at sessionDestroyed");
|
|
||||||
|
|
||||||
// Parallel: must produce the same count.
|
|
||||||
let par_pool = build_thread_pool(2)?;
|
|
||||||
let par_count = run_pass2(
|
|
||||||
log_path.to_str().unwrap(),
|
|
||||||
"findme",
|
|
||||||
&expanded_sids,
|
|
||||||
&expanded_cids,
|
|
||||||
true,
|
|
||||||
&par_pool,
|
|
||||||
false,
|
|
||||||
)?;
|
|
||||||
assert_eq!(par_count, seq_count, "parallel pass2 must match sequential");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_session_destroyed_regex() {
|
fn test_session_destroyed_regex() {
|
||||||
@@ -1175,89 +1038,4 @@ mod tests {
|
|||||||
"2010F74498079D00A5647F3777545A64.node003"
|
"2010F74498079D00A5647F3777545A64.node003"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Regression: strict app isolation (P1) ---
|
|
||||||
// search-exceptions --app APP_A must not include lines from APP_B even when
|
|
||||||
// they share a correlation ID with an APP_A session.
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_search_exceptions_strict_app_isolation() -> Result<()> {
|
|
||||||
let dir = tempfile::tempdir()?;
|
|
||||||
let log_path = dir.path().join("test.log");
|
|
||||||
let mut file = std::fs::File::create(&log_path)?;
|
|
||||||
|
|
||||||
// APP_A session
|
|
||||||
writeln!(
|
|
||||||
file,
|
|
||||||
r#"Jan 01 00:00:01 host app sessionId=AAAA.node001, correlationId=c1, msg="signature:APP_A/1.0/ details:OS:1""#
|
|
||||||
)?;
|
|
||||||
// APP_A Exception line with shared CID
|
|
||||||
writeln!(
|
|
||||||
file,
|
|
||||||
r#"Jan 01 00:00:02 host app sessionId=AAAA.node001, correlationId=shared_cid, msg="Exception in APP_A""#
|
|
||||||
)?;
|
|
||||||
// APP_B session with same shared CID
|
|
||||||
writeln!(
|
|
||||||
file,
|
|
||||||
r#"Jan 01 00:00:03 host app sessionId=BBBB.node001, correlationId=c2, msg="signature:APP_B/2.0/ details:OS:1""#
|
|
||||||
)?;
|
|
||||||
writeln!(
|
|
||||||
file,
|
|
||||||
r#"Jan 01 00:00:04 host app sessionId=BBBB.node001, correlationId=shared_cid, msg="handling request from APP_B""#
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// Run the full pipeline as search-exceptions does
|
|
||||||
let pool = build_thread_pool(1)?;
|
|
||||||
let query = "Exception";
|
|
||||||
let pass1 = run_pass1(log_path.to_str().unwrap(), query, false, &pool)?;
|
|
||||||
|
|
||||||
let (expanded_sids, expanded_cids) = expand_seeds(
|
|
||||||
&pass1.seed_session_ids,
|
|
||||||
&pass1.seed_correlation_ids,
|
|
||||||
&pass1.change_session_map,
|
|
||||||
&pass1.sessions_with_signature,
|
|
||||||
);
|
|
||||||
|
|
||||||
let app_filters = vec!["APP_A".to_string()];
|
|
||||||
let filtered_sids = filter_expanded_by_app(
|
|
||||||
&expanded_sids,
|
|
||||||
&pass1.change_session_map,
|
|
||||||
&pass1.session_app_map,
|
|
||||||
&app_filters,
|
|
||||||
);
|
|
||||||
|
|
||||||
let filtered_cids: HashSet<String> = expanded_cids
|
|
||||||
.iter()
|
|
||||||
.filter(|cid| {
|
|
||||||
pass1
|
|
||||||
.seed_cid_sessions
|
|
||||||
.get(cid.as_str())
|
|
||||||
.is_some_and(|sid| filtered_sids.contains(sid))
|
|
||||||
})
|
|
||||||
.cloned()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// shared_cid should be in filtered_cids (it came from APP_A's session)
|
|
||||||
assert!(filtered_cids.contains("shared_cid"));
|
|
||||||
// BBBB should NOT be in filtered_sids
|
|
||||||
assert!(!filtered_sids.contains("BBBB"));
|
|
||||||
|
|
||||||
// Run pass2 with strict_app_isolation=true
|
|
||||||
let count = run_pass2(
|
|
||||||
log_path.to_str().unwrap(),
|
|
||||||
query,
|
|
||||||
&filtered_sids,
|
|
||||||
&filtered_cids,
|
|
||||||
false,
|
|
||||||
&pool,
|
|
||||||
true,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// Line 1 (sig for APP_A, sid match) = included
|
|
||||||
// Line 2 (Exception, sid+cid match) = included
|
|
||||||
// Line 3 (sig for APP_B, sid NOT in filtered) = excluded
|
|
||||||
// Line 4 (APP_B line, cid=shared_cid but sid=BBBB not in filtered) = excluded by strict isolation
|
|
||||||
assert_eq!(count, 2, "APP_B lines must not leak through shared CID");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user