基于Pytest的库存并发接口测试实现方案

1. 测试环境搭建

1.1 安装必要依赖

pip install pytest requests pytest-xdist pytest-asyncio aiohttp

1.2 项目结构

inventory_concurrency_test/
├── conftest.py           # 公共fixture
├── test_inventory.py     # 主测试文件
├── utils/
│   ├── http_client.py    # HTTP客户端封装
│   └── data_generator.py # 测试数据生成
└── requirements.txt

2. 基础测试实现

2.1 单线程库存测试

# test_inventory.py
import pytest
import requests

BASE_URL = "http://api.mall.com/v1"

def test_single_deduct():
    """测试单线程库存扣减"""
    product_id = setup_test_product(stock=100)
    
    response = requests.post(
        f"{BASE_URL}/inventory/deduct",
        json={"product_id": product_id, "quantity": 1}
    )
    
    assert response.status_code == 200
    assert response.json()["remaining_stock"] == 99

2.2 并发测试实现

使用pytest-xdist并行执行

# conftest.py
import pytest
from typing import List, Dict
import requests

@pytest.fixture(scope="module")
def test_product(request) -> Dict:
    """创建测试商品fixture"""
    product = {
        "name": "压力测试商品",
        "price": 100,
        "stock": request.param.get("stock", 100)
    }
    resp = requests.post(f"{BASE_URL}/products", json=product)
    yield resp.json()
    # 测试后清理
    requests.delete(f"{BASE_URL}/products/{resp.json()['id']}")

# 命令行执行:pytest -n 4  # 使用4个worker并行

使用aiohttp实现异步并发

# test_inventory.py
import aiohttp
import asyncio
import pytest

async def deduct_inventory(session, product_id, quantity):
    async with session.post(
        f"{BASE_URL}/inventory/deduct",
        json={"product_id": product_id, "quantity": quantity}
    ) as response:
        return await response.json()

@pytest.mark.asyncio
async def test_async_concurrent_deduct(test_product):
    """异步并发测试"""
    product_id = test_product["id"]
    concurrency = 50
    
    async with aiohttp.ClientSession() as session:
        tasks = [
            deduct_inventory(session, product_id, 1)
            for _ in range(concurrency)
        ]
        results = await asyncio.gather(*tasks)
        
        # 验证所有请求都成功
        assert all(r.get("success") for r in results)
        
        # 验证最终库存
        final_stock = await get_inventory(session, product_id)
        expected = test_product["stock"] - concurrency
        assert final_stock == expected

async def get_inventory(session, product_id):
    async with session.get(f"{BASE_URL}/inventory/{product_id}") as resp:
        return (await resp.json())["stock"]

3. 高级并发场景测试

3.1 库存超卖测试

# test_inventory.py
import threading
import time

def test_over_sell_protection(test_product):
    """测试库存超卖防护"""
    product_id = test_product["id"]
    initial_stock = test_product["stock"]
    threads = []
    results = []
    
    def worker():
        try:
            resp = requests.post(
                f"{BASE_URL}/inventory/deduct",
                json={"product_id": product_id, "quantity": 1},
                timeout=5
            )
            results.append(resp.status_code == 200)
        except Exception as e:
            results.append(False)
    
    # 创建超过库存量的线程
    for _ in range(initial_stock + 10):
        t = threading.Thread(target=worker)
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    # 验证成功数量不超过库存
    assert sum(results) <= initial_stock
    
    # 验证库存未出现负数
    final_stock = requests.get(f"{BASE_URL}/inventory/{product_id}").json()["stock"]
    assert final_stock >= 0

3.2 分布式锁测试

# test_inventory.py
from multiprocessing import Process
import random

def test_distributed_lock(test_product):
    """模拟分布式环境下的锁竞争"""
    product_id = test_product["id"]
    processes = []
    results = []
    
    def process_task():
        service_port = random.randint(8000, 8010)
        # 模拟不同服务实例
        resp = requests.post(
            f"http://localhost:{service_port}/inventory/deduct",
            json={"product_id": product_id, "quantity": 1},
            headers={"X-Service-ID": f"service-{service_port}"}
        )
        results.append(resp.status_code)
    
    for _ in range(10):
        p = Process(target=process_task)
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    # 验证只有一个服务实例扣减成功
    assert results.count(200) == 1
    assert results.count(409) == 9  # 409 Conflict

4. 参数化并发测试

4.1 不同并发级别测试

# test_inventory.py
import math

@pytest.mark.parametrize("concurrency", [10, 50, 100, 200])
def test_concurrency_levels(test_product, concurrency):
    """不同并发级别测试"""
    product_id = test_product["id"]
    initial_stock = test_product["stock"]
    test_product["stock"] = concurrency * 2  # 确保足够库存
    
    with ThreadPoolExecutor(max_workers=concurrency) as executor:
        futures = [
            executor.submit(
                requests.post,
                f"{BASE_URL}/inventory/deduct",
                json={"product_id": product_id, "quantity": 1}
            )
            for _ in range(concurrency)
        ]
        
        results = [f.result().status_code for f in futures]
    
    # 验证全部成功
    assert all(code == 200 for code in results)
    
    # 验证库存正确扣减
    final_stock = requests.get(f"{BASE_URL}/inventory/{product_id}").json()["stock"]
    assert final_stock == initial_stock - concurrency

