Search Docs…
Search Docs…
Tutorials
Multi-agent testing using Rust
This tutorial guides you through creating a high-performance multi-agent testing pipeline using Rust. You'll build a comprehensive testing framework that leverages Rust's performance, type safety, and async capabilities to test CoAgent deployments at scale.
What You'll Build
By the end of this tutorial, you'll have:
- A typed Rust client for CoAgent's testing APIs 
- High-performance concurrent test execution 
- Statistical analysis and performance benchmarking 
- Load testing with thousands of concurrent requests 
- A/B testing framework with statistical significance 
- Comprehensive reporting and visualization 
- CI/CD integration with GitHub Actions 
Prerequisites
- CoAgent running locally ( - docker-compose up)
- Rust toolchain (1.70+) with - rustupand- cargo
- Basic knowledge of async Rust and testing 
- Completed Rust Client Integration Tutorial (recommended) 
Tutorial Overview
Phase 1: Project Setup & Core Types Phase 2: Test Case Management System Phase 3: Multi-Agent Execution Engine Phase 4: Performance Analysis & Metrics Phase 5: Load Testing Framework Phase 6: Statistical A/B Testing Phase 7: Reporting & Visualization Phase 8: CI/CD Integration
Phase 1: Project Setup & Core Types
1.1 Create New Rust Project
cargo new coagent-multi-agent-testing --bin cd
1.2 Configure Cargo.toml
[package] name = "coagent-multi-agent-testing" version = "0.1.0" edition = "2021" [dependencies] # HTTP and JSON reqwest = { version = "0.12", features = ["json", "rustls-tls"] } serde = { version = "1", features = ["derive"] } serde_json = "1" # Async runtime and utilities tokio = { version = "1", features = ["full"] } futures = "0.3" async-trait = "0.1" # Error handling anyhow = "1" thiserror = "1" # Concurrency and timing backoff = "0.4" uuid = { version = "1", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } # Statistics and analysis statrs = "0.17" plotters = "0.3" tabled = "0.15" # CLI and configuration clap = { version = "4", features = ["derive"] } config = "0.14" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } # Testing framework criterion = { version = "0.5", features = ["html_reports"] } [dev-dependencies] tokio-test = "0.4" wiremock = "0.6" [[bin]] name = "main" path = "src/main.rs" [[bench]] name = "load_test" harness = false
1.3 Core Types and Client
Create src/types.rs:
use serde::{Deserialize, Serialize}; use std::collections::HashMap; use uuid::Uuid; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AgentConfig { pub id: String, pub name: String, pub description: String, pub preamble: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ModelProvider { pub id: String, pub name: String, pub provider_type: String, pub available_models: Vec<String>, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SandboxConfig { pub id: String, pub name: String, pub system_prompt: String, pub parameters: SandboxParameters, pub category: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SandboxParameters { pub temperature: f32, pub max_tokens: u32, pub top_p: f32, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TestCase { pub id: String, pub name: String, pub input: TestInput, pub validations: Vec<TestValidation>, pub expected_duration_ms: Option<u64>, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TestInput { pub human_prompt: String, pub context: HashMap<String, serde_json::Value>, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum TestValidation { ContentMatch { pattern: String }, SemanticSimilarity { sentence: String, threshold: f32 }, ToolCall { tool_name: String }, ResponseTime { max_seconds: u64 }, ResponseSchema { schema: serde_json::Value }, LlmEval { criteria: String, model_ref: ModelReference }, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ModelReference { pub provider_id: String, pub model_name: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TestSuite { pub id: String, pub name: String, pub description: String, pub test_cases: Vec<TestCase>, pub config_ids: Vec<String>, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TestRun { pub id: String, pub suite_id: String, pub status: TestRunStatus, pub started_at: chrono::DateTime<chrono::Utc>, pub completed_at: Option<chrono::DateTime<chrono::Utc>>, pub results: Vec<TestResult>, pub metrics: TestMetrics, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum TestRunStatus { Running, Completed, Failed, Cancelled, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TestResult { pub test_case_id: String, pub config_id: String, pub status: TestStatus, pub response: Option<String>, pub execution_time_ms: u64, pub token_usage: TokenUsage, pub validation_results: Vec<ValidationResult>, pub error: Option<String>, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum TestStatus { Passed, Failed, Warning, Error, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TokenUsage { pub prompt_tokens: u32, pub completion_tokens: u32, pub total_tokens: u32, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ValidationResult { pub validation_type: String, pub passed: bool, pub details: HashMap<String, serde_json::Value>, pub score: Option<f32>, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TestMetrics { pub total_cases: u32, pub passed_cases: u32, pub failed_cases: u32, pub warning_cases: u32, pub error_cases: u32, pub avg_response_time_ms: f64, pub total_tokens_used: u64, pub success_rate: f64, } impl Default for SandboxParameters { fn default() -> Self { Self { temperature: 0.7, max_tokens: 2048, top_p: 1.0, } } }
1.4 CoAgent Testing Client
Create src/client.rs:
use crate::types::*; use anyhow::{Context, Result}; use reqwest::Client; use serde_json::json; use std::time::Duration; use tracing::{info, warn}; #[derive(Clone)] pub struct CoAgentTestClient { client: Client, base_url: String, } impl CoAgentTestClient { pub fn new(base_url: String) -> Result<Self> { let client = Client::builder() .timeout(Duration::from_secs(60)) .build() .context("Failed to create HTTP client")?; Ok(Self { client, base_url }) } // Agent Configuration Management pub async fn create_agent(&self, config: &AgentConfig) -> Result<String> { let response = self .client .post(&format!("{}/api/v1/agents", self.base_url)) .json(config) .send() .await .context("Failed to create agent")?; if response.status().is_success() { let result: serde_json::Value = response.json().await?; Ok(result["id"].as_str().unwrap_or(&config.id).to_string()) } else { anyhow::bail!("Agent creation failed: {}", response.status()); } } pub async fn list_agents(&self) -> Result<Vec<AgentConfig>> { let response = self .client .get(&format!("{}/api/v1/agents", self.base_url)) .send() .await .context("Failed to list agents")?; if response.status().is_success() { let result: serde_json::Value = response.json().await?; let agents = result["agents"] .as_array() .context("Invalid agents response format")?; Ok(serde_json::from_value(json!(agents))?) } else { anyhow::bail!("Failed to list agents: {}", response.status()); } } // Model Provider Management pub async fn create_provider(&self, provider: &ModelProvider) -> Result<String> { let response = self .client .post(&format!("{}/api/v1/providers", self.base_url)) .json(provider) .send() .await .context("Failed to create provider")?; if response.status().is_success() { let result: serde_json::Value = response.json().await?; Ok(result["id"].as_str().unwrap_or(&provider.id).to_string()) } else { anyhow::bail!("Provider creation failed: {}", response.status()); } } // Sandbox Configuration Management pub async fn create_sandbox_config(&self, config: &SandboxConfig) -> Result<String> { let response = self .client .post(&format!("{}/api/v1/sandbox_configs", self.base_url)) .json(config) .send() .await .context("Failed to create sandbox config")?; if response.status().is_success() { let result: serde_json::Value = response.json().await?; Ok(result["id"].as_str().unwrap_or(&config.id).to_string()) } else { anyhow::bail!("Sandbox config creation failed: {}", response.status()); } } // Test Suite Management pub async fn create_test_suite(&self, suite: &TestSuite) -> Result<String> { let formatted_suite = json!({ "name": suite.name, "description": suite.description, "cases": suite.test_cases.iter().map(|tc| json!({ "input": tc.input, "validations": tc.validations.iter().map(|v| match v { TestValidation::ContentMatch { pattern } => json!({ "kind": { "content_match": { "pattern": pattern } } }), TestValidation::SemanticSimilarity { sentence, threshold } => json!({ "kind": { "semantic_similarity": { "sentence": sentence, "threshold": threshold } } }), TestValidation::ToolCall { tool_name } => json!({ "kind": { "tool_call": { "tool_name": tool_name } } }), TestValidation::ResponseTime { max_seconds } => json!({ "kind": { "response_time": { "max_seconds": max_seconds } } }), TestValidation::ResponseSchema { schema } => json!({ "kind": { "response_schema": { "schema": schema } } }), TestValidation::LlmEval { criteria, model_ref } => json!({ "kind": { "llm_v0": { "llm0": { "llm_criteria": criteria, "model_reference": model_ref } }} }), }).collect::<Vec<_>>() })).collect::<Vec<_>>() }); let response = self .client .post(&format!("{}/api/v1/testsets", self.base_url)) .json(&formatted_suite) .send() .await .context("Failed to create test suite")?; if response.status().is_success() { let result: serde_json::Value = response.json().await?; Ok(result["id_testset"].as_str().unwrap_or(&suite.id).to_string()) } else { anyhow::bail!("Test suite creation failed: {}", response.status()); } } pub async fn run_test_suite(&self, suite_id: &str, config_ids: &[String]) -> Result<TestRun> { let run_request = json!({ "selected_configs": config_ids }); let response = self .client .post(&format!("{}/api/v1/testsets/{}/run", self.base_url, suite_id)) .json(&run_request) .send() .await .context("Failed to start test run")?; if !response.status().is_success() { anyhow::bail!("Test run start failed: {}", response.status()); } let result: serde_json::Value = response.json().await?; let run_id = result["run_id"].as_str().context("Missing run_id")?; info!("Started test run: {}", run_id); // Poll for completion self.wait_for_test_completion(run_id).await } async fn wait_for_test_completion(&self, run_id: &str) -> Result<TestRun> { let mut interval = tokio::time::interval(Duration::from_secs(2)); let start_time = std::time::Instant::now(); let timeout = Duration::from_secs(300); // 5 minute timeout loop { interval.tick().await; if start_time.elapsed() > timeout { anyhow::bail!("Test run timed out after 5 minutes"); } let response = self .client .get(&format!("{}/api/v1/testruns/{}", self.base_url, run_id)) .send() .await .context("Failed to get test run status")?; if !response.status().is_success() { warn!("Failed to get test run status: {}", response.status()); continue; } let result: serde_json::Value = response.json().await?; let status = result["status"].as_str().unwrap_or("unknown"); match status { "completed" | "failed" => { return self.parse_test_run_result(result); } "running" => { let progress = self.calculate_progress(&result); info!("Test run progress: {:.1}%", progress * 100.0); continue; } _ => { warn!("Unknown test run status: {}", status); continue; } } } } fn calculate_progress(&self, result: &serde_json::Value) -> f64 { let total = result["total_cases"].as_u64().unwrap_or(1) as f64; let completed = result["passed_cases"].as_u64().unwrap_or(0) as f64 + result["failed_cases"].as_u64().unwrap_or(0) as f64; if total > 0.0 { completed / total } else { 0.0 } } fn parse_test_run_result(&self, result: serde_json::Value) -> Result<TestRun> { let test_run = TestRun { id: result["run_id"].as_str().unwrap_or("unknown").to_string(), suite_id: result["testset_id"].as_str().unwrap_or("unknown").to_string(), status: match result["status"].as_str().unwrap_or("failed") { "completed" => TestRunStatus::Completed, "running" => TestRunStatus::Running, "cancelled" => TestRunStatus::Cancelled, _ => TestRunStatus::Failed, }, started_at: chrono::Utc::now(), // Should parse from response completed_at: Some(chrono::Utc::now()), // Should parse from response results: Vec::new(), // Would parse detailed results metrics: TestMetrics { total_cases: result["total_cases"].as_u64().unwrap_or(0) as u32, passed_cases: result["passed_cases"].as_u64().unwrap_or(0) as u32, failed_cases: result["failed_cases"].as_u64().unwrap_or(0) as u32, warning_cases: result["warning_cases"].as_u64().unwrap_or(0) as u32, error_cases: 0, avg_response_time_ms: result["avg_response_time_ms"].as_f64().unwrap_or(0.0), total_tokens_used: result["total_tokens"].as_u64().unwrap_or(0), success_rate: { let total = result["total_cases"].as_f64().unwrap_or(1.0); let passed = result["passed_cases"].as_f64().unwrap_or(0.0); if total > 0.0 { passed / total } else { 0.0 } }, }, }; Ok(test_run) } }
Phase 2: Test Case Management System
2.1 Test Case Builder
Create src/test_builder.rs:
use crate::types::*; use anyhow::Result; use uuid::Uuid; pub struct TestSuiteBuilder { suite: TestSuite, } impl TestSuiteBuilder { pub fn new(name: impl Into<String>, description: impl Into<String>) -> Self { Self { suite: TestSuite { id: Uuid::new_v4().to_string(), name: name.into(), description: description.into(), test_cases: Vec::new(), config_ids: Vec::new(), }, } } pub fn with_config_ids(mut self, config_ids: Vec<String>) -> Self { self.suite.config_ids = config_ids; self } pub fn add_test_case(mut self, test_case: TestCase) -> Self { self.suite.test_cases.push(test_case); self } pub fn add_customer_support_tests(mut self) -> Self { // Basic functionality tests let basic_tests = vec![ TestCaseBuilder::new("product_return_inquiry") .with_prompt("I want to return a product I bought last week") .with_content_match(r"(return|refund|exchange|policy)") .with_response_time(5) .with_semantic_similarity( "I can help you with your return. Let me guide you through our return process.", 0.7, ) .build(), TestCaseBuilder::new("order_status_check") .with_prompt("Can you check the status of my order #12345?") .with_content_match(r"(order|status|tracking|shipped|delivered)") .with_tool_call("order_lookup") .with_response_time(8) .build(), TestCaseBuilder::new("billing_question") .with_prompt("I was charged twice for the same order") .with_content_match(r"(billing|charge|refund|investigate|resolve)") .with_llm_eval( "Rate the empathy and helpfulness of the response on a scale of 1-10. A score of 7 or higher passes.", ModelReference { provider_id: "openai-eval".to_string(), model_name: "gpt-4".to_string(), }, ) .build(), ]; for test in basic_tests { self.suite.test_cases.push(test); } self } pub fn add_edge_case_tests(mut self) -> Self { let edge_cases = vec![ TestCaseBuilder::new("angry_customer") .with_prompt("This is ridiculous! Your product is broken and your service is terrible!") .with_llm_eval( "Rate how well the response de-escalates the situation and shows empathy (1-10). Score 7+ passes.", ModelReference { provider_id: "openai-eval".to_string(), model_name: "gpt-4".to_string(), }, ) .with_content_match(r"(understand|sorry|apologize|help|resolve)") .build(), TestCaseBuilder::new("ambiguous_request") .with_prompt("I have a problem with my thing") .with_content_match(r"(clarify|more information|specific|help understand)") .with_llm_eval( "Does the response appropriately ask for clarification? (Yes/No)", ModelReference { provider_id: "anthropic-eval".to_string(), model_name: "claude-3-sonnet".to_string(), }, ) .build(), TestCaseBuilder::new("multiple_issues") .with_prompt("I need to return a product, update my address, and cancel my subscription") .with_content_match(r"(return|address|subscription)") .with_llm_eval( "Does the response address all three issues mentioned? (Yes/No)", ModelReference { provider_id: "openai-eval".to_string(), model_name: "gpt-4".to_string(), }, ) .build(), ]; for test in edge_cases { self.suite.test_cases.push(test); } self } pub fn add_performance_tests(mut self) -> Self { let performance_tests = vec![ TestCaseBuilder::new("quick_response_test") .with_prompt("What are your business hours?") .with_response_time(2) .with_content_match(r"(hours|open|close|Monday|business)") .build(), TestCaseBuilder::new("complex_query_efficiency") .with_prompt("I need to return a product that was a gift, but I don't have the receipt, and it was purchased with store credit from a previous return. Can you help?") .with_response_time(8) .with_llm_eval( "Does the response efficiently address the complex return scenario? Rate 1-10, need 7+.", ModelReference { provider_id: "openai-eval".to_string(), model_name: "gpt-4".to_string(), }, ) .build(), ]; for test in performance_tests { self.suite.test_cases.push(test); } self } pub fn build(self) -> TestSuite { self.suite } } pub struct TestCaseBuilder { test_case: TestCase, } impl TestCaseBuilder { pub fn new(name: impl Into<String>) -> Self { Self { test_case: TestCase { id: Uuid::new_v4().to_string(), name: name.into(), input: TestInput { human_prompt: String::new(), context: std::collections::HashMap::new(), }, validations: Vec::new(), expected_duration_ms: None, }, } } pub fn with_prompt(mut self, prompt: impl Into<String>) -> Self { self.test_case.input.human_prompt = prompt.into(); self } pub fn with_context(mut self, key: impl Into<String>, value: serde_json::Value) -> Self { self.test_case.input.context.insert(key.into(), value); self } pub fn with_content_match(mut self, pattern: impl Into<String>) -> Self { self.test_case.validations.push(TestValidation::ContentMatch { pattern: pattern.into(), }); self } pub fn with_semantic_similarity(mut self, sentence: impl Into<String>, threshold: f32) -> Self { self.test_case.validations.push(TestValidation::SemanticSimilarity { sentence: sentence.into(), threshold, }); self } pub fn with_tool_call(mut self, tool_name: impl Into<String>) -> Self { self.test_case.validations.push(TestValidation::ToolCall { tool_name: tool_name.into(), }); self } pub fn with_response_time(mut self, max_seconds: u64) -> Self { self.test_case.validations.push(TestValidation::ResponseTime { max_seconds }); self.test_case.expected_duration_ms = Some(max_seconds * 1000); self } pub fn with_schema_validation(mut self, schema: serde_json::Value) -> Self { self.test_case.validations.push(TestValidation::ResponseSchema { schema }); self } pub fn with_llm_eval(mut self, criteria: impl Into<String>, model_ref: ModelReference) -> Self { self.test_case.validations.push(TestValidation::LlmEval { criteria: criteria.into(), model_ref, }); self } pub fn build(self) -> TestCase { self.test_case } }
2.2 Configuration Management
Create src/config_manager.rs:
use crate::types::*; use anyhow::Result; use uuid::Uuid; pub struct ConfigurationManager { pub agents: Vec<AgentConfig>, pub providers: Vec<ModelProvider>, pub sandbox_configs: Vec<SandboxConfig>, } impl ConfigurationManager { pub fn new() -> Self { Self { agents: Vec::new(), providers: Vec::new(), sandbox_configs: Vec::new(), } } pub fn create_standard_setup(&mut self) -> Result<()> { // Create model providers self.providers.push(ModelProvider { id: Uuid::new_v4().to_string(), name: "OpenAI Test Provider".to_string(), provider_type: "openai".to_string(), available_models: vec!["gpt-4".to_string(), "gpt-3.5-turbo".to_string()], }); self.providers.push(ModelProvider { id: Uuid::new_v4().to_string(), name: "Anthropic Test Provider".to_string(), provider_type: "anthropic".to_string(), available_models: vec!["claude-3-sonnet".to_string(), "claude-3-haiku".to_string()], }); // Create agent configurations self.agents.push(AgentConfig { id: Uuid::new_v4().to_string(), name: "conservative-support-agent".to_string(), description: "Conservative customer support agent focused on accuracy".to_string(), preamble: r#"You are a careful, professional customer support agent. Always verify information before responding. Prioritize accuracy over speed. When in doubt, escalate to human support. Guidelines: 1. Acknowledge the customer's concern 2. Ask clarifying questions if needed 3. Provide accurate, step-by-step solutions 4. Offer alternative options when appropriate 5. Always be polite and professional"#.to_string(), }); self.agents.push(AgentConfig { id: Uuid::new_v4().to_string(), name: "fast-support-agent".to_string(), description: "Quick-response customer support agent for high volume".to_string(), preamble: r#"You are an efficient customer support agent. Provide quick, helpful responses. Be concise but friendly. Handle common issues directly. Guidelines: 1. Give direct, actionable answers 2. Be concise but complete 3. Use bullet points for clarity 4. Handle routine requests immediately 5. Escalate only complex technical issues"#.to_string(), }); self.agents.push(AgentConfig { id: Uuid::new_v4().to_string(), name: "empathetic-support-agent".to_string(), description: "Empathetic customer support agent focused on customer satisfaction".to_string(), preamble: r#"You are a warm, empathetic customer support agent. Focus on understanding the customer's feelings and concerns. Provide emotional support along with practical solutions. Guidelines: 1. Acknowledge emotions and show empathy 2. Use warm, supportive language 3. Take time to understand the full situation 4. Offer reassurance and confidence 5. Go above and beyond to help"#.to_string(), }); // Create sandbox configurations self.sandbox_configs.push(SandboxConfig { id: Uuid::new_v4().to_string(), name: "Conservative Testing Environment".to_string(), system_prompt: "Focus on providing accurate, well-researched responses. Take time to verify information.".to_string(), parameters: SandboxParameters { temperature: 0.2, max_tokens: 1024, top_p: 0.8, }, category: "accuracy_focused".to_string(), }); self.sandbox_configs.push(SandboxConfig { id: Uuid::new_v4().to_string(), name: "Fast Response Environment".to_string(), system_prompt: "Provide quick, efficient responses. Be concise and direct.".to_string(), parameters: SandboxParameters { temperature: 0.3, max_tokens: 512, top_p: 0.9, }, category: "speed_focused".to_string(), }); self.sandbox_configs.push(SandboxConfig { id: Uuid::new_v4().to_string(), name: "Balanced Environment".to_string(), system_prompt: "Balance accuracy with efficiency. Provide helpful responses in reasonable time.".to_string(), parameters: SandboxParameters { temperature: 0.4, max_tokens: 768, top_p: 0.95, }, category: "balanced".to_string(), }); Ok(()) } pub fn get_config_ids(&self) -> Vec<String> { self.sandbox_configs.iter().map(|c| c.id.clone()).collect() } }
Phase 3: Multi-Agent Execution Engine
3.1 Concurrent Test Executor
Create src/executor.rs:
use crate::client::CoAgentTestClient; use crate::types::*; use anyhow::Result; use futures::future::join_all; use std::sync::Arc; use std::time::Instant; use tokio::sync::Semaphore; use tracing::{info, warn, error}; pub struct MultiAgentTestExecutor { client: Arc<CoAgentTestClient>, concurrency_limit: usize, } impl MultiAgentTestExecutor { pub fn new(client: CoAgentTestClient, concurrency_limit: usize) -> Self { Self { client: Arc::new(client), concurrency_limit, } } pub async fn execute_test_suite(&self, suite: &TestSuite) -> Result<MultiAgentTestReport> { info!("Starting multi-agent test execution for suite: {}", suite.name); let start_time = Instant::now(); // Create the test suite in CoAgent let suite_id = self.client.create_test_suite(suite).await?; info!("Created test suite with ID: {}", suite_id); // Execute tests for each configuration let semaphore = Arc::new(Semaphore::new(self.concurrency_limit)); let mut test_futures = Vec::new(); for config_id in &suite.config_ids { let client = Arc::clone(&self.client); let suite_id = suite_id.clone(); let config_id = config_id.clone(); let semaphore = Arc::clone(&semaphore); let future = async move { let _permit = semaphore.acquire().await.unwrap(); info!("Running tests for config: {}", config_id); let result = client.run_test_suite(&suite_id, &[config_id.clone()]).await; match result { Ok(test_run) => { info!("Completed tests for config: {} - Success rate: {:.1}%", config_id, test_run.metrics.success_rate * 100.0); Some((config_id, test_run)) } Err(e) => { error!("Failed tests for config: {} - Error: {}", config_id, e); None } } }; test_futures.push(future); } // Wait for all tests to complete let results: Vec<Option<(String, TestRun)>> = join_all(test_futures).await; let successful_results: Vec<(String, TestRun)> = results.into_iter().flatten().collect(); let execution_time = start_time.elapsed(); info!("Multi-agent test execution completed in {:?}", execution_time); Ok(MultiAgentTestReport::new( suite.clone(), successful_results, execution_time, )) } pub async fn execute_concurrent_load_test( &self, suite: &TestSuite, concurrent_users: usize, duration_seconds: u64, ) -> Result<LoadTestReport> { info!("Starting load test with {} concurrent users for {} seconds", concurrent_users, duration_seconds); let start_time = Instant::now(); let end_time = start_time + std::time::Duration::from_secs(duration_seconds); let semaphore = Arc::new(Semaphore::new(concurrent_users)); let mut request_futures = Vec::new(); let mut request_counter = 0u64; // Create continuous load for the specified duration while Instant::now() < end_time { for config_id in &suite.config_ids { if Instant::now() >= end_time { break; } let client = Arc::clone(&self.client); let suite_id = format!("load-test-{}", uuid::Uuid::new_v4()); let config_id = config_id.clone(); let semaphore = Arc::clone(&semaphore); request_counter += 1; let future = async move { let _permit = semaphore.acquire().await.unwrap(); let request_start = Instant::now(); // Simulate a single test request let result = client.run_test_suite(&suite_id, &[config_id.clone()]).await; let request_duration = request_start.elapsed(); LoadTestResult { config_id, success: result.is_ok(), duration: request_duration, error: result.err().map(|e| e.to_string()), } }; request_futures.push(future); // Limit the number of pending futures to prevent memory issues if request_futures.len() >= concurrent_users * 2 { let completed_results: Vec<LoadTestResult> = join_all(request_futures.drain(..concurrent_users)).await; // Process results as needed } } // Small delay to prevent overwhelming the system tokio::time::sleep(std::time::Duration::from_millis(100)).await; } // Wait for remaining requests to complete let final_results: Vec<LoadTestResult> = join_all(request_futures).await; let total_duration = start_time.elapsed(); Ok(LoadTestReport::new(final_results, total_duration, concurrent_users)) } } #[derive(Debug)] pub struct MultiAgentTestReport { pub suite_name: String, pub config_results: Vec<(String, TestRun)>, pub execution_time: std::time::Duration, pub summary: TestSummary, } impl MultiAgentTestReport { pub fn new( suite: TestSuite, results: Vec<(String, TestRun)>, execution_time: std::time::Duration, ) -> Self { let summary = TestSummary::calculate(&results); Self { suite_name: suite.name, config_results: results, execution_time, summary, } } pub fn print_summary(&self) { println!("\n🎯 MULTI-AGENT TEST RESULTS"); println!("═══════════════════════════════════════════════════"); println!("Suite: {}", self.suite_name); println!("Execution time: {:?}", self.execution_time); println!("Configurations tested: {}", self.config_results.len()); println!("\n📊 Overall Summary:"); println!(" Success rate: {:.1}%", self.summary.overall_success_rate * 100.0); println!(" Average response time: {:.2}ms", self.summary.avg_response_time); println!(" Total tokens used: {}", self.summary.total_tokens); println!("\n🏆 Configuration Rankings:"); let mut ranked_results = self.config_results.clone(); ranked_results.sort_by(|a, b| b.1.metrics.success_rate.partial_cmp(&a.1.metrics.success_rate).unwrap()); for (i, (config_id, test_run)) in ranked_results.iter().enumerate() { println!(" {}. {} - Success: {:.1}% | Avg Time: {:.0}ms | Tokens: {}", i + 1, config_id, test_run.metrics.success_rate * 100.0, test_run.metrics.avg_response_time_ms, test_run.metrics.total_tokens_used ); } println!("\n💡 Recommendations:"); for recommendation in &self.summary.recommendations { println!(" • {}", recommendation); } } } #[derive(Debug)] pub struct TestSummary { pub overall_success_rate: f64, pub avg_response_time: f64, pub total_tokens: u64, pub best_config: String, pub recommendations: Vec<String>, } impl TestSummary { pub fn calculate(results: &[(String, TestRun)]) -> Self { if results.is_empty() { return Self { overall_success_rate: 0.0, avg_response_time: 0.0, total_tokens: 0, best_config: "None".to_string(), recommendations: vec!["No test results to analyze".to_string()], }; } let total_success_rate = results.iter() .map(|(_, run)| run.metrics.success_rate) .sum::<f64>() / results.len() as f64; let avg_response_time = results.iter() .map(|(_, run)| run.metrics.avg_response_time_ms) .sum::<f64>() / results.len() as f64; let total_tokens = results.iter() .map(|(_, run)| run.metrics.total_tokens_used) .sum::<u64>(); let best_config = results.iter() .max_by(|a, b| a.1.metrics.success_rate.partial_cmp(&b.1.metrics.success_rate).unwrap()) .map(|(config, _)| config.clone()) .unwrap_or_else(|| "Unknown".to_string()); let mut recommendations = Vec::new(); if total_success_rate < 0.8 { recommendations.push("Overall success rate is below 80% - review agent configurations".to_string()); } else if total_success_rate > 0.95 { recommendations.push("Excellent success rate achieved - current configurations are performing well".to_string()); } if avg_response_time > 5000.0 { recommendations.push("Average response time is high - consider optimizing model parameters".to_string()); } recommendations.push(format!("Best performing configuration: {}", best_config)); Self { overall_success_rate: total_success_rate, avg_response_time, total_tokens, best_config, recommendations, } } } #[derive(Debug)] pub struct LoadTestResult { pub config_id: String, pub success: bool, pub duration: std::time::Duration, pub error: Option<String>, } #[derive(Debug)] pub struct LoadTestReport { pub results: Vec<LoadTestResult>, pub total_duration: std::time::Duration, pub concurrent_users: usize, pub requests_per_second: f64, pub success_rate: f64, pub avg_response_time: std::time::Duration, pub p95_response_time: std::time::Duration, } impl LoadTestReport { pub fn new( results: Vec<LoadTestResult>, total_duration: std::time::Duration, concurrent_users: usize, ) -> Self { let total_requests = results.len() as f64; let successful_requests = results.iter().filter(|r| r.success).count() as f64; let success_rate = if total_requests > 0.0 { successful_requests / total_requests } else { 0.0 }; let requests_per_second = total_requests / total_duration.as_secs_f64(); let mut response_times: Vec<std::time::Duration> = results.iter() .filter(|r| r.success) .map(|r| r.duration) .collect(); response_times.sort(); let avg_response_time = if !response_times.is_empty() { let total_ms: u128 = response_times.iter().map(|d| d.as_millis()).sum(); std::time::Duration::from_millis((total_ms / response_times.len() as u128) as u64) } else { std::time::Duration::from_secs(0) }; let p95_response_time = if response_times.len() > 0 { let p95_index = (response_times.len() as f64 * 0.95) as usize; response_times.get(p95_index.min(response_times.len() - 1)) .copied() .unwrap_or(std::time::Duration::from_secs(0)) } else { std::time::Duration::from_secs(0) }; Self { results, total_duration, concurrent_users, requests_per_second, success_rate, avg_response_time, p95_response_time, } } pub fn print_report(&self) { println!("\n⚡ LOAD TEST RESULTS"); println!("═══════════════════════════════════════════════════"); println!("Concurrent users: {}", self.concurrent_users); println!("Total duration: {:?}", self.total_duration); println!("Total requests: {}", self.results.len()); println!("Requests per second: {:.2}", self.requests_per_second); println!("Success rate: {:.1}%", self.success_rate * 100.0); println!("Average response time: {:?}", self.avg_response_time); println!("95th percentile response time: {:?}", self.p95_response_time); if !self.results.iter().all(|r| r.success) { println!("\n❌ Failed Requests:"); for (i, result) in self.results.iter().enumerate() { if !result.success { println!(" {}: {} - {:?}", i + 1, result.config_id, result.error); } } } } }
Phase 4: Performance Analysis & Metrics
4.1 Statistical Analysis Module
Create src/analytics.rs:
use crate::types::*; use anyhow::Result; use statrs::distribution::{TTest, Continuous}; use statrs::statistics::{Statistics, OrderStatistics}; use std::collections::HashMap; use tabled::{Table, Tabled}; #[derive(Debug, Clone, Tabled)] pub struct PerformanceMetrics { #[tabled(rename = "Configuration")] pub config_name: String, #[tabled(rename = "Success Rate %")] pub success_rate_percent: String, #[tabled(rename = "Avg Response (ms)")] pub avg_response_time: String, #[tabled(rename = "P95 Response (ms)")] pub p95_response_time: String, #[tabled(rename = "Total Tokens")] pub total_tokens: String, #[tabled(rename = "Tokens/Request")] pub tokens_per_request: String, } #[derive(Debug)] pub struct StatisticalAnalysis { pub config_comparisons: Vec<ConfigComparison>, pub overall_stats: OverallStatistics, pub performance_insights: Vec<String>, } #[derive(Debug)] pub struct ConfigComparison { pub config_a: String, pub config_b: String, pub success_rate_diff: f64, pub response_time_diff: f64, pub statistical_significance: bool, pub p_value: Option<f64>, pub recommendation: String, } #[derive(Debug)] pub struct OverallStatistics { pub total_tests: usize, pub mean_success_rate: f64, pub std_success_rate: f64, pub mean_response_time: f64, pub std_response_time: f64, pub response_time_percentiles: HashMap<u8, f64>, } pub struct PerformanceAnalyzer; impl PerformanceAnalyzer { pub fn analyze_test_results(results: &[(String, TestRun)]) -> StatisticalAnalysis { let config_comparisons = Self::compare_configurations(results); let overall_stats = Self::calculate_overall_statistics(results); let performance_insights = Self::generate_insights(results, &overall_stats); StatisticalAnalysis { config_comparisons, overall_stats, performance_insights, } } pub fn create_performance_table(results: &[(String, TestRun)]) -> Table { let metrics: Vec<PerformanceMetrics> = results .iter() .map(|(config_name, test_run)| { // Calculate additional metrics from detailed results let response_times: Vec<f64> = test_run .results .iter() .map(|r| r.execution_time_ms as f64) .collect(); let p95_response_time = if !response_times.is_empty() { response_times.percentile(95) } else { test_run.metrics.avg_response_time_ms }; let tokens_per_request = if test_run.metrics.total_cases > 0 { test_run.metrics.total_tokens_used as f64 / test_run.metrics.total_cases as f64 } else { 0.0 }; PerformanceMetrics { config_name: config_name.clone(), success_rate_percent: format!("{:.1}", test_run.metrics.success_rate * 100.0), avg_response_time: format!("{:.0}", test_run.metrics.avg_response_time_ms), p95_response_time: format!("{:.0}", p95_response_time), total_tokens: test_run.metrics.total_tokens_used.to_string(), tokens_per_request: format!("{:.0}", tokens_per_request), } }) .collect(); Table::new(metrics) } fn compare_configurations(results: &[(String, TestRun)]) -> Vec<ConfigComparison> { let mut comparisons = Vec::new(); for i in 0..results.len() { for j in (i + 1)..results.len() { let (config_a, test_run_a) = &results[i]; let (config_b, test_run_b) = &results[j]; let success_rate_diff = test_run_b.metrics.success_rate - test_run_a.metrics.success_rate; let response_time_diff = test_run_b.metrics.avg_response_time_ms - test_run_a.metrics.avg_response_time_ms; // Perform statistical significance test let (statistical_significance, p_value) = Self::calculate_significance(test_run_a, test_run_b); let recommendation = Self::generate_comparison_recommendation( config_a, config_b, success_rate_diff, response_time_diff, statistical_significance, ); comparisons.push(ConfigComparison { config_a: config_a.clone(), config_b: config_b.clone(), success_rate_diff, response_time_diff, statistical_significance, p_value, recommendation, }); } } comparisons } fn calculate_significance(test_run_a: &TestRun, test_run_b: &TestRun) -> (bool, Option<f64>) { // Simplified statistical significance test // In a real implementation, you'd use proper statistical tests let sample_size_a = test_run_a.metrics.total_cases as f64; let sample_size_b = test_run_b.metrics.total_cases as f64; if sample_size_a < 30.0 || sample_size_b < 30.0 { return (false, None); // Need larger sample sizes } let success_rate_a = test_run_a.metrics.success_rate; let success_rate_b = test_run_b.metrics.success_rate; // Calculate pooled proportion and standard error let p_pooled = (success_rate_a * sample_size_a + success_rate_b * sample_size_b) / (sample_size_a + sample_size_b); let se = (p_pooled * (1.0 - p_pooled) * (1.0 / sample_size_a + 1.0 / sample_size_b)).sqrt(); if se == 0.0 { return (false, None); } let z_score = (success_rate_b - success_rate_a) / se; // Two-tailed test with alpha = 0.05 (critical value ≈ 1.96) let is_significant = z_score.abs() > 1.96; // Approximate p-value calculation let p_value = 2.0 * (1.0 - statrs::distribution::Normal::new(0.0, 1.0) .map(|n| n.cdf(z_score.abs())) .unwrap_or(0.5)); (is_significant, Some(p_value)) } fn generate_comparison_recommendation( config_a: &str, config_b: &str, success_rate_diff: f64, response_time_diff: f64, is_significant: bool, ) -> String { if !is_significant { return "No statistically significant difference between configurations".to_string(); } let better_config = if success_rate_diff > 0.0 { config_b } else { config_a }; let worse_config = if success_rate_diff > 0.0 { config_a } else { config_b }; let improvement = success_rate_diff.abs() * 100.0; let time_impact = if response_time_diff.abs() > 100.0 { if response_time_diff > 0.0 { " but is slower" } else { " and is also faster" } } else { "" }; format!( "{} outperforms {} by {:.1}% success rate{}", better_config, worse_config, improvement, time_impact ) } fn calculate_overall_statistics(results: &[(String, TestRun)]) -> OverallStatistics { let success_rates: Vec<f64> = results.iter().map(|(_, r)| r.metrics.success_rate).collect(); let response_times: Vec<f64> = results.iter().map(|(_, r)| r.metrics.avg_response_time_ms).collect(); let mut response_time_percentiles = HashMap::new(); if !response_times.is_empty() { response_time_percentiles.insert(50, response_times.percentile(50)); response_time_percentiles.insert(75, response_times.percentile(75)); response_time_percentiles.insert(90, response_times.percentile(90)); response_time_percentiles.insert(95, response_times.percentile(95)); response_time_percentiles.insert(99, response_times.percentile(99)); } OverallStatistics { total_tests: results.len(), mean_success_rate: success_rates.mean(), std_success_rate: success_rates.std_dev(), mean_response_time: response_times.mean(), std_response_time: response_times.std_dev(), response_time_percentiles, } } fn generate_insights(results: &[(String, TestRun)], stats: &OverallStatistics) -> Vec<String> { let mut insights = Vec::new(); // Success rate insights if stats.mean_success_rate < 0.8 { insights.push("⚠️ Overall success rate is below 80% - investigate failing test cases".to_string()); } else if stats.mean_success_rate > 0.95 { insights.push("✅ Excellent overall success rate above 95%".to_string()); } // Performance insights if let Some(p95) = stats.response_time_percentiles.get(&95) { if *p95 > 5000.0 { insights.push("🐌 95th percentile response time exceeds 5 seconds - optimize performance".to_string()); } } // Consistency insights if stats.std_success_rate > 0.1 { insights.push("📊 High variance in success rates across configurations - some configs need improvement".to_string()); } // Best and worst performers if let (Some(best), Some(worst)) = ( results.iter().max_by(|a, b| a.1.metrics.success_rate.partial_cmp(&b.1.metrics.success_rate).unwrap()), results.iter().min_by(|a, b| a.1.metrics.success_rate.partial_cmp(&b.1.metrics.success_rate).unwrap()) ) { if best.1.metrics.success_rate - worst.1.metrics.success_rate > 0.1 { insights.push(format!("🏆 {} significantly outperforms {} by {:.1}%", best.0, worst.0, (best.1.metrics.success_rate - worst.1.metrics.success_rate) * 100.0)); } } insights } pub fn export_results_to_csv(results: &[(String, TestRun)], filename: &str) -> Result<()> { use std::fs::File; use std::io::Write; let mut file = File::create(filename)?; // Write CSV header writeln!(file, "Config,Success Rate,Avg Response Time (ms),Total Tokens,Passed Cases,Failed Cases,Total Cases")?; // Write data rows for (config_name, test_run) in results { writeln!( file, "{},{:.4},{:.2},{},{},{},{}", config_name, test_run.metrics.success_rate, test_run.metrics.avg_response_time_ms, test_run.metrics.total_tokens_used, test_run.metrics.passed_cases, test_run.metrics.failed_cases, test_run.metrics.total_cases )?; } Ok(()) } }
Phase 5: Load Testing Framework
5.1 Advanced Load Testing
Create src/load_testing.rs:
use crate::client::CoAgentTestClient; use crate::types::*; use anyhow::Result; use futures::stream::{FuturesUnordered, StreamExt}; use std::sync::{Arc, atomic::{AtomicU64, Ordering}}; use std::time::{Duration, Instant}; use tokio::sync::Semaphore; use tracing::{info, warn}; #[derive(Debug, Clone)] pub struct LoadTestConfig { pub concurrent_users: usize, pub ramp_up_duration: Duration, pub test_duration: Duration, pub ramp_down_duration: Duration, pub target_rps: Option<f64>, // Requests per second limit } impl Default for LoadTestConfig { fn default() -> Self { Self { concurrent_users: 50, ramp_up_duration: Duration::from_secs(30), test_duration: Duration::from_secs(300), // 5 minutes ramp_down_duration: Duration::from_secs(30), target_rps: None, } } } #[derive(Debug)] pub struct LoadTestScenario { pub name: String, pub prompts: Vec<String>, pub config_ids: Vec<String>, pub weight: f64, // Probability weight for this scenario } pub struct LoadTestEngine { client: Arc<CoAgentTestClient>, metrics_collector: Arc<LoadTestMetricsCollector>, } impl LoadTestEngine { pub fn new(client: CoAgentTestClient) -> Self { Self { client: Arc::new(client), metrics_collector: Arc::new(LoadTestMetricsCollector::new()), } } pub async fn run_load_test( &self, config: LoadTestConfig, scenarios: Vec<LoadTestScenario>, ) -> Result<LoadTestReport> { info!("Starting load test with {} concurrent users", config.concurrent_users); let start_time = Instant::now(); let test_end_time = start_time + config.ramp_up_duration + config.test_duration; // Create rate limiter if specified let rate_limiter = config.target_rps.map(|rps| { Arc::new(tokio::time::interval(Duration::from_secs_f64(1.0 / rps))) }); // Phase 1: Ramp up info!("Phase 1: Ramping up users over {:?}", config.ramp_up_duration); let ramp_up_end = start_time + config.ramp_up_duration; self.execute_ramp_up_phase(&config, &scenarios, ramp_up_end, rate_limiter.clone()).await?; // Phase 2: Sustained load info!("Phase 2: Sustained load for {:?}", config.test_duration); self.execute_sustained_load_phase(&config, &scenarios, test_end_time, rate_limiter).await?; // Phase 3: Ramp down info!("Phase 3: Ramping down"); // For now, we just stop - could implement gradual ramp down let total_duration = start_time.elapsed(); let report = self.metrics_collector.generate_report(total_duration).await; info!("Load test completed in {:?}", total_duration); Ok(report) } async fn execute_ramp_up_phase( &self, config: &LoadTestConfig, scenarios: &[LoadTestScenario], end_time: Instant, rate_limiter: Option<Arc<tokio::time::Interval>>, ) -> Result<()> { let mut active_users = 0; let ramp_interval = config.ramp_up_duration.as_millis() as f64 / config.concurrent_users as f64; let mut ramp_timer = tokio::time::interval(Duration::from_millis(ramp_interval as u64)); let semaphore = Arc::new(Semaphore::new(config.concurrent_users)); let mut user_tasks = FuturesUnordered::new(); while Instant::now() < end_time && active_users < config.concurrent_users { ramp_timer.tick().await; // Add a new user active_users += 1; let user_task = self.spawn_user_simulation( active_users, scenarios.to_vec(), end_time, Arc::clone(&semaphore), rate_limiter.clone(), ); user_tasks.push(user_task); info!("Ramped up to {} users", active_users); } // Wait for ramp-up phase to complete while Instant::now() < end_time { tokio::time::sleep(Duration::from_millis(100)).await; } Ok(()) } async fn execute_sustained_load_phase( &self, config: &LoadTestConfig, scenarios: &[LoadTestScenario], end_time: Instant, rate_limiter: Option<Arc<tokio::time::Interval>>, ) -> Result<()> { let semaphore = Arc::new(Semaphore::new(config.concurrent_users)); let mut user_tasks = FuturesUnordered::new(); // Spawn all concurrent users for user_id in 1..=config.concurrent_users { let user_task = self.spawn_user_simulation( user_id, scenarios.to_vec(), end_time, Arc::clone(&semaphore), rate_limiter.clone(), ); user_tasks.push(user_task); } // Wait for all users to complete while let Some(_) = user_tasks.next().await {} Ok(()) } async fn spawn_user_simulation( &self, user_id: usize, scenarios: Vec<LoadTestScenario>, end_time: Instant, semaphore: Arc<Semaphore>, rate_limiter: Option<Arc<tokio::time::Interval>>, ) -> Result<()> { let client = Arc::clone(&self.client); let metrics = Arc::clone(&self.metrics_collector); tokio::spawn(async move { let _permit = semaphore.acquire().await.unwrap(); while Instant::now() < end_time { // Rate limiting if let Some(mut limiter) = rate_limiter.as_ref().map(|l| l.as_ref().clone()) { limiter.tick().await; } // Select scenario based on weights let scenario = Self::select_weighted_scenario(&scenarios); let prompt = Self::select_random_prompt(&scenario.prompts); let config_id = Self::select_random_config(&scenario.config_ids); // Execute request let request_start = Instant::now(); let request_result = Self::execute_single_request( &client, &prompt, &config_id, user_id, ).await; let request_duration = request_start.elapsed(); // Record metrics metrics.record_request(LoadTestRequestMetric { user_id, scenario_name: scenario.name.clone(), config_id: config_id.clone(), duration: request_duration, success: request_result.is_ok(), error: request_result.err().map(|e| e.to_string()), timestamp: request_start, }).await; // Small delay between requests from same user tokio::time::sleep(Duration::from_millis(100)).await; } }); Ok(()) } fn select_weighted_scenario(scenarios: &[LoadTestScenario]) -> &LoadTestScenario { let total_weight: f64 = scenarios.iter().map(|s| s.weight).sum(); let mut random_value = fastrand::f64() * total_weight; for scenario in scenarios { random_value -= scenario.weight; if random_value <= 0.0 { return scenario; } } &scenarios[0] // Fallback } fn select_random_prompt(prompts: &[String]) -> String { prompts[fastrand::usize(..prompts.len())].clone() } fn select_random_config(config_ids: &[String]) -> String { config_ids[fastrand::usize(..config_ids.len())].clone() } async fn execute_single_request( client: &CoAgentTestClient, prompt: &str, config_id: &str, user_id: usize, ) -> Result<()> { // Create a minimal test suite for this request let test_suite = TestSuite { id: format!("load-test-{}-{}", user_id, uuid::Uuid::new_v4()), name: format!("Load Test User {}", user_id), description: "Generated for load testing".to_string(), test_cases: vec![TestCase { id: uuid::Uuid::new_v4().to_string(), name: "Load Test Case".to_string(), input: TestInput { human_prompt: prompt.to_string(), context: std::collections::HashMap::new(), }, validations: vec![TestValidation::ResponseTime { max_seconds: 30 }], expected_duration_ms: Some(5000), }], config_ids: vec![config_id.to_string()], }; let suite_id = client.create_test_suite(&test_suite).await?; let _result = client.run_test_suite(&suite_id, &[config_id.to_string()]).await?; Ok(()) } } #[derive(Debug, Clone)] pub struct LoadTestRequestMetric { pub user_id: usize, pub scenario_name: String, pub config_id: String, pub duration: Duration, pub success: bool, pub error: Option<String>, pub timestamp: Instant, } pub struct LoadTestMetricsCollector { metrics: Arc<tokio::sync::Mutex<Vec<LoadTestRequestMetric>>>, request_counter: AtomicU64, success_counter: AtomicU64, } impl LoadTestMetricsCollector { pub fn new() -> Self { Self { metrics: Arc::new(tokio::sync::Mutex::new(Vec::new())), request_counter: AtomicU64::new(0), success_counter: AtomicU64::new(0), } } pub async fn record_request(&self, metric: LoadTestRequestMetric) { self.request_counter.fetch_add(1, Ordering::Relaxed); if metric.success { self.success_counter.fetch_add(1, Ordering::Relaxed); } let mut metrics = self.metrics.lock().await; metrics.push(metric); } pub async fn generate_report(&self, total_duration: Duration) -> LoadTestReport { let metrics = self.metrics.lock().await; let total_requests = self.request_counter.load(Ordering::Relaxed); let successful_requests = self.success_counter.load(Ordering::Relaxed); let mut response_times: Vec<Duration> = metrics.iter() .filter(|m| m.success) .map(|m| m.duration) .collect(); response_times.sort(); let avg_response_time = if !response_times.is_empty() { let total_ms: u128 = response_times.iter().map(|d| d.as_millis()).sum(); Duration::from_millis((total_ms / response_times.len() as u128) as u64) } else { Duration::from_secs(0) }; let percentiles = Self::calculate_percentiles(&response_times); LoadTestReport { total_requests, successful_requests, failed_requests: total_requests - successful_requests, success_rate: if total_requests > 0 { successful_requests as f64 / total_requests as f64 } else { 0.0 }, requests_per_second: total_requests as f64 / total_duration.as_secs_f64(), avg_response_time, response_time_percentiles: percentiles, total_duration, error_summary: Self::summarize_errors(&metrics), } } fn calculate_percentiles(response_times: &[Duration]) -> std::collections::HashMap<u8, Duration> { let mut percentiles = std::collections::HashMap::new(); if response_times.is_empty() { return percentiles; } for p in &[50, 75, 90, 95, 99] { let index = (*p as f64 / 100.0 * (response_times.len() - 1) as f64) as usize; percentiles.insert(*p, response_times[index.min(response_times.len() - 1)]); } percentiles } fn summarize_errors(metrics: &[LoadTestRequestMetric]) -> std::collections::HashMap<String, u64> { let mut error_counts = std::collections::HashMap::new(); for metric in metrics { if !metric.success { if let Some(error) = &metric.error { *error_counts.entry(error.clone()).or_insert(0) += 1; } else { *error_counts.entry("Unknown error".to_string()).or_insert(0) += 1; } } } error_counts } } #[derive(Debug)] pub struct LoadTestReport { pub total_requests: u64, pub successful_requests: u64, pub failed_requests: u64, pub success_rate: f64, pub requests_per_second: f64, pub avg_response_time: Duration, pub response_time_percentiles: std::collections::HashMap<u8, Duration>, pub total_duration: Duration, pub error_summary: std::collections::HashMap<String, u64>, } impl LoadTestReport { pub fn print_report(&self) { println!("\n⚡ LOAD TEST REPORT"); println!("══════════════════════════════════════════════════════"); println!("Duration: {:?}", self.total_duration); println!("Total Requests: {}", self.total_requests); println!("Successful: {} ({:.1}%)", self.successful_requests, self.success_rate * 100.0); println!("Failed: {}", self.failed_requests); println!("Requests/sec: {:.2}", self.requests_per_second); println!("\n📊 Response Time Statistics:"); println!("Average: {:?}", self.avg_response_time); for (percentile, duration) in &self.response_time_percentiles { println!("P{}: {:?}", percentile, duration); } if !self.error_summary.is_empty() { println!("\n❌ Error Summary:"); for (error, count) in &self.error_summary { println!(" {}: {} occurrences", error, count); } } } }
Phase 6: Statistical A/B Testing
6.1 A/B Testing Framework
Create src/ab_testing.rs:
use crate::client::CoAgentTestClient; use crate::types::*; use anyhow::Result; use statrs::distribution::{Normal, Continuous}; use statrs::statistics::Statistics; use std::collections::HashMap; use std::sync::Arc; use tracing::info; #[derive(Debug, Clone)] pub struct ABTestConfig { pub variant_a_config_id: String, pub variant_b_config_id: String, pub sample_size_per_variant: usize, pub significance_level: f64, // Default 0.05 pub test_prompts: Vec<String>, pub success_criteria: ABTestSuccessCriteria, } #[derive(Debug, Clone)] pub enum ABTestSuccessCriteria { SuccessRate, ResponseTime, TokenEfficiency, CustomMetric(String), } #[derive(Debug)] pub struct ABTestResult { pub variant_a_stats: VariantStatistics, pub variant_b_stats: VariantStatistics, pub statistical_test: StatisticalTestResult, pub recommendation: ABTestRecommendation, pub confidence_interval: ConfidenceInterval, } #[derive(Debug)] pub struct VariantStatistics { pub config_id: String, pub sample_size: usize, pub success_rate: f64, pub avg_response_time: f64, pub avg_tokens_per_request: f64, pub std_dev_response_time: f64, pub response_time_percentiles: HashMap<u8, f64>, } #[derive(Debug)] pub struct StatisticalTestResult { pub test_type: String, pub statistic: f64, pub p_value: f64, pub is_significant: bool, pub effect_size: f64, pub power: Option<f64>, } #[derive(Debug)] pub struct ConfidenceInterval { pub metric: String, pub confidence_level: f64, pub lower_bound: f64, pub upper_bound: f64, pub difference_estimate: f64, } #[derive(Debug)] pub enum ABTestRecommendation { ChooseVariantA { reason: String, confidence: f64 }, ChooseVariantB { reason: String, confidence: f64 }, NoSignificantDifference { reason: String }, IncreaseSampleSize { current_power: f64, required_samples: usize }, } pub struct ABTestEngine { client: Arc<CoAgentTestClient>, } impl ABTestEngine { pub fn new(client: CoAgentTestClient) -> Self { Self { client: Arc::new(client), } } pub async fn run_ab_test(&self, config: ABTestConfig) -> Result<ABTestResult> { info!("Starting A/B test: {} vs {}", config.variant_a_config_id, config.variant_b_config_id); // Create test suite for A/B testing let test_suite = self.create_ab_test_suite(&config).await?; // Run tests for Variant A info!("Testing Variant A: {}", config.variant_a_config_id); let variant_a_results = self.run_variant_test( &test_suite, &config.variant_a_config_id, config.sample_size_per_variant, ).await?; // Run tests for Variant B info!("Testing Variant B: {}", config.variant_b_config_id); let variant_b_results = self.run_variant_test( &test_suite, &config.variant_b_config_id, config.sample_size_per_variant, ).await?; // Calculate statistics for each variant let variant_a_stats = self.calculate_variant_statistics( &config.variant_a_config_id, &variant_a_results, ); let variant_b_stats = self.calculate_variant_statistics( &config.variant_b_config_id, &variant_b_results, ); // Perform statistical test let statistical_test = self.perform_statistical_test( &variant_a_stats, &variant_b_stats, &config.success_criteria, config.significance_level, ); // Calculate confidence interval let confidence_interval = self.calculate_confidence_interval( &variant_a_stats, &variant_b_stats, &config.success_criteria, 1.0 - config.significance_level, ); // Generate recommendation let recommendation = self.generate_recommendation( &variant_a_stats, &variant_b_stats, &statistical_test, &config, ); Ok(ABTestResult { variant_a_stats, variant_b_stats, statistical_test, recommendation, confidence_interval, }) } async fn create_ab_test_suite(&self, config: &ABTestConfig) -> Result<TestSuite> { let test_cases: Vec<TestCase> = config .test_prompts .iter() .enumerate() .map(|(i, prompt)| TestCase { id: format!("ab-test-case-{}", i), name: format!("A/B Test Case {}", i + 1), input: TestInput { human_prompt: prompt.clone(), context: HashMap::new(), }, validations: vec![ TestValidation::ResponseTime { max_seconds: 30 }, TestValidation::ContentMatch { pattern: r".+".to_string() // Ensure non-empty response }, ], expected_duration_ms: Some(10000), }) .collect(); Ok(TestSuite { id: uuid::Uuid::new_v4().to_string(), name: "A/B Test Suite".to_string(), description: format!("A/B test comparing {} vs {}", config.variant_a_config_id, config.variant_b_config_id), test_cases, config_ids: vec![ config.variant_a_config_id.clone(), config.variant_b_config_id.clone(), ], }) } async fn run_variant_test( &self, test_suite: &TestSuite, config_id: &str, sample_size: usize, ) -> Result<Vec<TestRun>> { let mut results = Vec::new(); // Run the test multiple times to build up sample size let runs_needed = (sample_size as f64 / test_suite.test_cases.len() as f64).ceil() as usize; for run_idx in 0..runs_needed { info!("Running test batch {} of {} for config {}", run_idx + 1, runs_needed, config_id); let suite_id = self.client.create_test_suite(test_suite).await?; let test_run = self.client.run_test_suite(&suite_id, &[config_id.to_string()]).await?; results.push(test_run); // Small delay between runs to avoid overwhelming the system tokio::time::sleep(std::time::Duration::from_millis(500)).await; } Ok(results) } fn calculate_variant_statistics(&self, config_id: &str, test_runs: &[TestRun]) -> VariantStatistics { let mut all_response_times = Vec::new(); let mut all_token_counts = Vec::new(); let mut total_successes = 0; let mut total_tests = 0; for test_run in test_runs { for result in &test_run.results { total_tests += 1; if matches!(result.status, TestStatus::Passed) { total_successes += 1; } all_response_times.push(result.execution_time_ms as f64); all_token_counts.push(result.token_usage.total_tokens as f64); } } let success_rate = if total_tests > 0 { total_successes as f64 / total_tests as f64 } else { 0.0 }; let avg_response_time = all_response_times.mean(); let std_dev_response_time = all_response_times.std_dev(); let avg_tokens = all_token_counts.mean(); // Calculate percentiles let mut sorted_times = all_response_times.clone(); sorted_times.sort_by(|a, b| a.partial_cmp(b).unwrap()); let mut percentiles = HashMap::new(); if !sorted_times.is_empty() { for p in &[50, 75, 90, 95, 99] { let index = (*p as f64 / 100.0 * (sorted_times.len() - 1) as f64) as usize; percentiles.insert(*p, sorted_times[index.min(sorted_times.len() - 1)]); } } VariantStatistics { config_id: config_id.to_string(), sample_size: total_tests, success_rate, avg_response_time, avg_tokens_per_request: avg_tokens, std_dev_response_time, response_time_percentiles: percentiles, } } fn perform_statistical_test( &self, variant_a: &VariantStatistics, variant_b: &VariantStatistics, criteria: &ABTestSuccessCriteria, alpha: f64, ) -> StatisticalTestResult { match criteria { ABTestSuccessCriteria::SuccessRate => { self.two_proportion_z_test(variant_a, variant_b, alpha) } ABTestSuccessCriteria::ResponseTime => { self.welch_t_test_response_time(variant_a, variant_b, alpha) } ABTestSuccessCriteria::TokenEfficiency => { // Compare tokens per successful request self.welch_t_test_token_efficiency(variant_a, variant_b, alpha) } ABTestSuccessCriteria::CustomMetric(_) => { // Placeholder for custom metrics StatisticalTestResult { test_type: "Custom Metric Test".to_string(), statistic: 0.0, p_value: 1.0, is_significant: false, effect_size: 0.0, power: None, } } } } fn two_proportion_z_test( &self, variant_a: &VariantStatistics, variant_b: &VariantStatistics, alpha: f64, ) -> StatisticalTestResult { let p1 = variant_a.success_rate; let p2 = variant_b.success_rate; let n1 = variant_a.sample_size as f64; let n2 = variant_b.sample_size as f64; // Pooled proportion let p_pool = ((p1 * n1) + (p2 * n2)) / (n1 + n2); // Standard error let se = (p_pool * (1.0 - p_pool) * (1.0 / n1 + 1.0 / n2)).sqrt(); let z_statistic = if se != 0.0 { (p2 - p1) / se } else { 0.0 }; // Two-tailed test let p_value = if z_statistic != 0.0 { 2.0 * (1.0 - Normal::new(0.0, 1.0).unwrap().cdf(z_statistic.abs())) } else { 1.0 }; let is_significant = p_value < alpha; // Effect size (Cohen's h for proportions) let effect_size = 2.0 * (p2.sqrt().asin() - p1.sqrt().asin()); StatisticalTestResult { test_type: "Two-Proportion Z-Test".to_string(), statistic: z_statistic, p_value, is_significant, effect_size, power: None, // Could be calculated with additional complexity } } fn welch_t_test_response_time( &self, variant_a: &VariantStatistics, variant_b: &VariantStatistics, alpha: f64, ) -> StatisticalTestResult { let mean1 = variant_a.avg_response_time; let mean2 = variant_b.avg_response_time; let s1 = variant_a.std_dev_response_time; let s2 = variant_b.std_dev_response_time; let n1 = variant_a.sample_size as f64; let n2 = variant_b.sample_size as f64; if n1 < 2.0 || n2 < 2.0 || s1 == 0.0 || s2 == 0.0 { return StatisticalTestResult { test_type: "Welch's T-Test (Response Time)".to_string(), statistic: 0.0, p_value: 1.0, is_significant: false, effect_size: 0.0, power: None, }; } // Welch's t-test let se = ((s1 * s1 / n1) + (s2 * s2 / n2)).sqrt(); let t_statistic = if se != 0.0 { (mean2 - mean1) / se } else { 0.0 }; // Degrees of freedom (Welch-Satterthwaite equation) let df = { let s1_sq_n1 = s1 * s1 / n1; let s2_sq_n2 = s2 * s2 / n2; let numerator = (s1_sq_n1 + s2_sq_n2).powi(2); let denominator = (s1_sq_n1.powi(2) / (n1 - 1.0)) + (s2_sq_n2.powi(2) / (n2 - 1.0)); if denominator != 0.0 { numerator / denominator } else { 1.0 } }; // Approximate p-value using normal distribution for large samples let p_value = if t_statistic != 0.0 && df > 30.0 { 2.0 * (1.0 - Normal::new(0.0, 1.0).unwrap().cdf(t_statistic.abs())) } else { 1.0 }; let is_significant = p_value < alpha; // Effect size (Cohen's d) let pooled_sd = ((((n1 - 1.0) * s1 * s1) + ((n2 - 1.0) * s2 * s2)) / (n1 + n2 - 2.0)).sqrt(); let effect_size = if pooled_sd != 0.0 { (mean2 - mean1) / pooled_sd } else { 0.0 }; StatisticalTestResult { test_type: "Welch's T-Test (Response Time)".to_string(), statistic: t_statistic, p_value, is_significant, effect_size, power: None, } } fn welch_t_test_token_efficiency( &self, variant_a: &VariantStatistics, variant_b: &VariantStatistics, alpha: f64, ) -> StatisticalTestResult { // Simplified - would need actual token data for proper calculation // This is a placeholder implementation StatisticalTestResult { test_type: "Welch's T-Test (Token Efficiency)".to_string(), statistic: 0.0, p_value: 1.0, is_significant: false, effect_size: 0.0, power: None, } } fn calculate_confidence_interval( &self, variant_a: &VariantStatistics, variant_b: &VariantStatistics, criteria: &ABTestSuccessCriteria, confidence_level: f64, ) -> ConfidenceInterval { match criteria { ABTestSuccessCriteria::SuccessRate => { let p1 = variant_a.success_rate; let p2 = variant_b.success_rate; let n1 = variant_a.sample_size as f64; let n2 = variant_b.sample_size as f64; let diff = p2 - p1; let se = ((p1 * (1.0 - p1) / n1) + (p2 * (1.0 - p2) / n2)).sqrt(); // Z-critical value for given confidence level let z_critical = Normal::new(0.0, 1.0).unwrap().inverse_cdf(1.0 - (1.0 - confidence_level) / 2.0); let margin_of_error = z_critical * se; ConfidenceInterval { metric: "Success Rate Difference".to_string(), confidence_level, lower_bound: diff - margin_of_error, upper_bound: diff + margin_of_error, difference_estimate: diff, } } ABTestSuccessCriteria::ResponseTime => { let mean1 = variant_a.avg_response_time; let mean2 = variant_b.avg_response_time; let s1 = variant_a.std_dev_response_time; let s2 = variant_b.std_dev_response_time; let n1 = variant_a.sample_size as f64; let n2 = variant_b.sample_size as f64; let diff = mean2 - mean1; let se = ((s1 * s1 / n1) + (s2 * s2 / n2)).sqrt(); // Using normal approximation for large samples let z_critical = Normal::new(0.0, 1.0).unwrap().inverse_cdf(1.0 - (1.0 - confidence_level) / 2.0); let margin_of_error = z_critical * se; ConfidenceInterval { metric: "Response Time Difference (ms)".to_string(), confidence_level, lower_bound: diff - margin_of_error, upper_bound: diff + margin_of_error, difference_estimate: diff, } } _ => ConfidenceInterval { metric: "Unknown".to_string(), confidence_level, lower_bound: 0.0, upper_bound: 0.0, difference_estimate: 0.0, } } } fn generate_recommendation( &self, variant_a: &VariantStatistics, variant_b: &VariantStatistics, statistical_test: &StatisticalTestResult, config: &ABTestConfig, ) -> ABTestRecommendation { if !statistical_test.is_significant { return ABTestRecommendation::NoSignificantDifference { reason: format!( "No statistically significant difference found (p-value: {:.4}). Consider running more tests or testing different variants.", statistical_test.p_value ), }; } match config.success_criteria { ABTestSuccessCriteria::SuccessRate => { if variant_b.success_rate > variant_a.success_rate { ABTestRecommendation::ChooseVariantB { reason: format!( "Variant B has {:.1}% higher success rate ({:.1}% vs {:.1}%) with statistical significance (p-value: {:.4})", (variant_b.success_rate - variant_a.success_rate) * 100.0, variant_b.success_rate * 100.0, variant_a.success_rate * 100.0, statistical_test.p_value ), confidence: 1.0 - statistical_test.p_value, } } else { ABTestRecommendation::ChooseVariantA { reason: format!( "Variant A has {:.1}% higher success rate ({:.1}% vs {:.1}%) with statistical significance (p-value: {:.4})", (variant_a.success_rate - variant_b.success_rate) * 100.0, variant_a.success_rate * 100.0, variant_b.success_rate * 100.0, statistical_test.p_value ), confidence: 1.0 - statistical_test.p_value, } } } ABTestSuccessCriteria::ResponseTime => { if variant_b.avg_response_time < variant_a.avg_response_time { ABTestRecommendation::ChooseVariantB { reason: format!( "Variant B is {:.0}ms faster on average ({:.0}ms vs {:.0}ms) with statistical significance", variant_a.avg_response_time - variant_b.avg_response_time, variant_b.avg_response_time, variant_a.avg_response_time ), confidence: 1.0 - statistical_test.p_value, } } else { ABTestRecommendation::ChooseVariantA { reason: format!( "Variant A is {:.0}ms faster on average ({:.0}ms vs {:.0}ms) with statistical significance", variant_b.avg_response_time - variant_a.avg_response_time, variant_a.avg_response_time, variant_b.avg_response_time ), confidence: 1.0 - statistical_test.p_value, } } } _ => ABTestRecommendation::NoSignificantDifference { reason: "Analysis not implemented for this success criteria".to_string(), }, } } } impl ABTestResult { pub fn print_report(&self) { println!("\n🔬 A/B TEST RESULTS"); println!("═══════════════════════════════════════════════════════════════"); println!("\n📊 Variant A Statistics:"); println!(" Config ID: {}", self.variant_a_stats.config_id); println!(" Sample Size: {}", self.variant_a_stats.sample_size); println!(" Success Rate: {:.2}%", self.variant_a_stats.success_rate * 100.0); println!(" Avg Response Time: {:.1}ms", self.variant_a_stats.avg_response_time); println!(" Avg Tokens/Request: {:.1}", self.variant_a_stats.avg_tokens_per_request); println!("\n📊 Variant B Statistics:"); println!(" Config ID: {}", self.variant_b_stats.config_id); println!(" Sample Size: {}", self.variant_b_stats.sample_size); println!(" Success Rate: {:.2}%", self.variant_b_stats.success_rate * 100.0); println!(" Avg Response Time: {:.1}ms", self.variant_b_stats.avg_response_time); println!(" Avg Tokens/Request: {:.1}", self.variant_b_stats.avg_tokens_per_request); println!("\n🧮 Statistical Test Results:"); println!(" Test Type: {}", self.statistical_test.test_type); println!(" Test Statistic: {:.4}", self.statistical_test.statistic); println!(" P-Value: {:.6}", self.statistical_test.p_value); println!(" Statistically Significant: {}", self.statistical_test.is_significant); println!(" Effect Size: {:.4}", self.statistical_test.effect_size); println!("\n📏 Confidence Interval ({:.0}% confidence):", self.confidence_interval.confidence_level * 100.0); println!(" Metric: {}", self.confidence_interval.metric); println!(" Difference Estimate: {:.4}", self.confidence_interval.difference_estimate); println!(" Range: [{:.4}, {:.4}]", self.confidence_interval.lower_bound, self.confidence_interval.upper_bound); println!("\n💡 Recommendation:"); match &self.recommendation { ABTestRecommendation::ChooseVariantA { reason, confidence } => { println!(" ✅ Choose Variant A"); println!(" Confidence: {:.1}%", confidence * 100.0); println!(" Reason: {}", reason); } ABTestRecommendation::ChooseVariantB { reason, confidence } => { println!(" ✅ Choose Variant B"); println!(" Confidence: {:.1}%", confidence * 100.0); println!(" Reason: {}", reason); } ABTestRecommendation::NoSignificantDifference { reason } => { println!(" ⚖️ No Clear Winner"); println!(" Reason: {}", reason); } ABTestRecommendation::IncreaseSampleSize { current_power, required_samples } => { println!(" 📈 Increase Sample Size"); println!(" Current Power: {:.1}%", current_power * 100.0); println!(" Recommended Samples: {}", required_samples); } } } }
Phase 7: Main Application
7.1 CLI Application
Create src/main.rs:
mod types; mod client; mod test_builder; mod config_manager; mod executor; mod analytics; mod load_testing; mod ab_testing; use anyhow::Result; use clap::{Parser, Subcommand}; use tracing::{info, Level}; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; #[derive(Parser)] #[command(name = "coagent-multi-agent-testing")] #[command(about = "High-performance multi-agent testing framework for CoAgent")] struct Cli { #[command(subcommand)] command: Commands, #[arg(long, default_value = "http://localhost:3000")] coagent_url: String, #[arg(long, default_value = "info")] log_level: String, } #[derive(Subcommand)] enum Commands { /// Run comprehensive multi-agent tests Test { #[arg(long, default_value = "10")] concurrency: usize, #[arg(long)] export_csv: Option<String>, #[arg(long, default_value = "false")] include_edge_cases: bool, #[arg(long, default_value = "false")] include_performance_tests: bool, }, /// Run load testing with specified parameters Load { #[arg(long, default_value = "50")] concurrent_users: usize, #[arg(long, default_value = "300")] duration_seconds: u64, #[arg(long)] ramp_up_seconds: Option<u64>, #[arg(long)] target_rps: Option<f64>, }, /// Run A/B testing between two configurations ABTest { #[arg(long)] variant_a: String, #[arg(long)] variant_b: String, #[arg(long, default_value = "100")] sample_size: usize, #[arg(long, default_value = "0.05")] significance_level: f64, #[arg(long, default_value = "success_rate")] criteria: String, }, /// Set up test environment with standard configurations Setup, /// Analyze existing test results Analyze { #[arg(long)] results_file: Option<String>, }, } #[tokio::main] async fn main() -> Result<()> { let cli = Cli::parse(); // Initialize tracing init_tracing(&cli.log_level); info!("Starting CoAgent Multi-Agent Testing Framework"); info!("CoAgent URL: {}", cli.coagent_url); // Initialize client let client = client::CoAgentTestClient::new(cli.coagent_url)?; match cli.command { Commands::Test { concurrency, export_csv, include_edge_cases, include_performance_tests } => { run_multi_agent_tests( client, concurrency, export_csv, include_edge_cases, include_performance_tests ).await?; } Commands::Load { concurrent_users, duration_seconds, ramp_up_seconds, target_rps } => { run_load_tests( client, concurrent_users, duration_seconds, ramp_up_seconds, target_rps ).await?; } Commands::ABTest { variant_a, variant_b, sample_size, significance_level, criteria } => { run_ab_tests( client, variant_a, variant_b, sample_size, significance_level, criteria ).await?; } Commands::Setup => { run_environment_setup(client).await?; } Commands::Analyze { results_file } => { run_analysis(results_file).await?; } } Ok(()) } fn init_tracing(level: &str) { let filter = EnvFilter::try_new(level) .or_else(|_| EnvFilter::try_new("info")) .unwrap(); let fmt_layer = fmt::layer() .with_target(false) .with_timer(fmt::time::UtcTime::rfc_3339()) .with_level(true) .json(); tracing_subscriber::registry() .with(filter) .with(fmt_layer) .init(); } async fn run_multi_agent_tests( client: client::CoAgentTestClient, concurrency: usize, export_csv: Option<String>, include_edge_cases: bool, include_performance_tests: bool, ) -> Result<()> { info!("Running multi-agent tests with concurrency: {}", concurrency); // Set up test environment let mut config_manager = config_manager::ConfigurationManager::new(); config_manager.create_standard_setup()?; // Create configurations in CoAgent for provider in &config_manager.providers { client.create_provider(provider).await?; } for agent in &config_manager.agents { client.create_agent(agent).await?; } let mut created_config_ids = Vec::new(); for sandbox_config in &config_manager.sandbox_configs { let config_id = client.create_sandbox_config(sandbox_config).await?; created_config_ids.push(config_id); } // Build test suite let mut suite_builder = test_builder::TestSuiteBuilder::new( "Comprehensive Customer Support Testing", "Multi-agent testing suite covering functionality, edge cases, and performance" ).with_config_ids(created_config_ids); suite_builder = suite_builder.add_customer_support_tests(); if include_edge_cases { suite_builder = suite_builder.add_edge_case_tests(); } if include_performance_tests { suite_builder = suite_builder.add_performance_tests(); } let test_suite = suite_builder.build(); // Execute tests let executor = executor::MultiAgentTestExecutor::new(client, concurrency); let report = executor.execute_test_suite(&test_suite).await?; // Display results report.print_summary(); // Perform statistical analysis let analysis = analytics::PerformanceAnalyzer::analyze_test_results(&report.config_results); println!("\n📈 STATISTICAL ANALYSIS"); println!("═══════════════════════════════════════════════════"); let performance_table = analytics::PerformanceAnalyzer::create_performance_table(&report.config_results); println!("{}", performance_table); println!("\n🔍 Performance Insights:"); for insight in &analysis.performance_insights { println!(" • {}", insight); } println!("\n📊 Configuration Comparisons:"); for comparison in &analysis.config_comparisons { println!(" {} vs {}: {}", comparison.config_a, comparison.config_b, comparison.recommendation); } // Export to CSV if requested if let Some(filename) = export_csv { analytics::PerformanceAnalyzer::export_results_to_csv(&report.config_results, &filename)?; info!("Results exported to: {}", filename); } Ok(()) } async fn run_load_tests( client: client::CoAgentTestClient, concurrent_users: usize, duration_seconds: u64, ramp_up_seconds: Option<u64>, target_rps: Option<f64>, ) -> Result<()> { info!("Starting load test with {} concurrent users for {} seconds", concurrent_users, duration_seconds); let load_config = load_testing::LoadTestConfig { concurrent_users, ramp_up_duration: std::time::Duration::from_secs(ramp_up_seconds.unwrap_or(30)), test_duration: std::time::Duration::from_secs(duration_seconds), ramp_down_duration: std::time::Duration::from_secs(30), target_rps, }; // Create load test scenarios let scenarios = vec![ load_testing::LoadTestScenario { name: "Customer Support Queries".to_string(), prompts: vec![ "I want to return a product".to_string(), "What's the status of my order?".to_string(), "I need help with billing".to_string(), "How do I cancel my subscription?".to_string(), ], config_ids: vec!["config-1".to_string(), "config-2".to_string()], // Would use actual IDs weight: 0.8, }, load_testing::LoadTestScenario { name: "Technical Support".to_string(), prompts: vec![ "I'm having trouble logging in".to_string(), "The app keeps crashing".to_string(), "How do I reset my password?".to_string(), ], config_ids: vec!["config-3".to_string()], weight: 0.2, }, ]; // Create test suite for load testing let test_suite = test_builder::TestSuiteBuilder::new( "Load Test Suite", "Minimal test suite for load testing" ).add_customer_support_tests() .build(); let load_engine = load_testing::LoadTestEngine::new(client); let report = load_engine.run_load_test(load_config, scenarios).await?; report.print_report(); Ok(()) } async fn run_ab_tests( client: client::CoAgentTestClient, variant_a: String, variant_b: String, sample_size: usize, significance_level: f64, criteria: String, ) -> Result<()> { info!("Starting A/B test: {} vs {}", variant_a, variant_b); let success_criteria = match criteria.as_str() { "success_rate" => ab_testing::ABTestSuccessCriteria::SuccessRate, "response_time" => ab_testing::ABTestSuccessCriteria::ResponseTime, "token_efficiency" => ab_testing::ABTestSuccessCriteria::TokenEfficiency, custom => ab_testing::ABTestSuccessCriteria::CustomMetric(custom.to_string()), }; let ab_config = ab_testing::ABTestConfig { variant_a_config_id: variant_a, variant_b_config_id: variant_b, sample_size_per_variant: sample_size, significance_level, test_prompts: vec![ "I need help with returning a product".to_string(), "What are your business hours?".to_string(), "I was charged incorrectly".to_string(), "How do I update my account information?".to_string(), "I'm having trouble with my order".to_string(), ], success_criteria, }; let ab_engine = ab_testing::ABTestEngine::new(client); let result = ab_engine.run_ab_test(ab_config).await?; result.print_report(); Ok(()) } async fn run_environment_setup(client: client::CoAgentTestClient) -> Result<()> { info!("Setting up test environment"); let mut config_manager = config_manager::ConfigurationManager::new(); config_manager.create_standard_setup()?; info!("Creating {} model providers...", config_manager.providers.len()); for provider in &config_manager.providers { let provider_id = client.create_provider(provider).await?; info!("Created provider: {} (ID: {})", provider.name, provider_id); } info!("Creating {} agent configurations...", config_manager.agents.len()); for agent in &config_manager.agents { let agent_id = client.create_agent(agent).await?; info!("Created agent: {} (ID: {})", agent.name, agent_id); } info!("Creating {} sandbox configurations...", config_manager.sandbox_configs.len()); for sandbox_config in &config_manager.sandbox_configs { let config_id = client.create_sandbox_config(sandbox_config).await?; info!("Created sandbox config: {} (ID: {})", sandbox_config.name, config_id); } println!("\n✅ Environment setup completed successfully!"); println!("You can now run tests using the available configurations."); Ok(()) } async fn run_analysis(results_file: Option<String>) -> Result<()> { info!("Running analysis on test results"); if let Some(_file) = results_file { // Would load and analyze results from file println!("📊 Analysis of saved results would be implemented here"); } else { println!("📊 Real-time analysis mode - connect to live CoAgent instance"); // Could fetch recent test runs and analyze them } Ok(()) }
Phase 8: CI/CD Integration
8.1 GitHub Actions Workflow
Create .github/workflows/rust-multi-agent-testing.yml:
name: Rust Multi-Agent Testing on: push: branches: [ main, develop ] pull_request: branches: [ main ] schedule: # Run nightly at 2 AM UTC - cron: '0 2 * * *' env: CARGO_TERM_COLOR: always RUST_LOG: info jobs: test-and-benchmark: runs-on: ubuntu-latest services: coagent: image: coagent:latest ports: - 3000:3000 options: >- --health-cmd="curl -f http://localhost:3000/health || exit 1" --health-interval=10s --health-timeout=5s --health-retries=30 steps: - uses: actions/checkout@v4 - name: Install Rust toolchain uses: actions-rs/toolchain@v1 with: toolchain: stable profile: minimal override: true components: rustfmt, clippy - name: Cache cargo registry uses: actions/cache@v3 with: path: | ~/.cargo/registry ~/.cargo/git target key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} - name: Check formatting run: cargo fmt -- --check - name: Run clippy run: cargo clippy --all-targets --all-features -- -D warnings - name: Build run: cargo build --verbose --all-targets - name: Run unit tests run: cargo test --lib --verbose - name: Wait for CoAgent run: | echo "Waiting for CoAgent to be ready..." for i in {1..60}; do if curl -f http://localhost:3000/health; then echo "CoAgent is ready!" break fi echo "Attempt $i/60 failed, waiting..." sleep 5 done - name: Set up test environment env: OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} run: | cargo run -- setup --coagent-url http://localhost:3000 - name: Run multi-agent integration tests env: COAGENT_URL: http://localhost:3000 run: | cargo run -- test \ --concurrency 5 \ --include-edge-cases \ --include-performance-tests \ --export-csv test-results.csv \ --log-level debug - name: Run load test (short duration for CI) env: COAGENT_URL: http://localhost:3000 run: | cargo run -- load \ --concurrent-users 10 \ --duration-seconds 30 \ --ramp-up-seconds 10 - name: Run benchmark suite run: | cargo bench --bench load_test -- --output-format json | tee benchmark-results.json - name: Upload test artifacts uses: actions/upload-artifact@v3 if: always() with: name: test-results path: | test-results.csv benchmark-results.json target/criterion/ - name: Comment PR with results if: github.event_name == 'pull_request' uses: actions/github-script@v6 with: script: | const fs = require('fs'); try { if (fs.existsSync('test-results.csv')) { const csv = fs.readFileSync('test-results.csv', 'utf8'); const lines = csv.split('\n'); let summary = '## 🦀 Rust Multi-Agent Test Results\n\n'; summary += '| Configuration | Success Rate | Avg Response Time | Total Tokens |\n'; summary += '|---------------|--------------|-------------------|-------------|\n'; // Parse CSV and create table (simplified) for (let i = 1; i < Math.min(lines.length, 6); i++) { if (lines[i].trim()) { const cols = lines[i].split(','); if (cols.length >= 4) { summary += `| ${cols[0]} | ${cols[1]} | ${cols[2]}ms | ${cols[3]} |\n`; } } } summary += '\n📊 Full results available in workflow artifacts.'; github.rest.issues.createComment({ issue_number: context.issue.number, owner: context.repo.owner, repo: context.repo.repo, body: summary }); } } catch (error) { console.log('Could not post results:', error); } performance-regression-check: runs-on: ubuntu-latest needs: test-and-benchmark if: github.event_name == 'pull_request' steps: - uses: actions/checkout@v4 - name: Download benchmark results uses: actions/download-artifact@v3 with: name: test-results - name: Check for performance regressions run: | # This would compare benchmark results against main branch echo "Performance regression check would be implemented here" # Could use tools like critcmp to compare Criterion benchmark results
8.2 Dockerfile for Testing Environment
Create Dockerfile.testing:
FROM rust:1.75 as builder WORKDIR /app COPY Cargo.toml Cargo.lock ./ COPY src/ ./src/ COPY benches/ ./benches/ # Build in release mode for performance testing RUN cargo build --release FROM ubuntu:22.04 RUN apt-get update && apt-get install -y \ curl \ ca-certificates \ && rm -rf /var/lib/apt/lists/* WORKDIR /app COPY --from=builder /app/target/release/coagent-multi-agent-testing /usr/local/bin/ ENTRYPOINT ["coagent-multi-agent-testing"]
Usage Examples
Basic Multi-Agent Testing
# Set up the environment first cargo run -- setup # Run comprehensive tests cargo run -- test --include-edge-cases --include-performance-tests # Export results to CSV cargo run -- test --export-csv results.csv --concurrency 20
Load Testing
# Run load test with 100 concurrent users for 5 minutes cargo run -- load --concurrent-users 100 --duration-seconds 300 # Load test with rate limiting cargo run -- load --concurrent-users 50 --duration-seconds 180 --target-rps 10
A/B Testing
# A/B test comparing success rates cargo run -- ab-test \ --variant-a conservative-config \ --variant-b fast-config \ --sample-size 200 \ --criteria success_rate # A/B test comparing response times cargo run -- ab-test \ --variant-a config-a \ --variant-b config-b \ --sample-size 150 \ --criteria response_time \ --significance-level 0
Summary
🎉 Congratulations! You've built a comprehensive Rust-based multi-agent testing pipeline that provides:
Key Features Built
- ✅ High-Performance Concurrent Testing - Leverages Rust's async capabilities 
- ✅ Statistical Analysis Framework - Comprehensive performance metrics and comparisons 
- ✅ Advanced Load Testing - Configurable ramp-up, sustained load, and rate limiting 
- ✅ Rigorous A/B Testing - Statistical significance testing with confidence intervals 
- ✅ Production-Ready CI/CD - GitHub Actions integration with artifact collection 
- ✅ Comprehensive Reporting - Tables, CSV export, and detailed analysis 
Performance Advantages
- Memory Efficiency: Rust's zero-cost abstractions and ownership model 
- Concurrent Execution: Tokio-based async runtime for high throughput 
- Type Safety: Compile-time guarantees prevent runtime errors 
- Resource Management: Automatic memory management without garbage collection 
Enterprise Ready
- Scalability: Handles thousands of concurrent requests efficiently 
- Reliability: Robust error handling and retry mechanisms 
- Observability: Structured logging and comprehensive metrics 
- CI/CD Integration: Automated testing with performance regression detection 
This Rust implementation provides significant performance advantages over the Python version while maintaining the same comprehensive feature set, making it ideal for high-scale production testing scenarios.
Next Steps
- Configuration Reference - Detailed configuration options 
- Test Validation Types - Complete validation reference 
- REST API Reference - API documentation 
- Criterion Benchmarking - Add detailed performance benchmarks 
- Custom Metrics Integration - Extend with Prometheus/Grafana monitoring