provisioning/core/nulib/dataframes/log_processor.nu
2025-09-22 23:11:41 +01:00

547 lines
15 KiB
Plaintext

#!/usr/bin/env nu
# Log Processing Module for Provisioning System
# Advanced log collection, parsing, and analysis using DataFrames
use polars_integration.nu *
use ../lib_provisioning/utils/settings.nu *
# Log sources configuration
export def get_log_sources []: nothing -> record {
{
system: {
paths: ["/var/log/syslog", "/var/log/messages"]
format: "syslog"
enabled: true
}
provisioning: {
paths: [
($env.PROVISIONING_PATH? | default "/usr/local/provisioning" | path join "logs")
"~/.provisioning/logs"
]
format: "json"
enabled: true
}
containers: {
paths: [
"/var/log/containers"
"/var/lib/docker/containers"
]
format: "json"
enabled: ($env.DOCKER_HOST? | is-not-empty)
}
kubernetes: {
command: "kubectl logs"
format: "json"
enabled: ((which kubectl | length) > 0)
}
cloud_providers: {
aws: {
cloudwatch: true
s3_logs: []
enabled: ($env.AWS_PROFILE? | is-not-empty)
}
gcp: {
stackdriver: true
enabled: ($env.GOOGLE_CLOUD_PROJECT? | is-not-empty)
}
}
}
}
# Collect logs from all configured sources
export def collect_logs [
--since: string = "1h"
--sources: list<string> = []
--output_format: string = "dataframe"
--filter_level: string = "info"
--include_metadata = true
]: nothing -> any {
print $"📊 Collecting logs from the last ($since)..."
let log_sources = get_log_sources
let enabled_sources = if ($sources | is-empty) {
$log_sources | transpose source config | where {|row| $row.config.enabled} | get source
} else {
$sources
}
print $"🔍 Enabled sources: ($enabled_sources | str join ', ')"
let collected_logs = ($enabled_sources | each {|source|
print $"📥 Collecting from: ($source)"
collect_from_source $source $log_sources.$source --since $since
} | flatten)
print $"📋 Collected ($collected_logs | length) log entries"
# Filter by log level
let filtered_logs = (filter_by_level $collected_logs $filter_level)
# Process into requested format
match $output_format {
"dataframe" => {
create_infra_dataframe $filtered_logs --source "logs"
}
"json" => {
$filtered_logs | to json
}
"csv" => {
$filtered_logs | to csv
}
_ => {
$filtered_logs
}
}
}
def collect_from_source [
source: string
config: record
--since: string = "1h"
]: nothing -> list {
match $source {
"system" => {
collect_system_logs $config --since $since
}
"provisioning" => {
collect_provisioning_logs $config --since $since
}
"containers" => {
collect_container_logs $config --since $since
}
"kubernetes" => {
collect_kubernetes_logs $config --since $since
}
_ => {
print $"⚠️ Unknown log source: ($source)"
[]
}
}
}
def collect_system_logs [
config: record
--since: string = "1h"
]: record -> list {
$config.paths | each {|path|
if ($path | path exists) {
let content = (read_recent_logs $path --since $since)
$content | each {|line|
parse_system_log_line $line $path
}
} else {
[]
}
} | flatten
}
def collect_provisioning_logs [
config: record
--since: string = "1h"
]: record -> list {
$config.paths | each {|log_dir|
if ($log_dir | path exists) {
let log_files = (ls ($log_dir | path join "*.log") | get name)
$log_files | each {|file|
if ($file | str ends-with ".json") {
collect_json_logs $file --since $since
} else {
collect_text_logs $file --since $since
}
} | flatten
} else {
[]
}
} | flatten
}
def collect_container_logs [
config: record
--since: string = "1h"
]: record -> list {
if ((which docker | length) > 0) {
collect_docker_logs --since $since
} else {
print "⚠️ Docker not available for container log collection"
[]
}
}
def collect_kubernetes_logs [
config: record
--since: string = "1h"
]: record -> list {
if ((which kubectl | length) > 0) {
collect_k8s_logs --since $since
} else {
print "⚠️ kubectl not available for Kubernetes log collection"
[]
}
}
def read_recent_logs [
file_path: string
--since: string = "1h"
]: string -> list {
let since_timestamp = ((date now) - (parse_duration $since))
if ($file_path | path exists) {
# Use tail with approximate line count based on time
let estimated_lines = match $since {
"1m" => 100
"5m" => 500
"1h" => 3600
"1d" => 86400
_ => 1000
}
(tail -n $estimated_lines $file_path | lines)
} else {
[]
}
}
def parse_system_log_line [
line: string
source_file: string
]: nothing -> record {
# Parse standard syslog format
let syslog_pattern = '(?P<timestamp>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+(?P<hostname>\S+)\s+(?P<process>\S+?)(\[(?P<pid>\d+)\])?:\s*(?P<message>.*)'
let parsed = ($line | parse --regex $syslog_pattern)
if ($parsed | length) > 0 {
let entry = $parsed.0
{
timestamp: (parse_syslog_timestamp $entry.timestamp)
level: (extract_log_level $entry.message)
message: $entry.message
hostname: $entry.hostname
process: $entry.process
pid: ($entry.pid? | default "")
source: $source_file
raw: $line
}
} else {
{
timestamp: (date now)
level: "unknown"
message: $line
source: $source_file
raw: $line
}
}
}
def collect_json_logs [
file_path: string
--since: string = "1h"
]: string -> list {
let lines = (read_recent_logs $file_path --since $since)
$lines | each {|line|
do {
let parsed = ($line | from json)
{
timestamp: (standardize_timestamp ($parsed.timestamp? | default (date now)))
level: ($parsed.level? | default "info")
message: ($parsed.message? | default $line)
service: ($parsed.service? | default "provisioning")
source: $file_path
metadata: ($parsed | reject timestamp level message service?)
raw: $line
}
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
{
timestamp: (date now)
level: "error"
message: $"Failed to parse JSON: ($line)"
source: $file_path
raw: $line
}
}
}
}
def collect_text_logs [
file_path: string
--since: string = "1h"
]: string -> list {
let lines = (read_recent_logs $file_path --since $since)
$lines | each {|line|
{
timestamp: (date now)
level: (extract_log_level $line)
message: $line
source: $file_path
raw: $line
}
}
}
def collect_docker_logs [
--since: string = "1h"
]: nothing -> list {
do {
let containers = (docker ps --format "{{.Names}}" | lines)
$containers | each {|container|
let logs = (^docker logs --since $since $container | complete | get stdout | lines)
$logs | each {|line|
{
timestamp: (date now)
level: (extract_log_level $line)
message: $line
container: $container
source: "docker"
raw: $line
}
}
} | flatten
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
print "⚠️ Failed to collect Docker logs"
[]
}
}
def collect_k8s_logs [
--since: string = "1h"
]: nothing -> list {
do {
let pods = (kubectl get pods -o jsonpath='{.items[*].metadata.name}' | split row " ")
$pods | each {|pod|
let logs = (kubectl logs --since=$since $pod 2>/dev/null | lines)
$logs | each {|line|
{
timestamp: (date now)
level: (extract_log_level $line)
message: $line
pod: $pod
source: "kubernetes"
raw: $line
}
}
} | flatten
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
print "⚠️ Failed to collect Kubernetes logs"
[]
}
}
def parse_syslog_timestamp [ts: string]: string -> datetime {
do {
# Parse syslog timestamp format: "Jan 16 10:30:15"
let current_year = (date now | date format "%Y")
$"($current_year) ($ts)" | into datetime --format "%Y %b %d %H:%M:%S"
} | complete | if ($in.exit_code == 0) {
$in.stdout
} else {
date now
}
}
def extract_log_level [message: string]: string -> string {
let level_patterns = {
"FATAL": "fatal"
"ERROR": "error"
"WARN": "warn"
"WARNING": "warning"
"INFO": "info"
"DEBUG": "debug"
"TRACE": "trace"
}
let upper_message = ($message | str upcase)
for level_key in ($level_patterns | columns) {
if ($upper_message | str contains $level_key) {
return ($level_patterns | get $level_key)
}
}
"info" # default level
}
def filter_by_level [
logs: list
level: string
]: nothing -> list {
let level_order = ["trace", "debug", "info", "warn", "warning", "error", "fatal"]
let min_index = ($level_order | enumerate | where {|row| $row.item == $level} | get index.0)
$logs | where {|log|
let log_level_index = ($level_order | enumerate | where {|row| $row.item == $log.level} | get index.0? | default 2)
$log_level_index >= $min_index
}
}
def parse_duration [duration: string]: string -> duration {
match $duration {
$dur if ($dur | str ends-with "m") => {
let minutes = ($dur | str replace "m" "" | into int)
$minutes * 60 * 1000 * 1000 * 1000 # nanoseconds
}
$dur if ($dur | str ends-with "h") => {
let hours = ($dur | str replace "h" "" | into int)
$hours * 60 * 60 * 1000 * 1000 * 1000 # nanoseconds
}
$dur if ($dur | str ends-with "d") => {
let days = ($dur | str replace "d" "" | into int)
$days * 24 * 60 * 60 * 1000 * 1000 * 1000 # nanoseconds
}
_ => {
3600 * 1000 * 1000 * 1000 # 1 hour default
}
} | into duration
}
# Analyze logs using DataFrame operations
export def analyze_logs [
logs_df: any
--analysis_type: string = "summary" # summary, errors, patterns, performance
--time_window: string = "1h"
--group_by: list<string> = ["service", "level"]
]: any -> any {
match $analysis_type {
"summary" => {
analyze_log_summary $logs_df $group_by
}
"errors" => {
analyze_log_errors $logs_df
}
"patterns" => {
analyze_log_patterns $logs_df $time_window
}
"performance" => {
analyze_log_performance $logs_df $time_window
}
_ => {
error make { msg: $"Unknown analysis type: ($analysis_type)" }
}
}
}
def analyze_log_summary [logs_df: any, group_cols: list<string>]: nothing -> any {
aggregate_dataframe $logs_df --group_by $group_cols --operations {
count: "count"
first_seen: "min"
last_seen: "max"
}
}
def analyze_log_errors [logs_df: any]: any -> any {
# Filter error logs and analyze patterns
query_dataframe $logs_df "SELECT * FROM logs_df WHERE level IN ('error', 'fatal', 'warn')"
}
def analyze_log_patterns [logs_df: any, time_window: string]: nothing -> any {
# Time series analysis of log patterns
time_series_analysis $logs_df --time_column "timestamp" --value_column "level" --window $time_window
}
def analyze_log_performance [logs_df: any, time_window: string]: nothing -> any {
# Analyze performance-related logs
query_dataframe $logs_df "SELECT * FROM logs_df WHERE message LIKE '%performance%' OR message LIKE '%slow%'"
}
# Generate log analysis report
export def generate_log_report [
logs_df: any
--output_path: string = "log_report.md"
--include_charts = false
]: any -> nothing {
let summary = analyze_logs $logs_df --analysis_type "summary"
let errors = analyze_logs $logs_df --analysis_type "errors"
let report = $"
# Log Analysis Report
Generated: (date now | date format '%Y-%m-%d %H:%M:%S')
## Summary
Total log entries: (query_dataframe $logs_df 'SELECT COUNT(*) as count FROM logs_df')
### Log Levels Distribution
(analyze_log_summary $logs_df ['level'] | to md --pretty)
### Services Overview
(analyze_log_summary $logs_df ['service'] | to md --pretty)
## Error Analysis
(analyze_log_errors $logs_df | to md --pretty)
## Recommendations
Based on the log analysis:
1. **Error Patterns**: Review services with high error rates
2. **Performance**: Investigate slow operations
3. **Monitoring**: Set up alerts for critical error patterns
---
Report generated by Provisioning System Log Analyzer
"
$report | save --force $output_path
print $"📊 Log analysis report saved to: ($output_path)"
}
# Real-time log monitoring
export def monitor_logs [
--follow = true
--alert_level: string = "error"
--callback: string = ""
]: nothing -> nothing {
print $"👀 Starting real-time log monitoring (alert level: ($alert_level))..."
if $follow {
# Start continuous monitoring
while true {
let recent_logs = collect_logs --since "1m" --filter_level $alert_level
if ($recent_logs | length) > 0 {
print $"🚨 Found ($recent_logs | length) ($alert_level) entries:"
$recent_logs | each {|log|
print $"[($log.timestamp)] ($log.level | str upcase): ($log.message)"
if ($callback | is-not-empty) {
# Execute callback command for alerts
do {
nu -c $callback
} | complete | if ($in.exit_code != 0) {
print $"⚠️ Failed to execute callback: ($callback)"
}
}
}
}
sleep 60sec # Check every minute
}
}
}