ERPNext系统性能测试¶
1. 性能测试概述¶
1.1 测试目标¶
验证ERPNext系统在高负载、高并发情况下的性能表现,确保系统能够满足预期的性能指标。
1.2 性能指标¶
- 响应时间:页面响应时间 < 3秒,API响应时间 < 1秒
- 吞吐量:支持1000+并发用户
- 并发用户:支持500+用户并发访问
- 资源利用率:CPU < 80%,内存 < 85%,磁盘I/O < 70%
1.3 测试工具¶
- JMeter:负载测试和压力测试
- Gatling:高性能负载测试
- Artillery:API性能测试
- Prometheus:性能监控
- Grafana:性能可视化
2. 性能测试类型¶
2.1 负载测试¶
验证系统在正常负载下的性能表现 - 并发用户数:50-200 - 测试时长:30分钟 - 预期结果:响应时间稳定,无错误
2.2 压力测试¶
验证系统在极限负载下的表现 - 并发用户数:200-500 - 测试时长:60分钟 - 预期结果:系统不崩溃,性能降级可控
2.3 容量测试¶
确定系统的最大容量 - 逐步增加负载 - 找到性能拐点 - 确定最大并发数
2.4 稳定性测试¶
验证系统长时间运行的稳定性 - 中等负载运行24小时 - 监控内存泄漏 - 检查性能衰减
3. 测试场景设计¶
3.1 用户登录场景¶
<!-- JMeter测试计划 -->
<TestPlan>
<ThreadGroup>
<elementProp name="ThreadGroup.main_controller" elementType="LoopController">
<stringProp name="LoopController.loops">100</stringProp>
</elementProp>
<stringProp name="ThreadGroup.num_threads">50</stringProp>
<stringProp name="ThreadGroup.ramp_time">60</stringProp>
</ThreadGroup>
<HTTPSamplerProxy>
<stringProp name="HTTPSampler.domain">localhost</stringProp>
<stringProp name="HTTPSampler.port">8000</stringProp>
<stringProp name="HTTPSampler.path">/api/method/login</stringProp>
<stringProp name="HTTPSampler.method">POST</stringProp>
</HTTPSamplerProxy>
</TestPlan>
3.2 客户管理场景¶
# Gatling测试脚本
import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._
class CustomerManagementSimulation extends Simulation {
val httpProtocol = http
.baseUrl("http://localhost:8000")
.acceptHeader("application/json")
.contentTypeHeader("application/json")
val customerScenario = scenario("客户管理性能测试")
.exec(http("登录")
.post("/api/method/login")
.body(StringBody("""{"usr":"test@example.com","pwd":"password123"}"""))
.check(status.is(200))
.check(jsonPath("$.message.sid").saveAs("token")))
.exec(http("创建客户")
.post("/api/resource/Customer")
.header("Authorization", "token ${token}")
.body(StringBody("""{"customer_name":"性能测试客户","customer_type":"Individual"}"""))
.check(status.is(200)))
.exec(http("获取客户列表")
.get("/api/resource/Customer")
.header("Authorization", "token ${token}")
.check(status.is(200)))
setUp(
customerScenario.inject(
rampUsers(100) during (60 seconds),
constantUsers(100) during (300 seconds)
)
).protocols(httpProtocol)
}
3.3 销售订单场景¶
# Artillery测试配置
config:
target: 'http://localhost:8000'
phases:
- duration: 60
arrivalRate: 10
- duration: 300
arrivalRate: 20
- duration: 60
arrivalRate: 5
scenarios:
- name: "销售订单性能测试"
weight: 100
flow:
- post:
url: "/api/method/login"
json:
usr: "test@example.com"
pwd: "password123"
capture:
- json: "$.message.sid"
as: "token"
- post:
url: "/api/resource/Sales Order"
headers:
Authorization: "token {token }"
json:
customer: "测试客户"
items:
- item_code: "TEST_ITEM"
qty: 1
rate: 100
4. 性能监控¶
4.1 系统资源监控¶
import psutil
import time
import json
class SystemMonitor:
def __init__(self):
self.metrics = []
def collect_metrics(self):
"""收集系统性能指标"""
metrics = {
'timestamp': time.time(),
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_io': psutil.disk_io_counters()._asdict(),
'network_io': psutil.net_io_counters()._asdict()
}
self.metrics.append(metrics)
return metrics
def get_average_metrics(self, duration_minutes=5):
"""获取平均性能指标"""
recent_metrics = [m for m in self.metrics
if m['timestamp'] > time.time() - duration_minutes * 60]
if not recent_metrics:
return None
avg_cpu = sum(m['cpu_percent'] for m in recent_metrics) / len(recent_metrics)
avg_memory = sum(m['memory_percent'] for m in recent_metrics) / len(recent_metrics)
return {
'avg_cpu_percent': avg_cpu,
'avg_memory_percent': avg_memory,
'sample_count': len(recent_metrics)
}
4.2 应用性能监控¶
import requests
import time
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Prometheus指标定义
REQUEST_COUNT = Counter('erpnext_requests_total', 'Total requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('erpnext_request_duration_seconds', 'Request duration')
ACTIVE_USERS = Gauge('erpnext_active_users', 'Active users')
class APMMonitor:
def __init__(self):
self.start_time = time.time()
def monitor_request(self, method, endpoint, duration):
"""监控请求性能"""
REQUEST_COUNT.labels(method=method, endpoint=endpoint).inc()
REQUEST_DURATION.observe(duration)
def update_active_users(self, count):
"""更新活跃用户数"""
ACTIVE_USERS.set(count)
def start_monitoring(self, port=8001):
"""启动监控服务"""
start_http_server(port)
print(f"监控服务启动在端口 {port}")
5. 性能测试执行¶
5.1 JMeter测试执行¶
#!/bin/bash
# JMeter性能测试执行脚本
# 设置测试参数
TEST_PLAN="ERPNext_Performance_Test.jmx"
RESULT_DIR="results"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
# 创建结果目录
mkdir -p $RESULT_DIR
# 执行测试
jmeter -n -t $TEST_PLAN \
-l $RESULT_DIR/test_results_$TIMESTAMP.jtl \
-e -o $RESULT_DIR/html_report_$TIMESTAMP
# 生成报告
echo "测试完成,报告保存在: $RESULT_DIR/html_report_$TIMESTAMP"
5.2 Gatling测试执行¶
#!/bin/bash
# Gatling性能测试执行脚本
# 设置测试参数
SIMULATION_CLASS="CustomerManagementSimulation"
RESULT_DIR="results"
# 执行测试
gatling.sh -s $SIMULATION_CLASS -rf $RESULT_DIR
# 查看结果
echo "测试完成,查看结果: $RESULT_DIR"
5.3 Artillery测试执行¶
#!/bin/bash
# Artillery性能测试执行脚本
# 设置测试参数
CONFIG_FILE="artillery_config.yml"
RESULT_FILE="artillery_results.json"
# 执行测试
artillery run $CONFIG_FILE --output $RESULT_FILE
# 生成报告
artillery report $RESULT_FILE
6. 性能分析¶
6.1 响应时间分析¶
import pandas as pd
import matplotlib.pyplot as plt
class PerformanceAnalyzer:
def __init__(self, jmeter_results_file):
self.results = pd.read_csv(jmeter_results_file)
def analyze_response_time(self):
"""分析响应时间"""
# 计算统计指标
stats = {
'min': self.results['elapsed'].min(),
'max': self.results['elapsed'].max(),
'mean': self.results['elapsed'].mean(),
'median': self.results['elapsed'].median(),
'p95': self.results['elapsed'].quantile(0.95),
'p99': self.results['elapsed'].quantile(0.99)
}
return stats
def plot_response_time_trend(self):
"""绘制响应时间趋势图"""
plt.figure(figsize=(12, 6))
plt.plot(self.results['timeStamp'], self.results['elapsed'])
plt.title('响应时间趋势')
plt.xlabel('时间')
plt.ylabel('响应时间(ms)')
plt.grid(True)
plt.show()
def analyze_error_rate(self):
"""分析错误率"""
total_requests = len(self.results)
error_requests = len(self.results[self.results['success'] == False])
error_rate = (error_requests / total_requests) * 100
return {
'total_requests': total_requests,
'error_requests': error_requests,
'error_rate': error_rate
}
6.2 吞吐量分析¶
class ThroughputAnalyzer:
def __init__(self, results_data):
self.results = results_data
def calculate_throughput(self, time_window=60):
"""计算吞吐量"""
# 按时间窗口分组计算请求数
throughput_data = []
for i in range(0, len(self.results), time_window):
window_data = self.results[i:i+time_window]
if len(window_data) > 0:
throughput = len(window_data) / time_window
throughput_data.append(throughput)
return throughput_data
def find_peak_throughput(self):
"""找到峰值吞吐量"""
throughput_data = self.calculate_throughput()
return max(throughput_data) if throughput_data else 0
7. 性能优化建议¶
7.1 数据库优化¶
-- 数据库索引优化
CREATE INDEX idx_customer_name ON tabCustomer(customer_name);
CREATE INDEX idx_sales_order_date ON tabSales Order(transaction_date);
CREATE INDEX idx_item_code ON tabItem(item_code);
-- 查询优化
EXPLAIN SELECT * FROM tabCustomer WHERE customer_name = '测试客户';
-- 分区表优化
ALTER TABLE tabGL Entry PARTITION BY RANGE (YEAR(posting_date));
7.2 应用优化¶
# 缓存优化
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_customer_info(customer_name):
"""缓存客户信息查询"""
# 查询客户信息
pass
# 连接池优化
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
'mysql://user:password@localhost/erpnext',
poolclass=QueuePool,
pool_size=20,
max_overflow=30,
pool_pre_ping=True
)
7.3 系统优化¶
# 系统参数优化
echo 'net.core.somaxconn = 65535' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_max_syn_backlog = 65535' >> /etc/sysctl.conf
echo 'vm.swappiness = 10' >> /etc/sysctl.conf
# 应用服务器优化
# Nginx配置优化
worker_processes auto;
worker_connections 1024;
keepalive_timeout 65;
gzip on;
gzip_types text/plain application/json;
# Gunicorn配置优化
workers = 4
worker_class = "gevent"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
8. 性能测试报告¶
8.1 测试结果汇总¶
class PerformanceReport:
def __init__(self, test_results):
self.results = test_results
def generate_summary(self):
"""生成测试摘要"""
summary = {
'test_duration': self.results['duration'],
'total_users': self.results['total_users'],
'total_requests': self.results['total_requests'],
'avg_response_time': self.results['avg_response_time'],
'max_response_time': self.results['max_response_time'],
'throughput': self.results['throughput'],
'error_rate': self.results['error_rate'],
'cpu_usage': self.results['cpu_usage'],
'memory_usage': self.results['memory_usage']
}
return summary
def generate_recommendations(self):
"""生成优化建议"""
recommendations = []
if self.results['avg_response_time'] > 3000:
recommendations.append("响应时间超过3秒,建议优化数据库查询和缓存")
if self.results['error_rate'] > 1:
recommendations.append("错误率超过1%,建议检查系统稳定性")
if self.results['cpu_usage'] > 80:
recommendations.append("CPU使用率超过80%,建议增加服务器资源")
return recommendations
9. 持续性能监控¶
9.1 监控仪表板¶
# Grafana仪表板配置
dashboard_config = {
"dashboard": {
"title": "ERPNext性能监控",
"panels": [
{
"title": "响应时间",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, erpnext_request_duration_seconds_bucket)"
}
]
},
{
"title": "吞吐量",
"type": "graph",
"targets": [
{
"expr": "rate(erpnext_requests_total[5m])"
}
]
},
{
"title": "活跃用户",
"type": "singlestat",
"targets": [
{
"expr": "erpnext_active_users"
}
]
}
]
}
}
9.2 告警配置¶
# Prometheus告警规则
groups:
- name: erpnext_alerts
rules:
- alert: HighResponseTime
expr: histogram_quantile(0.95, erpnext_request_duration_seconds_bucket) > 3
for: 5m
labels:
severity: warning
annotations:
summary: "响应时间过高"
description: "95%的请求响应时间超过3秒"
- alert: HighErrorRate
expr: rate(erpnext_requests_total{status="error"}[5m]) / rate(erpnext_requests_total[5m]) > 0.01
for: 5m
labels:
severity: critical
annotations:
summary: "错误率过高"
description: "错误率超过1%"
10. 总结¶
ERPNext系统性能测试提供了完整的性能测试解决方案,包括负载测试、压力测试、容量测试、稳定性测试等多种测试类型。通过JMeter、Gatling、Artillery等工具进行测试执行,使用Prometheus和Grafana进行性能监控,确保系统在高负载下的稳定性和可靠性。