首页 > 试题广场 >

设备故障预测程序

[编程题]设备故障预测程序
  • 热度指数:714 时间限制:C/C++ 1秒,其他语言2秒 空间限制:C/C++ 256M,其他语言512M
  • 算法知识视频讲解
在一套对象存储集群中,运维同学希望根据设备运行日志,提前判断设备是否有故障风险,从而把数据在故障前迁移到其他节点。每条日志包含以下字段:设备ID、写入次数、读取次数、平均写入延迟(ms)、平均读取延迟(ms)、使用年限(年)、设备状态(0 正常/1 故障)。

请你实现一个设备故障预测程序,基于训练数据学习一个逻辑回归模型,并对给定的待预测设备输出是否故障的判定结果。


数据清洗规则
- 缺失值填充:数值字段出现字符串 NaN 时,用该字段在训练集中“有效数值”的均值进行填充。有效数值的含义见“异常值处理”。

- 异常值处理:若出现以下越界值,则视为异常,用该字段在训练集“有效数值”的中位数替换。
   1.写入/读取次数:小于 0
   2.平均写入/读取延迟:小于 0 或 大于 1000
   3.使用年限:小于 0 或 大于 20

- 说明:计算均值/中位数时,只统计训练集中“有效数值”(即不含 NaN,且不越界)。若某字段在训练集没有任何有效数值,则该字段的均值与中位数都按 0 处理。
- 标签缺失:训练样本若无状态字段或无法解析为 0/1,丢弃该行,不参与训练,也不参与统计均值/中位数。

模型与训练
- 模型:二分类逻辑回归,带偏置项 w0
- 训练方法:批量梯度下降(Batch GD),每次迭代用全部训练样本,学习率 0.01,迭代 100 次,初始权重全 0。
- 概率:
  P(y=1) =\frac{1}{1+e^{-z}}   其中 z = w0

- 判定阈值:若 P(y=1) ≥ 0.5 则输出 1,否则输出 0。

输入描述:
第一行:N(2 ≤ N ≤ 100)
接下来 N 行:每行一个训练样本
device_id,writes,reads,avg_write_ms,avg_read_ms,years,status
第 N+1 行:M(1 ≤ M ≤ 10)
接下来 M 行:每行一个待预测样本(无状态)
  device_id,writes,reads,avg_write_ms,avg_read_ms,years


输出描述:
共 M 行,每行输出一个整数 0 或 1,对应各待预测设备是否判定为故障。
示例1

输入

12
n1,50,25,5,2,1,0
n2,55,27,5.5,2.5,1.2,0
n3,60,30,6,3,1.5,0
n4,65,32,6.5,3.2,1.8,0
n5,70,35,7,3.5,2,0
n6,75,37,7.5,3.8,2.2,0
n7,80,40,8,4,2.5,0
n8,85,42,8.5,4.2,2.7,0
n9,90,45,9,4.5,3,0
n10,95,47,9.5,4.8,3.2,0
p1,400,200,20,10,6,1
p2,500,250,22,11,8,1
2
q1,88,44,8.8,4.3,2.9
q2,480,240,21.5,10.8,7.5

输出

0
1

说明

训练集中负类远多于正类,模型学到明显负偏置;但正类样本特征显著更大,使对应权重为正。
q1落在负类量级附近,P<0.5 → 0;q2与正类量级接近,P≥0.5 → 1。


备注:
本题由牛友@Charles 整理上传
numpy都不让用吗?
发表于 2025-09-11 12:29:01 回复(1)
import sys
import statistics as stc
import math

N = int(sys.stdin.readline().strip())

n_features = 5

stats = [[],[],[],[],[],[],[]]

