513 lines
14 KiB
Plaintext
513 lines
14 KiB
Plaintext
#!/usr/bin/env nu
|
||
|
||
# Polars DataFrame Integration for Provisioning System
|
||
# High-performance data processing for logs, metrics, and infrastructure state
|
||
|
||
use ../lib_provisioning/utils/settings.nu *
|
||
|
||
# Check if Polars plugin is available
|
||
export def check_polars_available []: nothing -> bool {
|
||
let plugins = (plugin list)
|
||
($plugins | any {|p| $p.name == "polars" or $p.name == "nu_plugin_polars"})
|
||
}
|
||
|
||
# Initialize Polars plugin if available
|
||
export def init_polars []: nothing -> bool {
|
||
if (check_polars_available) {
|
||
# Try to load polars plugin
|
||
do {
|
||
plugin use polars
|
||
true
|
||
} | complete | if ($in.exit_code == 0) {
|
||
true
|
||
} else {
|
||
print "⚠️ Warning: Polars plugin found but failed to load"
|
||
false
|
||
}
|
||
} else {
|
||
print "ℹ️ Polars plugin not available, using native Nushell operations"
|
||
false
|
||
}
|
||
}
|
||
|
||
# Create DataFrame from infrastructure data
|
||
export def create_infra_dataframe [
|
||
data: list
|
||
--source: string = "infrastructure"
|
||
--timestamp = true
|
||
]: list -> any {
|
||
|
||
let use_polars = init_polars
|
||
|
||
mut processed_data = $data
|
||
|
||
if $timestamp {
|
||
$processed_data = ($processed_data | each {|row|
|
||
$row | upsert timestamp (date now)
|
||
})
|
||
}
|
||
|
||
if $use_polars {
|
||
# Use Polars DataFrame
|
||
$processed_data | polars into-df
|
||
} else {
|
||
# Return enhanced Nushell table with DataFrame-like operations
|
||
$processed_data | enhance_nushell_table
|
||
}
|
||
}
|
||
|
||
# Process logs into DataFrame format
|
||
export def process_logs_to_dataframe [
|
||
log_files: list<string>
|
||
--format: string = "auto" # auto, json, csv, syslog, custom
|
||
--time_column: string = "timestamp"
|
||
--level_column: string = "level"
|
||
--message_column: string = "message"
|
||
]: list<string> -> any {
|
||
|
||
let use_polars = init_polars
|
||
|
||
# Collect and parse all log files
|
||
let parsed_logs = ($log_files | each {|file|
|
||
if ($file | path exists) {
|
||
parse_log_file $file --format $format
|
||
} else {
|
||
[]
|
||
}
|
||
} | flatten)
|
||
|
||
if ($parsed_logs | length) == 0 {
|
||
if $use_polars {
|
||
[] | polars into-df
|
||
} else {
|
||
[]
|
||
}
|
||
} else {
|
||
# Standardize log format
|
||
let standardized = ($parsed_logs | each {|log|
|
||
{
|
||
timestamp: (standardize_timestamp ($log | get $time_column))
|
||
level: ($log | get $level_column)
|
||
message: ($log | get $message_column)
|
||
source: ($log.source? | default "unknown")
|
||
service: ($log.service? | default "provisioning")
|
||
metadata: ($log | reject $time_column $level_column $message_column)
|
||
}
|
||
})
|
||
|
||
if $use_polars {
|
||
$standardized | polars into-df
|
||
} else {
|
||
$standardized | enhance_nushell_table
|
||
}
|
||
}
|
||
}
|
||
|
||
# Parse individual log file based on format
|
||
def parse_log_file [
|
||
file_path: string
|
||
--format: string = "auto"
|
||
]: string -> list {
|
||
|
||
if not ($file_path | path exists) {
|
||
return []
|
||
}
|
||
|
||
let content = (open $file_path --raw)
|
||
|
||
match $format {
|
||
"json" => {
|
||
# Parse JSON logs
|
||
$content | lines | each {|line|
|
||
do {
|
||
$line | from json
|
||
} | complete | if ($in.exit_code == 0) {
|
||
$in.stdout
|
||
} else {
|
||
{
|
||
timestamp: (date now)
|
||
level: "unknown"
|
||
message: $line
|
||
raw: true
|
||
}
|
||
}
|
||
}
|
||
}
|
||
"csv" => {
|
||
# Parse CSV logs
|
||
do {
|
||
$content | from csv
|
||
} | complete | if ($in.exit_code == 0) {
|
||
$in.stdout
|
||
} else {
|
||
[]
|
||
}
|
||
}
|
||
"syslog" => {
|
||
# Parse syslog format
|
||
$content | lines | each {|line|
|
||
parse_syslog_line $line
|
||
}
|
||
}
|
||
"auto" => {
|
||
# Auto-detect format
|
||
if ($file_path | str ends-with ".json") {
|
||
parse_log_file $file_path --format "json"
|
||
} else if ($file_path | str ends-with ".csv") {
|
||
parse_log_file $file_path --format "csv"
|
||
} else {
|
||
parse_log_file $file_path --format "syslog"
|
||
}
|
||
}
|
||
_ => {
|
||
# Custom format - treat as plain text
|
||
$content | lines | each {|line|
|
||
{
|
||
timestamp: (date now)
|
||
level: "info"
|
||
message: $line
|
||
source: $file_path
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
# Parse syslog format line
|
||
def parse_syslog_line [line: string]: string -> record {
|
||
# Basic syslog parsing - can be enhanced
|
||
let parts = ($line | parse --regex '(?P<timestamp>\w+\s+\d+\s+\d+:\d+:\d+)\s+(?P<host>\S+)\s+(?P<service>\S+):\s*(?P<message>.*)')
|
||
|
||
if ($parts | length) > 0 {
|
||
let parsed = $parts.0
|
||
{
|
||
timestamp: $parsed.timestamp
|
||
level: "info" # Default level
|
||
message: $parsed.message
|
||
host: $parsed.host
|
||
service: $parsed.service
|
||
}
|
||
} else {
|
||
{
|
||
timestamp: (date now)
|
||
level: "unknown"
|
||
message: $line
|
||
}
|
||
}
|
||
}
|
||
|
||
# Standardize timestamp formats
|
||
def standardize_timestamp [ts: any]: any -> datetime {
|
||
match ($ts | describe) {
|
||
"string" => {
|
||
do {
|
||
$ts | into datetime
|
||
} | complete | if ($in.exit_code == 0) {
|
||
$in.stdout
|
||
} else {
|
||
date now
|
||
}
|
||
}
|
||
"datetime" => $ts,
|
||
_ => (date now)
|
||
}
|
||
}
|
||
|
||
# Enhance Nushell table with DataFrame-like operations
|
||
def enhance_nushell_table []: list -> list {
|
||
let data = $in
|
||
|
||
# Add DataFrame-like methods through custom commands
|
||
$data | add_dataframe_methods
|
||
}
|
||
|
||
def add_dataframe_methods []: list -> list {
|
||
# This function adds metadata to enable DataFrame-like operations
|
||
# In a real implementation, we'd add custom commands to the scope
|
||
$in
|
||
}
|
||
|
||
# Query DataFrame with SQL-like syntax
|
||
export def query_dataframe [
|
||
df: any
|
||
query: string
|
||
--use_polars = false
|
||
]: any -> any {
|
||
|
||
if $use_polars and (check_polars_available) {
|
||
# Use Polars query capabilities
|
||
$df | polars query $query
|
||
} else {
|
||
# Fallback to Nushell operations
|
||
query_with_nushell $df $query
|
||
}
|
||
}
|
||
|
||
def query_with_nushell [df: any, query: string]: nothing -> any {
|
||
# Simple SQL-like query parser for Nushell
|
||
# This is a basic implementation - can be significantly enhanced
|
||
|
||
if ($query | str downcase | str starts-with "select") {
|
||
let parts = ($query | str replace --regex "(?i)select\\\\s+" "" | split row " from ")
|
||
if ($parts | length) >= 2 {
|
||
let columns = ($parts.0 | split row ",")
|
||
let conditions = if ($parts | length) > 2 { $parts.2 } else { "" }
|
||
|
||
mut result = $df
|
||
|
||
if $columns != ["*"] {
|
||
$result = ($result | select ($columns | each {|c| $c | str trim}))
|
||
}
|
||
|
||
if ($conditions | str contains "where") {
|
||
# Basic WHERE clause processing
|
||
$result = (process_where_clause $result $conditions)
|
||
}
|
||
|
||
$result
|
||
} else {
|
||
$df
|
||
}
|
||
} else {
|
||
$df
|
||
}
|
||
}
|
||
|
||
def process_where_clause [data: any, conditions: string]: nothing -> any {
|
||
# Basic WHERE clause implementation
|
||
# This would need significant enhancement for production use
|
||
$data
|
||
}
|
||
|
||
# Aggregate data with common operations
|
||
export def aggregate_dataframe [
|
||
df: any
|
||
--group_by: list<string> = []
|
||
--operations: record = {} # {column: operation}
|
||
--time_bucket: string = "1h" # For time-based aggregations
|
||
]: any -> any {
|
||
|
||
let use_polars = init_polars
|
||
|
||
if $use_polars and (check_polars_available) {
|
||
# Use Polars aggregation
|
||
aggregate_with_polars $df $group_by $operations $time_bucket
|
||
} else {
|
||
# Use Nushell aggregation
|
||
aggregate_with_nushell $df $group_by $operations $time_bucket
|
||
}
|
||
}
|
||
|
||
def aggregate_with_polars [
|
||
df: any
|
||
group_cols: list<string>
|
||
operations: record
|
||
time_bucket: string
|
||
]: nothing -> any {
|
||
# Polars aggregation implementation
|
||
if ($group_cols | length) > 0 {
|
||
$df | polars group-by $group_cols | polars agg [
|
||
(polars col "value" | polars sum)
|
||
(polars col "value" | polars mean)
|
||
(polars col "value" | polars count)
|
||
]
|
||
} else {
|
||
$df
|
||
}
|
||
}
|
||
|
||
def aggregate_with_nushell [
|
||
df: any
|
||
group_cols: list<string>
|
||
operations: record
|
||
time_bucket: string
|
||
]: nothing -> any {
|
||
# Nushell aggregation implementation
|
||
if ($group_cols | length) > 0 {
|
||
$df | group-by ($group_cols | str join " ")
|
||
} else {
|
||
$df
|
||
}
|
||
}
|
||
|
||
# Time series analysis operations
|
||
export def time_series_analysis [
|
||
df: any
|
||
--time_column: string = "timestamp"
|
||
--value_column: string = "value"
|
||
--window: string = "1h"
|
||
--operations: list<string> = ["mean", "sum", "count"]
|
||
]: any -> any {
|
||
|
||
let use_polars = init_polars
|
||
|
||
if $use_polars and (check_polars_available) {
|
||
time_series_with_polars $df $time_column $value_column $window $operations
|
||
} else {
|
||
time_series_with_nushell $df $time_column $value_column $window $operations
|
||
}
|
||
}
|
||
|
||
def time_series_with_polars [
|
||
df: any
|
||
time_col: string
|
||
value_col: string
|
||
window: string
|
||
ops: list<string>
|
||
]: nothing -> any {
|
||
# Polars time series operations
|
||
$df | polars group-by $time_col | polars agg [
|
||
(polars col $value_col | polars mean)
|
||
(polars col $value_col | polars sum)
|
||
(polars col $value_col | polars count)
|
||
]
|
||
}
|
||
|
||
def time_series_with_nushell [
|
||
df: any
|
||
time_col: string
|
||
value_col: string
|
||
window: string
|
||
ops: list<string>
|
||
]: nothing -> any {
|
||
# Nushell time series - basic implementation
|
||
$df | group-by {|row|
|
||
# Group by time windows - simplified
|
||
($row | get $time_col) | date format "%Y-%m-%d %H:00:00"
|
||
} | each {|group_data|
|
||
let values = ($group_data | get $value_col)
|
||
{
|
||
time_window: "grouped"
|
||
mean: ($values | math avg)
|
||
sum: ($values | math sum)
|
||
count: ($values | length)
|
||
}
|
||
}
|
||
}
|
||
|
||
# Export DataFrame to various formats
|
||
export def export_dataframe [
|
||
df: any
|
||
output_path: string
|
||
--format: string = "csv" # csv, parquet, json, excel
|
||
]: any -> nothing {
|
||
|
||
let use_polars = init_polars
|
||
|
||
match $format {
|
||
"csv" => {
|
||
if $use_polars and (check_polars_available) {
|
||
$df | polars save $output_path
|
||
} else {
|
||
$df | to csv | save --force $output_path
|
||
}
|
||
}
|
||
"parquet" => {
|
||
if $use_polars and (check_polars_available) {
|
||
$df | polars save $output_path
|
||
} else {
|
||
error make { msg: "Parquet format requires Polars plugin" }
|
||
}
|
||
}
|
||
"json" => {
|
||
$df | to json | save --force $output_path
|
||
}
|
||
_ => {
|
||
error make { msg: $"Unsupported format: ($format)" }
|
||
}
|
||
}
|
||
|
||
print $"✅ DataFrame exported to: ($output_path) (format: ($format))"
|
||
}
|
||
|
||
# Performance comparison: Polars vs Nushell
|
||
export def benchmark_operations [
|
||
data_size: int = 10000
|
||
operations: list<string> = ["filter", "group", "aggregate"]
|
||
]: int -> record {
|
||
|
||
print $"🔬 Benchmarking operations with ($data_size) records..."
|
||
|
||
# Generate test data
|
||
let test_data = (0..$data_size | each {|i|
|
||
{
|
||
id: $i
|
||
value: (random int 1..100)
|
||
category: (random int 1..5 | into string)
|
||
timestamp: (date now)
|
||
}
|
||
})
|
||
|
||
let results = {}
|
||
|
||
# Benchmark with Nushell
|
||
let nushell_start = (date now)
|
||
let nushell_result = (benchmark_nushell_operations $test_data $operations)
|
||
let nushell_duration = ((date now) - $nushell_start)
|
||
|
||
$results | insert nushell {
|
||
duration_ms: ($nushell_duration | into int)
|
||
operations_per_sec: ($data_size / ($nushell_duration | into int) * 1000)
|
||
}
|
||
|
||
# Benchmark with Polars (if available)
|
||
if (check_polars_available) {
|
||
let polars_start = (date now)
|
||
let polars_result = (benchmark_polars_operations $test_data $operations)
|
||
let polars_duration = ((date now) - $polars_start)
|
||
|
||
$results | insert polars {
|
||
duration_ms: ($polars_duration | into int)
|
||
operations_per_sec: ($data_size / ($polars_duration | into int) * 1000)
|
||
}
|
||
|
||
$results | insert performance_gain (
|
||
($results.nushell.duration_ms / $results.polars.duration_ms)
|
||
)
|
||
}
|
||
|
||
$results
|
||
}
|
||
|
||
def benchmark_nushell_operations [data: list, ops: list<string>]: nothing -> any {
|
||
mut result = $data
|
||
|
||
if "filter" in $ops {
|
||
$result = ($result | where value > 50)
|
||
}
|
||
|
||
if "group" in $ops {
|
||
$result = ($result | group-by category)
|
||
}
|
||
|
||
if "aggregate" in $ops {
|
||
$result = ($result | each {|group| {
|
||
category: $group.0
|
||
count: ($group.1 | length)
|
||
avg_value: ($group.1 | get value | math avg)
|
||
}})
|
||
}
|
||
|
||
$result
|
||
}
|
||
|
||
def benchmark_polars_operations [data: list, ops: list<string>]: nothing -> any {
|
||
mut df = ($data | polars into-df)
|
||
|
||
if "filter" in $ops {
|
||
$df = ($df | polars filter (polars col value))
|
||
}
|
||
|
||
if "group" in $ops {
|
||
$df = ($df | polars group-by "category")
|
||
}
|
||
|
||
if "aggregate" in $ops {
|
||
$df = ($df | polars agg [
|
||
(polars col "id" | polars count)
|
||
(polars col "value" | polars mean)
|
||
])
|
||
}
|
||
|
||
$df
|
||
} |