基于小波变换和机器学习的地震信号处理和识别

废话也不多说了,首先导入相关模块

import pandas as pdimport numpy as npimport pywtimport scipy as spimport matplotlib.pyplot as pltfrom sklearn.model_selection import train_test_splitfrom sklearn.pipeline import make_pipelinefrom sklearn.preprocessing import StandardScalerfrom sklearn.svm import SVC

导入数据

df = pd.read_pickle('shuju.pkl')df.head()

进行下数据转换

df['fMag'] = df['Mag.']df['Mag.'] = df['Mag.'].apply(lambda x :int(x) if (x-int(x)) < 0.5 else int(x)+1)

删除不需要的列

df.drop(columns=['Date(yyyy/mm/dd)', 'Time(UTC)', 'Time', 'unit'], inplace=True)df.dropna(inplace=True)df.head()

训练集划分

train, test = train_test_split(df, test_size=0.3, random_state=38)print('train data shape: ', train.shape)print('test data shape:  ', test.shape)

train data shape: (2228, 6)

test data shape: (955, 6)

训练集

为了便于训练,进行数据转换,得到train_wave_mat和test_wave_mat

绘制一些样本

fig, axs = plt.subplots(3, 3, figsize=(15, 10))plt.subplots_adjust(hspace=0.5, wspace=0.2)n = 0for i in range(3):    for j in range(3):        axs[i,j].plot(train_wave_mat[n], color='black')        axs[i,j].set_title('train wave sample ' + str(n+1))        axs[i,j].set_xlabel('time')        axs[i,j].set_ylabel('amplitude')        axs[i,j].set_ylim(0, 0.4)        axs[i,j].set_xticks([])        n += 1


提取标签

train_labels_mat = train['Mag.'].tolist()test_labels_mat = test['Mag.'].tolist()

定义特征提取函数(小波变换)

def features(data, labels, waveletname):    features_list = []    list_unique_labels = list(set(labels))    list_labels = [list_unique_labels.index(elem) for elem in labels]    for signal in data:        coeff_list = pywt.wavedec(signal, waveletname)        features = []        for coeff in coeff_list:            features += get_features(coeff)        features_list.append(features)    return features_list, list_labelsdef get_features(list_values):    entropy = calculate_entropy(list_values)    crossings = calculate_crossings(list_values)    statistics = calculate_statistics(list_values)    return [entropy] + crossings + statisticsdef calculate_entropy(list_values):    from collections import Counter    counter_values = Counter(list_values).most_common()    probabilities = [elem[1]/len(list_values) for elem in counter_values]    entropy=sp.stats.entropy(probabilities)    return entropydef calculate_statistics(list_values):    n5 = np.nanpercentile(list_values, 5)    n25 = np.nanpercentile(list_values, 25)    n75 = np.nanpercentile(list_values, 75)    n95 = np.nanpercentile(list_values, 95)    median = np.nanpercentile(list_values, 50)    mean = np.nanmean(list_values)    std = np.nanstd(list_values)    var = np.nanvar(list_values)    rms = np.nanmean(np.sqrt(list_values**2))    return [n5, n25, n75, n95, median, mean, std, var, rms]def calculate_crossings(list_values):    zero_crossing_indices = np.nonzero(np.diff(np.array(list_values) > 0))[0]    no_zero_crossings = len(zero_crossing_indices)    mean_crossing_indices = np.nonzero(np.diff(np.array(list_values) > np.nanmean(list_values)))[0]    no_mean_crossings = len(mean_crossing_indices)    return [no_zero_crossings, no_mean_crossings]

利用小波变换进行特征提取

X_train, y_train = features(train_wave_mat, train_labels_mat, 'db4')X_test, y_test = features(test_wave_mat, test_labels_mat, 'db4')

准备绘制小波变换幅值

trainset = np.array(X_train)testset = np.array(X_test)train_magitude = np.array(train['fMag'])test_magitude = np.array(test['fMag'])train_wavelet_amplitude = trainset.max(axis=1) - trainset.min(axis=1)test_wavelet_amplitude = testset.max(axis=1) - testset.min(axis=1)

绘制

size = 10alpha = 0.3plt.figure(figsize=(10,8))plt.scatter(train_magitude, train_wavelet_amplitude, s=size, c='r', alpha=alpha)plt.scatter(test_magitude, test_wavelet_amplitude, s=size, c='b', alpha=alpha)plt.ylabel('Wavelet Amplitude')plt.xlabel('Magnitude')plt.show()

