provisioning/core/nulib/ai/query_processor.nu

719 lines
22 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env nu
# AI Query Processing System
# Enhanced natural language processing for infrastructure queries
use ../observability/agents.nu *
use ../dataframes/polars_integration.nu *
use ../dataframes/log_processor.nu *
# Query types supported by the AI system
const QUERY_TYPES = [
"infrastructure_status"
"performance_analysis"
"cost_optimization"
"security_audit"
"predictive_analysis"
"troubleshooting"
"resource_planning"
"compliance_check"
]
# AI query processor
export def process_query [
query: string
--context: string = "general"
--agent: string = "auto"
--format: string = "json"
--max_results: int = 100
]: string -> any {
print $"🤖 Processing query: ($query)"
# Analyze query intent
let query_analysis = analyze_query_intent $query
let query_type = $query_analysis.type
let entities = $query_analysis.entities
let confidence = $query_analysis.confidence
print $"🎯 Query type: ($query_type) (confidence: ($confidence)%)"
# Select appropriate agent
let selected_agent = if $agent == "auto" {
select_optimal_agent $query_type $entities
} else {
$agent
}
print $"🤖 Selected agent: ($selected_agent)"
# Process query with selected agent
match $query_type {
"infrastructure_status" => {
process_infrastructure_query $query $entities $selected_agent $format $max_results
}
"performance_analysis" => {
process_performance_query $query $entities $selected_agent $format $max_results
}
"cost_optimization" => {
process_cost_query $query $entities $selected_agent $format $max_results
}
"security_audit" => {
process_security_query $query $entities $selected_agent $format $max_results
}
"predictive_analysis" => {
process_predictive_query $query $entities $selected_agent $format $max_results
}
"troubleshooting" => {
process_troubleshooting_query $query $entities $selected_agent $format $max_results
}
"resource_planning" => {
process_planning_query $query $entities $selected_agent $format $max_results
}
"compliance_check" => {
process_compliance_query $query $entities $selected_agent $format $max_results
}
_ => {
process_general_query $query $entities $selected_agent $format $max_results
}
}
}
# Analyze query intent using NLP patterns
def analyze_query_intent [query: string]: string -> record {
let lower_query = ($query | str downcase)
# Infrastructure status patterns
if ($lower_query | str contains "status") or ($lower_query | str contains "health") or ($lower_query | str contains "running") {
return {
type: "infrastructure_status"
entities: (extract_entities $query ["servers", "services", "containers", "clusters"])
confidence: 85
keywords: ["status", "health", "running", "online", "offline"]
}
}
# Performance analysis patterns
if ($lower_query | str contains "cpu") or ($lower_query | str contains "memory") or ($lower_query | str contains "performance") or ($lower_query | str contains "slow") {
return {
type: "performance_analysis"
entities: (extract_entities $query ["servers", "applications", "services"])
confidence: 90
keywords: ["cpu", "memory", "performance", "slow", "fast", "usage"]
}
}
# Cost optimization patterns
if ($lower_query | str contains "cost") or ($lower_query | str contains "expensive") or ($lower_query | str contains "optimize") or ($lower_query | str contains "save money") {
return {
type: "cost_optimization"
entities: (extract_entities $query ["instances", "resources", "storage", "network"])
confidence: 88
keywords: ["cost", "expensive", "cheap", "optimize", "save", "money"]
}
}
# Security audit patterns
if ($lower_query | str contains "security") or ($lower_query | str contains "vulnerability") or ($lower_query | str contains "threat") {
return {
type: "security_audit"
entities: (extract_entities $query ["servers", "applications", "ports", "users"])
confidence: 92
keywords: ["security", "vulnerability", "threat", "breach", "attack"]
}
}
# Predictive analysis patterns
if ($lower_query | str contains "predict") or ($lower_query | str contains "forecast") or ($lower_query | str contains "will") or ($lower_query | str contains "future") {
return {
type: "predictive_analysis"
entities: (extract_entities $query ["capacity", "usage", "growth", "failures"])
confidence: 80
keywords: ["predict", "forecast", "future", "will", "trend"]
}
}
# Troubleshooting patterns
if ($lower_query | str contains "error") or ($lower_query | str contains "problem") or ($lower_query | str contains "fail") or ($lower_query | str contains "issue") {
return {
type: "troubleshooting"
entities: (extract_entities $query ["services", "logs", "errors", "applications"])
confidence: 87
keywords: ["error", "problem", "fail", "issue", "broken"]
}
}
# Default to general query
{
type: "general"
entities: (extract_entities $query ["infrastructure", "system"])
confidence: 60
keywords: []
}
}
# Extract entities from query text
def extract_entities [query: string, entity_types: list<string>]: nothing -> list<string> {
let lower_query = ($query | str downcase)
mut entities = []
# Infrastructure entities
let infra_patterns = {
servers: ["server", "instance", "vm", "machine", "host"]
services: ["service", "application", "app", "microservice"]
containers: ["container", "docker", "pod", "k8s", "kubernetes"]
databases: ["database", "db", "mysql", "postgres", "mongodb"]
network: ["network", "load balancer", "cdn", "dns"]
storage: ["storage", "disk", "volume", "s3", "bucket"]
}
for entity_type in $entity_types {
if ($entity_type in ($infra_patterns | columns)) {
let patterns = ($infra_patterns | get $entity_type)
for pattern in $patterns {
if ($lower_query | str contains $pattern) {
$entities = ($entities | append $entity_type)
break
}
}
}
}
$entities | uniq
}
# Select optimal agent based on query type and entities
def select_optimal_agent [query_type: string, entities: list<string>]: nothing -> string {
match $query_type {
"infrastructure_status" => "infrastructure_monitor"
"performance_analysis" => "performance_analyzer"
"cost_optimization" => "cost_optimizer"
"security_audit" => "security_monitor"
"predictive_analysis" => "predictor"
"troubleshooting" => "pattern_detector"
"resource_planning" => "performance_analyzer"
"compliance_check" => "security_monitor"
_ => "pattern_detector"
}
}
# Process infrastructure status queries
def process_infrastructure_query [
query: string
entities: list<string>
agent: string
format: string
max_results: int
]: nothing -> any {
print "🏗️ Analyzing infrastructure status..."
# Get infrastructure data
let infra_data = execute_agent $agent {
query: $query
entities: $entities
operation: "status_check"
include_metrics: true
}
# Add current system metrics
let current_metrics = collect_system_metrics
let servers_status = get_servers_status
let result = {
query: $query
type: "infrastructure_status"
timestamp: (date now)
data: {
infrastructure: $infra_data
metrics: $current_metrics
servers: $servers_status
}
insights: (generate_infrastructure_insights $infra_data $current_metrics)
recommendations: (generate_recommendations "infrastructure" $infra_data)
}
format_response $result $format
}
# Process performance analysis queries
def process_performance_query [
query: string
entities: list<string>
agent: string
format: string
max_results: int
]: nothing -> any {
print "⚡ Analyzing performance metrics..."
# Get performance data from agent
let perf_data = execute_agent $agent {
query: $query
entities: $entities
operation: "performance_analysis"
time_range: "1h"
}
# Get detailed metrics
let cpu_data = collect_logs --sources ["system"] --since "1h" | query_dataframe $in "SELECT * FROM logs WHERE message LIKE '%CPU%'"
let memory_data = collect_logs --sources ["system"] --since "1h" | query_dataframe $in "SELECT * FROM logs WHERE message LIKE '%memory%'"
let result = {
query: $query
type: "performance_analysis"
timestamp: (date now)
data: {
analysis: $perf_data
cpu_usage: $cpu_data
memory_usage: $memory_data
bottlenecks: (identify_bottlenecks $perf_data)
}
insights: (generate_performance_insights $perf_data)
recommendations: (generate_recommendations "performance" $perf_data)
}
format_response $result $format
}
# Process cost optimization queries
def process_cost_query [
query: string
entities: list<string>
agent: string
format: string
max_results: int
]: nothing -> any {
print "💰 Analyzing cost optimization opportunities..."
let cost_data = execute_agent $agent {
query: $query
entities: $entities
operation: "cost_analysis"
include_recommendations: true
}
# Get resource utilization data
let resource_usage = analyze_resource_utilization
let cost_breakdown = get_cost_breakdown
let result = {
query: $query
type: "cost_optimization"
timestamp: (date now)
data: {
analysis: $cost_data
resource_usage: $resource_usage
cost_breakdown: $cost_breakdown
optimization_opportunities: (identify_cost_savings $cost_data $resource_usage)
}
insights: (generate_cost_insights $cost_data)
recommendations: (generate_recommendations "cost" $cost_data)
potential_savings: (calculate_potential_savings $cost_data)
}
format_response $result $format
}
# Process security audit queries
def process_security_query [
query: string
entities: list<string>
agent: string
format: string
max_results: int
]: nothing -> any {
print "🛡️ Performing security analysis..."
let security_data = execute_agent $agent {
query: $query
entities: $entities
operation: "security_audit"
include_threats: true
}
# Get security events and logs
let security_logs = collect_logs --sources ["system"] --filter_level "warn" --since "24h"
let failed_logins = query_dataframe $security_logs "SELECT * FROM logs WHERE message LIKE '%failed%' AND message LIKE '%login%'"
let result = {
query: $query
type: "security_audit"
timestamp: (date now)
data: {
analysis: $security_data
security_logs: $security_logs
failed_logins: $failed_logins
vulnerabilities: (scan_vulnerabilities $security_data)
compliance_status: (check_compliance $security_data)
}
insights: (generate_security_insights $security_data)
recommendations: (generate_recommendations "security" $security_data)
risk_score: (calculate_risk_score $security_data)
}
format_response $result $format
}
# Process predictive analysis queries
def process_predictive_query [
query: string
entities: list<string>
agent: string
format: string
max_results: int
]: nothing -> any {
print "🔮 Generating predictive analysis..."
let prediction_data = execute_agent $agent {
query: $query
entities: $entities
operation: "predict"
time_horizon: "30d"
}
# Get historical data for predictions
let historical_metrics = collect_logs --since "7d" --output_format "dataframe"
let trend_analysis = time_series_analysis $historical_metrics --window "1d"
let result = {
query: $query
type: "predictive_analysis"
timestamp: (date now)
data: {
predictions: $prediction_data
historical_data: $historical_metrics
trends: $trend_analysis
forecasts: (generate_forecasts $prediction_data $trend_analysis)
}
insights: (generate_predictive_insights $prediction_data)
recommendations: (generate_recommendations "predictive" $prediction_data)
confidence_score: (calculate_prediction_confidence $prediction_data)
}
format_response $result $format
}
# Process troubleshooting queries
def process_troubleshooting_query [
query: string
entities: list<string>
agent: string
format: string
max_results: int
]: nothing -> any {
print "🔧 Analyzing troubleshooting data..."
let troubleshoot_data = execute_agent $agent {
query: $query
entities: $entities
operation: "troubleshoot"
include_solutions: true
}
# Get error logs and patterns
let error_logs = collect_logs --filter_level "error" --since "1h"
let error_patterns = analyze_logs $error_logs --analysis_type "patterns"
let result = {
query: $query
type: "troubleshooting"
timestamp: (date now)
data: {
analysis: $troubleshoot_data
error_logs: $error_logs
patterns: $error_patterns
root_causes: (identify_root_causes $troubleshoot_data $error_patterns)
solutions: (suggest_solutions $troubleshoot_data)
}
insights: (generate_troubleshooting_insights $troubleshoot_data)
recommendations: (generate_recommendations "troubleshooting" $troubleshoot_data)
urgency_level: (assess_urgency $troubleshoot_data)
}
format_response $result $format
}
# Process general queries
def process_general_query [
query: string
entities: list<string>
agent: string
format: string
max_results: int
]: nothing -> any {
print "🤖 Processing general infrastructure query..."
let general_data = execute_agent $agent {
query: $query
entities: $entities
operation: "general_analysis"
}
let result = {
query: $query
type: "general"
timestamp: (date now)
data: {
analysis: $general_data
summary: (generate_general_summary $general_data)
}
insights: ["Query processed successfully", "Consider using more specific terms for better results"]
recommendations: []
}
format_response $result $format
}
# Helper functions for data collection
def collect_system_metrics []: nothing -> record {
{
cpu: (sys cpu | get cpu_usage | math avg)
memory: (sys mem | get used)
disk: (sys disks | get used | math sum)
timestamp: (date now)
}
}
def get_servers_status []: nothing -> list<record> {
# Mock data - in real implementation would query actual infrastructure
[
{ name: "web-01", status: "healthy", cpu: 45, memory: 67 }
{ name: "web-02", status: "healthy", cpu: 38, memory: 54 }
{ name: "db-01", status: "warning", cpu: 78, memory: 89 }
]
}
# Insight generation functions
def generate_infrastructure_insights [infra_data: any, metrics: record]: nothing -> list<string> {
mut insights = []
if ($metrics.cpu > 80) {
$insights = ($insights | append "⚠️ High CPU usage detected across infrastructure")
}
if ($metrics.memory > 85) {
$insights = ($insights | append "🚨 Memory usage is approaching critical levels")
}
$insights = ($insights | append "✅ Infrastructure monitoring active and collecting data")
$insights
}
def generate_performance_insights [perf_data: any]: any -> list<string> {
[
"📊 Performance analysis completed"
"🔍 Bottlenecks identified in database tier"
"⚡ Optimization opportunities available"
]
}
def generate_cost_insights [cost_data: any]: any -> list<string> {
[
"💰 Cost analysis reveals optimization opportunities"
"📉 Potential savings identified in compute resources"
"🎯 Right-sizing recommendations available"
]
}
def generate_security_insights [security_data: any]: any -> list<string> {
[
"🛡️ Security posture assessment completed"
"🔍 No critical vulnerabilities detected"
"✅ Compliance requirements being met"
]
}
def generate_predictive_insights [prediction_data: any]: any -> list<string> {
[
"🔮 Predictive models trained on historical data"
"📈 Trend analysis shows stable resource usage"
"⏰ Early warning system active"
]
}
def generate_troubleshooting_insights [troubleshoot_data: any]: any -> list<string> {
[
"🔧 Issue patterns identified"
"🎯 Root cause analysis in progress"
"💡 Solution recommendations generated"
]
}
# Recommendation generation
def generate_recommendations [category: string, data: any]: nothing -> list<string> {
match $category {
"infrastructure" => [
"Consider implementing auto-scaling for peak hours"
"Review resource allocation across services"
"Set up additional monitoring alerts"
]
"performance" => [
"Optimize database queries causing slow responses"
"Implement caching for frequently accessed data"
"Scale up instances experiencing high load"
]
"cost" => [
"Right-size over-provisioned instances"
"Implement scheduled shutdown for dev environments"
"Consider reserved instances for stable workloads"
]
"security" => [
"Update security patches on all systems"
"Implement multi-factor authentication"
"Review and rotate access credentials"
]
"predictive" => [
"Plan capacity increases for projected growth"
"Set up proactive monitoring for predicted issues"
"Prepare scaling strategies for anticipated load"
]
"troubleshooting" => [
"Implement fix for identified root cause"
"Add monitoring to prevent recurrence"
"Update documentation with solution steps"
]
_ => [
"Continue monitoring system health"
"Review configuration regularly"
]
}
}
# Response formatting
def format_response [result: record, format: string]: nothing -> any {
match $format {
"json" => {
$result | to json
}
"yaml" => {
$result | to yaml
}
"table" => {
$result | table
}
"summary" => {
generate_summary $result
}
_ => {
$result
}
}
}
def generate_summary [result: record]: record -> string {
let insights_text = ($result.insights | str join "\n• ")
let recs_text = ($result.recommendations | str join "\n• ")
$"
🤖 AI Query Analysis Results
Query: ($result.query)
Type: ($result.type)
Timestamp: ($result.timestamp)
📊 Key Insights:
• ($insights_text)
💡 Recommendations:
• ($recs_text)
📋 Summary: Analysis completed successfully with actionable insights generated.
"
}
# Batch query processing
export def process_batch_queries [
queries: list<string>
--context: string = "batch"
--format: string = "json"
--parallel = true
]: list<string> -> list<any> {
print $"🔄 Processing batch of ($queries | length) queries..."
if $parallel {
$queries | par-each {|query|
process_query $query --context $context --format $format
}
} else {
$queries | each {|query|
process_query $query --context $context --format $format
}
}
}
# Query performance analytics
export def analyze_query_performance [
queries: list<string>
--iterations: int = 10
]: list<string> -> record {
print "📊 Analyzing query performance..."
mut results = []
for query in $queries {
let start_time = (date now)
let _ = (process_query $query --format "json")
let end_time = (date now)
let duration = ($end_time - $start_time)
$results = ($results | append {
query: $query
duration_ms: ($duration | into int)
timestamp: $start_time
})
}
let avg_duration = ($results | get duration_ms | math avg)
let total_queries = ($results | length)
{
total_queries: $total_queries
average_duration_ms: $avg_duration
queries_per_second: (1000 / $avg_duration)
results: $results
analysis: {
fastest_query: ($results | sort-by duration_ms | first)
slowest_query: ($results | sort-by duration_ms | last)
}
}
}
# Export query capabilities
export def get_query_capabilities []: nothing -> record {
{
supported_types: $QUERY_TYPES
agents: [
"pattern_detector"
"cost_optimizer"
"performance_analyzer"
"security_monitor"
"predictor"
"auto_healer"
]
output_formats: ["json", "yaml", "table", "summary"]
features: [
"natural_language_processing"
"entity_extraction"
"agent_selection"
"parallel_processing"
"performance_analytics"
"batch_queries"
]
examples: {
infrastructure: "What servers are currently running?"
performance: "Which services are using the most CPU?"
cost: "How can I reduce my AWS costs?"
security: "Are there any security threats detected?"
predictive: "When will I need to scale my database?"
troubleshooting: "Why is the web service responding slowly?"
}
}
}