#!/usr/bin/env nu # Log Processing Module for Provisioning System # Advanced log collection, parsing, and analysis using DataFrames use polars_integration.nu * use ../lib_provisioning/utils/settings.nu * # Log sources configuration export def get_log_sources []: nothing -> record { { system: { paths: ["/var/log/syslog", "/var/log/messages"] format: "syslog" enabled: true } provisioning: { paths: [ ($env.PROVISIONING_PATH? | default "/usr/local/provisioning" | path join "logs") "~/.provisioning/logs" ] format: "json" enabled: true } containers: { paths: [ "/var/log/containers" "/var/lib/docker/containers" ] format: "json" enabled: ($env.DOCKER_HOST? | is-not-empty) } kubernetes: { command: "kubectl logs" format: "json" enabled: ((which kubectl | length) > 0) } cloud_providers: { aws: { cloudwatch: true s3_logs: [] enabled: ($env.AWS_PROFILE? | is-not-empty) } gcp: { stackdriver: true enabled: ($env.GOOGLE_CLOUD_PROJECT? | is-not-empty) } } } } # Collect logs from all configured sources export def collect_logs [ --since: string = "1h" --sources: list = [] --output_format: string = "dataframe" --filter_level: string = "info" --include_metadata = true ]: nothing -> any { print $"📊 Collecting logs from the last ($since)..." let log_sources = get_log_sources let enabled_sources = if ($sources | is-empty) { $log_sources | transpose source config | where {|row| $row.config.enabled} | get source } else { $sources } print $"🔍 Enabled sources: ($enabled_sources | str join ', ')" let collected_logs = ($enabled_sources | each {|source| print $"📥 Collecting from: ($source)" collect_from_source $source $log_sources.$source --since $since } | flatten) print $"📋 Collected ($collected_logs | length) log entries" # Filter by log level let filtered_logs = (filter_by_level $collected_logs $filter_level) # Process into requested format match $output_format { "dataframe" => { create_infra_dataframe $filtered_logs --source "logs" } "json" => { $filtered_logs | to json } "csv" => { $filtered_logs | to csv } _ => { $filtered_logs } } } def collect_from_source [ source: string config: record --since: string = "1h" ]: nothing -> list { match $source { "system" => { collect_system_logs $config --since $since } "provisioning" => { collect_provisioning_logs $config --since $since } "containers" => { collect_container_logs $config --since $since } "kubernetes" => { collect_kubernetes_logs $config --since $since } _ => { print $"⚠️ Unknown log source: ($source)" [] } } } def collect_system_logs [ config: record --since: string = "1h" ]: record -> list { $config.paths | each {|path| if ($path | path exists) { let content = (read_recent_logs $path --since $since) $content | each {|line| parse_system_log_line $line $path } } else { [] } } | flatten } def collect_provisioning_logs [ config: record --since: string = "1h" ]: record -> list { $config.paths | each {|log_dir| if ($log_dir | path exists) { let log_files = (ls ($log_dir | path join "*.log") | get name) $log_files | each {|file| if ($file | str ends-with ".json") { collect_json_logs $file --since $since } else { collect_text_logs $file --since $since } } | flatten } else { [] } } | flatten } def collect_container_logs [ config: record --since: string = "1h" ]: record -> list { if ((which docker | length) > 0) { collect_docker_logs --since $since } else { print "⚠️ Docker not available for container log collection" [] } } def collect_kubernetes_logs [ config: record --since: string = "1h" ]: record -> list { if ((which kubectl | length) > 0) { collect_k8s_logs --since $since } else { print "⚠️ kubectl not available for Kubernetes log collection" [] } } def read_recent_logs [ file_path: string --since: string = "1h" ]: string -> list { let since_timestamp = ((date now) - (parse_duration $since)) if ($file_path | path exists) { # Use tail with approximate line count based on time let estimated_lines = match $since { "1m" => 100 "5m" => 500 "1h" => 3600 "1d" => 86400 _ => 1000 } (tail -n $estimated_lines $file_path | lines) } else { [] } } def parse_system_log_line [ line: string source_file: string ]: nothing -> record { # Parse standard syslog format let syslog_pattern = '(?P\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+(?P\S+)\s+(?P\S+?)(\[(?P\d+)\])?:\s*(?P.*)' let parsed = ($line | parse --regex $syslog_pattern) if ($parsed | length) > 0 { let entry = $parsed.0 { timestamp: (parse_syslog_timestamp $entry.timestamp) level: (extract_log_level $entry.message) message: $entry.message hostname: $entry.hostname process: $entry.process pid: ($entry.pid? | default "") source: $source_file raw: $line } } else { { timestamp: (date now) level: "unknown" message: $line source: $source_file raw: $line } } } def collect_json_logs [ file_path: string --since: string = "1h" ]: string -> list { let lines = (read_recent_logs $file_path --since $since) $lines | each {|line| do { let parsed = ($line | from json) { timestamp: (standardize_timestamp ($parsed.timestamp? | default (date now))) level: ($parsed.level? | default "info") message: ($parsed.message? | default $line) service: ($parsed.service? | default "provisioning") source: $file_path metadata: ($parsed | reject timestamp level message service?) raw: $line } } | complete | if ($in.exit_code == 0) { $in.stdout } else { { timestamp: (date now) level: "error" message: $"Failed to parse JSON: ($line)" source: $file_path raw: $line } } } } def collect_text_logs [ file_path: string --since: string = "1h" ]: string -> list { let lines = (read_recent_logs $file_path --since $since) $lines | each {|line| { timestamp: (date now) level: (extract_log_level $line) message: $line source: $file_path raw: $line } } } def collect_docker_logs [ --since: string = "1h" ]: nothing -> list { do { let containers = (docker ps --format "{{.Names}}" | lines) $containers | each {|container| let logs = (^docker logs --since $since $container | complete | get stdout | lines) $logs | each {|line| { timestamp: (date now) level: (extract_log_level $line) message: $line container: $container source: "docker" raw: $line } } } | flatten } | complete | if ($in.exit_code == 0) { $in.stdout } else { print "⚠️ Failed to collect Docker logs" [] } } def collect_k8s_logs [ --since: string = "1h" ]: nothing -> list { do { let pods = (kubectl get pods -o jsonpath='{.items[*].metadata.name}' | split row " ") $pods | each {|pod| let logs = (kubectl logs --since=$since $pod 2>/dev/null | lines) $logs | each {|line| { timestamp: (date now) level: (extract_log_level $line) message: $line pod: $pod source: "kubernetes" raw: $line } } } | flatten } | complete | if ($in.exit_code == 0) { $in.stdout } else { print "⚠️ Failed to collect Kubernetes logs" [] } } def parse_syslog_timestamp [ts: string]: string -> datetime { do { # Parse syslog timestamp format: "Jan 16 10:30:15" let current_year = (date now | date format "%Y") $"($current_year) ($ts)" | into datetime --format "%Y %b %d %H:%M:%S" } | complete | if ($in.exit_code == 0) { $in.stdout } else { date now } } def extract_log_level [message: string]: string -> string { let level_patterns = { "FATAL": "fatal" "ERROR": "error" "WARN": "warn" "WARNING": "warning" "INFO": "info" "DEBUG": "debug" "TRACE": "trace" } let upper_message = ($message | str upcase) for level_key in ($level_patterns | columns) { if ($upper_message | str contains $level_key) { return ($level_patterns | get $level_key) } } "info" # default level } def filter_by_level [ logs: list level: string ]: nothing -> list { let level_order = ["trace", "debug", "info", "warn", "warning", "error", "fatal"] let min_index = ($level_order | enumerate | where {|row| $row.item == $level} | get index.0) $logs | where {|log| let log_level_index = ($level_order | enumerate | where {|row| $row.item == $log.level} | get index.0? | default 2) $log_level_index >= $min_index } } def parse_duration [duration: string]: string -> duration { match $duration { $dur if ($dur | str ends-with "m") => { let minutes = ($dur | str replace "m" "" | into int) $minutes * 60 * 1000 * 1000 * 1000 # nanoseconds } $dur if ($dur | str ends-with "h") => { let hours = ($dur | str replace "h" "" | into int) $hours * 60 * 60 * 1000 * 1000 * 1000 # nanoseconds } $dur if ($dur | str ends-with "d") => { let days = ($dur | str replace "d" "" | into int) $days * 24 * 60 * 60 * 1000 * 1000 * 1000 # nanoseconds } _ => { 3600 * 1000 * 1000 * 1000 # 1 hour default } } | into duration } # Analyze logs using DataFrame operations export def analyze_logs [ logs_df: any --analysis_type: string = "summary" # summary, errors, patterns, performance --time_window: string = "1h" --group_by: list = ["service", "level"] ]: any -> any { match $analysis_type { "summary" => { analyze_log_summary $logs_df $group_by } "errors" => { analyze_log_errors $logs_df } "patterns" => { analyze_log_patterns $logs_df $time_window } "performance" => { analyze_log_performance $logs_df $time_window } _ => { error make { msg: $"Unknown analysis type: ($analysis_type)" } } } } def analyze_log_summary [logs_df: any, group_cols: list]: nothing -> any { aggregate_dataframe $logs_df --group_by $group_cols --operations { count: "count" first_seen: "min" last_seen: "max" } } def analyze_log_errors [logs_df: any]: any -> any { # Filter error logs and analyze patterns query_dataframe $logs_df "SELECT * FROM logs_df WHERE level IN ('error', 'fatal', 'warn')" } def analyze_log_patterns [logs_df: any, time_window: string]: nothing -> any { # Time series analysis of log patterns time_series_analysis $logs_df --time_column "timestamp" --value_column "level" --window $time_window } def analyze_log_performance [logs_df: any, time_window: string]: nothing -> any { # Analyze performance-related logs query_dataframe $logs_df "SELECT * FROM logs_df WHERE message LIKE '%performance%' OR message LIKE '%slow%'" } # Generate log analysis report export def generate_log_report [ logs_df: any --output_path: string = "log_report.md" --include_charts = false ]: any -> nothing { let summary = analyze_logs $logs_df --analysis_type "summary" let errors = analyze_logs $logs_df --analysis_type "errors" let report = $" # Log Analysis Report Generated: (date now | date format '%Y-%m-%d %H:%M:%S') ## Summary Total log entries: (query_dataframe $logs_df 'SELECT COUNT(*) as count FROM logs_df') ### Log Levels Distribution (analyze_log_summary $logs_df ['level'] | to md --pretty) ### Services Overview (analyze_log_summary $logs_df ['service'] | to md --pretty) ## Error Analysis (analyze_log_errors $logs_df | to md --pretty) ## Recommendations Based on the log analysis: 1. **Error Patterns**: Review services with high error rates 2. **Performance**: Investigate slow operations 3. **Monitoring**: Set up alerts for critical error patterns --- Report generated by Provisioning System Log Analyzer " $report | save --force $output_path print $"📊 Log analysis report saved to: ($output_path)" } # Real-time log monitoring export def monitor_logs [ --follow = true --alert_level: string = "error" --callback: string = "" ]: nothing -> nothing { print $"👀 Starting real-time log monitoring (alert level: ($alert_level))..." if $follow { # Start continuous monitoring while true { let recent_logs = collect_logs --since "1m" --filter_level $alert_level if ($recent_logs | length) > 0 { print $"🚨 Found ($recent_logs | length) ($alert_level) entries:" $recent_logs | each {|log| print $"[($log.timestamp)] ($log.level | str upcase): ($log.message)" if ($callback | is-not-empty) { # Execute callback command for alerts do { nu -c $callback } | complete | if ($in.exit_code != 0) { print $"⚠️ Failed to execute callback: ($callback)" } } } } sleep 60sec # Check every minute } } }