绘制小波分解系数

from scipy import signala = pywt.wavedec(X_train, 'db4', mode='sym', level=None)(cA, cD) = pywt.dwt(X_train, 'db4', mode='sym')fig, axs = plt.subplots(2, 2, figsize=(15, 10))axs[0][0].plot(cA[0], color='darkred')axs[0][0].set_xlabel('Time')axs[0][0].set_ylabel('Approximation Coefficient')axs[0][0].set_xticks([])axs[0][1].plot(cD[0], color='darkblue')axs[0][1].set_xlabel('Time')axs[0][1].set_ylabel('Detail Coefficient')axs[0][1].set_xticks([])axs[1][0].plot(X_train[0], color='darkgreen')axs[1][0].set_xlabel('Time')axs[1][0].set_ylabel('Wavelet Coefficient')axs[1][0].set_xticks([])axs[1][1].plot(X_train[:])axs[1][1].set_xlabel('Wavelet Samples')axs[1][1].set_ylabel('Wavelet Coefficients')plt.show()

fig, axs = plt.subplots(3, 3, figsize=(21, 15))for i in range(3):    axs[i][0].plot(cA[i], color='darkred')    axs[i][0].set_xlabel('Time')    axs[i][0].set_ylabel('Approximation Coefficient')    axs[i][0].set_xticks([])    axs[i][1].plot(cD[i], color='darkblue')    axs[i][1].set_xlabel('Time')    axs[i][1].set_ylabel('Detail Coefficient')    axs[i][1].set_xticks([])    axs[i][2].plot(X_train[i], color='darkgreen')    axs[i][2].set_xlabel('Time')    axs[i][2].set_ylabel('Wavelet Coefficient')    axs[i][2].set_xticks([])

看看3D图

import matplotlib as mplax = plt.figure(figsize=(10,10)).add_subplot(projection='3d')x = np.linspace(0,108,108)y = np.asarray(X_train)z = range(len(y))cmap = mpl.cm.get_cmap('ocean')for i in z:    ax.plot(x,y[i],zs=i,zdir='x', color=cmap(i/2500))ax.set_xlabel('Wavelet Samples')ax.set_ylabel('Time')ax.set_yticks([])ax.set_zlabel('Wavelet Coefficient')plt.show()

创建一个 pipeline

clf = make_pipeline(StandardScaler(), SVC(gamma='auto'))

模型拟合

clf.fit(X_train,y_train)

Pipeline(steps=[('standardscaler', StandardScaler()),

('svc', SVC(gamma='auto'))])

预测

y_pred = clf.predict(X_test)

绘制散点图

labels = clf.named_steps['svc'].classes_size = 10alpha = 0.5colors = range(len(labels))plt.figure(figsize=(10,8))for i in range(len(labels)):    plt.scatter(test_wavelet_amplitude[y_pred == i], test_magitude[y_pred == i], label=labels[i]+1, s=size, alpha=alpha)    plt.xlabel('Wavelet Amplitude')    plt.ylabel('Magnitude')plt.legend()plt.show()

计算准确率

from sklearn.metrics import accuracy_scoreaccuracy = accuracy_score(y_test, y_pred)print('accuracy: ', accuracy)

绘制预测/实际类别

plt.figure(figsize=(20,7))plt.plot(y_pred[:500], color='orange', label='Classified', linewidth=1, alpha=1)plt.plot(y_test[:500], color='green', label='Actual', alpha=0.5)plt.xlabel('Samples')plt.ylabel('Class')plt.xticks([0,500])plt.yticks([1,2,3,4,5,6,7])plt.legend()plt.show()

计算混淆矩阵

from sklearn.metrics import confusion_matrixcm = confusion_matrix(y_test, y_pred)print('confusion matrix: ', cm)

绘制混淆矩阵

import seaborn as snsplt.figure(figsize=(10, 10))sns.heatmap(cm, annot=True, square=True, cmap='viridis', fmt='d')plt.xlabel('Actual label')plt.ylabel('Predicted label')plt.show()

详细代码

https://mianbaoduo.com/o/bread/Yp2bl5xt

或者

https://m.tb.cn/h.fzeRMqF?tk=ik7R2tMxlDy CZ3457

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章