Skip to content

ERPNext系统性能测试

1. 性能测试概述

1.1 测试目标

验证ERPNext系统在高负载、高并发情况下的性能表现,确保系统能够满足预期的性能指标。

1.2 性能指标

  • 响应时间:页面响应时间 < 3秒,API响应时间 < 1秒
  • 吞吐量:支持1000+并发用户
  • 并发用户:支持500+用户并发访问
  • 资源利用率:CPU < 80%,内存 < 85%,磁盘I/O < 70%

1.3 测试工具

  • JMeter:负载测试和压力测试
  • Gatling:高性能负载测试
  • Artillery:API性能测试
  • Prometheus:性能监控
  • Grafana:性能可视化

2. 性能测试类型

2.1 负载测试

验证系统在正常负载下的性能表现 - 并发用户数:50-200 - 测试时长:30分钟 - 预期结果:响应时间稳定,无错误

2.2 压力测试

验证系统在极限负载下的表现 - 并发用户数:200-500 - 测试时长:60分钟 - 预期结果:系统不崩溃,性能降级可控

2.3 容量测试

确定系统的最大容量 - 逐步增加负载 - 找到性能拐点 - 确定最大并发数

2.4 稳定性测试

验证系统长时间运行的稳定性 - 中等负载运行24小时 - 监控内存泄漏 - 检查性能衰减

3. 测试场景设计

3.1 用户登录场景

<!-- JMeter测试计划 -->
<TestPlan>
  <ThreadGroup>
    <elementProp name="ThreadGroup.main_controller" elementType="LoopController">
      <stringProp name="LoopController.loops">100</stringProp>
    </elementProp>
    <stringProp name="ThreadGroup.num_threads">50</stringProp>
    <stringProp name="ThreadGroup.ramp_time">60</stringProp>
  </ThreadGroup>

  <HTTPSamplerProxy>
    <stringProp name="HTTPSampler.domain">localhost</stringProp>
    <stringProp name="HTTPSampler.port">8000</stringProp>
    <stringProp name="HTTPSampler.path">/api/method/login</stringProp>
    <stringProp name="HTTPSampler.method">POST</stringProp>
  </HTTPSamplerProxy>
</TestPlan>

3.2 客户管理场景

# Gatling测试脚本
import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._

class CustomerManagementSimulation extends Simulation {
  val httpProtocol = http
    .baseUrl("http://localhost:8000")
    .acceptHeader("application/json")
    .contentTypeHeader("application/json")

  val customerScenario = scenario("客户管理性能测试")
    .exec(http("登录")
      .post("/api/method/login")
      .body(StringBody("""{"usr":"test@example.com","pwd":"password123"}"""))
      .check(status.is(200))
      .check(jsonPath("$.message.sid").saveAs("token")))
    .exec(http("创建客户")
      .post("/api/resource/Customer")
      .header("Authorization", "token ${token}")
      .body(StringBody("""{"customer_name":"性能测试客户","customer_type":"Individual"}"""))
      .check(status.is(200)))
    .exec(http("获取客户列表")
      .get("/api/resource/Customer")
      .header("Authorization", "token ${token}")
      .check(status.is(200)))

  setUp(
    customerScenario.inject(
      rampUsers(100) during (60 seconds),
      constantUsers(100) during (300 seconds)
    )
  ).protocols(httpProtocol)
}

3.3 销售订单场景

# Artillery测试配置
config:
  target: 'http://localhost:8000'
  phases:
    - duration: 60
      arrivalRate: 10
    - duration: 300
      arrivalRate: 20
    - duration: 60
      arrivalRate: 5

scenarios:
  - name: "销售订单性能测试"
    weight: 100
    flow:
      - post:
          url: "/api/method/login"
          json:
            usr: "test@example.com"
            pwd: "password123"
          capture:
            - json: "$.message.sid"
              as: "token"
      - post:
          url: "/api/resource/Sales Order"
          headers:
            Authorization: "token {token }"
          json:
            customer: "测试客户"
            items:
              - item_code: "TEST_ITEM"
                qty: 1
                rate: 100

4. 性能监控

4.1 系统资源监控

import psutil
import time
import json

