Skip to content

ERPNext系统白盒测试

1. 白盒测试概述

1.1 测试目标

验证ERPNext系统的内部代码逻辑、数据流和控制流,确保代码的正确性、完整性和可靠性。

1.2 测试范围

  • 代码覆盖率测试
  • 静态代码分析
  • 动态代码分析
  • 单元测试
  • 集成测试
  • 代码质量分析

1.3 测试工具

  • pytest-cov:代码覆盖率测试
  • coverage.py:覆盖率分析
  • pylint:静态代码分析
  • flake8:代码风格检查
  • bandit:安全漏洞扫描
  • SonarQube:代码质量分析

2. 代码覆盖率测试

2.1 覆盖率指标

  • 语句覆盖率:执行过的语句占总语句的比例
  • 分支覆盖率:执行过的分支占总分支的比例
  • 函数覆盖率:调用过的函数占总函数的比例
  • 行覆盖率:执行过的代码行占总代码行的比例

2.2 覆盖率测试实现

import pytest
import coverage
from unittest.mock import patch, MagicMock

class CoverageTest:
    def __init__(self):
        self.cov = coverage.Coverage()

    def start_coverage(self):
        """开始覆盖率统计"""
        self.cov.start()

    def stop_coverage(self):
        """停止覆盖率统计"""
        self.cov.stop()
        self.cov.save()

    def generate_report(self):
        """生成覆盖率报告"""
        self.cov.report()
        self.cov.html_report(directory='htmlcov')

    def test_customer_creation_coverage(self):
        """测试客户创建代码覆盖率"""
        self.start_coverage()

        # 导入被测试的模块
        from erpnext.accounts.doctype.customer.customer import Customer

        # 测试正常创建客户
        customer = Customer()
        customer.customer_name = "测试客户"
        customer.customer_type = "Individual"
        customer.insert()

        # 测试异常情况
        try:
            customer.customer_name = None
            customer.insert()
        except Exception:
            pass

        self.stop_coverage()
        self.generate_report()

2.3 pytest-cov配置

# pytest.ini
[tool:pytest]
addopts = --cov=erpnext --cov-report=html --cov-report=term-missing --cov-fail-under=80
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*

3. 静态代码分析

3.1 pylint静态分析

# pylint配置文件 .pylintrc
[MASTER]
init-hook='import sys; sys.path.append(".")'

[MESSAGES CONTROL]
disable=missing-docstring,too-few-public-methods,too-many-arguments

[FORMAT]
max-line-length=120

[DESIGN]
max-args=10
max-locals=20
max-returns=6
max-branches=15
max-statements=60

# 静态分析脚本
import subprocess
import os

class StaticAnalysis:
    def __init__(self, source_dir="erpnext"):
        self.source_dir = source_dir

    def run_pylint(self):
        """运行pylint静态分析"""
        cmd = f"pylint {self.source_dir} --output-format=json"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

        if result.returncode != 0:
            print("pylint发现代码问题:")
            print(result.stdout)
        else:
            print("pylint检查通过")

    def run_flake8(self):
        """运行flake8代码风格检查"""
        cmd = f"flake8 {self.source_dir} --max-line-length=120"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

        if result.returncode != 0:
            print("flake8发现代码风格问题:")
            print(result.stdout)
        else:
            print("flake8检查通过")

    def run_bandit(self):
        """运行bandit安全漏洞扫描"""
        cmd = f"bandit -r {self.source_dir} -f json"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

        if result.returncode != 0:
            print("bandit发现安全问题:")
            print(result.stdout)
        else:
            print("bandit安全检查通过")

3.2 代码质量检查

import ast
import re

