ERPNext系统白盒测试¶
1. 白盒测试概述¶
1.1 测试目标¶
验证ERPNext系统的内部代码逻辑、数据流和控制流,确保代码的正确性、完整性和可靠性。
1.2 测试范围¶
- 代码覆盖率测试
- 静态代码分析
- 动态代码分析
- 单元测试
- 集成测试
- 代码质量分析
1.3 测试工具¶
- pytest-cov:代码覆盖率测试
- coverage.py:覆盖率分析
- pylint:静态代码分析
- flake8:代码风格检查
- bandit:安全漏洞扫描
- SonarQube:代码质量分析
2. 代码覆盖率测试¶
2.1 覆盖率指标¶
- 语句覆盖率:执行过的语句占总语句的比例
- 分支覆盖率:执行过的分支占总分支的比例
- 函数覆盖率:调用过的函数占总函数的比例
- 行覆盖率:执行过的代码行占总代码行的比例
2.2 覆盖率测试实现¶
import pytest
import coverage
from unittest.mock import patch, MagicMock
class CoverageTest:
def __init__(self):
self.cov = coverage.Coverage()
def start_coverage(self):
"""开始覆盖率统计"""
self.cov.start()
def stop_coverage(self):
"""停止覆盖率统计"""
self.cov.stop()
self.cov.save()
def generate_report(self):
"""生成覆盖率报告"""
self.cov.report()
self.cov.html_report(directory='htmlcov')
def test_customer_creation_coverage(self):
"""测试客户创建代码覆盖率"""
self.start_coverage()
# 导入被测试的模块
from erpnext.accounts.doctype.customer.customer import Customer
# 测试正常创建客户
customer = Customer()
customer.customer_name = "测试客户"
customer.customer_type = "Individual"
customer.insert()
# 测试异常情况
try:
customer.customer_name = None
customer.insert()
except Exception:
pass
self.stop_coverage()
self.generate_report()
2.3 pytest-cov配置¶
# pytest.ini
[tool:pytest]
addopts = --cov=erpnext --cov-report=html --cov-report=term-missing --cov-fail-under=80
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
3. 静态代码分析¶
3.1 pylint静态分析¶
# pylint配置文件 .pylintrc
[MASTER]
init-hook='import sys; sys.path.append(".")'
[MESSAGES CONTROL]
disable=missing-docstring,too-few-public-methods,too-many-arguments
[FORMAT]
max-line-length=120
[DESIGN]
max-args=10
max-locals=20
max-returns=6
max-branches=15
max-statements=60
# 静态分析脚本
import subprocess
import os
class StaticAnalysis:
def __init__(self, source_dir="erpnext"):
self.source_dir = source_dir
def run_pylint(self):
"""运行pylint静态分析"""
cmd = f"pylint {self.source_dir} --output-format=json"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
print("pylint发现代码问题:")
print(result.stdout)
else:
print("pylint检查通过")
def run_flake8(self):
"""运行flake8代码风格检查"""
cmd = f"flake8 {self.source_dir} --max-line-length=120"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
print("flake8发现代码风格问题:")
print(result.stdout)
else:
print("flake8检查通过")
def run_bandit(self):
"""运行bandit安全漏洞扫描"""
cmd = f"bandit -r {self.source_dir} -f json"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
print("bandit发现安全问题:")
print(result.stdout)
else:
print("bandit安全检查通过")
3.2 代码质量检查¶
import ast
import re
class CodeQualityAnalyzer:
def __init__(self):
self.issues = []
def analyze_file(self, file_path):
"""分析单个文件"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 检查代码复杂度
self.check_complexity(file_path, content)
# 检查命名规范
self.check_naming_conventions(file_path, content)
# 检查代码重复
self.check_code_duplication(file_path, content)
def check_complexity(self, file_path, content):
"""检查代码复杂度"""
try:
tree = ast.parse(content)
complexity = self.calculate_complexity(tree)
if complexity > 10:
self.issues.append({
'file': file_path,
'type': 'complexity',
'message': f'代码复杂度过高: {complexity}',
'severity': 'warning'
})
except SyntaxError as e:
self.issues.append({
'file': file_path,
'type': 'syntax',
'message': f'语法错误: {e}',
'severity': 'error'
})
def calculate_complexity(self, node):
"""计算代码复杂度"""
complexity = 1
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1
return complexity
def check_naming_conventions(self, file_path, content):
"""检查命名规范"""
lines = content.split('\n')
for i, line in enumerate(lines, 1):
# 检查函数命名
function_match = re.search(r'def\s+([a-zA-Z_][a-zA-Z0-9_]*)', line)
if function_match:
function_name = function_match.group(1)
if not re.match(r'^[a-z_][a-z0-9_]*$', function_name):
self.issues.append({
'file': file_path,
'line': i,
'type': 'naming',
'message': f'函数命名不符合规范: {function_name}',
'severity': 'warning'
})
# 检查类命名
class_match = re.search(r'class\s+([A-Za-z_][A-Za-z0-9_]*)', line)
if class_match:
class_name = class_match.group(1)
if not re.match(r'^[A-Z][A-Za-z0-9]*$', class_name):
self.issues.append({
'file': file_path,
'line': i,
'type': 'naming',
'message': f'类命名不符合规范: {class_name}',
'severity': 'warning'
})
4. 动态代码分析¶
4.1 运行时分析¶
import cProfile
import pstats
import io
from contextlib import contextmanager
class DynamicAnalysis:
def __init__(self):
self.profiler = cProfile.Profile()
@contextmanager
def profile_context(self):
"""性能分析上下文"""
self.profiler.enable()
try:
yield
finally:
self.profiler.disable()
def analyze_performance(self, func, *args, **kwargs):
"""分析函数性能"""
with self.profile_context():
result = func(*args, **kwargs)
# 生成性能报告
s = io.StringIO()
ps = pstats.Stats(self.profiler, stream=s).sort_stats('cumulative')
ps.print_stats()
print("性能分析结果:")
print(s.getvalue())
return result
def test_customer_creation_performance(self):
"""测试客户创建性能"""
from erpnext.accounts.doctype.customer.customer import Customer
def create_customer():
customer = Customer()
customer.customer_name = "性能测试客户"
customer.customer_type = "Individual"
customer.insert()
return customer
# 分析性能
result = self.analyze_performance(create_customer)
return result
4.2 内存分析¶
import tracemalloc
import gc
import sys
class MemoryAnalysis:
def __init__(self):
self.memory_snapshots = []
def start_tracing(self):
"""开始内存跟踪"""
tracemalloc.start()
def stop_tracing(self):
"""停止内存跟踪"""
tracemalloc.stop()
def take_snapshot(self, label=""):
"""拍摄内存快照"""
snapshot = tracemalloc.take_snapshot()
self.memory_snapshots.append({
'label': label,
'snapshot': snapshot,
'timestamp': time.time()
})
def analyze_memory_usage(self):
"""分析内存使用情况"""
if len(self.memory_snapshots) < 2:
return
# 比较快照
snapshot1 = self.memory_snapshots[0]['snapshot']
snapshot2 = self.memory_snapshots[-1]['snapshot']
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
print("内存使用分析:")
for stat in top_stats[:10]:
print(stat)
def test_memory_leak(self):
"""测试内存泄漏"""
self.start_tracing()
# 拍摄初始快照
self.take_snapshot("初始状态")
# 执行可能产生内存泄漏的操作
for i in range(1000):
customer = Customer()
customer.customer_name = f"测试客户{i}"
customer.customer_type = "Individual"
customer.insert()
# 拍摄结束快照
self.take_snapshot("操作完成")
# 分析内存使用
self.analyze_memory_usage()
self.stop_tracing()
5. 单元测试¶
5.1 测试用例设计¶
import unittest
from unittest.mock import patch, MagicMock
import pytest
class TestCustomer(unittest.TestCase):
def setUp(self):
"""测试前置条件"""
self.customer = Customer()
def tearDown(self):
"""测试后置条件"""
# 清理测试数据
pass
def test_customer_creation(self):
"""测试客户创建"""
self.customer.customer_name = "测试客户"
self.customer.customer_type = "Individual"
# 验证客户创建
self.customer.insert()
self.assertIsNotNone(self.customer.name)
self.assertEqual(self.customer.customer_name, "测试客户")
def test_customer_validation(self):
"""测试客户验证"""
# 测试必填字段验证
with self.assertRaises(ValidationError):
self.customer.customer_name = None
self.customer.insert()
def test_customer_update(self):
"""测试客户更新"""
# 创建客户
self.customer.customer_name = "测试客户"
self.customer.customer_type = "Individual"
self.customer.insert()
# 更新客户
self.customer.customer_type = "Company"
self.customer.save()
# 验证更新
updated_customer = Customer.get(self.customer.name)
self.assertEqual(updated_customer.customer_type, "Company")
@patch('erpnext.accounts.doctype.customer.customer.send_email')
def test_customer_notification(self, mock_send_email):
"""测试客户通知"""
self.customer.customer_name = "测试客户"
self.customer.email_id = "test@example.com"
self.customer.insert()
# 验证邮件发送
mock_send_email.assert_called_once()
def test_customer_deletion(self):
"""测试客户删除"""
# 创建客户
self.customer.customer_name = "测试客户"
self.customer.insert()
customer_name = self.customer.name
# 删除客户
self.customer.delete()
# 验证删除
with self.assertRaises(DoesNotExistError):
Customer.get(customer_name)
5.2 Mock和Stub¶
from unittest.mock import Mock, patch, MagicMock
class TestSalesOrder(unittest.TestCase):
def setUp(self):
self.sales_order = SalesOrder()
@patch('erpnext.accounts.doctype.sales_order.sales_order.get_customer')
def test_sales_order_with_mock_customer(self, mock_get_customer):
"""使用Mock测试销售订单"""
# 设置Mock返回值
mock_customer = MagicMock()
mock_customer.customer_name = "Mock客户"
mock_customer.credit_limit = 10000
mock_get_customer.return_value = mock_customer
# 测试销售订单创建
self.sales_order.customer = "Mock客户"
self.sales_order.insert()
# 验证Mock调用
mock_get_customer.assert_called_once_with("Mock客户")
@patch('erpnext.accounts.doctype.sales_order.sales_order.calculate_taxes')
def test_sales_order_tax_calculation(self, mock_calculate_taxes):
"""测试销售订单税费计算"""
# 设置Mock返回值
mock_calculate_taxes.return_value = {
'total_taxes': 100,
'grand_total': 1100
}
# 测试税费计算
self.sales_order.total = 1000
self.sales_order.calculate_taxes_and_totals()
# 验证税费计算
self.assertEqual(self.sales_order.total_taxes, 100)
self.assertEqual(self.sales_order.grand_total, 1100)
def test_sales_order_with_stub(self):
"""使用Stub测试销售订单"""
# 创建Stub对象
stub_customer = Mock()
stub_customer.customer_name = "Stub客户"
stub_customer.credit_limit = 5000
stub_customer.is_credit_limit_exceeded.return_value = False
# 使用Stub测试
self.sales_order.customer = "Stub客户"
self.sales_order.total = 3000
# 验证信用额度检查
self.assertFalse(stub_customer.is_credit_limit_exceeded(3000))
6. 集成测试¶
6.1 模块集成测试¶
class TestCustomerSalesIntegration(unittest.TestCase):
def setUp(self):
"""设置测试环境"""
self.customer = Customer()
self.sales_order = SalesOrder()
def test_customer_sales_order_integration(self):
"""测试客户和销售订单集成"""
# 创建客户
self.customer.customer_name = "集成测试客户"
self.customer.customer_type = "Individual"
self.customer.insert()
# 创建销售订单
self.sales_order.customer = self.customer.name
self.sales_order.transaction_date = today()
# 添加订单项目
item = self.sales_order.append("items")
item.item_code = "TEST_ITEM"
item.qty = 1
item.rate = 100
self.sales_order.insert()
# 验证集成
self.assertEqual(self.sales_order.customer, self.customer.name)
self.assertEqual(self.sales_order.total, 100)
def test_customer_payment_integration(self):
"""测试客户和付款集成"""
# 创建客户
self.customer.customer_name = "付款测试客户"
self.customer.insert()
# 创建销售订单
self.sales_order.customer = self.customer.name
self.sales_order.total = 1000
self.sales_order.insert()
self.sales_order.submit()
# 创建付款
payment = PaymentEntry()
payment.party_type = "Customer"
payment.party = self.customer.name
payment.paid_amount = 1000
payment.insert()
payment.submit()
# 验证付款集成
self.assertEqual(payment.party, self.customer.name)
self.assertEqual(payment.paid_amount, 1000)
6.2 数据库集成测试¶
class TestDatabaseIntegration(unittest.TestCase):
def setUp(self):
"""设置数据库测试环境"""
self.db = frappe.get_db()
def test_customer_database_integration(self):
"""测试客户数据库集成"""
# 创建客户
customer = Customer()
customer.customer_name = "数据库测试客户"
customer.customer_type = "Individual"
customer.insert()
# 验证数据库记录
customer_data = self.db.get_value("Customer", customer.name, "*")
self.assertIsNotNone(customer_data)
self.assertEqual(customer_data.customer_name, "数据库测试客户")
def test_transaction_rollback(self):
"""测试事务回滚"""
# 开始事务
self.db.begin()
try:
# 创建客户
customer = Customer()
customer.customer_name = "回滚测试客户"
customer.insert()
# 故意引发异常
raise Exception("测试异常")
except Exception:
# 回滚事务
self.db.rollback()
# 验证回滚
customer_exists = self.db.exists("Customer", {"customer_name": "回滚测试客户"})
self.assertFalse(customer_exists)
7. 测试工具和框架¶
7.1 测试框架配置¶
# conftest.py
import pytest
import frappe
from frappe.tests.utils import FrappeTestCase
@pytest.fixture(scope="session")
def setup_test_environment():
"""设置测试环境"""
frappe.init(site="test_site")
frappe.connect()
yield
frappe.destroy()
@pytest.fixture
def test_customer():
"""测试客户fixture"""
customer = Customer()
customer.customer_name = "测试客户"
customer.customer_type = "Individual"
customer.insert()
yield customer
customer.delete()
@pytest.fixture
def test_sales_order(test_customer):
"""测试销售订单fixture"""
sales_order = SalesOrder()
sales_order.customer = test_customer.name
sales_order.transaction_date = today()
item = sales_order.append("items")
item.item_code = "TEST_ITEM"
item.qty = 1
item.rate = 100
sales_order.insert()
yield sales_order
sales_order.delete()
7.2 测试数据管理¶
class TestDataManager:
def __init__(self):
self.test_data = {}
def create_test_customer(self, name="测试客户"):
"""创建测试客户"""
customer = Customer()
customer.customer_name = name
customer.customer_type = "Individual"
customer.insert()
self.test_data[f"customer_{name}"] = customer
return customer
def create_test_item(self, code="TEST_ITEM"):
"""创建测试商品"""
item = Item()
item.item_code = code
item.item_name = f"测试商品{code}"
item.item_group = "Products"
item.insert()
self.test_data[f"item_{code}"] = item
return item
def cleanup_test_data(self):
"""清理测试数据"""
for key, obj in self.test_data.items():
try:
obj.delete()
except Exception as e:
print(f"清理测试数据失败: {key} - {e}")
self.test_data.clear()
8. 测试报告和分析¶
8.1 测试报告生成¶
import json
import xml.etree.ElementTree as ET
from datetime import datetime
class TestReportGenerator:
def __init__(self):
self.test_results = []
def add_test_result(self, test_name, status, duration, error=None):
"""添加测试结果"""
result = {
"test_name": test_name,
"status": status,
"duration": duration,
"error": error,
"timestamp": datetime.now().isoformat()
}
self.test_results.append(result)
def generate_json_report(self, filename="test_report.json"):
"""生成JSON报告"""
report = {
"summary": self.generate_summary(),
"test_results": self.test_results
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
def generate_xml_report(self, filename="test_report.xml"):
"""生成XML报告"""
root = ET.Element("testsuite")
root.set("name", "ERPNext白盒测试")
root.set("tests", str(len(self.test_results)))
passed_count = len([r for r in self.test_results if r["status"] == "PASSED"])
root.set("failures", str(len(self.test_results) - passed_count))
for result in self.test_results:
testcase = ET.SubElement(root, "testcase")
testcase.set("name", result["test_name"])
testcase.set("time", str(result["duration"]))
if result["status"] != "PASSED":
failure = ET.SubElement(testcase, "failure")
failure.text = result.get("error", "")
tree = ET.ElementTree(root)
tree.write(filename, encoding="utf-8", xml_declaration=True)
def generate_summary(self):
"""生成测试摘要"""
total_tests = len(self.test_results)
passed_tests = len([r for r in self.test_results if r["status"] == "PASSED"])
failed_tests = total_tests - passed_tests
return {
"total_tests": total_tests,
"passed_tests": passed_tests,
"failed_tests": failed_tests,
"pass_rate": (passed_tests / total_tests * 100) if total_tests > 0 else 0,
"total_duration": sum(r["duration"] for r in self.test_results)
}
9. 持续集成¶
9.1 CI/CD配置¶
# .github/workflows/whitebox-testing.yml
name: ERPNext白盒测试
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
whitebox-testing:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.8
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest-cov pylint flake8 bandit
- name: Run static analysis
run: |
pylint erpnext --output-format=json > pylint-report.json
flake8 erpnext --output-file=flake8-report.txt
bandit -r erpnext -f json -o bandit-report.json
- name: Run unit tests with coverage
run: |
pytest tests/unit_tests/ --cov=erpnext --cov-report=xml --cov-report=html
- name: Upload coverage reports
uses: codecov/codecov-action@v1
with:
file: ./coverage.xml
- name: Upload test reports
uses: actions/upload-artifact@v2
with:
name: test-reports
path: |
coverage.xml
htmlcov/
pylint-report.json
flake8-report.txt
bandit-report.json
10. 总结¶
ERPNext系统白盒测试提供了全面的代码质量保证解决方案,包括代码覆盖率测试、静态代码分析、动态代码分析、单元测试、集成测试等多个方面。通过自动化测试工具和持续集成,确保代码质量和系统稳定性。