provisioning/core/nulib/observability/agents.nu

734 lines
24 KiB
Plaintext
Raw Permalink Normal View History

#!/usr/bin/env nu
# AI Agents for Observability and Infrastructure Intelligence
# Smart agents that analyze, predict, and optimize infrastructure
use collectors.nu *
use ../dataframes/polars_integration.nu *
use ../lib_provisioning/ai/lib.nu *
# Agent types and their capabilities
export def get_agent_types []: nothing -> record {
{
pattern_detector: {
description: "Detects anomalies and patterns in infrastructure data"
capabilities: ["anomaly_detection", "trend_analysis", "pattern_recognition"]
data_sources: ["metrics", "logs", "events"]
frequency: "real_time"
}
cost_optimizer: {
description: "Analyzes costs and provides optimization recommendations"
capabilities: ["cost_analysis", "rightsizing", "scheduling_optimization"]
data_sources: ["cost_metrics", "resource_usage", "deployment_patterns"]
frequency: "daily"
}
performance_analyzer: {
description: "Monitors and optimizes infrastructure performance"
capabilities: ["bottleneck_detection", "capacity_planning", "performance_tuning"]
data_sources: ["performance_metrics", "resource_metrics", "application_logs"]
frequency: "continuous"
}
security_monitor: {
description: "Monitors security events and vulnerabilities"
capabilities: ["threat_detection", "vulnerability_assessment", "compliance_monitoring"]
data_sources: ["security_events", "access_logs", "configuration_state"]
frequency: "real_time"
}
predictor: {
description: "Predicts infrastructure failures and capacity needs"
capabilities: ["failure_prediction", "capacity_forecasting", "maintenance_scheduling"]
data_sources: ["historical_metrics", "error_logs", "deployment_history"]
frequency: "hourly"
}
auto_healer: {
description: "Automatically responds to and fixes infrastructure issues"
capabilities: ["auto_remediation", "failover", "scaling_actions"]
data_sources: ["alerts", "health_checks", "performance_metrics"]
frequency: "real_time"
}
}
}
# Start AI agents
export def start_agents [
--config_file: string = "agents.toml"
--data_dir: string = "data/observability"
--agents: list<string> = []
--debug = false
]: nothing -> nothing {
print "๐Ÿค– Starting AI Observability Agents..."
# Load configuration
let config = load_agent_config $config_file
# Select agents to start
let selected_agents = if ($agents | is-empty) {
$config.agents | transpose name settings | where {|agent| $agent.settings.enabled} | get name
} else {
$agents
}
print $"๐Ÿš€ Starting agents: ($selected_agents | str join ', ')"
# Initialize agents
let active_agents = ($selected_agents | each {|agent_name|
initialize_agent $agent_name $config $data_dir $debug
})
# Start agent processing loops
start_agent_loops $active_agents $debug
}
def load_agent_config [config_file: string]: string -> record {
if ($config_file | path exists) {
open $config_file
} else {
# Default agent configuration
{
agents: {
pattern_detector: {
enabled: true
interval: "60s"
sensitivity: 0.8
lookback_hours: 24
alert_threshold: 0.9
}
cost_optimizer: {
enabled: true
interval: "3600s" # 1 hour
optimization_target: 0.3 # 30% cost reduction target
min_savings_threshold: 10 # $10 minimum savings
}
performance_analyzer: {
enabled: true
interval: "300s" # 5 minutes
performance_thresholds: {
cpu: 80
memory: 85
disk: 90
response_time: 500 # ms
}
}
security_monitor: {
enabled: true
interval: "30s"
threat_levels: ["medium", "high", "critical"]
auto_response: false
}
predictor: {
enabled: true
interval: "1800s" # 30 minutes
prediction_horizon: "7d"
confidence_threshold: 0.75
}
auto_healer: {
enabled: false # Disabled by default for safety
interval: "60s"
auto_actions: ["restart_service", "scale_up", "failover"]
max_actions_per_hour: 5
}
}
ai: {
model: "local" # local, openai, anthropic
temperature: 0.3
max_tokens: 1000
}
notifications: {
enabled: true
channels: ["console", "webhook"]
webhook_url: ""
}
}
}
}
def initialize_agent [
agent_name: string
config: record
data_dir: string
debug: bool
]: nothing -> record {
print $"๐Ÿ”ง Initializing agent: ($agent_name)"
let agent_config = $config.agents | get $agent_name
let agent_types = get_agent_types
{
name: $agent_name
type: ($agent_types | get $agent_name)
config: $agent_config
data_dir: $data_dir
debug: $debug
state: {
last_run: null
total_runs: 0
last_findings: []
performance_stats: {
avg_runtime: 0
total_runtime: 0
success_rate: 1.0
}
}
}
}
def start_agent_loops [agents: list, debug: bool]: nothing -> nothing {
print $"๐Ÿ”„ Starting ($agents | length) agent processing loops..."
# Start each agent in its own processing loop
$agents | each {|agent|
run_agent_loop $agent $debug
} | ignore
# Keep the main process running
while true {
sleep 60sec
}
}
def run_agent_loop [agent: record, debug: bool]: nothing -> nothing {
let interval_seconds = parse_interval $agent.config.interval
if $debug {
print $"๐Ÿค– Agent ($agent.name) loop started (interval: ($agent.config.interval))"
}
while true {
do {
let start_time = (date now)
# Execute agent logic
let results = execute_agent $agent
# Update agent state
let runtime = ((date now) - $start_time)
update_agent_performance $agent $runtime $results
if $debug and ($results | length) > 0 {
print $"๐Ÿ” Agent ($agent.name) found ($results | length) insights"
}
# Process results
process_agent_results $agent $results
} | complete | if ($in.exit_code != 0) {
print $"โŒ Error in agent ($agent.name): ($in.stderr)"
}
sleep ($interval_seconds * 1sec)
}
}
def execute_agent [agent: record]: nothing -> list {
match $agent.name {
"pattern_detector" => (execute_pattern_detector $agent)
"cost_optimizer" => (execute_cost_optimizer $agent)
"performance_analyzer" => (execute_performance_analyzer $agent)
"security_monitor" => (execute_security_monitor $agent)
"predictor" => (execute_predictor $agent)
"auto_healer" => (execute_auto_healer $agent)
_ => {
print $"โš ๏ธ Unknown agent type: ($agent.name)"
[]
}
}
}
# Pattern Detection Agent
def execute_pattern_detector [agent: record]: nothing -> list {
# Load recent observability data
let recent_data = query_observability_data --time_range "1h" --data_dir $agent.data_dir
if ($recent_data | length) == 0 {
return []
}
mut findings = []
# Detect anomalies in metrics
let metric_anomalies = detect_metric_anomalies $recent_data $agent.config.sensitivity
if ($metric_anomalies | length) > 0 {
$findings = ($findings | append {
type: "anomaly"
category: "metrics"
severity: "medium"
findings: $metric_anomalies
agent: $agent.name
timestamp: (date now)
})
}
# Detect log patterns
let log_patterns = detect_log_patterns $recent_data
if ($log_patterns | length) > 0 {
$findings = ($findings | append {
type: "pattern"
category: "logs"
severity: "info"
findings: $log_patterns
agent: $agent.name
timestamp: (date now)
})
}
$findings
}
def detect_metric_anomalies [data: any, sensitivity: float]: nothing -> list {
# Simple anomaly detection based on statistical analysis
# In production, this would use more sophisticated ML algorithms
let metrics = ($data | where collector == "system_metrics")
if ($metrics | length) < 10 {
return [] # Need sufficient data points
}
mut anomalies = []
# Check CPU usage anomalies
let cpu_metrics = ($metrics | where metric_name == "cpu" | get value)
let cpu_mean = ($cpu_metrics | math avg)
let cpu_std = ($cpu_metrics | math stddev)
let cpu_threshold = $cpu_mean + (2 * $cpu_std * $sensitivity)
let cpu_anomalies = ($metrics | where metric_name == "cpu" and value > $cpu_threshold)
if ($cpu_anomalies | length) > 0 {
$anomalies = ($anomalies | append {
metric: "cpu"
type: "high_usage"
threshold: $cpu_threshold
current_value: ($cpu_anomalies | get value | math max)
severity: (if ($cpu_anomalies | get value | math max) > 90 { "high" } else { "medium" })
})
}
# Check memory usage anomalies
let memory_metrics = ($metrics | where metric_name == "memory" | get value)
if ($memory_metrics | length) > 0 {
let mem_mean = ($memory_metrics | math avg)
let mem_std = ($memory_metrics | math stddev)
let mem_threshold = $mem_mean + (2 * $mem_std * $sensitivity)
let mem_anomalies = ($metrics | where metric_name == "memory" and value > $mem_threshold)
if ($mem_anomalies | length) > 0 {
$anomalies = ($anomalies | append {
metric: "memory"
type: "high_usage"
threshold: $mem_threshold
current_value: ($mem_anomalies | get value | math max)
severity: (if ($mem_anomalies | get value | math max) > 95 { "high" } else { "medium" })
})
}
}
$anomalies
}
def detect_log_patterns [data: any]: any -> list {
let log_data = ($data | where collector == "application_logs")
if ($log_data | length) == 0 {
return []
}
mut patterns = []
# Detect error rate spikes
let error_logs = ($log_data | where level in ["error", "fatal"])
let total_logs = ($log_data | length)
let error_rate = if $total_logs > 0 { ($error_logs | length) / $total_logs } else { 0 }
if $error_rate > 0.05 { # 5% error rate threshold
$patterns = ($patterns | append {
pattern: "high_error_rate"
value: $error_rate
threshold: 0.05
severity: (if $error_rate > 0.10 { "high" } else { "medium" })
})
}
# Detect repeated error messages
let error_messages = ($error_logs | group-by message | transpose message count | where count > 3)
if ($error_messages | length) > 0 {
$patterns = ($patterns | append {
pattern: "repeated_errors"
messages: ($error_messages | get message)
severity: "medium"
})
}
$patterns
}
# Cost Optimization Agent
def execute_cost_optimizer [agent: record]: nothing -> list {
let cost_data = query_observability_data --collector "cost_metrics" --time_range "24h" --data_dir $agent.data_dir
if ($cost_data | length) == 0 {
return []
}
# Analyze resource utilization vs cost
let utilization_analysis = analyze_resource_utilization $cost_data
let utilization_optimizations = ($utilization_analysis | each {|analysis|
if $analysis.potential_savings > $agent.config.min_savings_threshold {
{
type: "rightsizing"
resource: $analysis.resource
current_cost: $analysis.current_cost
potential_savings: $analysis.potential_savings
recommendation: $analysis.recommendation
confidence: $analysis.confidence
}
}
} | compact)
# Identify unused resources
let unused_resources = identify_unused_resources $cost_data
let unused_optimizations = ($unused_resources | each {|resource|
{
type: "unused_resource"
resource: $resource.name
cost: $resource.cost
recommendation: "Consider terminating or downsizing"
confidence: 0.9
}
})
let optimizations = ($utilization_optimizations | append $unused_optimizations)
$optimizations | each {|opt|
$opt | upsert agent $agent.name | upsert timestamp (date now)
}
}
def analyze_resource_utilization [cost_data: any]: any -> list {
# Mock analysis - in production would use real utilization data
[
{
resource: "ec2-i-12345"
current_cost: 120.0
utilization: 0.25
potential_savings: 60.0
recommendation: "Downsize from m5.xlarge to m5.large"
confidence: 0.85
}
]
}
def identify_unused_resources [cost_data: any]: any -> list {
# Mock analysis for unused resources
[
{
name: "unused-volume-123"
cost: 15.0
type: "ebs_volume"
last_access: "30d"
}
]
}
# Performance Analysis Agent
def execute_performance_analyzer [agent: record]: nothing -> list {
let perf_data = query_observability_data --collector "performance_metrics" --time_range "1h" --data_dir $agent.data_dir
if ($perf_data | length) == 0 {
return []
}
mut performance_issues = []
# Check against performance thresholds
let thresholds = $agent.config.performance_thresholds
# CPU performance analysis
let cpu_issues = ($perf_data | where metric_name == "cpu" and value > $thresholds.cpu)
if ($cpu_issues | length) > 0 {
$performance_issues = ($performance_issues | append {
type: "cpu_bottleneck"
severity: "high"
affected_resources: ($cpu_issues | get resource_id | uniq)
max_value: ($cpu_issues | get value | math max)
threshold: $thresholds.cpu
})
}
# Memory performance analysis
let memory_issues = ($perf_data | where metric_name == "memory" and value > $thresholds.memory)
if ($memory_issues | length) > 0 {
$performance_issues = ($performance_issues | append {
type: "memory_bottleneck"
severity: "high"
affected_resources: ($memory_issues | get resource_id | uniq)
max_value: ($memory_issues | get value | math max)
threshold: $thresholds.memory
})
}
$performance_issues | each {|issue|
$issue | upsert agent $agent.name | upsert timestamp (date now)
}
}
# Security Monitor Agent
def execute_security_monitor [agent: record]: nothing -> list {
let security_data = query_observability_data --collector "security_events" --time_range "5m" --data_dir $agent.data_dir
if ($security_data | length) == 0 {
return []
}
mut security_alerts = []
# Analyze authentication failures
let auth_failures = ($security_data | where event_type == "auth_failure")
if ($auth_failures | length) > 5 { # More than 5 failures in 5 minutes
$security_alerts = ($security_alerts | append {
type: "brute_force_attempt"
severity: "high"
event_count: ($auth_failures | length)
timeframe: "5m"
recommendation: "Consider blocking source IPs"
})
}
# Check for privilege escalation attempts
let escalation_events = ($security_data | where event_type == "privilege_escalation")
if ($escalation_events | length) > 0 {
$security_alerts = ($security_alerts | append {
type: "privilege_escalation"
severity: "critical"
event_count: ($escalation_events | length)
recommendation: "Immediate investigation required"
})
}
$security_alerts | each {|alert|
$alert | upsert agent $agent.name | upsert timestamp (date now)
}
}
# Predictor Agent
def execute_predictor [agent: record]: nothing -> list {
let historical_data = query_observability_data --time_range $"($agent.config.prediction_horizon)" --data_dir $agent.data_dir
if ($historical_data | length) < 100 {
return [] # Need sufficient historical data
}
mut predictions = []
# Predict capacity needs
let capacity_prediction = predict_capacity_needs $historical_data $agent.config
if $capacity_prediction.confidence > $agent.config.confidence_threshold {
$predictions = ($predictions | append {
type: "capacity_forecast"
forecast_horizon: $agent.config.prediction_horizon
prediction: $capacity_prediction.prediction
confidence: $capacity_prediction.confidence
recommendation: $capacity_prediction.recommendation
})
}
# Predict potential failures
let failure_prediction = predict_failures $historical_data $agent.config
if $failure_prediction.risk_score > 0.8 {
$predictions = ($predictions | append {
type: "failure_prediction"
risk_score: $failure_prediction.risk_score
predicted_failure_time: $failure_prediction.estimated_time
affected_components: $failure_prediction.components
recommendation: $failure_prediction.recommendation
})
}
$predictions | each {|pred|
$pred | upsert agent $agent.name | upsert timestamp (date now)
}
}
def predict_capacity_needs [data: any, config: record]: nothing -> record {
# Simple trend-based prediction
# In production, would use time series forecasting models
let cpu_trend = analyze_metric_trend $data "cpu"
let memory_trend = analyze_metric_trend $data "memory"
{
prediction: {
cpu_growth_rate: $cpu_trend.growth_rate
memory_growth_rate: $memory_trend.growth_rate
estimated_capacity_date: ((date now) + 30day)
}
confidence: 0.75
recommendation: (if $cpu_trend.growth_rate > 0.1 { "Consider adding CPU capacity" } else { "Current capacity sufficient" })
}
}
def analyze_metric_trend [data: any, metric: string]: nothing -> record {
let metric_data = ($data | where metric_name == $metric | sort-by timestamp)
if ($metric_data | length) < 10 {
return { growth_rate: 0, trend: "insufficient_data" }
}
# Simple linear trend analysis
let first_half = ($metric_data | first (($metric_data | length) // 2) | get value | math avg)
let second_half = ($metric_data | last (($metric_data | length) // 2) | get value | math avg)
let growth_rate = ($second_half - $first_half) / $first_half
{
growth_rate: $growth_rate
trend: (if $growth_rate > 0.05 { "increasing" } else if $growth_rate < -0.05 { "decreasing" } else { "stable" })
}
}
def predict_failures [data: any, config: record]: nothing -> record {
# Analyze patterns that typically precede failures
let error_rate = calculate_error_rate $data
let resource_stress = calculate_resource_stress $data
let risk_score = ($error_rate * 0.6) + ($resource_stress * 0.4)
{
risk_score: $risk_score
estimated_time: (if $risk_score > 0.9 { ((date now) + 2hr) } else { ((date now) + 1day) })
components: ["cpu", "memory", "application"]
recommendation: (if $risk_score > 0.8 { "Immediate attention required" } else { "Monitor closely" })
}
}
def calculate_error_rate [data: any]: any -> float {
let total_logs = ($data | where collector == "application_logs" | length)
if $total_logs == 0 { return 0.0 }
let error_logs = ($data | where collector == "application_logs" and level in ["error", "fatal"] | length)
$error_logs / $total_logs
}
def calculate_resource_stress [data: any]: any -> float {
let cpu_stress = ($data | where metric_name == "cpu" | get value | math avg) / 100
let memory_stress = ($data | where metric_name == "memory" | get value | math avg) / 100
($cpu_stress + $memory_stress) / 2
}
# Auto Healer Agent (requires careful configuration)
def execute_auto_healer [agent: record]: nothing -> list {
if not $agent.config.auto_response {
return [] # Safety check
}
let alerts = query_observability_data --collector "alerts" --time_range "5m" --data_dir $agent.data_dir
if ($alerts | length) == 0 {
return []
}
# Only process critical alerts for auto-healing
let critical_alerts = ($alerts | where severity == "critical")
let actions = ($critical_alerts | each {|alert|
let action = determine_healing_action $alert $agent.config
if ($action | is-not-empty) {
{
alert_id: $alert.id
action_type: $action.type
action_details: $action.details
risk_level: $action.risk
auto_executed: false # Manual approval required by default
}
}
} | compact)
$actions
}
def determine_healing_action [alert: record, config: record]: nothing -> record {
match $alert.type {
"service_down" => {
{
type: "restart_service"
details: { service: $alert.service, method: "systemctl_restart" }
risk: "low"
}
}
"high_cpu" => {
{
type: "scale_up"
details: { resource: $alert.resource, scale_factor: 1.5 }
risk: "medium"
}
}
_ => {}
}
}
# Utility functions
def parse_interval [interval: string]: string -> int {
match $interval {
$i if ($i | str ends-with "s") => ($i | str replace "s" "" | into int)
$i if ($i | str ends-with "m") => (($i | str replace "m" "" | into int) * 60)
$i if ($i | str ends-with "h") => (($i | str replace "h" "" | into int) * 3600)
_ => 60
}
}
def update_agent_performance [agent: record, runtime: duration, results: list]: nothing -> nothing {
# Update agent performance statistics
# This would modify agent state in a real implementation
}
def process_agent_results [agent: record, results: list]: nothing -> nothing {
if ($results | length) > 0 {
print $"๐Ÿ” Agent ($agent.name) generated ($results | length) insights:"
$results | each {|result|
print $" - ($result.type): ($result | get description? | default 'No description')"
} | ignore
# Send notifications if configured
send_agent_notifications $agent $results
}
}
def send_agent_notifications [agent: record, results: list]: nothing -> nothing {
# Send notifications for agent findings
$results | each {|result|
if $result.severity? in ["high", "critical"] {
print $"๐Ÿšจ ALERT: ($result.type) - ($result | get message? | default 'Critical finding')"
}
} | ignore
}
# Agent management commands
export def list_running_agents []: nothing -> list {
# List currently running agents
# This would query actual running processes in production
[]
}
export def stop_agent [agent_name: string]: string -> nothing {
print $"๐Ÿ›‘ Stopping agent: ($agent_name)"
# Implementation would stop the specific agent process
}
export def get_agent_status [agent_name?: string]: nothing -> any {
if ($agent_name | is-empty) {
print "๐Ÿ“Š All agents status:"
# Return status of all agents
[]
} else {
print $"๐Ÿ“Š Status of agent: ($agent_name)"
# Return status of specific agent
{}
}
}