class CodeQualityAnalyzer:
    def __init__(self):
        self.issues = []

    def analyze_file(self, file_path):
        """分析单个文件"""
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()

        # 检查代码复杂度
        self.check_complexity(file_path, content)

        # 检查命名规范
        self.check_naming_conventions(file_path, content)

        # 检查代码重复
        self.check_code_duplication(file_path, content)

    def check_complexity(self, file_path, content):
        """检查代码复杂度"""
        try:
            tree = ast.parse(content)
            complexity = self.calculate_complexity(tree)

            if complexity > 10:
                self.issues.append({
                    'file': file_path,
                    'type': 'complexity',
                    'message': f'代码复杂度过高: {complexity}',
                    'severity': 'warning'
                })
        except SyntaxError as e:
            self.issues.append({
                'file': file_path,
                'type': 'syntax',
                'message': f'语法错误: {e}',
                'severity': 'error'
            })

    def calculate_complexity(self, node):
        """计算代码复杂度"""
        complexity = 1

        for child in ast.walk(node):
            if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)):
                complexity += 1
            elif isinstance(child, ast.BoolOp):
                complexity += len(child.values) - 1

        return complexity

    def check_naming_conventions(self, file_path, content):
        """检查命名规范"""
        lines = content.split('\n')

        for i, line in enumerate(lines, 1):
            # 检查函数命名
            function_match = re.search(r'def\s+([a-zA-Z_][a-zA-Z0-9_]*)', line)
            if function_match:
                function_name = function_match.group(1)
                if not re.match(r'^[a-z_][a-z0-9_]*$', function_name):
                    self.issues.append({
                        'file': file_path,
                        'line': i,
                        'type': 'naming',
                        'message': f'函数命名不符合规范: {function_name}',
                        'severity': 'warning'
                    })

            # 检查类命名
            class_match = re.search(r'class\s+([A-Za-z_][A-Za-z0-9_]*)', line)
            if class_match:
                class_name = class_match.group(1)
                if not re.match(r'^[A-Z][A-Za-z0-9]*$', class_name):
                    self.issues.append({
                        'file': file_path,
                        'line': i,
                        'type': 'naming',
                        'message': f'类命名不符合规范: {class_name}',
                        'severity': 'warning'
                    })

4. 动态代码分析

4.1 运行时分析

import cProfile
import pstats
import io
from contextlib import contextmanager

class DynamicAnalysis:
    def __init__(self):
        self.profiler = cProfile.Profile()

    @contextmanager
    def profile_context(self):
        """性能分析上下文"""
        self.profiler.enable()
        try:
            yield
        finally:
            self.profiler.disable()

    def analyze_performance(self, func, *args, **kwargs):
        """分析函数性能"""
        with self.profile_context():
            result = func(*args, **kwargs)

        # 生成性能报告
        s = io.StringIO()
        ps = pstats.Stats(self.profiler, stream=s).sort_stats('cumulative')
        ps.print_stats()

        print("性能分析结果:")
        print(s.getvalue())

        return result

    def test_customer_creation_performance(self):
        """测试客户创建性能"""
        from erpnext.accounts.doctype.customer.customer import Customer

        def create_customer():
            customer = Customer()
            customer.customer_name = "性能测试客户"
            customer.customer_type = "Individual"
            customer.insert()
            return customer

        # 分析性能
        result = self.analyze_performance(create_customer)
        return result

4.2 内存分析

import tracemalloc
import gc
import sys

class MemoryAnalysis:
    def __init__(self):
        self.memory_snapshots = []

    def start_tracing(self):
        """开始内存跟踪"""
        tracemalloc.start()

    def stop_tracing(self):
        """停止内存跟踪"""
        tracemalloc.stop()

    def take_snapshot(self, label=""):
        """拍摄内存快照"""
        snapshot = tracemalloc.take_snapshot()
        self.memory_snapshots.append({
            'label': label,
            'snapshot': snapshot,
            'timestamp': time.time()
        })

    def analyze_memory_usage(self):
        """分析内存使用情况"""
        if len(self.memory_snapshots) < 2:
            return

        # 比较快照
        snapshot1 = self.memory_snapshots[0]['snapshot']
        snapshot2 = self.memory_snapshots[-1]['snapshot']

        top_stats = snapshot2.compare_to(snapshot1, 'lineno')

        print("内存使用分析:")
        for stat in top_stats[:10]:
            print(stat)

    def test_memory_leak(self):
        """测试内存泄漏"""
        self.start_tracing()

        # 拍摄初始快照
        self.take_snapshot("初始状态")

        # 执行可能产生内存泄漏的操作
        for i in range(1000):
            customer = Customer()
            customer.customer_name = f"测试客户{i}"
            customer.customer_type = "Individual"
            customer.insert()

        # 拍摄结束快照
        self.take_snapshot("操作完成")

        # 分析内存使用
        self.analyze_memory_usage()

        self.stop_tracing()

