看到大家都喜欢机器学习和深度学习之类的轴承故障识别方法,那我们就多更新一点,而基于现代信号处理的内容我们以后慢慢更。为啥我涉及的领域比较多,因为现代信号处理涉及的相关领域太多了,甚至,广义地说机器学习/深度学习等也通通属于现代信号处理的范畴,只不是部分属于黑箱模型而已。这篇文章我们主要使用小波包变换对轴承振动信号进行特征提取,然后输入随机森林模型进行模式识别,首先导入相关模块,其中pywt模块可能需要安装一下:pip install pywt
import globfrom scipy.io import loadmatfrom numpy import asarrayimport matplotlib.pyplot as pltimport numpy as npfrom scipy import signalimport scipyimport reimport osimport pandas as pdimport pywtfrom scipy.fftpack import fftfrom warnings import warnfrom sklearn import metricsimport warnings warnings.filterwarnings('ignore')为了便于频域特征提取,定义个FFT函数
def apply_fft(x, fs, num_samples): f = np.linspace(0.0, (fs/2.0), num_samples//2) freq_values = fft(x) freq_values = 2.0/num_samples * np.abs(freq_values[0:num_samples//2]) return f, freq_values接下来从轴承原始振动信号中创建数据集,首先还得定义个函数,该函数处理 .mat 振动数据文件,并根据“num_samples”所需长度对振动信号进行分割
def make_dataset(data_src, num_samples, class_): req_key = "_DE_time" pattern = re.compile(req_key) files = glob.glob(data_src) files = np.sort(files) data = loadmat(files[0]) keysList = [key for key in data] for key in keysList: if pattern.search(key): my_key = key drive_end_data = data[my_key] drive_end_data = drive_end_data.reshape(-1) num_segments = np.floor(len(drive_end_data)/num_samples) slices = np.split(drive_end_data[0:int(num_segments*num_samples)], num_samples) silces = np.array(slices).reshape(int(num_segments), num_samples) segmented_data = silces files = files[1:] for file in files: data = loadmat(file) keysList = [key for key in data] for key in keysList: if pattern.search(key): my_key = key drive_end_data = data[my_key] drive_end_data = drive_end_data.reshape(-1) num_segments = np.floor(len(drive_end_data)/num_samples) slices = np.split(drive_end_data[0:int(num_segments*num_samples)], num_samples) silces = np.array(slices).reshape(int(num_segments), num_samples) segmented_data = np.concatenate( (segmented_data, silces) , axis=0, out=None) segmented_data = np.unique(segmented_data, axis= 0) # remove duplicates np.random.shuffle( segmented_data) # suffule the data Class_ = np.ones(len(segmented_data))*class_ return segmented_data, Class_CWRU 数据集下载链接:https://engineering.case.edu/bearingdatacenter ,数据集下载完成后,根据工况分为10组,因此建立 10 个文件夹,文件夹名称如下:
1. 12K_DE_Normal
2. 12k_DE_IRFault_0.007
3. 12k_DE_IRFault_0.014
4. 12k_DE_IRFault_0.021
5. 12k_DE_BallFault_0.007
6. 12k_DE_BallFault_0.014
7. 12k_DE_BallFault_0.021
8. 12k_DE_ORFault_0.007
9. 12k_DE_ORFault_0.014
10. 12k_DE_ORFault_0.021
大家应该都对CWRU 数据集很熟悉了,所以就不用我多说了
num_samples = 1200 #样本点数fs = 12000; # 采样频率下面的代码就非常好理解了
data_path = (r"D:\dataset") # 数据集路径cls_1 = '12K_DE_Normal/*'; cls_2 = '12k_DE_IRFault_0.007/*'; cls_3 = '12k_DE_IRFault_0.014/*'; cls_4 = '12k_DE_IRFault_0.021/*'; cls_5 = '12k_DE_BallFault_0.007/*'cls_6 = '12k_DE_BallFault_0.014/*'; cls_7 = '12k_DE_BallFault_0.021/*'cls_8 = '12k_DE_ORFault_0.007/*'; cls_9 = '12k_DE_ORFault_0.014/*'; cls_10 ='12k_DE_ORFault_0.021/*'norm, y_norm, = make_dataset(os.path.join(data_path, cls_1), num_samples, 1)defc1, y_defc1 = make_dataset(os.path.join(data_path, cls_2), num_samples, 2)defc2, y_defc2 = make_dataset(os.path.join(data_path, cls_3), num_samples, 3)defc3, y_defc3 = make_dataset(os.path.join(data_path, cls_4), num_samples, 4)defc4, y_defc4 = make_dataset(os.path.join(data_path, cls_5), num_samples, 5)defc5, y_defc5 = make_dataset(os.path.join(data_path, cls_6), num_samples, 6)defc6, y_defc6 = make_dataset(os.path.join(data_path, cls_7), num_samples, 7)defc7, y_defc7 = make_dataset(os.path.join(data_path, cls_8), num_samples, 8)defc8, y_defc8 = make_dataset(os.path.join(data_path, cls_9), num_samples, 9)defc9, y_defc9 = make_dataset(os.path.join(data_path, cls_10), num_samples, 10)进行拼接
X = np.concatenate( (norm, defc1, defc2, defc3, defc4, defc5, defc6, defc7, defc8, defc9 ) , axis=0, out=None)Y = np.concatenate( (y_norm, y_defc1, y_defc2, y_defc3, y_defc4, y_defc5, y_defc6, y_defc7, y_defc8, y_defc9 ), axis=0, out=None)数据集搞定后,我们开始进行特征提取,使用db4小波包进行3层分解并提取频域特征
wavelet_function = "db4"num_levels = 3 # m = 1 # num_features = 2**num_levelsfeatures = np.repeat(np.nan, len(X)*m*num_features).reshape(len(X),m*num_features)for i in range(len(X)): wp = pywt.WaveletPacket(X[i], wavelet = wavelet_function, maxlevel = num_levels) # Wavelet packet transformation packet_names = [node.path for node in wp.get_level(num_levels, "natural")] for j in range(num_features): new_wp = pywt.WaveletPacket(data = None, wavelet = wavelet_function, maxlevel = num_levels) new_wp[packet_names[j]] = wp[packet_names[j]].data reconstructed_signal = new_wp.reconstruct(update = False) # Signal reconstruction from wavelet packet coefficients f, c = apply_fft(reconstructed_signal, fs, len(reconstructed_signal)) z = abs(c) maximal_idx = np.argpartition(z, -m)[-m:] high_amp = z[maximal_idx] high_freq = f[maximal_idx] feature = high_amp*high_freq l = 0 for f in feature: features[i,j*m+l] = f l = l+1可视化一下
plt.rc('font', size=20)X_Labels = ['F1','F2','F3','F4','F5','F6','F7','F8']plt.bar(X_Labels, features[0], 0.7)#, size=15)plt.show()接下来导入机器学习相关模块
from sklearn.model_selection import train_test_splitfrom sklearn.preprocessing import StandardScalerimport seaborn as snsfrom sklearn.ensemble import RandomForestClassifierfrom sklearn.metrics import roc_curvefrom sklearn.metrics import roc_auc_scorefrom sklearn.metrics import accuracy_score, confusion_matrixlabels = pd.Categorical(Y)训练集数据集划分
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size = 0.2, stratify = labels, random_state = 42)数据转换
scaler = StandardScaler()train_data_scaled = scaler.fit_transform(X_train)test_data_scaled = scaler.transform(X_test)随机森林训练
clf_RF = RandomForestClassifier(criterion='gini', max_features = 1, min_samples_leaf=1, min_samples_split=2, max_depth= 20, n_estimators= 300)clf_RF.fit(train_data_scaled, y_train)混淆矩阵及相关结果
test_predictions = clf_RF.predict(test_data_scaled)test_confu_matrix = confusion_matrix(y_test, test_predictions)fault_type = ['C1','C2','C3','C4','C5','C6','C7','C8','C9','C10']plt.figure(1,figsize=(18,8))sns.heatmap(test_confu_matrix, annot= True,fmt = "d",xticklabels=fault_type, yticklabels=fault_type, cmap = "Blues")plt.xlabel('Predicted Calss')plt.ylabel('True Class')plt.ylabel('True')Accuracy = metrics.accuracy_score(y_test, test_predictions)F1_score = metrics.f1_score(y_test, test_predictions, average='micro')probs = clf_RF.predict_proba(test_data_scaled)lr_auc = roc_auc_score(y_test, probs, multi_class='ovr')print("No. of Samples =", num_samples, "/ k =", num_levels, "/ m =", m, )print('ROC AUC = %.3f' % (lr_auc))print("F1 Score =", F1_score)print("Accuracy = %.3f" % (Accuracy*100), "%")No. of Samples = 1200 / k = 3 / m = 1 ROC AUC = 1.000 F1 Score = 0.9970326409495549 Accuracy = 99.703 %
基于机器学习/深度学习的故障诊断尽量别做太多,做多了上头,越做越虚,还是多做一些信号处理相关的吧
| 留言与评论(共有 0 条评论) “” |