题解 | #设备故障预测程序# | Python
设备故障预测程序
http://www.nowcoder.com/questionTerminal/ef4bd23c0f7c4f2ca04dfdf3d4e66337
"""
implement logistic regression from scratch
no data normalization as its not required in the question and will not pass some of the test cases if added
"""
import sys
import math
import statistics
# num of rows for training data
N = int(sys.stdin.readline())
# print('process training data')
# IDs_train = []
writes_train = []
reads_train = []
avg_write_ms_train = []
avg_read_ms_train = []
years_train = []
status_train = []
train_cnt = 0
for i in range(N):
# add strip to remove '\n' in the end of each row
device_id,w,r,w_ms,r_ms,y,s = sys.stdin.readline().strip().split(',')
if s == 'NaN': continue
train_cnt += 1
# IDs_train.append(device_id)
writes_train.append(int(w) if w != 'NaN' else w)
reads_train.append(int(r) if r != 'NaN' else r)
avg_write_ms_train.append(float(w_ms) if w_ms != 'NaN' else w_ms)
avg_read_ms_train.append(float(r_ms) if r_ms != 'NaN' else r_ms)
years_train.append(float(y) if y != 'NaN' else y)
status_train.append(int(s) if s != 'NaN' else s)
# print(f'\t{train_cnt}/{N} valid rows in training data')
def clean_data(tag, data, lb, ub, valid_med=None, valid_avg=None):
# print(f'process {tag} data')
if valid_med is None or valid_avg is None:
valid = []
for d in data:
if d == 'NaN' or d < lb or d > ub: continue
valid.append(d)
# print(f'\t{len(valid)}/{len(data)} valid rows of data')
if len(valid) > 0:
valid_med = statistics.median(valid)
valid_avg = statistics.mean(valid)
else:
valid_med = valid_avg = 0
data_p = []
for d in data:
if d == 'NaN':
data_p.append(valid_avg)
elif d < lb or d > ub:
data_p.append(valid_med)
else:
data_p.append(d)
return data_p, valid_med, valid_avg
writes_train, writes_med, writes_avg = clean_data('writes', writes_train, 0, float('infinity'))
reads_train, reads_med, reads_avg = clean_data('reads', reads_train, 0, float('infinity'))
avg_write_ms_train, avg_write_ms_med, avg_write_ms_avg = clean_data('avg_write_ms', avg_write_ms_train, 0, 1000)
avg_read_ms_train, avg_read_ms_med, avg_read_ms_avg = clean_data('avg_read_ms', avg_read_ms_train, 0, 1000)
years_train, years_med, years_avg = clean_data('years', years_train, 0, 20)
# num of rows for testing data
M = int(sys.stdin.readline())
# print('process testing data')
# IDs_test = []
writes_test = []
reads_test = []
avg_write_ms_test = []
avg_read_ms_test = []
years_test = []
test_cnt = 0
for i in range(M):
device_id,w,r,w_ms,r_ms,y = sys.stdin.readline().strip().split(',')
test_cnt += 1
# IDs_test.append(device_id)
writes_test.append(int(w) if w != 'NaN' else w)
reads_test.append(int(r) if r != 'NaN' else r)
avg_write_ms_test.append(float(w_ms) if w_ms != 'NaN' else w_ms)
avg_read_ms_test.append(float(r_ms) if r_ms != 'NaN' else r_ms)
years_test.append(float(y) if y != 'NaN' else y)
# print(f'\t{test_cnt}/{M} valid rows in testing data')
writes_test, _, _ = clean_data('writes', writes_test, 0, float('infinity'), writes_med, writes_avg)
reads_test, _, _ = clean_data('reads', reads_test, 0, float('infinity'), reads_med, reads_avg)
avg_write_ms_test, _, _ = clean_data('avg_write_ms', avg_write_ms_test, 0, 1000, avg_write_ms_med, avg_write_ms_avg)
avg_read_ms_test, _, _ = clean_data('avg_read_ms', avg_read_ms_test, 0, 1000, avg_read_ms_med, avg_read_ms_avg)
years_test, _, _ = clean_data('years', years_test, 0, 20, years_med, years_avg)
class LogisticRegression(object):
def __init__(self, learning_rate=0.01, n_iteration=100):
self.learning_rate = learning_rate
self.n_iteration = n_iteration
self.weights = None
self.bias = None
self.loss_history = []
def sigmoid(self, z):
z = [1. / (1 + math.exp(-v)) for v in z]
return z
def initialize_params(self, n_feature):
self.weights = [0 for _ in range(n_feature)]
self.bias = 0
def compute_loss(self, y_true, y_pred):
N = len(y_true)
loss = 0
for i in range(N):
loss += -1.0 * (y_true[i]*math.log(y_pred[i]) + (1-y_true[i])* math.log(1-y_pred[i] + 1e-6))
loss = loss / N
return loss
def compute_gradient(self, X, y_true, y_pred):
N = len(X)
n_feature = len(X[0])
dL_dz = [y_p - y_t for y_p, y_t in zip(y_pred, y_true)]
dL_db = statistics.mean(dL_dz)
dL_dw = [0. for _ in range(n_feature)]
for i in range(N):
for k in range(n_feature):
dL_dw[k] += dL_dz[i] * X[i][k]
dL_dw = [v / N for v in dL_dw]
return dL_dw, dL_db
def predict_proba(self, X):
out = []
N = len(X)
n_feature = len(X[0])
for i in range(N):
z = self.bias
for k in range(n_feature):
z += X[i][k] * self.weights[k]
out.append(z)
return self.sigmoid(out)
def fit(self, X, y):
N = len(X)
n_feature = len(X[0])
self.initialize_params(n_feature)
d_w = [0 for _ in range(n_feature)]
d_b = 0
for it in range(1, self.n_iteration+1):
y_proba = self.predict_proba(X)
loss = self.compute_loss(y, y_proba)
self.loss_history.append(loss)
dw, db = self.compute_gradient(X, y, y_proba)
# update params
self.weights = [w - self.learning_rate * d_w for w, d_w in zip(self.weights, dw)]
self.bias = self.bias - self.learning_rate * db
"""
if it % 5 == 0:
print(f'{it}/{self.n_iteration} iteration, loss: {loss:.3f}')
"""
def predict(self, X, threshold=0.5):
y_proba = self.predict_proba(X)
y_pred = [1 if y > threshold else 0 for y in y_proba]
return y_pred
model = LogisticRegression()
# prepare training data X,y
X = [[w, r, aw, ar, y] for w, r, aw, ar, y in zip(
writes_train,
reads_train,
avg_write_ms_train,
avg_read_ms_train,
years_train)
]
y = status_train
# training
model.fit(X, y)
# test
X_test = [[w, r, aw, ar, y] for w, r, aw, ar, y in zip(
writes_test,
reads_test,
avg_write_ms_test,
avg_read_ms_test,
years_test)
]
# X_test = scaler.fit_transform(X_test)
pred = model.predict(X_test)
print(f'predictions:\n{pred}')