5. 单元测试

5.1 测试用例设计

import unittest
from unittest.mock import patch, MagicMock
import pytest

class TestCustomer(unittest.TestCase):
    def setUp(self):
        """测试前置条件"""
        self.customer = Customer()

    def tearDown(self):
        """测试后置条件"""
        # 清理测试数据
        pass

    def test_customer_creation(self):
        """测试客户创建"""
        self.customer.customer_name = "测试客户"
        self.customer.customer_type = "Individual"

        # 验证客户创建
        self.customer.insert()
        self.assertIsNotNone(self.customer.name)
        self.assertEqual(self.customer.customer_name, "测试客户")

    def test_customer_validation(self):
        """测试客户验证"""
        # 测试必填字段验证
        with self.assertRaises(ValidationError):
            self.customer.customer_name = None
            self.customer.insert()

    def test_customer_update(self):
        """测试客户更新"""
        # 创建客户
        self.customer.customer_name = "测试客户"
        self.customer.customer_type = "Individual"
        self.customer.insert()

        # 更新客户
        self.customer.customer_type = "Company"
        self.customer.save()

        # 验证更新
        updated_customer = Customer.get(self.customer.name)
        self.assertEqual(updated_customer.customer_type, "Company")

    @patch('erpnext.accounts.doctype.customer.customer.send_email')
    def test_customer_notification(self, mock_send_email):
        """测试客户通知"""
        self.customer.customer_name = "测试客户"
        self.customer.email_id = "test@example.com"
        self.customer.insert()

        # 验证邮件发送
        mock_send_email.assert_called_once()

    def test_customer_deletion(self):
        """测试客户删除"""
        # 创建客户
        self.customer.customer_name = "测试客户"
        self.customer.insert()
        customer_name = self.customer.name

        # 删除客户
        self.customer.delete()

        # 验证删除
        with self.assertRaises(DoesNotExistError):
            Customer.get(customer_name)

5.2 Mock和Stub

from unittest.mock import Mock, patch, MagicMock

class TestSalesOrder(unittest.TestCase):
    def setUp(self):
        self.sales_order = SalesOrder()

    @patch('erpnext.accounts.doctype.sales_order.sales_order.get_customer')
    def test_sales_order_with_mock_customer(self, mock_get_customer):
        """使用Mock测试销售订单"""
        # 设置Mock返回值
        mock_customer = MagicMock()
        mock_customer.customer_name = "Mock客户"
        mock_customer.credit_limit = 10000
        mock_get_customer.return_value = mock_customer

        # 测试销售订单创建
        self.sales_order.customer = "Mock客户"
        self.sales_order.insert()

        # 验证Mock调用
        mock_get_customer.assert_called_once_with("Mock客户")

    @patch('erpnext.accounts.doctype.sales_order.sales_order.calculate_taxes')
    def test_sales_order_tax_calculation(self, mock_calculate_taxes):
        """测试销售订单税费计算"""
        # 设置Mock返回值
        mock_calculate_taxes.return_value = {
            'total_taxes': 100,
            'grand_total': 1100
        }

        # 测试税费计算
        self.sales_order.total = 1000
        self.sales_order.calculate_taxes_and_totals()

        # 验证税费计算
        self.assertEqual(self.sales_order.total_taxes, 100)
        self.assertEqual(self.sales_order.grand_total, 1100)

    def test_sales_order_with_stub(self):
        """使用Stub测试销售订单"""
        # 创建Stub对象
        stub_customer = Mock()
        stub_customer.customer_name = "Stub客户"
        stub_customer.credit_limit = 5000
        stub_customer.is_credit_limit_exceeded.return_value = False

        # 使用Stub测试
        self.sales_order.customer = "Stub客户"
        self.sales_order.total = 3000

        # 验证信用额度检查
        self.assertFalse(stub_customer.is_credit_limit_exceeded(3000))

