许多文章关注二维卷积神经网络。它们特别用于图像识别问题。1D CNN在一定程度上被涵盖,例如用于自然语言处理(NLP)。很少有文章解释如何构建一个1D CNN。本文试图弥补这一差距。
什幺时候应用1D CNN?
CNN可以很好地识别数据中的简单模式,然后使用这些模式在更高的层中形成更复杂的模式。当您希望从整体数据集的较短(固定长度)片段中获得有趣的特征,且特征在片段中的位置相关性不高时,1D CNN非常有效。
这适用于传感器数据(如陀螺仪或加速度计数据)的时间序列分析。它还适用于分析固定长度周期内的任何类型的信号数据(如音频信号)。另一个应用程序是NLP(尽管在这里LSTM网络更有前途,因为单词的接近程度可能并不总是一个可训练模式的良好指示器)
1D CNN和2D CNN有什幺区别?
无论是1D、2D还是3D, CNN都有相同的特点,采用相同的方法。关键的区别是输入数据的维度以及特征检测器(或过滤器)如何在数据中滑动:
问题陈述
在本文中,我们将重点关注加速度传感器采集的时间序列数据,数据地址为:https://www.cis.fordham.edu/wisdm/dataset.php
。这些数据来自用户腰部携带的智能手机。基于x、y和z轴的加速度计数据,1D CNN预测用户正在进行的活动类型(如“步行”、“慢跑”或“站立”)。你可以在我的另外两篇文章中找到更多的信息。对于各种活动,数据的每个时间间隔看起来与此类似。
如何在Python中构造一个一维CNN ?
有许多标准的CNN模型可供选择。我选择了Keras网站
上描述的一个模型,并对其进行了稍微修改,以适应上面描述的问题。下面的图片提供了构建模型的高级概览。将进一步解释每一层。
让我们首先看一下Python代码,以便构建这个模型:
model_m = Sequential() model_m.add(Reshape((TIME_PERIODS, num_sensors), input_shape=(input_shape,))) model_m.add(Conv1D(100, 10, activation='relu', input_shape=(TIME_PERIODS, num_sensors))) model_m.add(Conv1D(100, 10, activation='relu')) model_m.add(MaxPooling1D(3)) model_m.add(Conv1D(160, 10, activation='relu')) model_m.add(Conv1D(160, 10, activation='relu')) model_m.add(GlobalAveragePooling1D()) model_m.add(Dropout(0.5)) model_m.add(Dense(num_classes, activation='softmax')) print(model_m.summary())
运行这段代码将得到以下深度神经网络:
_________________________________________________________________ Layer (type) Output Shape Param # ================================================================= reshape_45 (Reshape) (None, 80, 3) 0 _________________________________________________________________ conv1d_145 (Conv1D) (None, 71, 100) 3100 _________________________________________________________________ conv1d_146 (Conv1D) (None, 62, 100) 100100 _________________________________________________________________ max_pooling1d_39 (MaxPooling (None, 20, 100) 0 _________________________________________________________________ conv1d_147 (Conv1D) (None, 11, 160) 160160 _________________________________________________________________ conv1d_148 (Conv1D) (None, 2, 160) 256160 _________________________________________________________________ global_average_pooling1d_29 (None, 160) 0 _________________________________________________________________ dropout_29 (Dropout) (None, 160) 0 _________________________________________________________________ dense_29 (Dense) (None, 6) 966 ================================================================= Total params: 520,486 Trainable params: 520,486 Non-trainable params: 0 _________________________________________________________________ None
让我们深入每一层,看看发生了什幺:
输入数据:
数据经过预处理,每个数据记录包含80个时间片(数据以20赫兹采样率记录,因此每个时间间隔包含加速度计读取的4秒数据)。在每个时间间隔内,存储x轴、y轴和z轴的三个加速度计值。这就得到了一个80×3矩阵。由于我通常在iOS中使用神经网络,所以数据必须作为长度为240的平面向量传递到神经网络中。网络的第一层必须将其重塑为原来的形状,即80 x 3。
第一1D CNN层:
一个高度为10(也称为内核大小)的过滤器(或也称为特征检测器)。只定义一个过滤器将允许神经网络学习第一层中的一个单一特征。这可能还不够,因此我们将定义100个过滤器。这允许我们在网络的第一层训练100个不同的特征。第一个神经网络层的输出是一个71 x 100的神经元矩阵。输出矩阵的每一列包含一个过滤器的权重。根据定义的内核大小和考虑输入矩阵的长度,每个过滤器将包含71个权重。
第二个一维CNN层
:第一个CNN的结果将被输入第二个CNN层。我们将再次定义100个不同的过滤器在这个级别上进行训练。按照与第一层相同的逻辑,输出矩阵的大小将为62 x 100。
最大池化层
: 为了降低输出的复杂性,防止数据过拟合,在CNN层之后,通常使用pooling层。在我们的例子中,我们选择的大小为3。这意味着该层的输出矩阵的大小只有输入矩阵的三分之一。
第三和第四个一维CNN层:
下面是一维CNN层的另一个序列,以便学习更高层次的特征。这两层之后的输出矩阵是一个2×160矩阵。
平均池化层:
多一个池化层,进一步避免过拟合。这次不是取最大值,而是取神经网络中两个权值的平均值。输出矩阵的大小为1 x 160个神经元。每个特征检测器在这一层的神经网络中只剩下一个权值。
Dropout层:
Dropout层将随机分配权值0给网络中的神经元。因为我们选择了0.5的概率,50%的神经元将获得0权值。通过这种操作,网络对数据中较小的变化不那幺敏感。因此,它应该进一步提高我们对不可见数据的准确性。这一层的输出仍然是1×160个神经元矩阵。
使用Softmax激活的全连接层:
最后一层将高度160的向量减少到6,因为我们要预测6个类(“慢跑”、“坐下”、“行走”、“站立”、“上楼”、“下楼”)。这由一个矩阵乘法完成。使用Softmax作为激活函数。它使神经网络的所有6个输出加起来等于1。因此,输出值将表示这六个类中的每个类的概率。
神经网络的训练与测试
下面是训练模型的Python代码,批处理大小为400,训练和验证拆分为80到20。
callbacks_list = [ keras.callbacks.ModelCheckpoint( filepath='best_model.{epoch:02d}-{val_loss:.2f}.h5', monitor='val_loss', save_best_only=True), keras.callbacks.EarlyStopping(monitor='acc', patience=1) ] model_m.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) BATCH_SIZE = 400 EPOCHS = 50 history = model_m.fit(x_train, y_train, batch_size=BATCH_SIZE, epochs=EPOCHS, callbacks=callbacks_list, validation_split=0.2, verbose=1)
该模型对训练数据的精度达到97%。
... Epoch 9/50 16694/16694 [==============================] - 16s 973us/step - loss: 0.0975 - acc: 0.9683 - val_loss: 0.7468 - val_acc: 0.8031 Epoch 10/50 16694/16694 [==============================] - 17s 989us/step - loss: 0.0917 - acc: 0.9715 - val_loss: 0.7215 - val_acc: 0.8064 Epoch 11/50 16694/16694 [==============================] - 17s 1ms/step - loss: 0.0877 - acc: 0.9716 - val_loss: 0.7233 - val_acc: 0.8040 Epoch 12/50 16694/16694 [==============================] - 17s 1ms/step - loss: 0.0659 - acc: 0.9802 - val_loss: 0.7064 - val_acc: 0.8347 Epoch 13/50 16694/16694 [==============================] - 17s 1ms/step - loss: 0.0626 - acc: 0.9799 - val_loss: 0.7219 - val_acc: 0.8107
根据测试数据运行它,准确率为92%
Accuracy on test data: 0.92 Loss on test data: 0.39
考虑到我们使用的是标准1D CNN模型之一。我们的模型在精确度、召回率和f1评分上也得分很高,,这是一个不错的数字
precision recall f1-score support 0 0.76 0.78 0.77 650 1 0.98 0.96 0.97 1990 2 0.91 0.94 0.92 452 3 0.99 0.84 0.91 370 4 0.82 0.77 0.79 725 5 0.93 0.98 0.95 2397 avg / total 0.92 0.92 0.92 6584
以下是对这些分数含义的简要回顾:
准确度:
正确预测结果与所有预测结果之和之间的比率。
(
(
T
P
+
T
N
)
/
(
T
P
+
T
N
+
F
P
+
F
N
)
)
((TP + TN) / (TP + TN + FP + FN))
(
(
T
P
+
T
N
)
/
(
T
P
+
T
N
+
F
P
+
F
N
)
)
精确度:
当模型预测为正例时,所有正确的预测除以所有正例。(TP/ (TP + FP))
Recall:
在所有可能的正例中,模型确定了多少正例呢?真正的正例除以所有实际的正例。
(
T
P
/
(
T
P
+
F
N
)
)
(TP / (TP +FN))
(
T
P
/
(
T
P
+
F
N
)
)
F1-score:
精确度和召回率的加权平均值。
(
2
x
r
e
c
a
l
l
x
p
r
e
c
i
s
i
o
n
/
(
r
e
c
a
l
l
+
p
r
e
c
i
s
i
o
n
)
)
(2 x recall x precision / (recall + precision))
(
2
x
r
e
c
a
l
l
x
p
r
e
c
i
s
i
o
n
/
(
r
e
c
a
l
l
+
p
r
e
c
i
s
i
o
n
)
)
与测试数据相关联的混淆矩阵如下所示。
在本文中,您已经看到了一个示例,如何使用1D CNN训练网络,以便基于智能手机的一组给定加速度计数据预测用户行为。完整的Python代码可以在github
上找到。
# Compatibility layer between Python 2 and Python 3 from __future__ import print_function from matplotlib import pyplot as plt import numpy as np import pandas as pd import seaborn as sns from scipy import stats from sklearn import metrics from sklearn.metrics import classification_report from sklearn import preprocessing import keras from keras.models import Sequential from keras.layers import Dense, Dropout, Flatten, Reshape, GlobalAveragePooling1D from keras.layers import Conv2D, MaxPooling2D, Conv1D, MaxPooling1D from keras.utils import np_utils # %% def feature_normalize(dataset): mu = np.mean(dataset, axis=0) sigma = np.std(dataset, axis=0) return (dataset - mu)/sigma def show_confusion_matrix(validations, predictions): matrix = metrics.confusion_matrix(validations, predictions) plt.figure(figsize=(6, 4)) sns.heatmap(matrix, cmap="coolwarm", linecolor='white', linewidths=1, xticklabels=LABELS, yticklabels=LABELS, annot=True, fmt="d") plt.title("Confusion Matrix") plt.ylabel("True Label") plt.xlabel("Predicted Label") plt.show() def show_basic_dataframe_info(dataframe, preview_rows=20): """ This function shows basic information for the given dataframe Args: dataframe: A Pandas DataFrame expected to contain data preview_rows: An integer value of how many rows to preview Returns: Nothing """ # Shape and how many rows and columns print("Number of columns in the dataframe: %i" % (dataframe.shape[1])) print("Number of rows in the dataframe: %i " % (dataframe.shape[0])) print("First 20 rows of the dataframe: ") # Show first 20 rows print(dataframe.head(preview_rows)) print(" Description of dataframe: ") # Describe dataset like mean, min, max, etc. # print(dataframe.describe()) def read_data(file_path): """ This function reads the accelerometer data from a file Args: file_path: URL pointing to the CSV file Returns: A pandas dataframe """ column_names = ['user-id', 'activity', 'timestamp', 'x-axis', 'y-axis', 'z-axis'] df = pd.read_csv(file_path, header=None, names=column_names) # Last column has a ";" character which must be removed ... df['z-axis'].replace(regex=True, inplace=True, to_replace=r';', value=r'') # ... and then this column must be transformed to float explicitly df['z-axis'] = df['z-axis'].apply(convert_to_float) # This is very important otherwise the model will not fit and loss # will show up as NAN df.dropna(axis=0, how='any', inplace=True) return df def convert_to_float(x): try: return np.float(x) except: return np.nan # Not used right now def feature_normalize(dataset): mu = np.mean(dataset, axis=0) sigma = np.std(dataset, axis=0) return (dataset - mu)/sigma def plot_axis(ax, x, y, title): ax.plot(x, y) ax.set_title(title) ax.xaxis.set_visible(False) ax.set_ylim([min(y) - np.std(y), max(y) + np.std(y)]) ax.set_xlim([min(x), max(x)]) ax.grid(True) def plot_activity(activity, data): fig, (ax0, ax1, ax2) = plt.subplots(nrows=3, figsize=(15, 10), sharex=True) plot_axis(ax0, data['timestamp'], data['x-axis'], 'x-axis') plot_axis(ax1, data['timestamp'], data['y-axis'], 'y-axis') plot_axis(ax2, data['timestamp'], data['z-axis'], 'z-axis') plt.subplots_adjust(hspace=0.2) fig.suptitle(activity) plt.subplots_adjust(top=0.90) plt.show() def create_segments_and_labels(df, time_steps, step, label_name): """ This function receives a dataframe and returns the reshaped segments of x,y,z acceleration as well as the corresponding labels Args: df: Dataframe in the expected format time_steps: Integer value of the length of a segment that is created Returns: reshaped_segments labels: """ # x, y, z acceleration as features N_FEATURES = 3 # Number of steps to advance in each iteration (for me, it should always # be equal to the time_steps in order to have no overlap between segments) # step = time_steps segments = [] labels = [] for i in range(0, len(df) - time_steps, step): xs = df['x-axis'].values[i: i + time_steps] ys = df['y-axis'].values[i: i + time_steps] zs = df['z-axis'].values[i: i + time_steps] # Retrieve the most often used label in this segment label = stats.mode(df[label_name][i: i + time_steps])[0][0] segments.append([xs, ys, zs]) labels.append(label) # Bring the segments into a better shape reshaped_segments = np.asarray(segments, dtype= np.float32).reshape(-1, time_steps, N_FEATURES) labels = np.asarray(labels) return reshaped_segments, labels # %% # ------- THE PROGRAM TO LOAD DATA AND TRAIN THE MODEL ------- # Set some standard parameters upfront pd.options.display.float_format = '{:.1f}'.format sns.set() # Default seaborn look and feel plt.style.use('ggplot') print('keras version ', keras.__version__) LABELS = ["Downstairs", "Jogging", "Sitting", "Standing", "Upstairs", "Walking"] # The number of steps within one time segment TIME_PERIODS = 80 # The steps to take from one segment to the next; if this value is equal to # TIME_PERIODS, then there is no overlap between the segments STEP_DISTANCE = 40 # %% print(" --- Load, inspect and transform data --- ") # Load data set containing all the data from csv df = read_data('Data/WISDM_ar_v1.1_raw.txt') # Describe the data show_basic_dataframe_info(df, 20) df['activity'].value_counts().plot(kind='bar', title='Training Examples by Activity Type') plt.show() df['user-id'].value_counts().plot(kind='bar', title='Training Examples by User') plt.show() for activity in np.unique(df["activity"]): subset = df[df["activity"] == activity][:180] plot_activity(activity, subset) # Define column name of the label vector LABEL = "ActivityEncoded" # Transform the labels from String to Integer via LabelEncoder le = preprocessing.LabelEncoder() # Add a new column to the existing DataFrame with the encoded values df[LABEL] = le.fit_transform(df["activity"].values.ravel()) # %% print(" --- Reshape the data into segments --- ") # Differentiate between test set and training set df_test = df[df['user-id'] > 28] df_train = df[df['user-id'] <= 28] # Normalize features for training data set df_train['x-axis'] = feature_normalize(df['x-axis']) df_train['y-axis'] = feature_normalize(df['y-axis']) df_train['z-axis'] = feature_normalize(df['z-axis']) # Round in order to comply to NSNumber from iOS df_train = df_train.round({ 'x-axis': 6, 'y-axis': 6, 'z-axis': 6}) # Reshape the training data into segments # so that they can be processed by the network x_train, y_train = create_segments_and_labels(df_train, TIME_PERIODS, STEP_DISTANCE, LABEL) # %% print(" --- Reshape data to be accepted by Keras --- ") # Inspect x data print('x_train shape: ', x_train.shape) # Displays (20869, 40, 3) print(x_train.shape[0], 'training samples') # Displays 20869 train samples # Inspect y data print('y_train shape: ', y_train.shape) # Displays (20869,) # Set input & output dimensions num_time_periods, num_sensors = x_train.shape[1], x_train.shape[2] num_classes = le.classes_.size print(list(le.classes_)) # Set input_shape / reshape for Keras # Remark: acceleration data is concatenated in one array in order to feed # it properly into coreml later, the preferred matrix of shape [40,3] # cannot be read in with the current version of coreml (see also reshape # layer as the first layer in the keras model) input_shape = (num_time_periods*num_sensors) x_train = x_train.reshape(x_train.shape[0], input_shape) print('x_train shape:', x_train.shape) # x_train shape: (20869, 120) print('input_shape:', input_shape) # input_shape: (120) # Convert type for Keras otherwise Keras cannot process the data x_train = x_train.astype("float32") y_train = y_train.astype("float32") # %% # One-hot encoding of y_train labels (only execute once!) y_train = np_utils.to_categorical(y_train, num_classes) print('New y_train shape: ', y_train.shape) # (4173, 6) # %% print(" --- Create neural network model --- ") # 1D CNN neural network model_m = Sequential() model_m.add(Reshape((TIME_PERIODS, num_sensors), input_shape=(input_shape,))) model_m.add(Conv1D(100, 10, activation='relu', input_shape=(TIME_PERIODS, num_sensors))) model_m.add(Conv1D(100, 10, activation='relu')) model_m.add(MaxPooling1D(3)) model_m.add(Conv1D(160, 10, activation='relu')) model_m.add(Conv1D(160, 10, activation='relu')) model_m.add(GlobalAveragePooling1D()) model_m.add(Dropout(0.5)) model_m.add(Dense(num_classes, activation='softmax')) print(model_m.summary()) # Accuracy on training data: 99% # Accuracy on test data: 91% # %% print(" --- Fit the model --- ") # The EarlyStopping callback monitors training accuracy: # if it fails to improve for two consecutive epochs, # training stops early callbacks_list = [ keras.callbacks.ModelCheckpoint( filepath='best_model.{epoch:02d}-{val_loss:.2f}.h5', monitor='val_loss', save_best_only=True), keras.callbacks.EarlyStopping(monitor='acc', patience=1) ] model_m.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) # Hyper-parameters BATCH_SIZE = 400 EPOCHS = 50 # Enable validation to use ModelCheckpoint and EarlyStopping callbacks. history = model_m.fit(x_train, y_train, batch_size=BATCH_SIZE, epochs=EPOCHS, callbacks=callbacks_list, validation_split=0.2, verbose=1) # %% print(" --- Learning curve of model training --- ") # summarize history for accuracy and loss plt.figure(figsize=(6, 4)) plt.plot(history.history['acc'], "g--", label="Accuracy of training data") plt.plot(history.history['val_acc'], "g", label="Accuracy of validation data") plt.plot(history.history['loss'], "r--", label="Loss of training data") plt.plot(history.history['val_loss'], "r", label="Loss of validation data") plt.title('Model Accuracy and Loss') plt.ylabel('Accuracy and Loss') plt.xlabel('Training Epoch') plt.ylim(0) plt.legend() plt.show() #%% print(" --- Check against test data --- ") # Normalize features for training data set df_test['x-axis'] = feature_normalize(df_test['x-axis']) df_test['y-axis'] = feature_normalize(df_test['y-axis']) df_test['z-axis'] = feature_normalize(df_test['z-axis']) df_test = df_test.round({ 'x-axis': 6, 'y-axis': 6, 'z-axis': 6}) x_test, y_test = create_segments_and_labels(df_test, TIME_PERIODS, STEP_DISTANCE, LABEL) # Set input_shape / reshape for Keras x_test = x_test.reshape(x_test.shape[0], input_shape) x_test = x_test.astype("float32") y_test = y_test.astype("float32") y_test = np_utils.to_categorical(y_test, num_classes) score = model_m.evaluate(x_test, y_test, verbose=1) print(" Accuracy on test data: %0.2f" % score[1]) print(" Loss on test data: %0.2f" % score[0]) # %% print(" --- Confusion matrix for test data --- ") y_pred_test = model_m.predict(x_test) # Take the class with the highest probability from the test predictions max_y_pred_test = np.argmax(y_pred_test, axis=1) max_y_test = np.argmax(y_test, axis=1) show_confusion_matrix(max_y_test, max_y_pred_test) # %% print(" --- Classification report for test data --- ") print(classification_report(max_y_test, max_y_pred_test))
英文原文:
Be First to Comment