题解 | #设备故障预测程序# | Python

设备故障预测程序

http://www.nowcoder.com/questionTerminal/ef4bd23c0f7c4f2ca04dfdf3d4e66337

"""
implement logistic regression from scratch
no data normalization as its not required in the question and will not pass some of the test cases if added
"""

import sys
import math
import statistics

# num of rows for training data
N = int(sys.stdin.readline())

# print('process training data')
# IDs_train = []
writes_train = []
reads_train = []
avg_write_ms_train = []
avg_read_ms_train = []
years_train = []
status_train = []
train_cnt = 0
for i in range(N):
    # add strip to remove '\n' in the end of each row
    device_id,w,r,w_ms,r_ms,y,s = sys.stdin.readline().strip().split(',')
    if s == 'NaN': continue
    
    train_cnt += 1
    # IDs_train.append(device_id)
    writes_train.append(int(w) if w != 'NaN' else w)
    reads_train.append(int(r) if r != 'NaN' else r)
    avg_write_ms_train.append(float(w_ms) if w_ms != 'NaN' else w_ms)
    avg_read_ms_train.append(float(r_ms) if r_ms != 'NaN' else r_ms)
    years_train.append(float(y) if y != 'NaN' else y)
    status_train.append(int(s) if s != 'NaN' else s)  

# print(f'\t{train_cnt}/{N} valid rows in training data')



def clean_data(tag, data, lb, ub, valid_med=None, valid_avg=None):
    # print(f'process {tag} data')
    if valid_med is None or valid_avg is None:
        valid = []
        for d in data:
            if d == 'NaN' or d < lb or d > ub: continue
            valid.append(d)
        # print(f'\t{len(valid)}/{len(data)} valid rows of data')
        if len(valid) > 0:
            valid_med = statistics.median(valid)
            valid_avg = statistics.mean(valid)
        else:
            valid_med = valid_avg = 0
    

    data_p = []
    for d in data:
        if d == 'NaN':
            data_p.append(valid_avg)
        elif d < lb or d > ub:
            data_p.append(valid_med)
        else:
            data_p.append(d)
    
    return data_p, valid_med, valid_avg


writes_train, writes_med, writes_avg = clean_data('writes', writes_train, 0, float('infinity'))
reads_train, reads_med, reads_avg = clean_data('reads', reads_train, 0, float('infinity'))
avg_write_ms_train, avg_write_ms_med, avg_write_ms_avg = clean_data('avg_write_ms', avg_write_ms_train, 0, 1000)
avg_read_ms_train, avg_read_ms_med, avg_read_ms_avg = clean_data('avg_read_ms', avg_read_ms_train, 0, 1000)
years_train, years_med, years_avg = clean_data('years', years_train, 0, 20)



# num of rows for testing data
M = int(sys.stdin.readline())
# print('process testing data')
# IDs_test = []
writes_test = []
reads_test = []
avg_write_ms_test = []
avg_read_ms_test = []
years_test = []
test_cnt = 0
for i in range(M):
    device_id,w,r,w_ms,r_ms,y = sys.stdin.readline().strip().split(',')
    test_cnt += 1
    # IDs_test.append(device_id)

    writes_test.append(int(w) if w != 'NaN' else w)
    reads_test.append(int(r) if r != 'NaN' else r)
    avg_write_ms_test.append(float(w_ms) if w_ms != 'NaN' else w_ms)
    avg_read_ms_test.append(float(r_ms) if r_ms != 'NaN' else r_ms)
    years_test.append(float(y) if y != 'NaN' else y)
        
# print(f'\t{test_cnt}/{M} valid rows in testing data')

writes_test, _, _ = clean_data('writes', writes_test, 0, float('infinity'), writes_med, writes_avg)
reads_test, _, _ = clean_data('reads', reads_test, 0, float('infinity'), reads_med, reads_avg)
avg_write_ms_test, _, _ = clean_data('avg_write_ms', avg_write_ms_test, 0, 1000, avg_write_ms_med, avg_write_ms_avg)
avg_read_ms_test, _, _ = clean_data('avg_read_ms', avg_read_ms_test, 0, 1000, avg_read_ms_med, avg_read_ms_avg)
years_test, _, _ = clean_data('years', years_test, 0, 20, years_med, years_avg)



class LogisticRegression(object):
    def __init__(self, learning_rate=0.01, n_iteration=100):
        self.learning_rate = learning_rate
        self.n_iteration = n_iteration
        self.weights = None
        self.bias = None
        self.loss_history = []

    
    def sigmoid(self, z):
        z = [1. / (1 + math.exp(-v)) for v in z]
        return z

    
    def initialize_params(self, n_feature):
        self.weights = [0 for _ in range(n_feature)]
        self.bias = 0

    
    def compute_loss(self, y_true, y_pred):
        N = len(y_true)
        loss = 0
        for i in range(N):
          loss += -1.0 * (y_true[i]*math.log(y_pred[i]) + (1-y_true[i])* math.log(1-y_pred[i] + 1e-6))
            
        loss = loss / N
        return loss

    
    def compute_gradient(self, X, y_true, y_pred):
        N = len(X)
        n_feature = len(X[0])
        
        dL_dz = [y_p - y_t for y_p, y_t in zip(y_pred, y_true)]

        dL_db = statistics.mean(dL_dz)

        dL_dw = [0. for _ in range(n_feature)]
        for i in range(N):
            for k in range(n_feature):
                dL_dw[k] += dL_dz[i] * X[i][k]

        dL_dw = [v / N for v in dL_dw]
        return dL_dw, dL_db


    def predict_proba(self, X):
        out = []
        N =  len(X)
        n_feature = len(X[0])
        for i in range(N):
            z = self.bias
            for k in range(n_feature):
                z += X[i][k] * self.weights[k]
            out.append(z)
        return self.sigmoid(out)
    
    
    def fit(self, X, y):
        N = len(X)
        n_feature = len(X[0])
        self.initialize_params(n_feature)
        
        d_w = [0 for _ in range(n_feature)]
        d_b = 0
        
        for it in range(1, self.n_iteration+1):
            
            y_proba = self.predict_proba(X)
            loss = self.compute_loss(y, y_proba)
            self.loss_history.append(loss)
            dw, db = self.compute_gradient(X, y, y_proba)

            # update params
            self.weights = [w - self.learning_rate * d_w for w, d_w in zip(self.weights, dw)]
            self.bias = self.bias - self.learning_rate * db
			"""
            if it % 5 == 0:
                print(f'{it}/{self.n_iteration} iteration, loss: {loss:.3f}')
			"""

    def predict(self, X, threshold=0.5):
        y_proba = self.predict_proba(X)
        y_pred = [1 if y > threshold else 0 for y in y_proba]
        return y_pred


        
model = LogisticRegression()

# prepare training data X,y
X = [[w, r, aw, ar, y] for w, r, aw, ar, y in zip(
    writes_train, 
    reads_train,
    avg_write_ms_train,
    avg_read_ms_train,
    years_train)
]
y = status_train

# training
model.fit(X, y)

# test
X_test = [[w, r, aw, ar, y] for w, r, aw, ar, y in zip(
    writes_test, 
    reads_test,
    avg_write_ms_test,
    avg_read_ms_test,
    years_test)
]

# X_test = scaler.fit_transform(X_test)
pred = model.predict(X_test)
print(f'predictions:\n{pred}')
            
            
            

    
        
        


全部评论

相关推荐

爱读书的放鸽子能手很...:刷个两端实习,冲春招,流水线什么时候不能去
我的秋招日记
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务