基于Pytest的库存并发接口测试实现方案

1. 测试环境搭建

1.1 安装必要依赖

pip install pytest requests pytest-xdist pytest-asyncio aiohttp

1.2 项目结构

inventory_concurrency_test/
├── conftest.py           # 公共fixture
├── test_inventory.py     # 主测试文件
├── utils/
│   ├── http_client.py    # HTTP客户端封装
│   └── data_generator.py # 测试数据生成
└── requirements.txt

2. 基础测试实现

2.1 单线程库存测试

# test_inventory.py
import pytest
import requests

BASE_URL = "http://api.mall.com/v1"

def test_single_deduct():
    """测试单线程库存扣减"""
    product_id = setup_test_product(stock=100)
    
    response = requests.post(
        f"{BASE_URL}/inventory/deduct",
        json={"product_id": product_id, "quantity": 1}
    )
    
    assert response.status_code == 200
    assert response.json()["remaining_stock"] == 99

2.2 并发测试实现

使用pytest-xdist并行执行

# conftest.py
import pytest
from typing import List, Dict
import requests

@pytest.fixture(scope="module")
def test_product(request) -> Dict:
    """创建测试商品fixture"""
    product = {
        "name": "压力测试商品",
        "price": 100,
        "stock": request.param.get("stock", 100)
    }
    resp = requests.post(f"{BASE_URL}/products", json=product)
    yield resp.json()
    # 测试后清理
    requests.delete(f"{BASE_URL}/products/{resp.json()['id']}")

# 命令行执行:pytest -n 4  # 使用4个worker并行

使用aiohttp实现异步并发

# test_inventory.py
import aiohttp
import asyncio
import pytest

async def deduct_inventory(session, product_id, quantity):
    async with session.post(
        f"{BASE_URL}/inventory/deduct",
        json={"product_id": product_id, "quantity": quantity}
    ) as response:
        return await response.json()

@pytest.mark.asyncio
async def test_async_concurrent_deduct(test_product):
    """异步并发测试"""
    product_id = test_product["id"]
    concurrency = 50
    
    async with aiohttp.ClientSession() as session:
        tasks = [
            deduct_inventory(session, product_id, 1)
            for _ in range(concurrency)
        ]
        results = await asyncio.gather(*tasks)
        
        # 验证所有请求都成功
        assert all(r.get("success") for r in results)
        
        # 验证最终库存
        final_stock = await get_inventory(session, product_id)
        expected = test_product["stock"] - concurrency
        assert final_stock == expected

async def get_inventory(session, product_id):
    async with session.get(f"{BASE_URL}/inventory/{product_id}") as resp:
        return (await resp.json())["stock"]

3. 高级并发场景测试

3.1 库存超卖测试

# test_inventory.py
import threading
import time

def test_over_sell_protection(test_product):
    """测试库存超卖防护"""
    product_id = test_product["id"]
    initial_stock = test_product["stock"]
    threads = []
    results = []
    
    def worker():
        try:
            resp = requests.post(
                f"{BASE_URL}/inventory/deduct",
                json={"product_id": product_id, "quantity": 1},
                timeout=5
            )
            results.append(resp.status_code == 200)
        except Exception as e:
            results.append(False)
    
    # 创建超过库存量的线程
    for _ in range(initial_stock + 10):
        t = threading.Thread(target=worker)
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    # 验证成功数量不超过库存
    assert sum(results) <= initial_stock
    
    # 验证库存未出现负数
    final_stock = requests.get(f"{BASE_URL}/inventory/{product_id}").json()["stock"]
    assert final_stock >= 0

3.2 分布式锁测试

# test_inventory.py
from multiprocessing import Process
import random

def test_distributed_lock(test_product):
    """模拟分布式环境下的锁竞争"""
    product_id = test_product["id"]
    processes = []
    results = []
    
    def process_task():
        service_port = random.randint(8000, 8010)
        # 模拟不同服务实例
        resp = requests.post(
            f"http://localhost:{service_port}/inventory/deduct",
            json={"product_id": product_id, "quantity": 1},
            headers={"X-Service-ID": f"service-{service_port}"}
        )
        results.append(resp.status_code)
    
    for _ in range(10):
        p = Process(target=process_task)
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    # 验证只有一个服务实例扣减成功
    assert results.count(200) == 1
    assert results.count(409) == 9  # 409 Conflict

4. 参数化并发测试

4.1 不同并发级别测试

# test_inventory.py
import math

@pytest.mark.parametrize("concurrency", [10, 50, 100, 200])
def test_concurrency_levels(test_product, concurrency):
    """不同并发级别测试"""
    product_id = test_product["id"]
    initial_stock = test_product["stock"]
    test_product["stock"] = concurrency * 2  # 确保足够库存
    
    with ThreadPoolExecutor(max_workers=concurrency) as executor:
        futures = [
            executor.submit(
                requests.post,
                f"{BASE_URL}/inventory/deduct",
                json={"product_id": product_id, "quantity": 1}
            )
            for _ in range(concurrency)
        ]
        
        results = [f.result().status_code for f in futures]
    
    # 验证全部成功
    assert all(code == 200 for code in results)
    
    # 验证库存正确扣减
    final_stock = requests.get(f"{BASE_URL}/inventory/{product_id}").json()["stock"]
    assert final_stock == initial_stock - concurrency

4.2 混合读写测试

# test_inventory.py
from random import choice

