基于Pytest的库存并发接口测试实现方案
1. 测试环境搭建
1.1 安装必要依赖
pip install pytest requests pytest-xdist pytest-asyncio aiohttp
1.2 项目结构
inventory_concurrency_test/ ├── conftest.py # 公共fixture ├── test_inventory.py # 主测试文件 ├── utils/ │ ├── http_client.py # HTTP客户端封装 │ └── data_generator.py # 测试数据生成 └── requirements.txt
2. 基础测试实现
2.1 单线程库存测试
# test_inventory.py
import pytest
import requests
BASE_URL = "http://api.mall.com/v1"
def test_single_deduct():
"""测试单线程库存扣减"""
product_id = setup_test_product(stock=100)
response = requests.post(
f"{BASE_URL}/inventory/deduct",
json={"product_id": product_id, "quantity": 1}
)
assert response.status_code == 200
assert response.json()["remaining_stock"] == 99
2.2 并发测试实现
使用pytest-xdist并行执行
# conftest.py
import pytest
from typing import List, Dict
import requests
@pytest.fixture(scope="module")
def test_product(request) -> Dict:
"""创建测试商品fixture"""
product = {
"name": "压力测试商品",
"price": 100,
"stock": request.param.get("stock", 100)
}
resp = requests.post(f"{BASE_URL}/products", json=product)
yield resp.json()
# 测试后清理
requests.delete(f"{BASE_URL}/products/{resp.json()['id']}")
# 命令行执行:pytest -n 4 # 使用4个worker并行
使用aiohttp实现异步并发
# test_inventory.py
import aiohttp
import asyncio
import pytest
async def deduct_inventory(session, product_id, quantity):
async with session.post(
f"{BASE_URL}/inventory/deduct",
json={"product_id": product_id, "quantity": quantity}
) as response:
return await response.json()
@pytest.mark.asyncio
async def test_async_concurrent_deduct(test_product):
"""异步并发测试"""
product_id = test_product["id"]
concurrency = 50
async with aiohttp.ClientSession() as session:
tasks = [
deduct_inventory(session, product_id, 1)
for _ in range(concurrency)
]
results = await asyncio.gather(*tasks)
# 验证所有请求都成功
assert all(r.get("success") for r in results)
# 验证最终库存
final_stock = await get_inventory(session, product_id)
expected = test_product["stock"] - concurrency
assert final_stock == expected
async def get_inventory(session, product_id):
async with session.get(f"{BASE_URL}/inventory/{product_id}") as resp:
return (await resp.json())["stock"]
3. 高级并发场景测试
3.1 库存超卖测试
# test_inventory.py
import threading
import time
def test_over_sell_protection(test_product):
"""测试库存超卖防护"""
product_id = test_product["id"]
initial_stock = test_product["stock"]
threads = []
results = []
def worker():
try:
resp = requests.post(
f"{BASE_URL}/inventory/deduct",
json={"product_id": product_id, "quantity": 1},
timeout=5
)
results.append(resp.status_code == 200)
except Exception as e:
results.append(False)
# 创建超过库存量的线程
for _ in range(initial_stock + 10):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
for t in threads:
t.join()
# 验证成功数量不超过库存
assert sum(results) <= initial_stock
# 验证库存未出现负数
final_stock = requests.get(f"{BASE_URL}/inventory/{product_id}").json()["stock"]
assert final_stock >= 0
3.2 分布式锁测试
# test_inventory.py
from multiprocessing import Process
import random
def test_distributed_lock(test_product):
"""模拟分布式环境下的锁竞争"""
product_id = test_product["id"]
processes = []
results = []
def process_task():
service_port = random.randint(8000, 8010)
# 模拟不同服务实例
resp = requests.post(
f"http://localhost:{service_port}/inventory/deduct",
json={"product_id": product_id, "quantity": 1},
headers={"X-Service-ID": f"service-{service_port}"}
)
results.append(resp.status_code)
for _ in range(10):
p = Process(target=process_task)
processes.append(p)
p.start()
for p in processes:
p.join()
# 验证只有一个服务实例扣减成功
assert results.count(200) == 1
assert results.count(409) == 9 # 409 Conflict
4. 参数化并发测试
4.1 不同并发级别测试
# test_inventory.py
import math
@pytest.mark.parametrize("concurrency", [10, 50, 100, 200])
def test_concurrency_levels(test_product, concurrency):
"""不同并发级别测试"""
product_id = test_product["id"]
initial_stock = test_product["stock"]
test_product["stock"] = concurrency * 2 # 确保足够库存
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = [
executor.submit(
requests.post,
f"{BASE_URL}/inventory/deduct",
json={"product_id": product_id, "quantity": 1}
)
for _ in range(concurrency)
]
results = [f.result().status_code for f in futures]
# 验证全部成功
assert all(code == 200 for code in results)
# 验证库存正确扣减
final_stock = requests.get(f"{BASE_URL}/inventory/{product_id}").json()["stock"]
assert final_stock == initial_stock - concurrency
4.2 混合读写测试
# test_inventory.py
from random import choice
def test_read_write_mix(test_product):
"""混合读写操作测试"""
product_id = test_product["id"]
operations = []
def random_operation():
if choice([True, False]): # 50%概率读/写
resp = requests.get(f"{BASE_URL}/inventory/{product_id}")
operations.append(("read", resp.status_code))
else:
resp = requests.post(
f"{BASE_URL}/inventory/deduct",
json={"product_id": product_id, "quantity": 1}
)
operations.append(("write", resp.status_code))
with ThreadPoolExecutor(max_workers=20) as executor:
futures = [executor.submit(random_operation) for _ in range(100)]
for f in futures:
f.result()
# 验证写操作的一致性
write_results = [op[1] for op in operations if op[0] == "write"]
final_stock = requests.get(f"{BASE_URL}/inventory/{product_id}").json()["stock"]
expected_stock = test_product["stock"] - write_results.count(200)
assert final_stock == expected_stock
5. 测试结果分析与断言
5.1 性能指标断言
# test_inventory.py
import time
def test_performance_metrics(test_product):
"""测试性能指标"""
product_id = test_product["id"]
concurrency = 50
start_time = time.time()
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = [
executor.submit(
requests.post,
f"{BASE_URL}/inventory/deduct",
json={"product_id": product_id, "quantity": 1}
)
for _ in range(concurrency)
]
results = [f.result() for f in futures]
duration = time.time() - start_time
tps = concurrency / duration
# 断言性能指标
assert duration < 2.0 # 总耗时不超过2秒
assert tps > 30 # TPS大于30
assert all(r.status_code == 200 for r in results)
5.2 数据库一致性验证
# test_inventory.py
import pymysql
def test_database_consistency(test_product):
"""测试数据库层面的数据一致性"""
product_id = test_product["id"]
initial_stock = test_product["stock"]
concurrency = initial_stock # 使用全部库存进行测试
# 并发扣减
with ThreadPoolExecutor(max_workers=20) as executor:
futures = [
executor.submit(
requests.post,
f"{BASE_URL}/inventory/deduct",
json={"product_id": product_id, "quantity": 1}
)
for _ in range(concurrency)
]
[f.result() for f in futures]
# 直接查询数据库验证
conn = pymysql.connect(
host="localhost",
user="test",
password="test",
database="mall"
)
try:
with conn.cursor() as cursor:
# 检查商品库存
cursor.execute(
"SELECT stock FROM products WHERE id = %s",
(product_id,)
)
db_stock = cursor.fetchone()[0]
# 检查库存流水记录总数
cursor.execute(
"SELECT COUNT(*) FROM inventory_logs WHERE product_id = %s",
(product_id,)
)
log_count = cursor.fetchone()[0]
# 检查订单数量
cursor.execute(
"SELECT COUNT(*) FROM orders WHERE product_id = %s",
(product_id,)
)
order_count = cursor.fetchone()[0]
# 验证一致性
assert db_stock == 0
assert log_count == concurrency
assert order_count == concurrency
finally:
conn.close()
6. 测试执行与报告
6.1 执行命令
# 普通执行 pytest test_inventory.py -v # 并发执行(使用4个worker) pytest test_inventory.py -n 4 # 生成HTML报告 pytest test_inventory.py --html=report.html # 带性能监控的执行 pytest test_inventory.py --monitor
6.2 集成Allure报告
# conftest.py
import allure
import pytest
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
"""为每个测试用例添加Allure报告"""
outcome = yield
rep = outcome.get_result()
if rep.when == "call" and rep.failed:
with allure.step("失败截图"):
# 可以添加失败时的截图或其他信息
allure.attach("失败信息", str(rep.longrepr))
7. 最佳实践建议
- 测试数据隔离:每个测试用例使用独立商品ID使用fixture自动清理测试数据
- 并发控制:逐步增加并发量进行测试监控系统资源使用情况
- 断言策略:验证业务规则(不超卖)验证数据一致性验证性能指标
- 环境管理:区分测试环境和生产环境使用Docker容器隔离测试环境
- 持续集成:将并发测试加入CI流水线设置性能基准阈值
通过这套基于Pytest的库存并发测试方案,可以全面验证商城系统在高并发场景下的库存管理功能,确保系统稳定性和数据一致性。
进阶高级测试工程师 文章被收录于专栏
《高级软件测试工程师》专栏旨在为测试领域的从业者提供深入的知识和实践指导,帮助大家从基础的测试技能迈向高级测试专家的行列。 在本专栏中,主要涵盖的内容: 1. 如何设计和实施高效的测试策略; 2. 掌握自动化测试、性能测试和安全测试的核心技术; 3. 深入理解测试驱动开发(TDD)和行为驱动开发(BDD)的实践方法; 4. 测试团队的管理和协作能力。 ——For.Heart
查看21道真题和解析