日志模式识别

日志样例

1
2
3
4
5
6
7
8
9
10
2021-03-11 09:23:17,195 [OCKey:HlTOgWeM-LYFkTMn2wsXcEd] DEBUG com.dawninfotek.base.security.WebAppSafeGuard - BaseWebAppSafeGuard.class::key = beneAcctNo4 value = 1
2021-03-11 09:23:17,849 [OCKey:zQXeCzqsWdY9_GliHIoAIpI] DEBUG com.dawninfotek.base.action.BaseDispatchAction - com.dawninfotek.easybanking.web.pr.olbfinance.polbfinancemyfinancenew.POLBFinanceMyFinanceAction::Base BaseDispatchAction - link6::37003747
2021-03-11 09:23:18,635 [OCKey:cSrKSPklI9-juH5TB1VdT6N] DEBUG com.dawninfotek.base.dac.ConnectionFactory - java.lang.Class::Getting a connection from dataSource
2021-03-11 09:23:18,635 [OCKey:cSrKSPklI9-juH5TB1VdT6N] DEBUG com.dawninfotek.base.dac.ConnectionFactory - java.lang.Class::got connection com.ibm.ws.rsadapter.jdbc.WSJdbcConnection@3bfa1e18 from dataSource: auto Commit = false
2021-03-11 09:23:18,635 [OCKey:cSrKSPklI9-juH5TB1VdT6N] DEBUG java.sql.Connection - {conn-12085845} Connection
2021-03-11 09:23:18,635 [OCKey:cSrKSPklI9-juH5TB1VdT6N] DEBUG java.sql.Connection - {conn-12085845} Preparing Statement: select count(T.CUSTNO) from PERCRMPRODUCTDUEDATE t where t.CUSTNO=? and t.READFLAG = '0'
2021-03-11 09:23:18,635 [OCKey:cSrKSPklI9-juH5TB1VdT6N] DEBUG java.sql.PreparedStatement - {pstm-12085846} Executing Statement: select count(T.CUSTNO) from PERCRMPRODUCTDUEDATE t where t.CUSTNO=? and t.READFLAG = '0'
2021-03-11 09:23:18,636 [OCKey:cSrKSPklI9-juH5TB1VdT6N] DEBUG java.sql.PreparedStatement - {pstm-12085846} Parameters: [42300205]
2021-03-11 09:23:18,636 [OCKey:cSrKSPklI9-juH5TB1VdT6N] DEBUG java.sql.PreparedStatement - {pstm-12085846} Types: [java.lang.String]
2021-03-11 09:23:18,636 [OCKey:cSrKSPklI9-juH5TB1VdT6N] DEBUG com.dawninfotek.easybanking.base.process.EasyBankingProcessManager - com.dawninfotek.easybanking.base.process.EasyBankingProcessManager::Start to commit transaction for DAC:EasyBankingSample

日志模式

使用 Drain3 算法对日志进行模式提取,共提取得到 190 条日志模式,只展示部分结果。

1
2
3
4
5
6
7
8
9
10
{"group_id": 1, "cluster_id": 1, "cluster_size": 62086, "is_abnormal": false, "template_mined": "<MODULE> key <*> value", "template_tokenize": "modul key valu"}
{"group_id": 2, "cluster_id": 2, "cluster_size": 22856, "is_abnormal": false, "template_mined": "<CLASS> key <KEY> value <VALUE>", "template_tokenize": "class key key valu valu"}
{"group_id": 3, "cluster_id": 3, "cluster_size": 2942, "is_abnormal": false, "template_mined": "<MODULE> Base BaseDispatchAction <*> <*>", "template_tokenize": "modul base basedispatchact"}
{"group_id": 3, "cluster_id": 53, "cluster_size": 147, "is_abnormal": false, "template_mined": "<MODULE> Base BaseDispatchAction <*>", "template_tokenize": "modul base basedispatchact"}
{"group_id": 4, "cluster_id": 4, "cluster_size": 16820, "is_abnormal": false, "template_mined": "<MODULE> Getting a connection from dataSource", "template_tokenize": "modul connect datasourc"}
{"group_id": 5, "cluster_id": 5, "cluster_size": 16820, "is_abnormal": false, "template_mined": "<MODULE> got connection com.ibm.ws.rsadapter.jdbc.WSJdbcConnection <*> from dataSource auto Commit false", "template_tokenize": "modul connect ibm rsadapt jdbc wsjdbcconnect datasourc auto commit fals"}
{"group_id": 6, "cluster_id": 6, "cluster_size": 23956, "is_abnormal": false, "template_mined": "<CONN> Connection", "template_tokenize": "conn connect"}
{"group_id": 7, "cluster_id": 7, "cluster_size": 22323, "is_abnormal": false, "template_mined": "<CONN> Preparing Statement <SQL>", "template_tokenize": "conn prepar statement sql"}
{"group_id": 8, "cluster_id": 8, "cluster_size": 22323, "is_abnormal": false, "template_mined": "<PSTM> Executing Statement <SQL>", "template_tokenize": "pstm execut statement sql"}
{"group_id": 9, "cluster_id": 9, "cluster_size": 22322, "is_abnormal": false, "template_mined": "<PSTM> Parameters <PARAMS>", "template_tokenize": "pstm paramet param"}

