基于Pytest的库存并发接口测试实现方案
1. 测试环境搭建
1.1 安装必要依赖
pip install pytest requests pytest-xdist pytest-asyncio aiohttp
1.2 项目结构
inventory_concurrency_test/ ├── conftest.py # 公共fixture ├── test_inventory.py # 主测试文件 ├── utils/ │ ├── http_client.py # HTTP客户端封装 │ └── data_generator.py # 测试数据生成 └── requirements.txt
2. 基础测试实现
2.1 单线程库存测试
# test_inventory.py import pytest import requests BASE_URL = "http://api.mall.com/v1" def test_single_deduct(): """测试单线程库存扣减""" product_id = setup_test_product(stock=100) response = requests.post( f"{BASE_URL}/inventory/deduct", json={"product_id": product_id, "quantity": 1} ) assert response.status_code == 200 assert response.json()["remaining_stock"] == 99
2.2 并发测试实现
使用pytest-xdist并行执行
# conftest.py import pytest from typing import List, Dict import requests @pytest.fixture(scope="module") def test_product(request) -> Dict: """创建测试商品fixture""" product = { "name": "压力测试商品", "price": 100, "stock": request.param.get("stock", 100) } resp = requests.post(f"{BASE_URL}/products", json=product) yield resp.json() # 测试后清理 requests.delete(f"{BASE_URL}/products/{resp.json()['id']}") # 命令行执行:pytest -n 4 # 使用4个worker并行
使用aiohttp实现异步并发
# test_inventory.py import aiohttp import asyncio import pytest async def deduct_inventory(session, product_id, quantity): async with session.post( f"{BASE_URL}/inventory/deduct", json={"product_id": product_id, "quantity": quantity} ) as response: return await response.json() @pytest.mark.asyncio async def test_async_concurrent_deduct(test_product): """异步并发测试""" product_id = test_product["id"] concurrency = 50 async with aiohttp.ClientSession() as session: tasks = [ deduct_inventory(session, product_id, 1) for _ in range(concurrency) ] results = await asyncio.gather(*tasks) # 验证所有请求都成功 assert all(r.get("success") for r in results) # 验证最终库存 final_stock = await get_inventory(session, product_id) expected = test_product["stock"] - concurrency assert final_stock == expected async def get_inventory(session, product_id): async with session.get(f"{BASE_URL}/inventory/{product_id}") as resp: return (await resp.json())["stock"]
3. 高级并发场景测试
3.1 库存超卖测试
# test_inventory.py import threading import time def test_over_sell_protection(test_product): """测试库存超卖防护""" product_id = test_product["id"] initial_stock = test_product["stock"] threads = [] results = [] def worker(): try: resp = requests.post( f"{BASE_URL}/inventory/deduct", json={"product_id": product_id, "quantity": 1}, timeout=5 ) results.append(resp.status_code == 200) except Exception as e: results.append(False) # 创建超过库存量的线程 for _ in range(initial_stock + 10): t = threading.Thread(target=worker) threads.append(t) t.start() for t in threads: t.join() # 验证成功数量不超过库存 assert sum(results) <= initial_stock # 验证库存未出现负数 final_stock = requests.get(f"{BASE_URL}/inventory/{product_id}").json()["stock"] assert final_stock >= 0
3.2 分布式锁测试
# test_inventory.py from multiprocessing import Process import random def test_distributed_lock(test_product): """模拟分布式环境下的锁竞争""" product_id = test_product["id"] processes = [] results = [] def process_task(): service_port = random.randint(8000, 8010) # 模拟不同服务实例 resp = requests.post( f"http://localhost:{service_port}/inventory/deduct", json={"product_id": product_id, "quantity": 1}, headers={"X-Service-ID": f"service-{service_port}"} ) results.append(resp.status_code) for _ in range(10): p = Process(target=process_task) processes.append(p) p.start() for p in processes: p.join() # 验证只有一个服务实例扣减成功 assert results.count(200) == 1 assert results.count(409) == 9 # 409 Conflict
4. 参数化并发测试
4.1 不同并发级别测试
# test_inventory.py import math @pytest.mark.parametrize("concurrency", [10, 50, 100, 200]) def test_concurrency_levels(test_product, concurrency): """不同并发级别测试""" product_id = test_product["id"] initial_stock = test_product["stock"] test_product["stock"] = concurrency * 2 # 确保足够库存 with ThreadPoolExecutor(max_workers=concurrency) as executor: futures = [ executor.submit( requests.post, f"{BASE_URL}/inventory/deduct", json={"product_id": product_id, "quantity": 1} ) for _ in range(concurrency) ] results = [f.result().status_code for f in futures] # 验证全部成功 assert all(code == 200 for code in results) # 验证库存正确扣减 final_stock = requests.get(f"{BASE_URL}/inventory/{product_id}").json()["stock"] assert final_stock == initial_stock - concurrency
4.2 混合读写测试
# test_inventory.py from random import choice def test_read_write_mix(test_product): """混合读写操作测试""" product_id = test_product["id"] operations = [] def random_operation(): if choice([True, False]): # 50%概率读/写 resp = requests.get(f"{BASE_URL}/inventory/{product_id}") operations.append(("read", resp.status_code)) else: resp = requests.post( f"{BASE_URL}/inventory/deduct", json={"product_id": product_id, "quantity": 1} ) operations.append(("write", resp.status_code)) with ThreadPoolExecutor(max_workers=20) as executor: futures = [executor.submit(random_operation) for _ in range(100)] for f in futures: f.result() # 验证写操作的一致性 write_results = [op[1] for op in operations if op[0] == "write"] final_stock = requests.get(f"{BASE_URL}/inventory/{product_id}").json()["stock"] expected_stock = test_product["stock"] - write_results.count(200) assert final_stock == expected_stock
5. 测试结果分析与断言
5.1 性能指标断言
# test_inventory.py import time def test_performance_metrics(test_product): """测试性能指标""" product_id = test_product["id"] concurrency = 50 start_time = time.time() with ThreadPoolExecutor(max_workers=concurrency) as executor: futures = [ executor.submit( requests.post, f"{BASE_URL}/inventory/deduct", json={"product_id": product_id, "quantity": 1} ) for _ in range(concurrency) ] results = [f.result() for f in futures] duration = time.time() - start_time tps = concurrency / duration # 断言性能指标 assert duration < 2.0 # 总耗时不超过2秒 assert tps > 30 # TPS大于30 assert all(r.status_code == 200 for r in results)
5.2 数据库一致性验证
# test_inventory.py import pymysql def test_database_consistency(test_product): """测试数据库层面的数据一致性""" product_id = test_product["id"] initial_stock = test_product["stock"] concurrency = initial_stock # 使用全部库存进行测试 # 并发扣减 with ThreadPoolExecutor(max_workers=20) as executor: futures = [ executor.submit( requests.post, f"{BASE_URL}/inventory/deduct", json={"product_id": product_id, "quantity": 1} ) for _ in range(concurrency) ] [f.result() for f in futures] # 直接查询数据库验证 conn = pymysql.connect( host="localhost", user="test", password="test", database="mall" ) try: with conn.cursor() as cursor: # 检查商品库存 cursor.execute( "SELECT stock FROM products WHERE id = %s", (product_id,) ) db_stock = cursor.fetchone()[0] # 检查库存流水记录总数 cursor.execute( "SELECT COUNT(*) FROM inventory_logs WHERE product_id = %s", (product_id,) ) log_count = cursor.fetchone()[0] # 检查订单数量 cursor.execute( "SELECT COUNT(*) FROM orders WHERE product_id = %s", (product_id,) ) order_count = cursor.fetchone()[0] # 验证一致性 assert db_stock == 0 assert log_count == concurrency assert order_count == concurrency finally: conn.close()
6. 测试执行与报告
6.1 执行命令
# 普通执行 pytest test_inventory.py -v # 并发执行(使用4个worker) pytest test_inventory.py -n 4 # 生成HTML报告 pytest test_inventory.py --html=report.html # 带性能监控的执行 pytest test_inventory.py --monitor
6.2 集成Allure报告
# conftest.py import allure import pytest @pytest.hookimpl(tryfirst=True, hookwrapper=True) def pytest_runtest_makereport(item, call): """为每个测试用例添加Allure报告""" outcome = yield rep = outcome.get_result() if rep.when == "call" and rep.failed: with allure.step("失败截图"): # 可以添加失败时的截图或其他信息 allure.attach("失败信息", str(rep.longrepr))
7. 最佳实践建议
- 测试数据隔离:每个测试用例使用独立商品ID使用fixture自动清理测试数据
- 并发控制:逐步增加并发量进行测试监控系统资源使用情况
- 断言策略:验证业务规则(不超卖)验证数据一致性验证性能指标
- 环境管理:区分测试环境和生产环境使用Docker容器隔离测试环境
- 持续集成:将并发测试加入CI流水线设置性能基准阈值
通过这套基于Pytest的库存并发测试方案,可以全面验证商城系统在高并发场景下的库存管理功能,确保系统稳定性和数据一致性。
进阶高级测试工程师 文章被收录于专栏
《高级软件测试工程师》专栏旨在为测试领域的从业者提供深入的知识和实践指导,帮助大家从基础的测试技能迈向高级测试专家的行列。 在本专栏中,主要涵盖的内容: 1. 如何设计和实施高效的测试策略; 2. 掌握自动化测试、性能测试和安全测试的核心技术; 3. 深入理解测试驱动开发(TDD)和行为驱动开发(BDD)的实践方法; 4. 测试团队的管理和协作能力。 ——For.Heart