首页 > 试题广场 >

设备故障预测程序

[编程题]设备故障预测程序
  • 热度指数:891 时间限制:C/C++ 1秒,其他语言2秒 空间限制:C/C++ 256M,其他语言512M
  • 算法知识视频讲解
在一套对象存储集群中,运维同学希望根据设备运行日志,提前判断设备是否有故障风险,从而把数据在故障前迁移到其他节点。每条日志包含以下字段:设备ID、写入次数、读取次数、平均写入延迟(ms)、平均读取延迟(ms)、使用年限(年)、设备状态(0 正常/1 故障)。

请你实现一个设备故障预测程序,基于训练数据学习一个逻辑回归模型,并对给定的待预测设备输出是否故障的判定结果。


数据清洗规则
- 缺失值填充:数值字段出现字符串 NaN 时,用该字段在训练集中“有效数值”的均值进行填充。有效数值的含义见“异常值处理”。

- 异常值处理:若出现以下越界值,则视为异常,用该字段在训练集“有效数值”的中位数替换。
   1.写入/读取次数:小于 0
   2.平均写入/读取延迟:小于 0 或 大于 1000
   3.使用年限:小于 0 或 大于 20

- 说明:计算均值/中位数时,只统计训练集中“有效数值”(即不含 NaN,且不越界)。若某字段在训练集没有任何有效数值,则该字段的均值与中位数都按 0 处理。
- 标签缺失:训练样本若无状态字段或无法解析为 0/1,丢弃该行,不参与训练,也不参与统计均值/中位数。

模型与训练
- 模型:二分类逻辑回归,带偏置项 w0
- 训练方法:批量梯度下降(Batch GD),每次迭代用全部训练样本,学习率 0.01,迭代 100 次,初始权重全 0。
- 概率:
  P(y=1) =\frac{1}{1+e^{-z}}   其中 z = w0

- 判定阈值:若 P(y=1) ≥ 0.5 则输出 1,否则输出 0。

输入描述:
第一行:N(2 ≤ N ≤ 100)
接下来 N 行:每行一个训练样本
device_id,writes,reads,avg_write_ms,avg_read_ms,years,status
第 N+1 行:M(1 ≤ M ≤ 10)
接下来 M 行:每行一个待预测样本(无状态)
  device_id,writes,reads,avg_write_ms,avg_read_ms,years


输出描述:
共 M 行,每行输出一个整数 0 或 1,对应各待预测设备是否判定为故障。
示例1

输入

12
n1,50,25,5,2,1,0
n2,55,27,5.5,2.5,1.2,0
n3,60,30,6,3,1.5,0
n4,65,32,6.5,3.2,1.8,0
n5,70,35,7,3.5,2,0
n6,75,37,7.5,3.8,2.2,0
n7,80,40,8,4,2.5,0
n8,85,42,8.5,4.2,2.7,0
n9,90,45,9,4.5,3,0
n10,95,47,9.5,4.8,3.2,0
p1,400,200,20,10,6,1
p2,500,250,22,11,8,1
2
q1,88,44,8.8,4.3,2.9
q2,480,240,21.5,10.8,7.5

输出

0
1

说明

训练集中负类远多于正类,模型学到明显负偏置;但正类样本特征显著更大,使对应权重为正。
q1落在负类量级附近,P<0.5 → 0;q2与正类量级接近,P≥0.5 → 1。


备注:
本题由牛友@Charles 整理上传
不让用numpy啊
import math
# read data
N = int(input())
X = []
y = []
for _ in range(N):
    row_ = list(input().split(','))
    if row_[-1] == 'NaN':
        continue
    row = []
    for i in row_[1:]:
        if i == 'NaN':
            row.append(1e5)
        else:
            row.append(i)
    X.append(list(map(float, row[:-1])))
    y.append(int(row[-1]))

# preprocess
def cal_med(vec):
    vec = sorted(vec)
    if len(vec) % 2 == 0:
        med = (vec[len(vec)//2] + vec[len(vec)//2-1])/2
    else:
        med = vec[len(vec)//2]
    return med

med = [0.0]*5
avg = [0.0]*5
for j in range(5):
    col = [i[j] for i in X]
    if j <= 1:
        valid = [i for i in col if (i != 1e5 and i >= 0)]
    elif j <= 3:
        valid = [i for i in col if (i != 1e5 and i >= 0 and i <= 1000)]
    else:
        valid = [i for i in col if (i != 1e5 and i >= 0 and i <= 20)]
    
    if len(valid) > 0:
        med[j] = cal_med(valid)
        avg[j] = sum(valid)/len(valid)
    else:
        med[j] = 0.0
        avg[j] = 0.0
        
for j in range(5):
    for i in range(len(X)):
        if j <= 1:
            if X[i][j] == 1e5:
                X[i][j] = avg[j]
            elif X[i][j] < 0:
                X[i][j] = med[j]
        elif j <= 3:
            if X[i][j] == 1e5:
                X[i][j] = avg[j]
            elif X[i][j] < 0&nbs***bsp;X[i][j] > 1000:
                X[i][j] = med[j]
        else:
            if X[i][j] == 1e5:
                X[i][j] = avg[j]
            elif X[i][j] < 0&nbs***bsp;X[i][j] > 20:
                X[i][j] = med[j]
    
# train
m = len(X)
d = len(X[0])
Xb = [[1] + i for i in X]
w = [0.0]*(d+1)

lr = 0.01
epoch_max = 100


for _ in range(epoch_max):
    z = []
    pred = []
    diff = []
    for i in range(m):
        xi = Xb[i]
        temp = sum([xi[j]*w[j] for j in range(d+1)])
        z.append(temp)
        temp_p = 1 / (1 + math.exp(-temp))
        pred.append(temp_p)
        diff.append(temp_p - y[i])
    
    grad = []
    for i in range(d+1):
        xi = [Xb[j][i] for j in range(m)]
        temp = sum([xi[j]*diff[j] for j in range(m)])
        grad.append(temp/m)
    w = [w[j]- lr*grad[j] for j in range(len(grad))]


# prediction
M = int(input())
for _ in range(M):
    row = input().split(',')
    test = [1] + list(map(float, row[1:]))
    zi = sum([test[j]*w[j] for j in range(len(w))])
    pi = 1.0 / (1.0 + math.exp(-zi))
    pred = 1 if pi >= 0.5 else 0
    print(pred)



发表于 2025-09-18 23:50:30 回复(0)