4.2 混合读写测试

# test_inventory.py
from random import choice

def test_read_write_mix(test_product):
    """混合读写操作测试"""
    product_id = test_product["id"]
    operations = []
    
    def random_operation():
        if choice([True, False]):  # 50%概率读/写
            resp = requests.get(f"{BASE_URL}/inventory/{product_id}")
            operations.append(("read", resp.status_code))
        else:
            resp = requests.post(
                f"{BASE_URL}/inventory/deduct",
                json={"product_id": product_id, "quantity": 1}
            )
            operations.append(("write", resp.status_code))
    
    with ThreadPoolExecutor(max_workers=20) as executor:
        futures = [executor.submit(random_operation) for _ in range(100)]
        for f in futures:
            f.result()
    
    # 验证写操作的一致性
    write_results = [op[1] for op in operations if op[0] == "write"]
    final_stock = requests.get(f"{BASE_URL}/inventory/{product_id}").json()["stock"]
    expected_stock = test_product["stock"] - write_results.count(200)
    assert final_stock == expected_stock

5. 测试结果分析与断言

5.1 性能指标断言

# test_inventory.py
import time

def test_performance_metrics(test_product):
    """测试性能指标"""
    product_id = test_product["id"]
    concurrency = 50
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=concurrency) as executor:
        futures = [
            executor.submit(
                requests.post,
                f"{BASE_URL}/inventory/deduct",
                json={"product_id": product_id, "quantity": 1}
            )
            for _ in range(concurrency)
        ]
        results = [f.result() for f in futures]
    
    duration = time.time() - start_time
    tps = concurrency / duration
    
    # 断言性能指标
    assert duration < 2.0  # 总耗时不超过2秒
    assert tps > 30  # TPS大于30
    assert all(r.status_code == 200 for r in results)

5.2 数据库一致性验证

# test_inventory.py
import pymysql

def test_database_consistency(test_product):
    """测试数据库层面的数据一致性"""
    product_id = test_product["id"]
    initial_stock = test_product["stock"]
    concurrency = initial_stock  # 使用全部库存进行测试
    
    # 并发扣减
    with ThreadPoolExecutor(max_workers=20) as executor:
        futures = [
            executor.submit(
                requests.post,
                f"{BASE_URL}/inventory/deduct",
                json={"product_id": product_id, "quantity": 1}
            )
            for _ in range(concurrency)
        ]
        [f.result() for f in futures]
    
    # 直接查询数据库验证
    conn = pymysql.connect(
        host="localhost",
        user="test",
        password="test",
        database="mall"
    )
    
    try:
        with conn.cursor() as cursor:
            # 检查商品库存
            cursor.execute(
                "SELECT stock FROM products WHERE id = %s",
                (product_id,)
            )
            db_stock = cursor.fetchone()[0]
            
            # 检查库存流水记录总数
            cursor.execute(
                "SELECT COUNT(*) FROM inventory_logs WHERE product_id = %s",
                (product_id,)
            )
            log_count = cursor.fetchone()[0]
            
            # 检查订单数量
            cursor.execute(
                "SELECT COUNT(*) FROM orders WHERE product_id = %s",
                (product_id,)
            )
            order_count = cursor.fetchone()[0]
            
            # 验证一致性
            assert db_stock == 0
            assert log_count == concurrency
            assert order_count == concurrency
    finally:
        conn.close()

6. 测试执行与报告

6.1 执行命令

# 普通执行
pytest test_inventory.py -v

# 并发执行(使用4个worker)
pytest test_inventory.py -n 4

# 生成HTML报告
pytest test_inventory.py --html=report.html

# 带性能监控的执行
pytest test_inventory.py --monitor

6.2 集成Allure报告

# conftest.py
import allure
import pytest

@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
    """为每个测试用例添加Allure报告"""
    outcome = yield
    rep = outcome.get_result()
    
    if rep.when == "call" and rep.failed:
        with allure.step("失败截图"):
            # 可以添加失败时的截图或其他信息
            allure.attach("失败信息", str(rep.longrepr))

7. 最佳实践建议

  1. 测试数据隔离:每个测试用例使用独立商品ID使用fixture自动清理测试数据
  2. 并发控制:逐步增加并发量进行测试监控系统资源使用情况
  3. 断言策略:验证业务规则(不超卖)验证数据一致性验证性能指标
  4. 环境管理:区分测试环境和生产环境使用Docker容器隔离测试环境
  5. 持续集成:将并发测试加入CI流水线设置性能基准阈值

通过这套基于Pytest的库存并发测试方案,可以全面验证商城系统在高并发场景下的库存管理功能,确保系统稳定性和数据一致性。

进阶高级测试工程师 文章被收录于专栏

