7 Commits

Author SHA1 Message Date
Alexandr Mansurov
bdae57d801 Search for exceptions 2026-02-20 23:32:59 +01:00
Alexandr Mansurov
8620359c79 Bump version 2026-02-20 22:29:23 +01:00
Alexandr Mansurov
09fa289535 Expansion improved 2026-02-20 22:29:10 +01:00
Alexandr Mansurov
43de2acade Expansion 2026-02-20 21:14:41 +01:00
Alexandr Mansurov
7f4aab32a1 Remove .github 2026-02-20 20:58:30 +01:00
Alexandr Mansurov
6e7f2c1eb3 Correlation IDs in the output 2026-02-20 20:51:14 +01:00
Alexandr Mansurov
bbf8102959 Add search 2026-02-20 15:31:14 +01:00
3 changed files with 48 additions and 342 deletions

View File

@@ -1,10 +1,10 @@
# log_ingest
A Rust CLI tool for ingesting and searching application log files.
A Rust CLI tool for loading log files into a SQLite database for analysis.
## Overview
Parses application logs containing signature messages and loads them into SQLite for querying. Also provides fast text search across log files with session-aware context expansion. Designed to handle large log volumes (10GB+ per day) with parallel processing.
Parses application logs containing signature messages and loads them into SQLite for querying. Designed to handle large log volumes (10GB+ per day) with batched inserts and efficient parsing.
## Features
@@ -15,9 +15,6 @@ Parses application logs containing signature messages and loads them into SQLite
- Parallel file processing for multi-day ingestion
- Indexed columns (`session_id`, `version`) for efficient queries
- Extensible parser architecture for adding new message types
- Full-text search with timestamp and message extraction
- Session-aware expanded search that follows `changeSessionId` chains and correlation IDs
- Exception search filtered by app signature
## Installation
@@ -27,20 +24,16 @@ cargo build --release
## Usage
The CLI uses subcommands: `signature`, `search`, and `search-exceptions`.
### `signature` — Load log entries into SQLite
#### Process a single file
### Process a single file
```bash
log_ingest signature --file /path/to/logs.log --output output.db
log_ingest --file /path/to/logs.log --output output.db
```
#### Process a date range
### Process a date range
```bash
log_ingest signature \
log_ingest \
--from 2026/01/20 \
--to 2026/01/21 \
--base-dir /var/log/myapp \
@@ -50,22 +43,22 @@ log_ingest signature \
The tool will look for files at `<base-dir>/YYYY/MM/DD/<filename>.gz` or `<base-dir>/YYYY/MM/DD/<filename>` for each day in the range.
#### Parallel processing
### Parallel processing
When processing multiple files, parsing runs in parallel by default using all available CPU cores. A single writer thread handles database inserts to avoid SQLite contention.
```bash
# Use all CPU cores (default)
log_ingest signature --from 2026/01/01 --to 2026/01/31 ...
log_ingest --from 2026/01/01 --to 2026/01/31 ...
# Limit to 4 threads
log_ingest signature --threads 4 --from 2026/01/01 --to 2026/01/31 ...
log_ingest --threads 4 --from 2026/01/01 --to 2026/01/31 ...
# Sequential processing (disable parallelism)
log_ingest signature --threads 1 --from 2026/01/01 --to 2026/01/31 ...
log_ingest --threads 1 --from 2026/01/01 --to 2026/01/31 ...
```
#### Options
### Options
| Option | Description |
|--------|-------------|
@@ -78,63 +71,6 @@ log_ingest signature --threads 1 --from 2026/01/01 --to 2026/01/31 ...
| `--batch-size <N>` | Batch size for inserts (default: 10000) |
| `--threads <N>` | Number of parallel threads (0 = all cores, 1 = sequential) |
### `search` — Search log files for matching lines
Searches a log file for lines containing a query string and prints the timestamp and message for each match. Supports both plain `.log` and gzip `.log.gz` files, with parallel chunk-based processing for plain files.
```bash
# Basic search
log_ingest search --file /path/to/logs.log --query "NullPointerException"
# Include correlationId in output
log_ingest search --file /path/to/logs.log --query "timeout" -c
# Expanded search: find all lines sharing sessionId/correlationId with matches,
# following changeSessionId chains backward to the session start (signature line)
log_ingest search --file /path/to/logs.log --query "Exception" -e
```
Expand mode (`-e`) performs a two-pass search:
1. **Pass 1**: Scans the entire file to find matching lines and collects their session IDs and correlation IDs. Also builds a `changeSessionId` graph and tracks which sessions have signature lines.
2. **Expansion**: Follows `changeSessionId` chains backward from seed sessions, stopping at sessions that have a signature (session start boundary).
3. **Pass 2**: Re-reads the file and outputs all lines belonging to expanded session IDs or correlation IDs, stopping early when all expanded sessions are destroyed (`sessionDestroyed`).
#### Options
| Option | Description |
|--------|-------------|
| `--file <PATH>` | Log file to search |
| `--query <TEXT>` | Text to search for in log lines |
| `-c, --correlation-id` | Include correlationId in output |
| `-e, --expand` | Expand results to full session context |
| `--threads <N>` | Number of parallel threads (0 = all cores, 1 = sequential) |
### `search-exceptions` — Search for exceptions filtered by app
A specialized search that finds `Exception` lines and expands them to full session context, filtered to only sessions belonging to specific apps (identified by their `signature:` line).
```bash
# Find exceptions for a specific app
log_ingest search-exceptions --file /path/to/logs.log --app XAMARIN_APP
# Filter to multiple apps
log_ingest search-exceptions --file /path/to/logs.log --app XAMARIN_APP --app ANOTHER_APP
```
This uses the same two-pass expand approach as `search -e`, with an additional app-filtering step that:
- Matches expanded sessions to their app via the `signature:APP/...` line
- Propagates app membership forward through `changeSessionId` chains
- Filters correlation IDs to only those originating from matching-app sessions
- Uses strict app isolation in pass 2 to prevent lines from non-matching apps leaking through shared correlation IDs
#### Options
| Option | Description |
|--------|-------------|
| `--file <PATH>` | Log file to search |
| `--app <NAME>` | Filter to sessions with this app signature (repeatable) |
| `--threads <N>` | Number of parallel threads (0 = all cores, 1 = sequential) |
## Database Schema
The schema uses normalized lookup tables to minimize disk usage for large datasets.

View File

@@ -120,26 +120,18 @@ fn main() -> Result<()> {
match args.command {
Command::Signature(sig_args) => run_signature(sig_args),
Command::Search(search_args) => {
let path = search_args
.file
.to_str()
.ok_or_else(|| anyhow!("File path contains invalid UTF-8"))?;
search::run_search(
path,
&search_args.query,
search_args.correlation_id,
search_args.expand,
search_args.threads,
)
}
Command::SearchExceptions(args) => {
let path = args
.file
.to_str()
.ok_or_else(|| anyhow!("File path contains invalid UTF-8"))?;
search::run_search_exceptions(path, &args.app, args.threads)
}
Command::Search(search_args) => search::run_search(
search_args.file.to_str().unwrap(),
&search_args.query,
search_args.correlation_id,
search_args.expand,
search_args.threads,
),
Command::SearchExceptions(args) => search::run_search_exceptions(
args.file.to_str().unwrap(),
&args.app,
args.threads,
),
}
}

View File

@@ -354,7 +354,7 @@ fn run_search_expanded(
);
// Pass 2: filter and print (re-reads file; for gzip this re-decompresses from stream)
let match_count = run_pass2(file_path, query, &expanded_sids, &expanded_cids, use_parallel, pool, false)?;
let match_count = run_pass2(file_path, query, &expanded_sids, &expanded_cids, use_parallel, pool)?;
eprintln!("{} lines output", match_count);
Ok(())
@@ -443,13 +443,7 @@ fn expand_seeds(
(expanded_sids, seed_cids.clone())
}
fn format_pass2_match(
trimmed: &str,
query: &str,
expanded_sids: &HashSet<String>,
expanded_cids: &HashSet<String>,
strict_app_isolation: bool,
) -> Option<String> {
fn format_pass2_match(trimmed: &str, query: &str, expanded_sids: &HashSet<String>, expanded_cids: &HashSet<String>) -> Option<String> {
let is_direct_match = trimmed.contains(query);
let sid = SESSION_ID_RE
@@ -465,14 +459,7 @@ fn format_pass2_match(
let sid_match = sid.is_some_and(|s| expanded_sids.contains(s));
let cid_match = cid.is_some_and(|c| expanded_cids.contains(c));
// When strict_app_isolation is enabled (search-exceptions), a CID match
// alone is not enough — the line's session must also be in the filtered set
// (or absent). This prevents leaking lines from non-matching apps that
// happen to share a correlation ID.
let effective_cid_match =
cid_match && !(strict_app_isolation && sid.is_some() && !sid_match);
if !is_direct_match && !sid_match && !effective_cid_match {
if !is_direct_match && !sid_match && !cid_match {
return None;
}
@@ -496,30 +483,6 @@ fn format_pass2_match(
))
}
/// Events collected per-line during pass2 parallel chunk processing.
/// Preserves file order so the assembly phase can stop at sessionDestroyed.
enum Pass2Event {
/// Line matched and should be output.
Match(String),
/// An expanded session was destroyed (no output for this line).
Destroy(String),
/// Line matched AND an expanded session was destroyed on this line.
MatchAndDestroy(String, String),
}
/// Check whether a line destroys one of the expanded sessions.
fn check_session_destroyed(trimmed: &str, expanded_sids: &HashSet<String>) -> Option<String> {
if trimmed.contains("sessionDestroyed")
&& let Some(caps) = SESSION_DESTROYED_RE.captures(trimmed)
{
let sid = normalize_session_id(caps.get(1).unwrap().as_str());
if expanded_sids.contains(sid) {
return Some(sid.to_string());
}
}
None
}
fn run_pass2(
file_path: &str,
query: &str,
@@ -527,13 +490,13 @@ fn run_pass2(
expanded_cids: &HashSet<String>,
use_parallel: bool,
pool: &rayon::ThreadPool,
strict_app_isolation: bool,
) -> Result<u64> {
if !use_parallel {
eprintln!("Pass 2: filtering sequentially...");
let mut count = 0u64;
let mut reader = read_log_file(file_path)?;
let mut line = String::new();
// Track sessions still alive; stop when all are destroyed
let mut remaining_sids: HashSet<&str> =
expanded_sids.iter().map(|s| s.as_str()).collect();
@@ -552,7 +515,7 @@ fn run_pass2(
if remaining_sids.remove(destroyed_sid) {
// Still output the sessionDestroyed line itself if it matches
if let Some(formatted) =
format_pass2_match(trimmed, query, expanded_sids, expanded_cids, strict_app_isolation)
format_pass2_match(trimmed, query, expanded_sids, expanded_cids)
{
println!("{}", formatted);
count += 1;
@@ -569,7 +532,7 @@ fn run_pass2(
}
if let Some(formatted) =
format_pass2_match(trimmed, query, expanded_sids, expanded_cids, strict_app_isolation)
format_pass2_match(trimmed, query, expanded_sids, expanded_cids)
{
println!("{}", formatted);
count += 1;
@@ -584,58 +547,30 @@ fn run_pass2(
eprintln!("Pass 2: filtering with {} threads...", chunks.len());
// Each chunk collects events preserving file order so the assembly
// phase can apply the same sessionDestroyed stop logic as sequential.
let results: Vec<Result<Vec<Pass2Event>>> = pool.install(|| {
let results: Vec<Result<(Vec<String>, u64)>> = pool.install(|| {
chunks
.par_iter()
.map(|&(start, end)| {
let mut events = Vec::new();
let mut lines = Vec::new();
let mut count = 0u64;
for_each_line_in_chunk(file_path, start, end, |trimmed| {
let formatted = format_pass2_match(
trimmed, query, expanded_sids, expanded_cids, strict_app_isolation,
);
let destroyed = check_session_destroyed(trimmed, expanded_sids);
match (formatted, destroyed) {
(Some(f), Some(d)) => events.push(Pass2Event::MatchAndDestroy(f, d)),
(Some(f), None) => events.push(Pass2Event::Match(f)),
(None, Some(d)) => events.push(Pass2Event::Destroy(d)),
(None, None) => {}
if let Some(formatted) = format_pass2_match(trimmed, query, expanded_sids, expanded_cids) {
lines.push(formatted);
count += 1;
}
})?;
Ok(events)
Ok((lines, count))
})
.collect()
});
// Assemble in chunk order, stopping when all expanded sessions are destroyed.
let mut remaining_sids: HashSet<&str> =
expanded_sids.iter().map(|s| s.as_str()).collect();
let mut total = 0u64;
'outer: for result in results {
let events = result?;
for event in events {
match event {
Pass2Event::Match(line) => {
println!("{}", line);
total += 1;
}
Pass2Event::Destroy(sid) => {
remaining_sids.remove(sid.as_str());
if remaining_sids.is_empty() {
break 'outer;
}
}
Pass2Event::MatchAndDestroy(line, sid) => {
println!("{}", line);
total += 1;
remaining_sids.remove(sid.as_str());
if remaining_sids.is_empty() {
break 'outer;
}
}
}
for result in results {
let (lines, count) = result?;
for line in lines {
println!("{}", line);
}
total += count;
}
Ok(total)
@@ -739,7 +674,7 @@ pub fn run_search_exceptions(file_path: &str, app_filters: &[String], threads: u
expanded_cids.len()
);
// Pass 2: filter and print (strict isolation prevents CID leaking across apps)
// Pass 2: filter and print
let match_count = run_pass2(
file_path,
query,
@@ -747,7 +682,6 @@ pub fn run_search_exceptions(file_path: &str, app_filters: &[String], threads: u
&filtered_cids,
use_parallel,
&pool,
true,
)?;
eprintln!("{} lines output", match_count);
@@ -761,7 +695,7 @@ mod tests {
#[test]
fn test_syslog_timestamp_extraction() {
let line = r#"Jan 27 17:21:17 a.b.c.d m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, msg="hello""#;
let line = r#"Jan 27 17:21:17 tom003.testintg.dbank.loc m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, msg="hello""#;
let caps = SYSLOG_TIMESTAMP_RE.captures(line).unwrap();
assert_eq!(caps.get(1).unwrap().as_str(), "Jan 27 17:21:17");
}
@@ -778,7 +712,7 @@ mod tests {
#[test]
fn test_full_line_extraction() {
let line = r#"Jan 27 17:21:17 a.b.c.d m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, lc=com.a.b.c.d.e.v5.endpoint.f, threadId=183, externalUserId=null, clientIp=1.1.1.1, xsrfToken=null, correlationId=abcd, sessionId=noSession, securityContext=CA_LOGGED_IN, userId=123, request_id=[(null)]snoSessio.abc, msg="getUnreadFilesCount(externalUserId=aaaaa,externalTeamSafeIds=bbbbb)", ex=""#;
let line = r#"Jan 27 17:21:17 tom003.testintg.dbank.loc m1s-kv dt="2026-01-27 17:21:17,524", ll=INFO, lc=com.m1.m1.server.api.enterprise.v5.endpoint.TeamSafeEndpointV5, threadId=183, externalUserId=null, clientIp=160.83.36.132, xsrfToken=null, correlationId=aXjl_RwRs-3BWsshhut44wAABKY, sessionId=noSession, securityContext=CA_LOGGED_IN, userId=238, request_id=[(null)]snoSessio.r44wAABKY, msg="getUnreadFilesCount(externalUserId=102c1271eddd4e62832db4b1e70b8cb4,externalTeamSafeIds=053fac9da79543d5b90612ed7d5d0ca2)", ex=""#;
let ts = SYSLOG_TIMESTAMP_RE
.captures(line)
@@ -792,7 +726,7 @@ mod tests {
.unwrap();
assert_eq!(
msg,
"getUnreadFilesCount(externalUserId=aaaaa,externalTeamSafeIds=bbbbb)"
"getUnreadFilesCount(externalUserId=102c1271eddd4e62832db4b1e70b8cb4,externalTeamSafeIds=053fac9da79543d5b90612ed7d5d0ca2)"
);
}
@@ -958,7 +892,7 @@ mod tests {
// Line 1: signature for session A
writeln!(
file,
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=AAAA.node001, correlationId=c1, msg="signature:APP/1.0/ details:OS:1""#
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=AAAA.node001, correlationId=c1, msg="signature:XAMARIN_APP/1.0/ details:OS:1""#
)?;
// Line 2: normal line for session A, matches query
writeln!(
@@ -1000,7 +934,7 @@ mod tests {
// Line 1: signature for session OLD
writeln!(
file,
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=OLD.node001, correlationId=c0, msg="signature:APP/1.0/ details:OS:1""#
r#"Jan 01 00:00:01 host app dt="2026-01-01 00:00:01,000", sessionId=OLD.node001, correlationId=c0, msg="signature:XAMARIN_APP/1.0/ details:OS:1""#
)?;
// Line 2: normal line for session OLD
writeln!(
@@ -1093,78 +1027,7 @@ mod tests {
Ok(())
}
// --- Regression: thread-consistency (P1) ---
// Both sequential and parallel pass2 must stop at sessionDestroyed
// and produce identical results regardless of thread count.
#[test]
fn test_pass2_stops_at_session_destroyed() -> Result<()> {
let dir = tempfile::tempdir()?;
let log_path = dir.path().join("test.log");
let mut file = std::fs::File::create(&log_path)?;
// Session AAAA with signature
writeln!(
file,
r#"Jan 01 00:00:01 host app sessionId=AAAA.node001, correlationId=c1, msg="signature:APP_A/1.0/ details:OS:1""#
)?;
// Line that matches query, with CID c1
writeln!(
file,
r#"Jan 01 00:00:02 host app sessionId=AAAA.node001, correlationId=c1, msg="findme error""#
)?;
// Session destroyed for AAAA
writeln!(
file,
r#"Jan 01 00:00:03 host app msg="sessionDestroyed #s=1 sid=AAAA.node001 isnew=false""#
)?;
// Post-destroy line with same CID c1 — must NOT be included
writeln!(
file,
r#"Jan 01 00:00:04 host app sessionId=noSession, correlationId=c1, msg="async callback after destroy""#
)?;
// Run pass1 + expand
let pool = build_thread_pool(1)?;
let pass1 = run_pass1(log_path.to_str().unwrap(), "findme", false, &pool)?;
let (expanded_sids, expanded_cids) = expand_seeds(
&pass1.seed_session_ids,
&pass1.seed_correlation_ids,
&pass1.change_session_map,
&pass1.sessions_with_signature,
);
assert!(expanded_cids.contains("c1"));
// Sequential: lines 1 (sid match) + 2 (query+sid+cid) = 2, then
// line 3 destroys AAAA (the only expanded session) → stop.
// Line 4 must NOT be included.
let seq_count = run_pass2(
log_path.to_str().unwrap(),
"findme",
&expanded_sids,
&expanded_cids,
false,
&pool,
false,
)?;
assert_eq!(seq_count, 2, "sequential pass2 must stop at sessionDestroyed");
// Parallel: must produce the same count.
let par_pool = build_thread_pool(2)?;
let par_count = run_pass2(
log_path.to_str().unwrap(),
"findme",
&expanded_sids,
&expanded_cids,
true,
&par_pool,
false,
)?;
assert_eq!(par_count, seq_count, "parallel pass2 must match sequential");
Ok(())
}
// --- sessionDestroyed regex test ---
#[test]
fn test_session_destroyed_regex() {
@@ -1175,89 +1038,4 @@ mod tests {
"2010F74498079D00A5647F3777545A64.node003"
);
}
// --- Regression: strict app isolation (P1) ---
// search-exceptions --app APP_A must not include lines from APP_B even when
// they share a correlation ID with an APP_A session.
#[test]
fn test_search_exceptions_strict_app_isolation() -> Result<()> {
let dir = tempfile::tempdir()?;
let log_path = dir.path().join("test.log");
let mut file = std::fs::File::create(&log_path)?;
// APP_A session
writeln!(
file,
r#"Jan 01 00:00:01 host app sessionId=AAAA.node001, correlationId=c1, msg="signature:APP_A/1.0/ details:OS:1""#
)?;
// APP_A Exception line with shared CID
writeln!(
file,
r#"Jan 01 00:00:02 host app sessionId=AAAA.node001, correlationId=shared_cid, msg="Exception in APP_A""#
)?;
// APP_B session with same shared CID
writeln!(
file,
r#"Jan 01 00:00:03 host app sessionId=BBBB.node001, correlationId=c2, msg="signature:APP_B/2.0/ details:OS:1""#
)?;
writeln!(
file,
r#"Jan 01 00:00:04 host app sessionId=BBBB.node001, correlationId=shared_cid, msg="handling request from APP_B""#
)?;
// Run the full pipeline as search-exceptions does
let pool = build_thread_pool(1)?;
let query = "Exception";
let pass1 = run_pass1(log_path.to_str().unwrap(), query, false, &pool)?;
let (expanded_sids, expanded_cids) = expand_seeds(
&pass1.seed_session_ids,
&pass1.seed_correlation_ids,
&pass1.change_session_map,
&pass1.sessions_with_signature,
);
let app_filters = vec!["APP_A".to_string()];
let filtered_sids = filter_expanded_by_app(
&expanded_sids,
&pass1.change_session_map,
&pass1.session_app_map,
&app_filters,
);
let filtered_cids: HashSet<String> = expanded_cids
.iter()
.filter(|cid| {
pass1
.seed_cid_sessions
.get(cid.as_str())
.is_some_and(|sid| filtered_sids.contains(sid))
})
.cloned()
.collect();
// shared_cid should be in filtered_cids (it came from APP_A's session)
assert!(filtered_cids.contains("shared_cid"));
// BBBB should NOT be in filtered_sids
assert!(!filtered_sids.contains("BBBB"));
// Run pass2 with strict_app_isolation=true
let count = run_pass2(
log_path.to_str().unwrap(),
query,
&filtered_sids,
&filtered_cids,
false,
&pool,
true,
)?;
// Line 1 (sig for APP_A, sid match) = included
// Line 2 (Exception, sid+cid match) = included
// Line 3 (sig for APP_B, sid NOT in filtered) = excluded
// Line 4 (APP_B line, cid=shared_cid but sid=BBBB not in filtered) = excluded by strict isolation
assert_eq!(count, 2, "APP_B lines must not leak through shared CID");
Ok(())
}
}