字段解释

1
2
3
4
5
6
group_id: 模式分组 ID,使用 Levenshtein 计算相似度,阈值为 0.8,大于等于阈值的模式会被分配到相同 group_id 之下。
cluster_id:模式 ID。
cluster_size: 模式匹配到的日志数量。
is_abnormal: 根据通用的异常关键字,预判模式的异常。
template_mined: 学习到的日志模式字符串。
template_tokenize: 日志模式字符串经过一系列处理得到的字符串,后续日志模式向量计算基于该字符串。

预置的通用异常关键字

1
abnormal_chars = ['error', 'fail', 'failed', 'exception', 'invalid', 'missing', 'duplicate', 'unable']

日志模式字符串处理过程

提取中英文单词

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def is_lower_alphabet(char):
"""小写字母"""
return '\u0061' <= char <= '\u007A'


def is_upper_alphabet(char):
"""大写字母"""
return '\u0041' <= char <= '\u005A'


def is_alphabet(char):
"""英文字母"""
return is_lower_alphabet(char) or is_upper_alphabet(char)

def is_chinese(char):
"""中文"""
return '\u4e00' <= char <= '\u9fff'

def segment(text):
"""中英文分词"""
chars = []
for char in text:
if not is_alphabet(char) and not is_chinese(char)::
if chars:
word = ''.join(chars)
yield word
chars = []
else:
chars.append(char)

if chars:
word = ''.join(chars)
yield word

中文分词

如果提取得到的单词是中文,进一步将中文进行分词。

1
2
3
4
import hanlp
HanLP = hanlp.load(hanlp.pretrained.mtl.CLOSE_TOK_POS_NER_SRL_DEP_SDP_CON_ELECTRA_SMALL_ZH)

tokens = HanLP('读取限流控制参数值', tasks='tok/fine')['tok/fine'] # ['读取', '限流', '控制', '参数值']

统一转化为小写

1
word = str.lower(word)

词干提取

词干提取(Stemming)是抽取词的词干或词根形式(不一定能够表达完整语义)。

1
2
3
4
from nltk.stem.snowball import SnowballStemmer
stemmer= SnowballStemmer('english')

word = stemmer.stem('Connection') # connect

词形还原

词形还原(Lemmatization),是把一个任何形式的语言词汇还原为一般形式(能表达完整语义)。

1
2
3
4
5
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()

word = lemmatizer.lemmatize('rooms') # room

去停用词

去除分词结果中的中英文停用词,这些词没有意义或者出现的频率很低,比如中文的 “的”、“是”、“啊”,英文的 “the”, “an”, “their”。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 英文停用词: https://github.com/stopwords-iso/stopwords-en
# 中文停用词: https://github.com/goto456/stopwords

from pathlib import Path

def load_stopwords():
english_path = Path(__file__).parent / 'stopwords' / 'english'
chinese_path = Path(__file__).parent / 'stopwords' / 'chinese'
with english_path.open('rt', encoding='utf-8') as en, chinese_path.open('rt', encoding='utf-8') as zh:
stopwords = {line.strip() for line in en} | {line.strip() for line in zh}

return stopwords

模式向量计算

TF-IDF 计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from sklearn.feature_extraction.text import TfidfVectorizer