class SystemMonitor:
    def __init__(self):
        self.metrics = []

    def collect_metrics(self):
        """收集系统性能指标"""
        metrics = {
            'timestamp': time.time(),
            'cpu_percent': psutil.cpu_percent(interval=1),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_io': psutil.disk_io_counters()._asdict(),
            'network_io': psutil.net_io_counters()._asdict()
        }
        self.metrics.append(metrics)
        return metrics

    def get_average_metrics(self, duration_minutes=5):
        """获取平均性能指标"""
        recent_metrics = [m for m in self.metrics 
                         if m['timestamp'] > time.time() - duration_minutes * 60]

        if not recent_metrics:
            return None

        avg_cpu = sum(m['cpu_percent'] for m in recent_metrics) / len(recent_metrics)
        avg_memory = sum(m['memory_percent'] for m in recent_metrics) / len(recent_metrics)

        return {
            'avg_cpu_percent': avg_cpu,
            'avg_memory_percent': avg_memory,
            'sample_count': len(recent_metrics)
        }

4.2 应用性能监控

import requests
import time
from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Prometheus指标定义
REQUEST_COUNT = Counter('erpnext_requests_total', 'Total requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('erpnext_request_duration_seconds', 'Request duration')
ACTIVE_USERS = Gauge('erpnext_active_users', 'Active users')

class APMMonitor:
    def __init__(self):
        self.start_time = time.time()

    def monitor_request(self, method, endpoint, duration):
        """监控请求性能"""
        REQUEST_COUNT.labels(method=method, endpoint=endpoint).inc()
        REQUEST_DURATION.observe(duration)

    def update_active_users(self, count):
        """更新活跃用户数"""
        ACTIVE_USERS.set(count)

    def start_monitoring(self, port=8001):
        """启动监控服务"""
        start_http_server(port)
        print(f"监控服务启动在端口 {port}")

5. 性能测试执行

5.1 JMeter测试执行

#!/bin/bash
# JMeter性能测试执行脚本

# 设置测试参数
TEST_PLAN="ERPNext_Performance_Test.jmx"
RESULT_DIR="results"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")

# 创建结果目录
mkdir -p $RESULT_DIR

# 执行测试
jmeter -n -t $TEST_PLAN \
       -l $RESULT_DIR/test_results_$TIMESTAMP.jtl \
       -e -o $RESULT_DIR/html_report_$TIMESTAMP

# 生成报告
echo "测试完成,报告保存在: $RESULT_DIR/html_report_$TIMESTAMP"

5.2 Gatling测试执行

#!/bin/bash
# Gatling性能测试执行脚本

# 设置测试参数
SIMULATION_CLASS="CustomerManagementSimulation"
RESULT_DIR="results"

# 执行测试
gatling.sh -s $SIMULATION_CLASS -rf $RESULT_DIR

# 查看结果
echo "测试完成,查看结果: $RESULT_DIR"

5.3 Artillery测试执行

#!/bin/bash
# Artillery性能测试执行脚本

# 设置测试参数
CONFIG_FILE="artillery_config.yml"
RESULT_FILE="artillery_results.json"

# 执行测试
artillery run $CONFIG_FILE --output $RESULT_FILE

# 生成报告
artillery report $RESULT_FILE

6. 性能分析

6.1 响应时间分析

import pandas as pd
import matplotlib.pyplot as plt

class PerformanceAnalyzer:
    def __init__(self, jmeter_results_file):
        self.results = pd.read_csv(jmeter_results_file)

    def analyze_response_time(self):
        """分析响应时间"""
        # 计算统计指标
        stats = {
            'min': self.results['elapsed'].min(),
            'max': self.results['elapsed'].max(),
            'mean': self.results['elapsed'].mean(),
            'median': self.results['elapsed'].median(),
            'p95': self.results['elapsed'].quantile(0.95),
            'p99': self.results['elapsed'].quantile(0.99)
        }

        return stats

    def plot_response_time_trend(self):
        """绘制响应时间趋势图"""
        plt.figure(figsize=(12, 6))
        plt.plot(self.results['timeStamp'], self.results['elapsed'])
        plt.title('响应时间趋势')
        plt.xlabel('时间')
        plt.ylabel('响应时间(ms)')
        plt.grid(True)
        plt.show()

    def analyze_error_rate(self):
        """分析错误率"""
        total_requests = len(self.results)
        error_requests = len(self.results[self.results['success'] == False])
        error_rate = (error_requests / total_requests) * 100

        return {
            'total_requests': total_requests,
            'error_requests': error_requests,
            'error_rate': error_rate
        }

6.2 吞吐量分析

class ThroughputAnalyzer:
    def __init__(self, results_data):
        self.results = results_data

    def calculate_throughput(self, time_window=60):
        """计算吞吐量"""
        # 按时间窗口分组计算请求数
        throughput_data = []

        for i in range(0, len(self.results), time_window):
            window_data = self.results[i:i+time_window]
            if len(window_data) > 0:
                throughput = len(window_data) / time_window
                throughput_data.append(throughput)

        return throughput_data

    def find_peak_throughput(self):
        """找到峰值吞吐量"""
        throughput_data = self.calculate_throughput()
        return max(throughput_data) if throughput_data else 0

7. 性能优化建议

7.1 数据库优化

-- 数据库索引优化
CREATE INDEX idx_customer_name ON tabCustomer(customer_name);
CREATE INDEX idx_sales_order_date ON tabSales Order(transaction_date);
CREATE INDEX idx_item_code ON tabItem(item_code);

-- 查询优化
EXPLAIN SELECT * FROM tabCustomer WHERE customer_name = '测试客户';

-- 分区表优化
ALTER TABLE tabGL Entry PARTITION BY RANGE (YEAR(posting_date));

7.2 应用优化

# 缓存优化
from functools import lru_cache

@lru_cache(maxsize=1000)
def get_customer_info(customer_name):
    """缓存客户信息查询"""
    # 查询客户信息
    pass

# 连接池优化
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    'mysql://user:password@localhost/erpnext',
    poolclass=QueuePool,
    pool_size=20,
    max_overflow=30,
    pool_pre_ping=True
)