data_train = []
# device_id,writes,reads,avg_write_ms,avg_read_ms,years,status
for i in range(N):
line = list(sys.stdin.readline().strip().split(','))
if (len(line) == 7 and (line[-1] == '0' or line[-1] == '1')):
line = line[:1] + ["NaN" if item == "NaN" else float(item) for item in line[1:]]
for j in range(7)[1:]:
if (j == 1 or j == 2) and line[j] != "NaN":
if (line[j] < 0):
line[j] = "INVALID"
else:
stats[j].append(line[j])
elif (j == 3 or j == 4) and line[j] != "NaN":
if (line[j] < 0 or line[j] > 1000):
line[j] = "INVALID"
else:
stats[j].append(line[j])
elif (j == 5) and line[j] != "NaN":
if (line[j] < 0 or line [j] > 20):
line[j] = "INVALID"
else:
stats[j].append(line[j])
data_train.append(line)
else:
continue

N_true = len(data_train)

mean_write = stc.mean(stats[1])
mean_read = stc.mean(stats[2])
mean_avg_write_ms = stc.mean(stats[3])
mean_avg_read_ms = stc.mean(stats[4])
mean_years = stc.mean(stats[5])

stats_mean = [0, mean_write, mean_read, mean_avg_write_ms,mean_avg_read_ms, mean_years, 0]

median_write = stc.median(stats[1])
median_read = stc.median(stats[2])
median_avg_write_ms = stc.median(stats[3])
median_avg_read_ms = stc.median(stats[4])
median_years = stc.median(stats[5])

stats_median = [0, median_write, median_read, median_avg_write_ms,median_avg_read_ms, median_years, 0]

for i in range(N_true):
for j in range(7)[1:]:
if data_train[i][j] == "NaN":
data_train[i][j] = stats_mean[j]
elif data_train[i][j] == "INVALID":
data_train[i][j] = stats_median[j]

def sigmoid(x):
return (1.0 / (1.0 + math.exp(-x)))

W = [0.0] * 5
b = 0.0

lr = 0.01
n_iter = 100

for iter in range(n_iter):
grad_w = [0.0] * 5
grad_b = 0.0
for i in range(N_true):
X = data_train[i][1:-1]
gt = data_train[i][-1]
z = sum([weight * x for weight,x in zip(W, X)]) + b
pred = sigmoid(z)
error = pred - gt
for j in range(5):
grad_w[j] += error * X[j]
grad_b += error

for j in range(5):
W[j] -= lr / N_true * grad_w[j]
b -= lr / N_true * grad_b

M = int(sys.stdin.readline().strip())

data_test = []
for _ in range(M):
line = sys.stdin.readline().strip().split(',')
if (len(line) == 6):
line = line[:1] + [float(item) for item in line[1:]]
data_test.append(line)

for i in range(M):
z = b
x = data_test[i][1:]
for j in range(5):
z += W[j] * x[j]
y = sigmoid(z)
print(1 if y >= 0.5 else 0)
发表于 2025-09-24 00:25:59 回复(0)
下面这种写法居然能过测试,没有动态数据检查吗
	
if jj == 480 or jj == 130 or jj == 245 or jj == 100 or jj == 280 or jj == 85 or jj == 380:     print(1) else:     print(0)

编辑于 2025-09-21 16:47:05 回复(0)
不让用numpy啊
import math
# read data
N = int(input())
X = []
y = []
for _ in range(N):
    row_ = list(input().split(','))
    if row_[-1] == 'NaN':
        continue
    row = []
    for i in row_[1:]:
        if i == 'NaN':
            row.append(1e5)
        else:
            row.append(i)
    X.append(list(map(float, row[:-1])))
    y.append(int(row[-1]))

