#!/usr/bin/env nu # Observability Collectors for Provisioning System # Collects metrics, logs, events, and state from infrastructure use ../dataframes/polars_integration.nu * use ../dataframes/log_processor.nu * use ../lib_provisioning/utils/settings.nu * # Main collector orchestrator export def start_collectors [ --config_file: string = "observability.toml" --interval: string = "60s" --output_dir: string = "data/observability" --enable_dataframes = true --debug = false ]: nothing -> nothing { print "🔍 Starting Observability Collectors..." # Load configuration let config = load_collector_config $config_file # Ensure output directory exists mkdir ($output_dir | path expand) # Initialize collectors let collectors = initialize_collectors $config print $"📊 Initialized ($collectors | length) collectors" if $debug { $env.OBSERVABILITY_DEBUG = "true" print "Debug mode enabled" } # Start collection loop collection_loop $collectors $interval $output_dir $enable_dataframes $debug } def load_collector_config [config_file: string]: string -> record { if ($config_file | path exists) { open $config_file } else { # Default configuration { collectors: { system_metrics: { enabled: true interval: "60s" metrics: ["cpu", "memory", "disk", "network"] } infrastructure_state: { enabled: true interval: "300s" sources: ["servers", "services", "clusters"] } application_logs: { enabled: true interval: "60s" log_sources: ["provisioning", "containers", "kubernetes"] } cost_metrics: { enabled: true interval: "3600s" providers: ["aws", "gcp", "azure"] } security_events: { enabled: true interval: "60s" sources: ["auth", "network", "filesystem"] } performance_metrics: { enabled: true interval: "30s" targets: ["deployments", "scaling", "response_times"] } } storage: { format: "parquet" # parquet, json, csv retention_days: 30 compression: "gzip" } alerting: { enabled: true channels: ["console", "webhook"] thresholds: { cpu_usage: 80 memory_usage: 85 disk_usage: 90 error_rate: 0.05 } } } } } def initialize_collectors [config: record]: nothing -> list { let enabled_collectors = [] $config.collectors | transpose name settings | each {|collector| if $collector.settings.enabled { { name: $collector.name config: $collector.settings last_run: null status: "initialized" } } } | compact } def collection_loop [ collectors: list interval: string output_dir: string enable_dataframes: bool debug: bool ]: nothing -> nothing { let interval_seconds = parse_interval $interval print $"🔄 Starting collection loop (interval: ($interval))..." while true { let collection_start = (date now) $collectors | each {|collector| do { if (should_collect $collector $collection_start) { if $debug { print $"📥 Collecting from: ($collector.name)" } let data = collect_from_collector $collector if ($data | length) > 0 { save_collected_data $data $collector.name $output_dir $enable_dataframes } } } | complete | if ($in.exit_code != 0) { print $"❌ Error in collector ($collector.name): ($in.stderr)" } } | ignore let collection_duration = ((date now) - $collection_start) if $debug { print $"✅ Collection cycle completed in ($collection_duration)" } sleep ($interval_seconds * 1sec) } } def parse_interval [interval: string]: string -> int { match $interval { $i if ($i | str ends-with "s") => ($i | str replace "s" "" | into int) $i if ($i | str ends-with "m") => (($i | str replace "m" "" | into int) * 60) $i if ($i | str ends-with "h") => (($i | str replace "h" "" | into int) * 3600) _ => 60 # default 60 seconds } } def should_collect [collector: record, current_time: datetime]: nothing -> bool { if ($collector.last_run | is-empty) { true # First run } else { let elapsed = ($current_time - $collector.last_run) let interval_duration = (parse_interval $collector.config.interval) ($elapsed | into int) >= ($interval_duration * 1000 * 1000 * 1000) # nanoseconds } } def collect_from_collector [collector: record]: nothing -> list { # Placeholder implementation - collectors will be enhanced later print $"📊 Collecting from: ($collector.name)" [] } # System metrics collector def collect_system_metrics [config: record]: nothing -> list { mut metrics = [] if "cpu" in $config.metrics { $metrics = ($metrics | append (get_cpu_metrics)) } if "memory" in $config.metrics { $metrics = ($metrics | append (get_memory_metrics)) } if "disk" in $config.metrics { $metrics = ($metrics | append (get_disk_metrics)) } if "network" in $config.metrics { $metrics = ($metrics | append (get_network_metrics)) } $metrics | each {|metric| $metric | upsert timestamp (date now) | upsert collector "system_metrics" } } def get_cpu_metrics []: nothing -> record { do { # Use different methods based on OS let cpu_usage = if (sys host | get name) == "Linux" { # Linux: use /proc/stat let cpu_info = (cat /proc/loadavg | split row " ") { usage_percent: (($cpu_info.0 | into float) * 100 / (sys host | get cpu | length)) load_1min: ($cpu_info.0 | into float) load_5min: ($cpu_info.1 | into float) load_15min: ($cpu_info.2 | into float) } } else if (sys host | get name) == "Darwin" { # macOS: use iostat or top let top_output = (top -l 1 -n 0 | lines | find "CPU usage" | first) let usage = ($top_output | parse --regex 'CPU usage: (?P\d+\.\d+)% user, (?P\d+\.\d+)% sys, (?P\d+\.\d+)% idle') if ($usage | length) > 0 { let u = $usage.0 { usage_percent: (100 - ($u.idle | into float)) user_percent: ($u.user | into float) system_percent: ($u.sys | into float) idle_percent: ($u.idle | into float) } } else { { usage_percent: 0, error: "Could not parse CPU usage" } } } else { { usage_percent: 0, error: "Unsupported OS for CPU metrics" } } { metric_name: "cpu" value: $cpu_usage.usage_percent unit: "percent" details: $cpu_usage } } | complete | if ($in.exit_code != 0) { { metric_name: "cpu" value: 0 unit: "percent" error: "Failed to collect CPU metrics" } } } def get_memory_metrics []: nothing -> record { do { let mem_info = (sys mem) { metric_name: "memory" value: (($mem_info.used | into float) / ($mem_info.total | into float) * 100) unit: "percent" details: { total_bytes: $mem_info.total used_bytes: $mem_info.used available_bytes: $mem_info.available free_bytes: $mem_info.free usage_percent: (($mem_info.used | into float) / ($mem_info.total | into float) * 100) } } } | complete | if ($in.exit_code != 0) { { metric_name: "memory" value: 0 unit: "percent" error: "Failed to collect memory metrics" } } } def get_disk_metrics []: nothing -> list { do { let disk_info = (sys disks) $disk_info | each {|disk| { metric_name: "disk" value: (($disk.used | into float) / ($disk.total | into float) * 100) unit: "percent" device: $disk.name mount_point: $disk.mount details: { total_bytes: $disk.total used_bytes: $disk.used available_bytes: $disk.available usage_percent: (($disk.used | into float) / ($disk.total | into float) * 100) filesystem: $disk.type } } } } | complete | if ($in.exit_code != 0) { [{ metric_name: "disk" value: 0 unit: "percent" error: "Failed to collect disk metrics" }] } } def get_network_metrics []: nothing -> list { do { let net_info = (sys net) $net_info | each {|interface| { metric_name: "network" interface: $interface.name details: { bytes_sent: $interface.sent bytes_received: $interface.recv packets_sent: $interface.packets_sent packets_received: $interface.packets_recv } } } } | complete | if ($in.exit_code != 0) { [{ metric_name: "network" value: 0 error: "Failed to collect network metrics" }] } } # Infrastructure state collector def collect_infrastructure_state [config: record]: nothing -> list { mut state_data = [] if "servers" in $config.sources { let server_state = collect_server_state $state_data = ($state_data | append $server_state) } if "services" in $config.sources { let service_state = collect_service_state $state_data = ($state_data | append $service_state) } if "clusters" in $config.sources { let cluster_state = collect_cluster_state $state_data = ($state_data | append $cluster_state) } $state_data | each {|state| $state | upsert timestamp (date now) | upsert collector "infrastructure_state" } } def collect_server_state []: nothing -> list { do { # Use provisioning query to get server state let servers = (nu -c "use core/nulib/main_provisioning/query.nu; main query servers --out json" | from json) $servers | each {|server| { resource_type: "server" resource_id: $server.name state: $server.state provider: $server.provider details: $server } } } | complete | if ($in.exit_code != 0) { print "⚠️ Could not collect server state" [] } } def collect_service_state []: nothing -> list { do { # Collect Docker container states if ((which docker | length) > 0) { let containers = (docker ps -a --format "{{.ID}},{{.Names}},{{.Status}},{{.Image}}" | lines | each {|line| let parts = ($line | split column ",") if ($parts | length) >= 4 { { resource_type: "container" resource_id: $parts.1 state: $parts.2 image: $parts.3 container_id: $parts.0 } } } | compact) $containers } else { [] } } | complete | if ($in.exit_code != 0) { [] } } def collect_cluster_state []: nothing -> list { do { # Collect Kubernetes cluster state if available if ((which kubectl | length) > 0) { let pods = (kubectl get pods -o json | from json) $pods.items | each {|pod| { resource_type: "pod" resource_id: $pod.metadata.name namespace: $pod.metadata.namespace state: $pod.status.phase node: $pod.spec.nodeName details: { containers: ($pod.spec.containers | length) restart_count: ($pod.status.containerStatuses? | default [] | get restartCount | math sum) } } } } else { [] } } | complete | if ($in.exit_code != 0) { [] } } # Application logs collector def collect_application_logs [config: record]: nothing -> list { collect_logs --since "1m" --sources $config.log_sources --output_format "list" } # Cost metrics collector def collect_cost_metrics [config: record]: nothing -> list { let cost_data = ($config.providers | each {|provider| collect_provider_costs $provider } | flatten) $cost_data | each {|cost| $cost | upsert timestamp (date now) | upsert collector "cost_metrics" } } def collect_provider_costs [provider: string]: string -> list { match $provider { "aws" => collect_aws_costs "gcp" => collect_gcp_costs "azure" => collect_azure_costs _ => [] } } def collect_aws_costs []: nothing -> list { do { if ((which aws | length) > 0) { # Use AWS Cost Explorer API (requires setup) # For now, return mock data structure [{ provider: "aws" service: "ec2" cost_usd: 125.50 period: "daily" region: "us-east-1" }] } else { [] } } | complete | if ($in.exit_code != 0) { [] } } def collect_gcp_costs []: nothing -> list { # GCP billing API integration would go here [] } def collect_azure_costs []: nothing -> list { # Azure cost management API integration would go here [] } # Security events collector def collect_security_events [config: record]: nothing -> list { mut security_events = [] if "auth" in $config.sources { $security_events = ($security_events | append (collect_auth_events)) } if "network" in $config.sources { $security_events = ($security_events | append (collect_network_events)) } if "filesystem" in $config.sources { $security_events = ($security_events | append (collect_filesystem_events)) } $security_events | each {|event| $event | upsert timestamp (date now) | upsert collector "security_events" } } def collect_auth_events []: nothing -> list { do { # Collect authentication logs if ($"/var/log/auth.log" | path exists) { let auth_logs = (tail -n 100 /var/log/auth.log | lines) $auth_logs | each {|line| if ($line | str contains "Failed password") { { event_type: "auth_failure" severity: "medium" message: $line source: "auth.log" } } else if ($line | str contains "Accepted publickey") { { event_type: "auth_success" severity: "info" message: $line source: "auth.log" } } } | compact } else { [] } } | complete | if ($in.exit_code != 0) { [] } } def collect_network_events []: nothing -> list { # Network security events would be collected here # This could include firewall logs, intrusion detection, etc. [] } def collect_filesystem_events []: nothing -> list { # File system security events # This could include file integrity monitoring, access logs, etc. [] } # Performance metrics collector def collect_performance_metrics [config: record]: nothing -> list { mut perf_metrics = [] if "deployments" in $config.targets { $perf_metrics = ($perf_metrics | append (collect_deployment_metrics)) } if "scaling" in $config.targets { $perf_metrics = ($perf_metrics | append (collect_scaling_metrics)) } if "response_times" in $config.targets { $perf_metrics = ($perf_metrics | append (collect_response_time_metrics)) } $perf_metrics | each {|metric| $metric | upsert timestamp (date now) | upsert collector "performance_metrics" } } def collect_deployment_metrics []: nothing -> list { # Track deployment performance # This would integrate with CI/CD systems [{ metric_name: "deployment_duration" value: 300 # seconds deployment_id: "deploy-123" status: "success" }] } def collect_scaling_metrics []: nothing -> list { # Track auto-scaling events and performance [] } def collect_response_time_metrics []: nothing -> list { # Collect application response times # This could integrate with APM tools [] } # Save collected data def save_collected_data [ data: list collector_name: string output_dir: string enable_dataframes: bool ]: nothing -> nothing { let timestamp = (date now | date format "%Y-%m-%d_%H-%M-%S") let filename = $"($collector_name)_($timestamp)" if $enable_dataframes and (check_polars_available) { # Save as Parquet for efficient storage and querying let df = create_infra_dataframe $data --source $collector_name let parquet_path = ($output_dir | path join $"($filename).parquet") export_dataframe $df $parquet_path --format "parquet" } else { # Save as JSON let json_path = ($output_dir | path join $"($filename).json") $data | to json | save --force $json_path } } # Query collected observability data export def query_observability_data [ --collector: string = "all" --time_range: string = "1h" --data_dir: string = "data/observability" --query: string = "" ]: nothing -> any { print $"🔍 Querying observability data (collector: ($collector), range: ($time_range))..." let data_files = if $collector == "all" { ls ($data_dir | path join "*.parquet") | get name } else { ls ($data_dir | path join $"($collector)_*.parquet") | get name } if ($data_files | length) == 0 { print "No observability data found" return [] } # Load and combine data let combined_data = ($data_files | each {|file| if (check_polars_available) { # Load parquet with Polars polars open $file } else { # Fallback to JSON if no Polars let json_file = ($file | str replace ".parquet" ".json") if ($json_file | path exists) { open $json_file } else { [] } } } | flatten) if ($query | is-not-empty) { query_dataframe $combined_data $query } else { $combined_data } }