#!/usr/bin/env nu # Polars DataFrame Integration for Provisioning System # High-performance data processing for logs, metrics, and infrastructure state use ../lib_provisioning/utils/settings.nu * # Check if Polars plugin is available export def check_polars_available []: nothing -> bool { let plugins = (plugin list) ($plugins | any {|p| $p.name == "polars" or $p.name == "nu_plugin_polars"}) } # Initialize Polars plugin if available export def init_polars []: nothing -> bool { if (check_polars_available) { # Try to load polars plugin do { plugin use polars true } | complete | if ($in.exit_code == 0) { true } else { print "⚠️ Warning: Polars plugin found but failed to load" false } } else { print "ℹ️ Polars plugin not available, using native Nushell operations" false } } # Create DataFrame from infrastructure data export def create_infra_dataframe [ data: list --source: string = "infrastructure" --timestamp = true ]: list -> any { let use_polars = init_polars mut processed_data = $data if $timestamp { $processed_data = ($processed_data | each {|row| $row | upsert timestamp (date now) }) } if $use_polars { # Use Polars DataFrame $processed_data | polars into-df } else { # Return enhanced Nushell table with DataFrame-like operations $processed_data | enhance_nushell_table } } # Process logs into DataFrame format export def process_logs_to_dataframe [ log_files: list --format: string = "auto" # auto, json, csv, syslog, custom --time_column: string = "timestamp" --level_column: string = "level" --message_column: string = "message" ]: list -> any { let use_polars = init_polars # Collect and parse all log files let parsed_logs = ($log_files | each {|file| if ($file | path exists) { parse_log_file $file --format $format } else { [] } } | flatten) if ($parsed_logs | length) == 0 { if $use_polars { [] | polars into-df } else { [] } } else { # Standardize log format let standardized = ($parsed_logs | each {|log| { timestamp: (standardize_timestamp ($log | get $time_column)) level: ($log | get $level_column) message: ($log | get $message_column) source: ($log.source? | default "unknown") service: ($log.service? | default "provisioning") metadata: ($log | reject $time_column $level_column $message_column) } }) if $use_polars { $standardized | polars into-df } else { $standardized | enhance_nushell_table } } } # Parse individual log file based on format def parse_log_file [ file_path: string --format: string = "auto" ]: string -> list { if not ($file_path | path exists) { return [] } let content = (open $file_path --raw) match $format { "json" => { # Parse JSON logs $content | lines | each {|line| do { $line | from json } | complete | if ($in.exit_code == 0) { $in.stdout } else { { timestamp: (date now) level: "unknown" message: $line raw: true } } } } "csv" => { # Parse CSV logs do { $content | from csv } | complete | if ($in.exit_code == 0) { $in.stdout } else { [] } } "syslog" => { # Parse syslog format $content | lines | each {|line| parse_syslog_line $line } } "auto" => { # Auto-detect format if ($file_path | str ends-with ".json") { parse_log_file $file_path --format "json" } else if ($file_path | str ends-with ".csv") { parse_log_file $file_path --format "csv" } else { parse_log_file $file_path --format "syslog" } } _ => { # Custom format - treat as plain text $content | lines | each {|line| { timestamp: (date now) level: "info" message: $line source: $file_path } } } } } # Parse syslog format line def parse_syslog_line [line: string]: string -> record { # Basic syslog parsing - can be enhanced let parts = ($line | parse --regex '(?P\w+\s+\d+\s+\d+:\d+:\d+)\s+(?P\S+)\s+(?P\S+):\s*(?P.*)') if ($parts | length) > 0 { let parsed = $parts.0 { timestamp: $parsed.timestamp level: "info" # Default level message: $parsed.message host: $parsed.host service: $parsed.service } } else { { timestamp: (date now) level: "unknown" message: $line } } } # Standardize timestamp formats def standardize_timestamp [ts: any]: any -> datetime { match ($ts | describe) { "string" => { do { $ts | into datetime } | complete | if ($in.exit_code == 0) { $in.stdout } else { date now } } "datetime" => $ts, _ => (date now) } } # Enhance Nushell table with DataFrame-like operations def enhance_nushell_table []: list -> list { let data = $in # Add DataFrame-like methods through custom commands $data | add_dataframe_methods } def add_dataframe_methods []: list -> list { # This function adds metadata to enable DataFrame-like operations # In a real implementation, we'd add custom commands to the scope $in } # Query DataFrame with SQL-like syntax export def query_dataframe [ df: any query: string --use_polars = false ]: any -> any { if $use_polars and (check_polars_available) { # Use Polars query capabilities $df | polars query $query } else { # Fallback to Nushell operations query_with_nushell $df $query } } def query_with_nushell [df: any, query: string]: nothing -> any { # Simple SQL-like query parser for Nushell # This is a basic implementation - can be significantly enhanced if ($query | str downcase | str starts-with "select") { let parts = ($query | str replace --regex "(?i)select\\\\s+" "" | split row " from ") if ($parts | length) >= 2 { let columns = ($parts.0 | split row ",") let conditions = if ($parts | length) > 2 { $parts.2 } else { "" } mut result = $df if $columns != ["*"] { $result = ($result | select ($columns | each {|c| $c | str trim})) } if ($conditions | str contains "where") { # Basic WHERE clause processing $result = (process_where_clause $result $conditions) } $result } else { $df } } else { $df } } def process_where_clause [data: any, conditions: string]: nothing -> any { # Basic WHERE clause implementation # This would need significant enhancement for production use $data } # Aggregate data with common operations export def aggregate_dataframe [ df: any --group_by: list = [] --operations: record = {} # {column: operation} --time_bucket: string = "1h" # For time-based aggregations ]: any -> any { let use_polars = init_polars if $use_polars and (check_polars_available) { # Use Polars aggregation aggregate_with_polars $df $group_by $operations $time_bucket } else { # Use Nushell aggregation aggregate_with_nushell $df $group_by $operations $time_bucket } } def aggregate_with_polars [ df: any group_cols: list operations: record time_bucket: string ]: nothing -> any { # Polars aggregation implementation if ($group_cols | length) > 0 { $df | polars group-by $group_cols | polars agg [ (polars col "value" | polars sum) (polars col "value" | polars mean) (polars col "value" | polars count) ] } else { $df } } def aggregate_with_nushell [ df: any group_cols: list operations: record time_bucket: string ]: nothing -> any { # Nushell aggregation implementation if ($group_cols | length) > 0 { $df | group-by ($group_cols | str join " ") } else { $df } } # Time series analysis operations export def time_series_analysis [ df: any --time_column: string = "timestamp" --value_column: string = "value" --window: string = "1h" --operations: list = ["mean", "sum", "count"] ]: any -> any { let use_polars = init_polars if $use_polars and (check_polars_available) { time_series_with_polars $df $time_column $value_column $window $operations } else { time_series_with_nushell $df $time_column $value_column $window $operations } } def time_series_with_polars [ df: any time_col: string value_col: string window: string ops: list ]: nothing -> any { # Polars time series operations $df | polars group-by $time_col | polars agg [ (polars col $value_col | polars mean) (polars col $value_col | polars sum) (polars col $value_col | polars count) ] } def time_series_with_nushell [ df: any time_col: string value_col: string window: string ops: list ]: nothing -> any { # Nushell time series - basic implementation $df | group-by {|row| # Group by time windows - simplified ($row | get $time_col) | date format "%Y-%m-%d %H:00:00" } | each {|group_data| let values = ($group_data | get $value_col) { time_window: "grouped" mean: ($values | math avg) sum: ($values | math sum) count: ($values | length) } } } # Export DataFrame to various formats export def export_dataframe [ df: any output_path: string --format: string = "csv" # csv, parquet, json, excel ]: any -> nothing { let use_polars = init_polars match $format { "csv" => { if $use_polars and (check_polars_available) { $df | polars save $output_path } else { $df | to csv | save --force $output_path } } "parquet" => { if $use_polars and (check_polars_available) { $df | polars save $output_path } else { error make { msg: "Parquet format requires Polars plugin" } } } "json" => { $df | to json | save --force $output_path } _ => { error make { msg: $"Unsupported format: ($format)" } } } print $"✅ DataFrame exported to: ($output_path) (format: ($format))" } # Performance comparison: Polars vs Nushell export def benchmark_operations [ data_size: int = 10000 operations: list = ["filter", "group", "aggregate"] ]: int -> record { print $"🔬 Benchmarking operations with ($data_size) records..." # Generate test data let test_data = (0..$data_size | each {|i| { id: $i value: (random int 1..100) category: (random int 1..5 | into string) timestamp: (date now) } }) let results = {} # Benchmark with Nushell let nushell_start = (date now) let nushell_result = (benchmark_nushell_operations $test_data $operations) let nushell_duration = ((date now) - $nushell_start) $results | insert nushell { duration_ms: ($nushell_duration | into int) operations_per_sec: ($data_size / ($nushell_duration | into int) * 1000) } # Benchmark with Polars (if available) if (check_polars_available) { let polars_start = (date now) let polars_result = (benchmark_polars_operations $test_data $operations) let polars_duration = ((date now) - $polars_start) $results | insert polars { duration_ms: ($polars_duration | into int) operations_per_sec: ($data_size / ($polars_duration | into int) * 1000) } $results | insert performance_gain ( ($results.nushell.duration_ms / $results.polars.duration_ms) ) } $results } def benchmark_nushell_operations [data: list, ops: list]: nothing -> any { mut result = $data if "filter" in $ops { $result = ($result | where value > 50) } if "group" in $ops { $result = ($result | group-by category) } if "aggregate" in $ops { $result = ($result | each {|group| { category: $group.0 count: ($group.1 | length) avg_value: ($group.1 | get value | math avg) }}) } $result } def benchmark_polars_operations [data: list, ops: list]: nothing -> any { mut df = ($data | polars into-df) if "filter" in $ops { $df = ($df | polars filter (polars col value)) } if "group" in $ops { $df = ($df | polars group-by "category") } if "aggregate" in $ops { $df = ($df | polars agg [ (polars col "id" | polars count) (polars col "value" | polars mean) ]) } $df }