Case Study
Local-LLM Log Anomaly CLI
Lightweight Python CLI that batches system logs and uses a local Ollama model to flag security risks, failures, and anomalous behavior — stdlib only.
- Python
- Ollama
- urllib
- argparse
- llama3.2
- python
- ollama
- llm
- security
- homelab
- zero-dependency
Overview
A complete, lightweight Python command-line tool that processes system logs and uses a local LLM to flag security risks, system failures, or anomalous behaviors.
To match the clean style of Project 1, this script relies entirely on Python’s standard library (urllib.request, argparse, json). It connects directly to a locally running instance of Ollama — lightweight models like llama3.2:1b or qwen2.5:1.5b run well on a local Linux machine.
What it implements
- Ollama health check — verifies the local daemon before processing
- Contextual batching — feeds log chunks sequentially so the model can spot patterns (e.g. brute-force login attempts)
- Structured JSON output — prompts the LLM for strict JSON, with markdown fence sanitization on parse failure
- Color-coded terminal UI — NORMAL, WARNING, and ANOMALY severity with inline reasons
Project setup
1. Ensure Ollama is running
Pull a small model:
ollama pull llama3.2:1b
2. Create project files
mkdir local-log-ai && cd local-log-ai
touch log_analyzer.py mock_system.log
3. Sample log data
Paste this into mock_system.log for immediate testing:
2026-05-18 10:14:22 INFO [auth_service] User 'davidc' successfully authenticated via session token.
2026-05-18 10:15:01 WARN [db_pool] Connection pool utilization at 82%. Spawning 2 auxiliary connections.
2026-05-18 10:15:45 ERROR [auth_service] Failed password attempt for user 'root' from IP 192.168.1.145.
2026-05-18 10:15:46 ERROR [auth_service] Failed password attempt for user 'root' from IP 192.168.1.145.
2026-05-18 10:15:47 ERROR [auth_service] Failed password attempt for user 'root' from IP 192.168.1.145.
2026-05-18 10:16:12 INFO [gcr_service] Tenant isolation layer successfully verified container constraints.
2026-05-18 10:17:02 CRIT [kernel] Out of memory (OOM): Kill process 41029 (java) score 842 or sacrifice child.
2026-05-18 10:17:05 WARN [systemd] Service 'crm-backend' terminated unexpectedly with exit code 137.
The code (log_analyzer.py)
#!/usr/bin/env python3
import argparse
import json
import sys
import urllib.request
import urllib.error
# --- ANSI Terminal Colors ---
CLR_RESET = "\033[0m"
CLR_GREEN = "\033[92m"
CLR_YELLOW = "\033[93m"
CLR_RED = "\033[91m"
CLR_CYAN = "\033[96m"
CLR_BOLD = "\033[1m"
OLLAMA_URL = "http://localhost:11434/api/chat"
DEFAULT_MODEL = "llama3.2:1b"
SYSTEM_PROMPT = """
You are a senior systems engineer and security operations analyst.
Analyze the following batch of log lines for anomalies, security issues, or critical infrastructure errors.
You must reply strictly with a single JSON object. Do not include any conversational markdown wrapper or backticks.
The JSON object must look exactly like this:
{
"analysis": [
{
"line_number": 1,
"severity": "NORMAL" | "WARNING" | "ANOMALY",
"reason": "Brief text explanation of why"
}
]
}
"""
def check_ollama_availability():
"""Verifies that the local Ollama instance is up and listening."""
try:
urllib.request.urlopen("http://localhost:11434/", timeout=2)
except Exception:
print(f"{CLR_RED}Error: Local Ollama server is not running on http://localhost:11434{CLR_RESET}")
print("Please start Ollama (`ollama serve`) and try again.")
sys.exit(1)
def query_local_llm(model: str, log_batch: str) -> dict:
"""Sends the log batch natively via HTTP POST to the local Ollama daemon."""
payload = {
"model": model,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Analyze this log batch:\n{log_batch}"}
],
"stream": False,
"options": {
"temperature": 0.1
}
}
req = urllib.request.Request(
OLLAMA_URL,
data=json.dumps(payload).encode('utf-8'),
headers={'Content-Type': 'application/json'}
)
try:
with urllib.request.urlopen(req) as response:
res_data = json.loads(response.read().decode('utf-8'))
raw_content = res_data['message']['content'].strip()
if raw_content.startswith("```"):
raw_content = raw_content.split("\n", 1)[1].rsplit("\n", 1)[0].strip()
if raw_content.startswith("json"):
raw_content = raw_content[4:].strip()
return json.loads(raw_content)
except urllib.error.URLError as e:
print(f"{CLR_RED}Failed to talk to Ollama: {e.reason}{CLR_RESET}")
return {"analysis": []}
except json.JSONDecodeError:
print(f"{CLR_YELLOW}[Raw LLM output failed to parse as valid JSON. Skipping batch.]{CLR_RESET}")
return {"analysis": []}
def process_logs(file_path: str, model: str, batch_size: int):
"""Reads logs, structures them into batch strings, and processes outcomes."""
print(f"{CLR_CYAN}🤖 Booting AI Analyzer using model: '{model}'...{CLR_RESET}\n")
try:
with open(file_path, 'r') as f:
lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
print(f"{CLR_RED}Error: File '{file_path}' not found.{CLR_RESET}")
sys.exit(1)
for i in range(0, len(lines), batch_size):
batch_lines = lines[i:i + batch_size]
batch_payload_string = ""
for idx, line in enumerate(batch_lines):
line_num = i + idx + 1
batch_payload_string += f"[{line_num}] {line}\n"
print(f"{CLR_BOLD}Analyzing lines {i+1} to {i+len(batch_lines)}...{CLR_RESET}")
ai_response = query_local_llm(model, batch_payload_string)
analysis_map = {item['line_number']: item for item in ai_response.get('analysis', [])}
for idx, line in enumerate(batch_lines):
line_num = i + idx + 1
analysis = analysis_map.get(line_num, {"severity": "NORMAL", "reason": "No anomalies found."})
severity = analysis.get("severity", "NORMAL").upper()
reason = analysis.get("reason", "")
if severity == "ANOMALY":
color = CLR_RED
prefix = "[🚨 ANOMALY]"
elif severity == "WARNING":
color = CLR_YELLOW
prefix = "[⚠️ WARNING]"
else:
color = CLR_GREEN
prefix = "[✅ NORMAL ]"
print(f" {color}{prefix} Line {line_num}: {line}{CLR_RESET}")
if severity != "NORMAL":
print(f" └─ {CLR_BOLD}Reason:{CLR_RESET} {reason}")
print("-" * 60)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="AI-Driven Local Log Anomaly Inspector")
parser.add_argument("logfile", help="Path to the system target log file")
parser.add_argument("--model", default=DEFAULT_MODEL, help=f"Ollama model name (default: {DEFAULT_MODEL})")
parser.add_argument("--batch", type=int, default=5, help="Number of log lines to stream per model evaluation context (default: 5)")
args = parser.parse_args()
check_ollama_availability()
process_logs(args.logfile, args.model, args.batch)
Execution
Make the script executable and point it at your mock log file:
chmod +x log_analyzer.py
./log_analyzer.py mock_system.log
Optional flags:
./log_analyzer.py mock_system.log --model qwen2.5:1.5b --batch 8
Why this shines on a portfolio
- Zero external overhead — JSON parsing, CLI arguments, and HTTP networking through Python primitives only
- Contextual batching — chunks preserve local context so the model can correlate events (rapid failed
rootlogins → brute-force pattern) - Homelab-ready — runs entirely on-machine with Ollama; no cloud API keys or pip install step
Related work
- Project 1: Markdown API Router — same zero-dependency philosophy in TypeScript
- Project 3: Legacy Flat-File Bridge — enterprise parsing in modern Java
- Project 4: Recursive Hierarchy Builder — PostgreSQL recursive CTE to nested JSON
- Project 5: Concurrent Link Sweeper — Go goroutines for network sweeps
- Project 6: Local Context Server — MCP-style Linux tool gateway for agents
- Project 7: Multi-Tenant Migration Engine — 2PC fleet migrations for Gnomad CRM
- Nobara homelab basics — local Linux stack this tool is built for