def test_read_write_mix(test_product):
    """混合读写操作测试"""
    product_id = test_product["id"]
    operations = []
    
    def random_operation():
        if choice([True, False]):  # 50%概率读/写
            resp = requests.get(f"{BASE_URL}/inventory/{product_id}")
            operations.append(("read", resp.status_code))
        else:
            resp = requests.post(
                f"{BASE_URL}/inventory/deduct",
                json={"product_id": product_id, "quantity": 1}
            )
            operations.append(("write", resp.status_code))
    
    with ThreadPoolExecutor(max_workers=20) as executor:
        futures = [executor.submit(random_operation) for _ in range(100)]
        for f in futures:
            f.result()
    
    # 验证写操作的一致性
    write_results = [op[1] for op in operations if op[0] == "write"]
    final_stock = requests.get(f"{BASE_URL}/inventory/{product_id}").json()["stock"]
    expected_stock = test_product["stock"] - write_results.count(200)
    assert final_stock == expected_stock

5. 测试结果分析与断言

5.1 性能指标断言

# test_inventory.py
import time

def test_performance_metrics(test_product):
    """测试性能指标"""
    product_id = test_product["id"]
    concurrency = 50
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=concurrency) as executor:
        futures = [
            executor.submit(
                requests.post,
                f"{BASE_URL}/inventory/deduct",
                json={"product_id": product_id, "quantity": 1}
            )
            for _ in range(concurrency)
        ]
        results = [f.result() for f in futures]
    
    duration = time.time() - start_time
    tps = concurrency / duration
    
    # 断言性能指标
    assert duration < 2.0  # 总耗时不超过2秒
    assert tps > 30  # TPS大于30
    assert all(r.status_code == 200 for r in results)

5.2 数据库一致性验证

# test_inventory.py
import pymysql

def test_database_consistency(test_product):
    """测试数据库层面的数据一致性"""
    product_id = test_product["id"]
    initial_stock = test_product["stock"]
    concurrency = initial_stock  # 使用全部库存进行测试
    
    # 并发扣减
    with ThreadPoolExecutor(max_workers=20) as executor:
        futures = [
            executor.submit(
                requests.post,
                f"{BASE_URL}/inventory/deduct",
                json={"product_id": product_id, "quantity": 1}
            )
            for _ in range(concurrency)
        ]
        [f.result() for f in futures]
    
    # 直接查询数据库验证
    conn = pymysql.connect(
        host="localhost",
        user="test",
        password="test",
        database="mall"
    )
    
    try:
        with conn.cursor() as cursor:
            # 检查商品库存
            cursor.execute(
                "SELECT stock FROM products WHERE id = %s",
                (product_id,)
            )
            db_stock = cursor.fetchone()[0]
            
            # 检查库存流水记录总数
            cursor.execute(
                "SELECT COUNT(*) FROM inventory_logs WHERE product_id = %s",
                (product_id,)
            )
            log_count = cursor.fetchone()[0]
            
            # 检查订单数量
            cursor.execute(
                "SELECT COUNT(*) FROM orders WHERE product_id = %s",
                (product_id,)
            )
            order_count = cursor.fetchone()[0]
            
            # 验证一致性
            assert db_stock == 0
            assert log_count == concurrency
            assert order_count == concurrency
    finally:
        conn.close()

6. 测试执行与报告

6.1 执行命令

# 普通执行
pytest test_inventory.py -v

# 并发执行(使用4个worker)
pytest test_inventory.py -n 4

# 生成HTML报告
pytest test_inventory.py --html=report.html

# 带性能监控的执行
pytest test_inventory.py --monitor

6.2 集成Allure报告

# conftest.py
import allure
import pytest

@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
    """为每个测试用例添加Allure报告"""
    outcome = yield
    rep = outcome.get_result()
    
    if rep.when == "call" and rep.failed:
        with allure.step("失败截图"):
            # 可以添加失败时的截图或其他信息
            allure.attach("失败信息", str(rep.longrepr))

7. 最佳实践建议

  1. 测试数据隔离:每个测试用例使用独立商品ID使用fixture自动清理测试数据
  2. 并发控制:逐步增加并发量进行测试监控系统资源使用情况
  3. 断言策略:验证业务规则(不超卖)验证数据一致性验证性能指标
  4. 环境管理:区分测试环境和生产环境使用Docker容器隔离测试环境
  5. 持续集成:将并发测试加入CI流水线设置性能基准阈值

通过这套基于Pytest的库存并发测试方案,可以全面验证商城系统在高并发场景下的库存管理功能,确保系统稳定性和数据一致性。

进阶高级测试工程师 文章被收录于专栏

《高级软件测试工程师》专栏旨在为测试领域的从业者提供深入的知识和实践指导,帮助大家从基础的测试技能迈向高级测试专家的行列。 在本专栏中,主要涵盖的内容: 1. 如何设计和实施高效的测试策略; 2. 掌握自动化测试、性能测试和安全测试的核心技术; 3. 深入理解测试驱动开发(TDD)和行为驱动开发(BDD)的实践方法; 4. 测试团队的管理和协作能力。 ——For.Heart

全部评论

相关推荐

04-10 11:56
如皋中学 Java
高斯林的信徒:双c9能简历挂的?
点赞 评论 收藏
分享
点赞 评论 收藏
分享
评论
点赞
1
分享

创作者周榜

更多
牛客网
牛客企业服务