6. 集成测试

6.1 模块集成测试

class TestCustomerSalesIntegration(unittest.TestCase):
    def setUp(self):
        """设置测试环境"""
        self.customer = Customer()
        self.sales_order = SalesOrder()

    def test_customer_sales_order_integration(self):
        """测试客户和销售订单集成"""
        # 创建客户
        self.customer.customer_name = "集成测试客户"
        self.customer.customer_type = "Individual"
        self.customer.insert()

        # 创建销售订单
        self.sales_order.customer = self.customer.name
        self.sales_order.transaction_date = today()

        # 添加订单项目
        item = self.sales_order.append("items")
        item.item_code = "TEST_ITEM"
        item.qty = 1
        item.rate = 100

        self.sales_order.insert()

        # 验证集成
        self.assertEqual(self.sales_order.customer, self.customer.name)
        self.assertEqual(self.sales_order.total, 100)

    def test_customer_payment_integration(self):
        """测试客户和付款集成"""
        # 创建客户
        self.customer.customer_name = "付款测试客户"
        self.customer.insert()

        # 创建销售订单
        self.sales_order.customer = self.customer.name
        self.sales_order.total = 1000
        self.sales_order.insert()
        self.sales_order.submit()

        # 创建付款
        payment = PaymentEntry()
        payment.party_type = "Customer"
        payment.party = self.customer.name
        payment.paid_amount = 1000
        payment.insert()
        payment.submit()

        # 验证付款集成
        self.assertEqual(payment.party, self.customer.name)
        self.assertEqual(payment.paid_amount, 1000)

6.2 数据库集成测试

class TestDatabaseIntegration(unittest.TestCase):
    def setUp(self):
        """设置数据库测试环境"""
        self.db = frappe.get_db()

    def test_customer_database_integration(self):
        """测试客户数据库集成"""
        # 创建客户
        customer = Customer()
        customer.customer_name = "数据库测试客户"
        customer.customer_type = "Individual"
        customer.insert()

        # 验证数据库记录
        customer_data = self.db.get_value("Customer", customer.name, "*")
        self.assertIsNotNone(customer_data)
        self.assertEqual(customer_data.customer_name, "数据库测试客户")

    def test_transaction_rollback(self):
        """测试事务回滚"""
        # 开始事务
        self.db.begin()

        try:
            # 创建客户
            customer = Customer()
            customer.customer_name = "回滚测试客户"
            customer.insert()

            # 故意引发异常
            raise Exception("测试异常")

        except Exception:
            # 回滚事务
            self.db.rollback()

        # 验证回滚
        customer_exists = self.db.exists("Customer", {"customer_name": "回滚测试客户"})
        self.assertFalse(customer_exists)

7. 测试工具和框架

7.1 测试框架配置

# conftest.py
import pytest
import frappe
from frappe.tests.utils import FrappeTestCase

@pytest.fixture(scope="session")
def setup_test_environment():
    """设置测试环境"""
    frappe.init(site="test_site")
    frappe.connect()

    yield

    frappe.destroy()

@pytest.fixture
def test_customer():
    """测试客户fixture"""
    customer = Customer()
    customer.customer_name = "测试客户"
    customer.customer_type = "Individual"
    customer.insert()

    yield customer

    customer.delete()

@pytest.fixture
def test_sales_order(test_customer):
    """测试销售订单fixture"""
    sales_order = SalesOrder()
    sales_order.customer = test_customer.name
    sales_order.transaction_date = today()

    item = sales_order.append("items")
    item.item_code = "TEST_ITEM"
    item.qty = 1
    item.rate = 100

    sales_order.insert()

    yield sales_order

    sales_order.delete()