# 读取日志模版
template = pd.read_csv('easybanking.csv')
template.fillna('', inplace=True)

# TF-IDF 计算
corpus = template['template_tokenize'].to_list()
vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform(corpus)

xarr = X.toarray()
words = vectorizer.get_feature_names() # 这里得到的词汇结果可能会被去掉一些频率很低的词

词向量计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import fasttext
import fasttext.util

fasttext.FastText.eprint = lambda x: None

# 加载预训练模型
ften = fasttext.load_model('cc.en.300.bin')
ftzh = fasttext.load_model('cc.zh.300.bin')

# 词向量默认维度是300,可以降维。
ften.get_dimension() # 300
fasttext.util.reduce_model(ften, 100)
ften.get_dimension() # 100


class WordVectorDict(dict):

def __missing__(self, key):
# 防止有些词汇不存在
if any(map(is_chinese, key)):
return ftzh.get_word_vector(key)
else:
return ften.get_word_vector(key)

word_vectors = WordVectorDict()

模式向量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from collections import defaultdict

# 模式向量计算
template_vectors = {}
for row, arr in zip(template.to_dict(orient='records'), xarr):
lookup = defaultdict(int, zip(words, arr)) # 防止有些词汇不存在
tokens = row['template_tokenize'].split()
tv = np.zeros(300)
for token in tokens:
w = lookup[token] # TF-IDF
v = word_vectors[token] # 词向量
wv = w * v
tv += wv
tv = tv if len(tokens) == 0 else tv / len(tokens)
template_vectors[row['cluster_id']] = tv

模式相似度

1
2
3
4
5
from sklearn.metrics.pairwise import cosine_similarity

# 余弦相似度
cs = cosine_similarity(template_vector_1.reshape(1,-1), template_vector_2.reshape(1,-1))

评估指标

  1. TP:True Positive,预测为正,实际为正。
  2. TN:True Negative,预测为负,实际为负。
  3. FP:False Positive,预测为正,实际为负。
  4. FN:False Negative,预测为负,实际为正。
  5. 准确率(Accuracy):ACC = (TP+TN) / (TP+TN+FP+FN)
  6. 错误率(Error Rate):ER = (FP+FN) / (TP+TN+FP+FN)
  7. 灵敏度(sensitive):sensitive = TP/P,表示的是所有正例中被分对的比例,衡量了分类器对正例的识别能力。。
  8. 特效度(specificity):specificity = TN/N,表示的是所有负例中被分对的比例,衡量了分类器对负例的识别能力。
  9. 精确率、精度(Precision):Precesion = TP / (TP+FP)。
  10. 召回率(Recall):Recall = TP / (TP+FN)。
  11. 综合评价指标(F-Measure):F1 = (2 * P * R) / (P + R)。
  12. 计算速度:分类器训练和预测需要的时间。
  13. 鲁棒性:处理缺失值和异常值的能力。
  14. 可扩展性:处理大数据集的能力。
  15. 可解释性:分类器的预测标准的可理解性,像决策树产生的规则就是很容易理解的,而神经网络的一堆参数就不好理解,我们只好把它看成一个黑盒子。
  16. ROC曲线:ROC(Receiver Operating Characteristic)曲线是以假正率(FP_rate)和真正率(TP_rate)为轴的曲线。
  17. PR曲线:PR(Precision-Recall)曲线。

基于模式概率分布的异常检测

在没有得到日志模版向量表示之前,使用模版的索引对所有日志模版进行 One-Hot 编码,所以对于每一行日志都有一个其对应的模版编码,也可以叫模版概率分布。

对原始日志处理成模版的概率分布之后,我们可以训练一个多分类模型,预测下一个日志的模版概率分布,如果真实日志的模版不在预测的 topK 中,则认为日志是异常的。

构造数据集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import numpy as np
import pandas as pd
from sklearn.preprocessing import label_binarize
from sklearn.model_selection import train_test_split


def create_window_dataset(log_parsed_path, template_path, window_size, split_ratio=0.8):

log_parsed = pd.read_csv(log_parsed_path)
templates = pd.read_csv(template_path)['cluster_id'].to_list()