《高级软件测试工程师》专栏旨在为测试领域的从业者提供深入的知识和实践指导,帮助大家从基础的测试技能迈向高级测试专家的行列。 在本专栏中,主要涵盖的内容: 1. 如何设计和实施高效的测试策略; 2. 掌握自动化测试、性能测试和安全测试的核心技术; 3. 深入理解测试驱动开发(TDD)和行为驱动开发(BDD)的实践方法; 4. 测试团队的管理和协作能力。 ——For.Heart

全部评论

相关推荐

各位前辈好,先说声抱歉,可能又是一篇“求骂醒”的帖子,但我真的需要一个方向。我的情况比大多数人都糟糕:双非软件工程,大四,马上毕业了,0实习经历,0工作经验。秋招根本没参加,原因很傻——我一头扎进了一个自己觉得“挺有意思”的项目里,天真的以为把项目做好工作自然会找上门。现在春招也快结束了,我才如梦初醒,发现简历投出去基本石沉大海。我没有什么能拿出手的背景,唯一能说的就是这个从后端到前端全栈独立开发的电影推荐平台。我知道在各位前辈眼里这大概率就是个小玩具,但我确实是下了功夫去琢磨的,它不是什么网上扒的代码,下面这些是我自己琢磨并落地的东西:项目概况:Spring&nbsp;Boot&nbsp;+&nbsp;MyBatis-Plus&nbsp;+&nbsp;Redis&nbsp;+&nbsp;JWT&nbsp;+&nbsp;MySQL&nbsp;+&nbsp;Vue3(前端是AI辅助生成的)我自己觉得花了心思的几个点:1.&nbsp;推荐算法落地:没有照搬别人的推荐逻辑。我是基于用户多维行为数据(评分、收藏、浏览时长)去计算标签权重,然后用“评分×log(热度+1)”的公式做加权排序;冷启动场景用热门数据兜底。推荐结果用Redis的ZSet缓存,用户行为一变化就主动删缓存触发重算。2.&nbsp;缓存体系设计:不是那种“面试八股文背完就扔”的表面理解。我实际遇到了缓存穿透和击穿的问题,然后自己用空值缓存+逻辑过期去解决。热门电影定时预热、批量查询用multiGet减少IO次数,还封装了MyCacheUtils通用模板,让整个项目其他模块也能复用这套缓存逻辑。3.&nbsp;并发与一致性:用Redis的SET&nbsp;NX&nbsp;EX实现了收藏/点赞的分布式锁,key精确到“用户+操作对象”级别,不是粗粒度的一锁全锁。异常回滚时Redis和MySQL数据一致性问题也思考并落地了。验证码的原子性校验用了Lua脚本来保证。4.&nbsp;性能是真实数据:我用JMeter做了2000并发的压测,引入Redis缓存体系后,推荐接口平均响应从6466ms降到155ms,吞吐量翻了一倍,缓存命中率干到98%以上。这些数据不是编的,是我自己反复调优跑出来的。说实话,做完这些的时候,看着压测报告我是挺兴奋的,觉得“这也算出活儿了吧”。但现实是,0实习好像成了我简历上的原罪,很多公司直接筛选条件就把我过滤了。所以我想跪求各位前辈指点我几个问题,每一条我都认真看、认真执行:1.&nbsp;关于简历:0实习的应届生,还有资格谈“项目亮点”吗?我这项目,是不是在专业面试官眼里就是一个“低配版培训项目”?如果这个项目还有救,该怎么在简历上呈现,才能让HR或者面试官至少愿意给我一个电话面试?如果没有,一个0实习的应届生到底该在简历上写什么?2.&nbsp;关于面试:如何用项目细节证明“我虽然没实习但真的能干活”?我挺怕面试官看到我没有实习经历就直接失去兴趣。真到了面试那一步,我该怎么引导对话,用上面这些技术细节去对抗“没实习=没工程经验”的刻板印象?比如缓存那块,怎么从“我解决了击穿”讲出一个有技术判断力和工程思维的完整故事?3.&nbsp;关于求职策略:错过了黄金窗口期,现在该冲什么样的公司?大厂我肯定不奢望了。现在这个时间点,我应该去投那些小公司和外包吗?要不要把薪资预期降到最低先入行再说?对于0实习的应届生,什么样的公司是真的有机会让我进去学技术、积累经验的?4.&nbsp;关于未来:如果现在直接找不到工作,我该怎么办?这段时间我想好了,如果实在是找不到研发岗,我要不要去干测试或者运维先入行?还是找家小公司被压榨一年攒个经验?还是干脆先找个其他工作边干边学等下一轮秋招?我什么建议都能接受。我知道自己起步晚了,代价得自己扛。现在唯一能做的就是面对现实,然后找到一条最有可能逆袭的路。希望前辈们能给我指个方向,即使简单几句“没救了”或者“还能救,去做XXX”我都非常感激。
jiestart:这简历肯定没面试的,你得包装个实习再加一个agent项目才有希望
点赞 评论 收藏
分享
评论
点赞
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务