7.2 测试数据管理

class TestDataManager:
    def __init__(self):
        self.test_data = {}

    def create_test_customer(self, name="测试客户"):
        """创建测试客户"""
        customer = Customer()
        customer.customer_name = name
        customer.customer_type = "Individual"
        customer.insert()

        self.test_data[f"customer_{name}"] = customer
        return customer

    def create_test_item(self, code="TEST_ITEM"):
        """创建测试商品"""
        item = Item()
        item.item_code = code
        item.item_name = f"测试商品{code}"
        item.item_group = "Products"
        item.insert()

        self.test_data[f"item_{code}"] = item
        return item

    def cleanup_test_data(self):
        """清理测试数据"""
        for key, obj in self.test_data.items():
            try:
                obj.delete()
            except Exception as e:
                print(f"清理测试数据失败: {key} - {e}")

        self.test_data.clear()

8. 测试报告和分析

8.1 测试报告生成

import json
import xml.etree.ElementTree as ET
from datetime import datetime

class TestReportGenerator:
    def __init__(self):
        self.test_results = []

    def add_test_result(self, test_name, status, duration, error=None):
        """添加测试结果"""
        result = {
            "test_name": test_name,
            "status": status,
            "duration": duration,
            "error": error,
            "timestamp": datetime.now().isoformat()
        }
        self.test_results.append(result)

    def generate_json_report(self, filename="test_report.json"):
        """生成JSON报告"""
        report = {
            "summary": self.generate_summary(),
            "test_results": self.test_results
        }

        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)

    def generate_xml_report(self, filename="test_report.xml"):
        """生成XML报告"""
        root = ET.Element("testsuite")
        root.set("name", "ERPNext白盒测试")
        root.set("tests", str(len(self.test_results)))

        passed_count = len([r for r in self.test_results if r["status"] == "PASSED"])
        root.set("failures", str(len(self.test_results) - passed_count))

        for result in self.test_results:
            testcase = ET.SubElement(root, "testcase")
            testcase.set("name", result["test_name"])
            testcase.set("time", str(result["duration"]))

            if result["status"] != "PASSED":
                failure = ET.SubElement(testcase, "failure")
                failure.text = result.get("error", "")

        tree = ET.ElementTree(root)
        tree.write(filename, encoding="utf-8", xml_declaration=True)

    def generate_summary(self):
        """生成测试摘要"""
        total_tests = len(self.test_results)
        passed_tests = len([r for r in self.test_results if r["status"] == "PASSED"])
        failed_tests = total_tests - passed_tests

        return {
            "total_tests": total_tests,
            "passed_tests": passed_tests,
            "failed_tests": failed_tests,
            "pass_rate": (passed_tests / total_tests * 100) if total_tests > 0 else 0,
            "total_duration": sum(r["duration"] for r in self.test_results)
        }

9. 持续集成

9.1 CI/CD配置

# .github/workflows/whitebox-testing.yml
name: ERPNext白盒测试

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  whitebox-testing:
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v2

    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: 3.8

    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install pytest-cov pylint flake8 bandit

    - name: Run static analysis
      run: |
        pylint erpnext --output-format=json > pylint-report.json
        flake8 erpnext --output-file=flake8-report.txt
        bandit -r erpnext -f json -o bandit-report.json

    - name: Run unit tests with coverage
      run: |
        pytest tests/unit_tests/ --cov=erpnext --cov-report=xml --cov-report=html

    - name: Upload coverage reports
      uses: codecov/codecov-action@v1
      with:
        file: ./coverage.xml

    - name: Upload test reports
      uses: actions/upload-artifact@v2
      with:
        name: test-reports
        path: |
          coverage.xml
          htmlcov/
          pylint-report.json
          flake8-report.txt
          bandit-report.json

10. 总结

ERPNext系统白盒测试提供了全面的代码质量保证解决方案,包括代码覆盖率测试、静态代码分析、动态代码分析、单元测试、集成测试等多个方面。通过自动化测试工具和持续集成,确保代码质量和系统稳定性。