def fn(dataset, look_back, look_interval=0, look_forward=1):
x, y = [], []
for i in range(len(dataset)-look_back-look_interval-look_forward):
back = dataset[i:i+look_back,]
forward = dataset[i+look_back+look_interval:i+look_back+look_interval+look_forward,]
back_x = label_binarize(back, classes=templates)
forward_y = label_binarize(forward, classes=templates)[0]
x.append(back_x)
y.append(forward_y)
return np.array(x), np.array(y)

dataset = log_parsed[['cluster_id']].values
x, y = fn(dataset, window_size)

x_train, x_test, y_train, y_test = train_test_split(x, y, train_size=split_ratio, shuffle=False, stratify=None)
return (x_train, y_train), (x_test, y_test)

模型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from tensorflow.keras import layers, models, optimizers


def build_model(input_shape, num_classes):
inputs = layers.Input(shape=input_shape)

x = layers.Bidirectional(layers.LSTM(64, return_sequences=True))(inputs)
x = layers.Bidirectional(layers.LSTM(64))(x)

outputs = layers.Dense(num_classes, activation='softmax')(x)

model = models.Model(inputs=inputs, outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer=optimizers.Adam(), metrics=['accuracy'])
return model

模型训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 构造数据集
n_candidates = 10
window_size = 10
log_parsed_path = 'easybanking_parsed_1w.csv'
template_path = 'easybanking_template.csv'
(x_train, y_train), (x_test, y_test) = create_window_dataset(log_parsed_path, template_path, window_size)

# 定义模型
num_classes = y_train.shape[1]
input_shape = (x_train.shape[1], x_train.shape[2])
model = build_model(input_shape, num_classes)
model.summary()

# 模型训练
call_backs = [
callbacks.ModelCheckpoint("best_model.h5", save_best_only=True, monitor="val_loss"),
callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=20, min_lr=0.0001),
callbacks.EarlyStopping(monitor="val_loss", patience=50, verbose=1),
]
history = model.fit(x_train, y_train, batch_size=32, epochs=20, callbacks=call_backs, validation_split=0.2, shuffle=True, verbose=1)

# 评估模型
best_model = models.load_model("best_model.h5")
y_pred = best_model.predict(x_test)
y_true = np.argmax(y_test, axis=1)
y_pred = np.argmax(y_pred, axis=1)
# 我们只知道日志模式预测的正确与否,但是不知道真实这条日志是否为异常,所以无法计算 TP、TN、FP 和 FN。
# 简单起见计算全局准确率和召回率,这里只取 topK K = 1。真正需要提高准确率等需要在模型参数和日志数据及其处理等方面调节。
precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='micro')
# (0.7737737737737738, 0.7737737737737738, 0.7737737737737739, None)

缺点

  1. 该方案对日志模板进行编码时只使用模板索引,这可能导致丢失有价值的信息,因为模板索引不能揭示日志的语义关系。
  2. 真实系统不断升级和更新,日志的模式可能会相应地漂移,该方案难以应付不断变化的、有噪声的日志数据。

基于有监督二分类的异常检测

我们得到了日志模版的向量表示,如果我们的数据集被人工标注了异常,我们很容易训练一个二分类模型,直接输出异常与否。

由于应用日志没有异常标注,所以此方案做不了,为了实验可以使用公开的 HDFS 日志,这份日志有异常标注。

这种输入是模式向量,输出是正常异常的概率,是一种粗粒度的异常检测方法,就是假设该模式是异常的,则所属该模式的日志均为异常。

这种方法感觉类似垃圾邮件分类,垃圾短信分类。

针对 HDFS 日志的异常检测,其实特征的构造是另一种做法,HDFS 的每一条日志都有一个 BlockID,根据 BlockID 可以提取出一个日志序列,而 BlockID 决定了这个日志序列的异常与否,换句话说,针对 HDFS 日志,单条日志没有正常异常的概念,只有根据 BlockID 提取的日志序列才有正常异常之分。就比如 “open file xxx” 和 “close file xxx”,单条来说都是正常的,但是结合到一起来看缺一不可,如果某个日志序列缺少了 open 或者 close 则认为文件打开异常了。所以 HDFS 日志的特征输入就是单个 BlockID 所属的日志序列,输出就是正常异常。

然而这种做法不具有普适性,换一个数据源日志则没法套用,只能说是该方法针对 HDFS 日志的异常检测具有比较好的效果。