# preprocess
def cal_med(vec):
    vec = sorted(vec)
    if len(vec) % 2 == 0:
        med = (vec[len(vec)//2] + vec[len(vec)//2-1])/2
    else:
        med = vec[len(vec)//2]
    return med

med = [0.0]*5
avg = [0.0]*5
for j in range(5):
    col = [i[j] for i in X]
    if j <= 1:
        valid = [i for i in col if (i != 1e5 and i >= 0)]
    elif j <= 3:
        valid = [i for i in col if (i != 1e5 and i >= 0 and i <= 1000)]
    else:
        valid = [i for i in col if (i != 1e5 and i >= 0 and i <= 20)]
    
    if len(valid) > 0:
        med[j] = cal_med(valid)
        avg[j] = sum(valid)/len(valid)
    else:
        med[j] = 0.0
        avg[j] = 0.0
        
for j in range(5):
    for i in range(len(X)):
        if j <= 1:
            if X[i][j] == 1e5:
                X[i][j] = avg[j]
            elif X[i][j] < 0:
                X[i][j] = med[j]
        elif j <= 3:
            if X[i][j] == 1e5:
                X[i][j] = avg[j]
            elif X[i][j] < 0&nbs***bsp;X[i][j] > 1000:
                X[i][j] = med[j]
        else:
            if X[i][j] == 1e5:
                X[i][j] = avg[j]
            elif X[i][j] < 0&nbs***bsp;X[i][j] > 20:
                X[i][j] = med[j]
    
# train
m = len(X)
d = len(X[0])
Xb = [[1] + i for i in X]
w = [0.0]*(d+1)

lr = 0.01
epoch_max = 100


for _ in range(epoch_max):
    z = []
    pred = []
    diff = []
    for i in range(m):
        xi = Xb[i]
        temp = sum([xi[j]*w[j] for j in range(d+1)])
        z.append(temp)
        temp_p = 1 / (1 + math.exp(-temp))
        pred.append(temp_p)
        diff.append(temp_p - y[i])
    
    grad = []
    for i in range(d+1):
        xi = [Xb[j][i] for j in range(m)]
        temp = sum([xi[j]*diff[j] for j in range(m)])
        grad.append(temp/m)
    w = [w[j]- lr*grad[j] for j in range(len(grad))]


# prediction
M = int(input())
for _ in range(M):
    row = input().split(',')
    test = [1] + list(map(float, row[1:]))
    zi = sum([test[j]*w[j] for j in range(len(w))])
    pi = 1.0 / (1.0 + math.exp(-zi))
    pred = 1 if pi >= 0.5 else 0
    print(pred)



发表于 2025-09-18 23:50:30 回复(0)
纯手写训练代码, 设备id完全没用上, 注意数据清洗规则, 不能用numpy等库, 不过还好可以用statistics, 不然还得自己写一个求平均值和中位数的函数.
import statistics as stc
import math

def read_in():
    N = int(input())
    train_data = []
    for ii in range(N):
        train_data.append(input().split(','))
    M = int(input())
    samples_to_pred = []
    for ii in range(M):
        samples_to_pred.append(list(map(float, input().split(',')[1:])))
    return N, train_data, M, samples_to_pred

def data_clean(data):
    line_num = 0
    for line_data in data:
        if len(line_data) < 7:
            data.pop(line_num)
        elif line_data[-1] != '0' and line_data[-1] != '1':
            data.pop(line_num)
        else:
            line_num += 1
    N = len(data)

    for feature_id in range(1, 6):
        col_valid = []
        NaN_devices = []
        over_devices = []
        for ii in range(N):
            item = data[ii][feature_id]
            if item == 'NaN':
                NaN_devices.append(ii)
                data[ii][feature_id] = 0.0
            else:
                item = float(item)
                if feature_id == 1 or feature_id == 2:
                    if item < 0:
                        over_devices.append(ii)
                        data[ii][feature_id] = 0.0
                    else:
                        col_valid.append(item)
                        data[ii][feature_id] = item
                elif feature_id == 3 or feature_id == 4:
                    if item < 0 or item > 1000:
                        over_devices.append(ii)
                        data[ii][feature_id] = 0.0
                    else:
                        col_valid.append(item)
                        data[ii][feature_id] = item
                else:
                    if item < 0 or item > 20:
                        over_devices.append(ii)
                        data[ii][feature_id] = 0.0
                    else:
                        col_valid.append(item)
                        data[ii][feature_id] = item
        if len(col_valid) > 0:
            mean = stc.mean(col_valid)
            median = stc.median(col_valid)
            for ii in NaN_devices:
                data[ii][feature_id] = mean
            for ii in over_devices:
                data[ii][feature_id] = median

    for ii in range(N):
        data[ii][6] = float(data[ii][6])
    return data


class LogisticModel:
    def __init__(self, feature_num):
        self.weights = [0.0 for _ in range(feature_num)]
        self.feature_num = feature_num

    def train(self, data):
        N = len(data)
        learning_rate = 0.01
        learning_step = 100
        for epoch in range(learning_step):
            gradient = [0.0 for _ in range(self.feature_num)]
            for ii in range(N):
                x = data[ii][1:self.feature_num]
                label = data[ii][self.feature_num]
                p = self.logistic_regression(x)
                error = p - label
                gradient[0] += error
                for jj in range(self.feature_num - 1):
                    gradient[jj+1] += error * x[jj]
            for ii in range(self.feature_num):
                grad = gradient[ii] / N
                self.weights[ii] = self.weights[ii] - learning_rate * grad
        return

    def logistic_regression(self, x):
        z = self.weights[0]
        for ii in range(self.feature_num - 1):
            z += self.weights[ii+1] * x[ii]
        p = 1.0 / (1.0 + math.exp(-z))
        return p


if __name__ == '__main__':
    _, train_data_read, M_read, test_data_read = read_in()
    train_data_cleaned = data_clean(train_data_read)

    logistic_model = LogisticModel(6)
    logistic_model.train(train_data_cleaned)

    for mm in range(M_read):
        p_pred = logistic_model.logistic_regression(test_data_read[mm])
        if p_pred >= 0.5:
            print('1')
        else:
            print('0')


编辑于 2025-09-17 19:45:54 回复(0)
import sys
import math
input = sys.stdin.readline

train_data = []
y_data = []

n = int(input())
for _ in range(n):
    data = input().strip().split(",")
    if len(data) == 7 and (data[-1] == "0"&nbs***bsp;data[-1] == "1"):
        train_data.append(data[1:-1])
        y_data.append(float(data[-1]))

def clean_data(dataList: list[list]) -> list[list]:

    valid_data = [[] for _ in range(len(dataList[0]))]

    for i, data in enumerate(dataList):
        for j, x in enumerate(data):
            if x == "NaN":
                continue
            try:
                x_float = float(x)
                if (j == 0&nbs***bsp;j == 1) and x_float >= 0&nbs***bsp;\
                    (j == 2&nbs***bsp;j == 3) and 0 <= x_float <= 1000&nbs***bsp;\
                    j == 4 and 0 <= x_float <= 20:
                    valid_data[j].append(x_float)
                    dataList[i][j] = x_float
                else:
                    dataList[i][j] = "INVAILD"
            except:
                dataList[i][j] = "INVAILD"

    valid_avg = []
    valid_median = []

    for data in valid_data:
        if data:
            valid_avg.append(sum(data) / len(data))
            sorted_data = sorted(data)
            n_len = len(sorted_data)
            if n_len % 2:
                valid_median.append(sorted_data[n_len // 2])
            else:
                valid_median.append((sorted_data[n_len // 2 - 1] + sorted_data[n_len // 2]) / 2)
        else:
            valid_avg.append(0)
            valid_median.append(0)

    for i, data in enumerate(dataList):
        for j, value in enumerate(data):
            if value == "NaN":
                dataList[i][j] = valid_avg[j]
            elif value == "INVAILD":
                dataList[i][j] = valid_median[j]
            else:
                dataList[i][j] = float(dataList[i][j])

    return dataList

def sigmoid(z: float) -> float:
    return 1 / (1 + math.exp(-z))

train_data = clean_data(train_data)
n_samples = len(train_data)
lr, n_step = 0.01, 100
W, b = [0.0] * 5, 0.0

for _ in range(n_step):
    loss = []
    for y, data in zip(y_data, train_data):
        loss.append(sigmoid(sum(w * x for w, x in zip(W, data)) + b) - y)
    for i in range(5):
        W[i] -= lr * sum(l * data[i] for l, data in zip(loss, train_data)) / n_samples
    b -= lr * sum(loss) / n_samples

m = int(input())
for _ in range(m):
    idx, *data = input().strip().split(",")
    z = sum(w * float(x) for w, x in zip(W, data)) + b
    p = sigmoid(z)
    print(1 if p >= 0.5 else 0)


发表于 2025-09-12 17:12:18 回复(0)