7.3 系统优化

# 系统参数优化
echo 'net.core.somaxconn = 65535' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_max_syn_backlog = 65535' >> /etc/sysctl.conf
echo 'vm.swappiness = 10' >> /etc/sysctl.conf

# 应用服务器优化
# Nginx配置优化
worker_processes auto;
worker_connections 1024;
keepalive_timeout 65;
gzip on;
gzip_types text/plain application/json;

# Gunicorn配置优化
workers = 4
worker_class = "gevent"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100

8. 性能测试报告

8.1 测试结果汇总

class PerformanceReport:
    def __init__(self, test_results):
        self.results = test_results

    def generate_summary(self):
        """生成测试摘要"""
        summary = {
            'test_duration': self.results['duration'],
            'total_users': self.results['total_users'],
            'total_requests': self.results['total_requests'],
            'avg_response_time': self.results['avg_response_time'],
            'max_response_time': self.results['max_response_time'],
            'throughput': self.results['throughput'],
            'error_rate': self.results['error_rate'],
            'cpu_usage': self.results['cpu_usage'],
            'memory_usage': self.results['memory_usage']
        }
        return summary

    def generate_recommendations(self):
        """生成优化建议"""
        recommendations = []

        if self.results['avg_response_time'] > 3000:
            recommendations.append("响应时间超过3秒,建议优化数据库查询和缓存")

        if self.results['error_rate'] > 1:
            recommendations.append("错误率超过1%,建议检查系统稳定性")

        if self.results['cpu_usage'] > 80:
            recommendations.append("CPU使用率超过80%,建议增加服务器资源")

        return recommendations

9. 持续性能监控

9.1 监控仪表板

# Grafana仪表板配置
dashboard_config = {
    "dashboard": {
        "title": "ERPNext性能监控",
        "panels": [
            {
                "title": "响应时间",
                "type": "graph",
                "targets": [
                    {
                        "expr": "histogram_quantile(0.95, erpnext_request_duration_seconds_bucket)"
                    }
                ]
            },
            {
                "title": "吞吐量",
                "type": "graph",
                "targets": [
                    {
                        "expr": "rate(erpnext_requests_total[5m])"
                    }
                ]
            },
            {
                "title": "活跃用户",
                "type": "singlestat",
                "targets": [
                    {
                        "expr": "erpnext_active_users"
                    }
                ]
            }
        ]
    }
}

9.2 告警配置

# Prometheus告警规则
groups:
- name: erpnext_alerts
  rules:
  - alert: HighResponseTime
    expr: histogram_quantile(0.95, erpnext_request_duration_seconds_bucket) > 3
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "响应时间过高"
      description: "95%的请求响应时间超过3秒"

  - alert: HighErrorRate
    expr: rate(erpnext_requests_total{status="error"}[5m]) / rate(erpnext_requests_total[5m]) > 0.01
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "错误率过高"
      description: "错误率超过1%"

10. 总结

ERPNext系统性能测试提供了完整的性能测试解决方案,包括负载测试、压力测试、容量测试、稳定性测试等多种测试类型。通过JMeter、Gatling、Artillery等工具进行测试执行,使用Prometheus和Grafana进行性能监控,确保系统在高负载下的稳定性和可靠性。