构造数据集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def create_label_dataset(log_parsed_path, template_path, split_ratio=0.8):
import fasttext
fasttext.FastText.eprint = lambda x: None
from sklearn.feature_extraction.text import TfidfVectorizer

ften = fasttext.load_model('cc.en.300.bin')
ftzh = fasttext.load_model('cc.zh.300.bin')

class WordVectorDict(dict):

def __missing__(self, key):
if any(map(is_chinese, key)):
return ftzh.get_word_vector(key)
else:
return ften.get_word_vector(key)

# 读取日志模版
template = pd.read_csv(template_path)
template.fillna('', inplace=True)

# TF-IDF 计算
corpus = template['template_tokenize'].to_list()
vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform(corpus)
xarr = X.toarray()

# 词向量计算
words = vectorizer.get_feature_names()
word_vectors = WordVectorDict()

# 模版向量计算
template_vectors = {}
for row, arr in zip(template.to_dict(orient='records'), xarr):
lookup = defaultdict(int, zip(words, arr))
tokens = row['template_tokenize'].split()
tv = np.zeros(300)
for token in tokens:
w = lookup[token] # TF-IDF
v = word_vectors[token] # 词向量
wv = w * v
tv += wv
tv = tv if len(tokens) == 0 else tv / len(tokens)
template_vectors[row['cluster_id']] = tv

# 读取日志解析结果
log_vectors = []
log_parsed = pd.read_csv(log_parsed_path)
for cluster_id in log_parsed['cluster_id'].to_list():
tv = template_vectors[cluster_id]
log_vectors.append(tv)

log_labels = log_parsed['label'].to_list()

x = np.array(log_vectors)
x = np.reshape(x, (x.shape[0], 1, x.shape[1]))
y = np.array(log_labels)
x_train, x_test, y_train, y_test = train_test_split(x, y, train_size=split_ratio, shuffle=False, stratify=None)

return (x_train, y_train), (x_test, y_test)

模型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from tensorflow.keras import layers, models, optimizers


def build_model(input_shape, num_classes):
inputs = layers.Input(shape=input_shape)

x = layers.Bidirectional(layers.LSTM(64, return_sequences=True))(inputs)
x = layers.Bidirectional(layers.LSTM(64))(x)

outputs = layers.Dense(num_classes, activation='sigmoid')(x)

model = models.Model(inputs=inputs, outputs=outputs)
model.compile(loss='binary_crossentropy', optimizer=optimizers.Adam(), metrics=['accuracy'])
return model

模型训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 构造数据集
log_parsed_path = 'HDFS_parsed_10w.csv'
template_path = 'HDFS_template.csv'
(x_train, y_train), (x_test, y_test) = create_label_dataset(log_parsed_path, template_path)

# 定义模型
num_classes = 1
input_shape = (x_train.shape[1], x_train.shape[2])
model = build_model(input_shape, num_classes)
model.summary()

# 模型训练
call_backs = [
callbacks.ModelCheckpoint("best_model.h5", save_best_only=True, monitor="val_loss"),
callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=20, min_lr=0.0001),
callbacks.EarlyStopping(monitor="val_loss", patience=50, verbose=1),
]
history = model.fit(x_train, y_train, batch_size=32, epochs=20, callbacks=call_backs, validation_split=0.2, shuffle=True, verbose=1)

# 评估模型
best_model = models.load_model("best_model.h5")
y_pred = best_model.predict(x_test)
y_pred = y_pred[:,0].astype(int)
precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary')
# (0.9553976053045843, 1.0, 0.9771901149032715, None)
# 这里虽然有监督的二分类模型取得了 95% 以上的准确度,但是数据集不合理,数据集应该包含 50% 的正常日志和 50% 的异常日志再加以乱序,这样训练出来的结果是比较合理的。

# 画图
metric = "accuracy"
plt.figure()
plt.plot(history.history[metric])
plt.plot(history.history["val_" + metric])
plt.title("model " + metric)
plt.ylabel(metric, fontsize="large")
plt.xlabel("epoch", fontsize="large")
plt.legend(["train", "val"], loc="best")
plt.show()
plt.close()

HDFS model accuracy

基于预测模式向量的异常检测

由于没有异常标注数据,所以需要换一种思路进行异常检测,借鉴对时间序列的预测,我们可以认为日志是一种时间序列,此方案中我们训练的模型不是分类模型,而是回归模型,预测下一个日志的模版向量表示,与真实日志的模版向量进行相似度计算,计算出的相似度低于某个阈值则认为该日志是异常的,相似度采用余弦相似度。

构造数据集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def create_vector_dataset(log_parsed_path, template_path, window_size, split_ratio=0.8):
import fasttext
fasttext.FastText.eprint = lambda x: None
from sklearn.feature_extraction.text import TfidfVectorizer

ften = fasttext.load_model('cc.en.300.bin')
ftzh = fasttext.load_model('cc.zh.300.bin')

class WordVectorDict(dict):

def __missing__(self, key):
if any(map(is_chinese, key)):
return ftzh.get_word_vector(key)
else:
return ften.get_word_vector(key)

# 读取日志模版
template = pd.read_csv(template_path)
template.fillna('', inplace=True)

# TF-IDF 计算
corpus = template['template_tokenize'].to_list()
vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform(corpus)
xarr = X.toarray()

# 词向量计算
words = vectorizer.get_feature_names()
word_vectors = WordVectorDict()

# 模版向量计算
template_vectors = {}
for row, arr in zip(template.to_dict(orient='records'), xarr):
lookup = defaultdict(int, zip(words, arr))
tokens = row['template_tokenize'].split()
tv = np.zeros(300)
for token in tokens:
w = lookup[token] # TF-IDF
v = word_vectors[token] # 词向量
wv = w * v
tv += wv
tv = tv if len(tokens) == 0 else tv / len(tokens)
template_vectors[row['cluster_id']] = tv

# 读取日志解析结果
log_vectors = []
log_parsed = pd.read_csv(log_parsed_path)
for cluster_id in log_parsed['cluster_id'].to_list():
tv = template_vectors[cluster_id]
log_vectors.append(tv)

def fn(dataset, look_back, look_interval=0, look_forward=1):
x, y = [], []
for i in range(len(dataset)-look_back-look_interval-look_forward):
back = dataset[i:i+look_back,]
forward = dataset[i+look_back+look_interval:i+look_back+look_interval+look_forward,]
x.append(back)
y.append(forward)
return np.array(x), np.array(y)

x, y = fn(np.array(log_vectors), window_size)
y = y[:,0]

x_train, x_test, y_train, y_test = train_test_split(x, y, train_size=split_ratio, shuffle=False, stratify=None)

return (x_train, y_train), (x_test, y_test)

模型定义

1
2
3
4
5
6
7
8
9
10
from tensorflow.keras import layers, models


def build_model(n_steps_in, n_features, n_steps_out):
model = models.Sequential()
model.add(layers.Bidirectional(layers.LSTM(units=64, activation='tanh'), input_shape=(n_steps_in, n_features)))
model.add(layers.Dense(n_steps_out))
model.compile(loss='mse', optimizer='adam', metrics=['accuracy'])
return model

模型训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import matplotlib.pyplot as plt

# 构造数据集
window_size = 10
log_parsed_path = 'easybanking_parsed_1w.csv'
template_path = 'easybanking_template.csv'
(x_train, y_train), (x_test, y_test) = create_vector_dataset(log_parsed_path, template_path, window_size)

# 定义模型
n_steps_in, n_features, n_steps_out = (x_train.shape[1], x_train.shape[2], y_train.shape[1])
model = build_model(n_steps_in, n_features, n_features)
model.summary()

# 模型训练
call_backs = [
callbacks.ModelCheckpoint("best_model.h5", save_best_only=True, monitor="val_loss"),
callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=20, min_lr=0.0001),
callbacks.EarlyStopping(monitor="val_loss", patience=50, verbose=1),
]
history = model.fit(x_train, y_train, batch_size=32, epochs=20, callbacks=call_backs, validation_split=0.2, shuffle=True, verbose=1)

# 画图
metric = "accuracy"
plt.figure()
plt.plot(history.history[metric])
plt.plot(history.history["val_" + metric])
plt.title("model " + metric)
plt.ylabel(metric, fontsize="large")
plt.xlabel("epoch", fontsize="large")
plt.legend(["train", "val"], loc="best")
plt.show()
plt.close()

easybanking model accuracy

评论