[python] 웹 공격 로그 분석 인공지능 학습 (프로젝트 코드)
oolongeya
·2021. 11. 8. 23:19
필요 모듈 : Scikit-Learn (0.22 or higher is required)
필요 코드 : 파서, 모델별 성능 출력 코드
csic_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
import io
import urllib.parse
import numpy as np
def parse(file_in, file_out):
f = open("csic/" + file_in, 'r', encoding="utf8")
lines = list(map(lambda line: line.strip(), f.readlines()))
res = []
for i in range(len(lines)):
line = lines[i]
words = line.split(' ')
url_req = ''
is_req = False
if line.startswith("GET"):
is_req = True
url_req = words[0] + words[1]
elif line.startswith("POST") or line.startswith("PUT"):
is_req = True
url_req = words[0] + words[1]
idx = 1
while not lines[i + idx].startswith("Content-Length"):
idx += 1
url_req += '?' + lines[i + idx + 2]
if is_req:
res.append(url_req)
f.close()
out = io.open(file_out, 'w', encoding="utf-8")
for e in res:
out.writelines(urllib.parse.unquote(e).replace('\n', '').lower() + '\n')
print("Parsing complete.", len(res), "requests earned from", file_in)
def load_parsed(file):
with open(file, 'r', encoding="utf8") as f:
data = f.readlines()
ret = []
for i in data:
i = i.strip()
if i != '':
ret.append(i)
return ret
# 0: normal, 1: anomaly
def make_data_set(parsed: list, label: int):
return {
"data": parsed,
"target": np.array([label] * len(parsed), dtype=np.uint8),
"target_names": np.array(["normal", "anomaly"])
}
def combine_data_set(data_l: dict, data_r: dict):
if "target_names" not in data_l or "target_names" not in data_r:
print("Invalid data set!")
return False
if not np.array_equal(data_l["target_names"], data_r["target_names"]):
print("Invalid combining!")
return False
return {
"data": data_l["data"] + data_r["data"],
"target": np.append(data_l["target"], data_r["target"]),
"target_names": data_l["target_names"].copy()
}
|
cs |
CSIC 2010 원본 데이터에서 유의미한 문자열만 파싱하는 코드가 있습니다. parse() : 함수를 실행하여 문자열을 파싱합니다. load_parsed() : 파싱된 데이터 셋을 리스트로 불러옵니다. make_data_set() : scikit-learn에 내장된 데이터 셋 형태로 구조를 맞춰줍니다. combine_data_set() : 두 데이터 셋을 결합합니다. |
DecisionTree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
import csic_parser
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.tree import DecisionTreeClassifier
anomaly_train_raw = "anomal_train.txt"
anomaly_test_raw = "anomal_test.txt"
normal_train_raw = "norm_train.txt"
normal_test_raw = "norm_test.txt"
csic_parser.parse(normal_train_raw, "normal_train.txt")
csic_parser.parse(normal_test_raw, "normal_test.txt")
csic_parser.parse(anomaly_train_raw, "anomaly_train.txt")
csic_parser.parse(anomaly_test_raw, "anomaly_test.txt")
normal_train = csic_parser.load_parsed("normal_train.txt")
normal_train = csic_parser.make_data_set(normal_train, 0)
anomaly_train = csic_parser.load_parsed("anomaly_train.txt")
anomaly_train = csic_parser.make_data_set(anomaly_train, 1)
train_data = csic_parser.combine_data_set(normal_train, anomaly_train)
normal_test = csic_parser.load_parsed("normal_test.txt")
normal_test = csic_parser.make_data_set(normal_test, 0)
anomaly_test = csic_parser.load_parsed("anomaly_test.txt")
anomaly_test = csic_parser.make_data_set(anomaly_test, 1)
test_data = csic_parser.combine_data_set(normal_test, anomaly_test)
vectorizer = TfidfVectorizer(
min_df=0,
analyzer="char",
sublinear_tf=True,
ngram_range=(3, 3)
)
X_train = train_data["data"]
y_train = train_data["target"]
X_test = test_data["data"]
y_test = test_data["target"]
vectorizer.fit(X_train)
X_train = vectorizer.transform(X_train)
X_test = vectorizer.transform(X_test)
dtree = DecisionTreeClassifier(
criterion="entropy",
max_depth=150,
random_state=29
)
dtree.fit(X_train, y_train)
y_pred_dt = dtree.predict(X_test)
score = accuracy_score(y_pred_dt, y_test)
f1 = f1_score(y_pred_dt, y_test)
print("Decision Tree 모델의 정확도 : ", score)
print("Decision Tree 모델의 F1 score:", f1)
|
cs |
결정 트리 모델로 파싱한 데이터 셋을 훈련한 결과를 출력합니다. |
LR.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import csic_parser
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
anomaly_train_raw = "anomal_train.txt"
anomaly_test_raw = "anomal_test.txt"
normal_train_raw = "norm_train.txt"
normal_test_raw = "norm_test.txt"
csic_parser.parse(normal_train_raw, "normal_train.txt")
csic_parser.parse(normal_test_raw, "normal_test.txt")
csic_parser.parse(anomaly_train_raw, "anomaly_train.txt")
csic_parser.parse(anomaly_test_raw, "anomaly_test.txt")
normal_train = csic_parser.load_parsed("normal_train.txt")
normal_train = csic_parser.make_data_set(normal_train, 0)
anomaly_train = csic_parser.load_parsed("anomaly_train.txt")
anomaly_train = csic_parser.make_data_set(anomaly_train, 1)
train_data = csic_parser.combine_data_set(normal_train, anomaly_train)
normal_test = csic_parser.load_parsed("normal_test.txt")
normal_test = csic_parser.make_data_set(normal_test, 0)
anomaly_test = csic_parser.load_parsed("anomaly_test.txt")
anomaly_test = csic_parser.make_data_set(anomaly_test, 1)
test_data = csic_parser.combine_data_set(normal_test, anomaly_test)
vectorizer = TfidfVectorizer(
min_df=0,
analyzer="char",
sublinear_tf=True,
ngram_range=(3, 3)
)
X_train = train_data["data"]
y_train = train_data["target"]
X_test = test_data["data"]
y_test = test_data["target"]
vectorizer.fit(X_train)
X_train = vectorizer.transform(X_train)
X_test = vectorizer.transform(X_test)
lr = LogisticRegression(
solver='liblinear',
multi_class='auto',
C=90,
random_state=1
)
lr.fit(X_train, y_train)
y_pred_lr = lr.predict(X_test)
score = accuracy_score(y_pred_lr, y_test)
f1 = f1_score(y_pred_lr, y_test)
print("Logistic Regression 모델의 정확도:", score)
print("Logistic Regression 모델의 F1 score:", f1)
|
cs |
로지스틱 회귀 모델로 파싱한 데이터 셋을 훈련한 결과를 출력합니다. |
RF.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import csic_parser
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
anomaly_train_raw = "anomal_train.txt"
anomaly_test_raw = "anomal_test.txt"
normal_train_raw = "norm_train.txt"
normal_test_raw = "norm_test.txt"
csic_parser.parse(normal_train_raw, "normal_train.txt")
csic_parser.parse(normal_test_raw, "normal_test.txt")
csic_parser.parse(anomaly_train_raw, "anomaly_train.txt")
csic_parser.parse(anomaly_test_raw, "anomaly_test.txt")
normal_train = csic_parser.load_parsed("normal_train.txt")
normal_train = csic_parser.make_data_set(normal_train, 0)
anomaly_train = csic_parser.load_parsed("anomaly_train.txt")
anomaly_train = csic_parser.make_data_set(anomaly_train, 1)
train_data = csic_parser.combine_data_set(normal_train, anomaly_train)
normal_test = csic_parser.load_parsed("normal_test.txt")
normal_test = csic_parser.make_data_set(normal_test, 0)
anomaly_test = csic_parser.load_parsed("anomaly_test.txt")
anomaly_test = csic_parser.make_data_set(anomaly_test, 1)
test_data = csic_parser.combine_data_set(normal_test, anomaly_test)
vectorizer = TfidfVectorizer(
min_df=0,
analyzer="char",
sublinear_tf=True,
ngram_range=(3, 3)
)
X_train = train_data["data"]
y_train = train_data["target"]
X_test = test_data["data"]
y_test = test_data["target"]
vectorizer.fit(X_train)
X_train = vectorizer.transform(X_train)
X_test = vectorizer.transform(X_test)
rf = RandomForestClassifier(
criterion="entropy",
n_estimators=180,
random_state=2,
n_jobs=4
)
rf.fit(X_train, y_train)
y_pred_rf = rf.predict(X_test)
score = accuracy_score(y_pred_rf, y_test)
f1 = f1_score(y_pred_rf, y_test)
print("Random Forest 모델의 정확도:", score)
print("Random Forest 모델의 F1 score:", f1)
|
cs |
Random Forest 모델로 파싱한 데이터 셋을 훈련한 결과를 출력합니다. |
SVM.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
import csic_parser
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.svm import LinearSVC
anomaly_train_raw = "anomal_train.txt"
anomaly_test_raw = "anomal_test.txt"
normal_train_raw = "norm_train.txt"
normal_test_raw = "norm_test.txt"
csic_parser.parse(normal_train_raw, "normal_train.txt")
csic_parser.parse(normal_test_raw, "normal_test.txt")
csic_parser.parse(anomaly_train_raw, "anomaly_train.txt")
csic_parser.parse(anomaly_test_raw, "anomaly_test.txt")
normal_train = csic_parser.load_parsed("normal_train.txt")
normal_train = csic_parser.make_data_set(normal_train, 0)
anomaly_train = csic_parser.load_parsed("anomaly_train.txt")
anomaly_train = csic_parser.make_data_set(anomaly_train, 1)
train_data = csic_parser.combine_data_set(normal_train, anomaly_train)
normal_test = csic_parser.load_parsed("normal_test.txt")
normal_test = csic_parser.make_data_set(normal_test, 0)
anomaly_test = csic_parser.load_parsed("anomaly_test.txt")
anomaly_test = csic_parser.make_data_set(anomaly_test, 1)
test_data = csic_parser.combine_data_set(normal_test, anomaly_test)
vectorizer = TfidfVectorizer(
min_df=0,
analyzer="char",
sublinear_tf=True,
ngram_range=(3, 3)
)
X_train = train_data["data"]
y_train = train_data["target"]
X_test = test_data["data"]
y_test = test_data["target"]
vectorizer.fit(X_train)
X_train = vectorizer.transform(X_train)
X_test = vectorizer.transform(X_test)
svclassifier = LinearSVC(random_state=1, tol=1e-5, C=1)
svclassifier.fit(X_train, y_train)
y_pred_svm = svclassifier.predict(X_test)
score = accuracy_score(y_pred_svm, y_test)
f1 = f1_score(y_pred_svm, y_test)
print("SVM 모델의 정확도:", score)
print("SVM Forest 모델의 F1 score:", f1)
|
cs |
Support Vector machine 모델로 파싱한 데이터 셋을 훈련한 결과를 출력합니다. |
ada.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
import csic_parser
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import AdaBoostClassifier
anomaly_train_raw = "anomal_train.txt"
anomaly_test_raw = "anomal_test.txt"
normal_train_raw = "norm_train.txt"
normal_test_raw = "norm_test.txt"
csic_parser.parse(normal_train_raw, "normal_train.txt")
csic_parser.parse(normal_test_raw, "normal_test.txt")
csic_parser.parse(anomaly_train_raw, "anomaly_train.txt")
csic_parser.parse(anomaly_test_raw, "anomaly_test.txt")
normal_train = csic_parser.load_parsed("normal_train.txt")
normal_train = csic_parser.make_data_set(normal_train, 0)
anomaly_train = csic_parser.load_parsed("anomaly_train.txt")
anomaly_train = csic_parser.make_data_set(anomaly_train, 1)
train_data = csic_parser.combine_data_set(normal_train, anomaly_train)
normal_test = csic_parser.load_parsed("normal_test.txt")
normal_test = csic_parser.make_data_set(normal_test, 0)
anomaly_test = csic_parser.load_parsed("anomaly_test.txt")
anomaly_test = csic_parser.make_data_set(anomaly_test, 1)
test_data = csic_parser.combine_data_set(normal_test, anomaly_test)
vectorizer = TfidfVectorizer(
min_df=0,
analyzer="char",
sublinear_tf=True,
ngram_range=(3, 3)
)
X_train = train_data["data"]
y_train = train_data["target"]
X_test = test_data["data"]
y_test = test_data["target"]
vectorizer.fit(X_train)
X_train = vectorizer.transform(X_train)
X_test = vectorizer.transform(X_test)
base_model = DecisionTreeClassifier(
max_depth = 1,
max_features=0.3,
class_weight='balanced',
random_state=1
)
ada_model = AdaBoostClassifier(
base_estimator = base_model,
n_estimators = 1000,
learning_rate=1.,
random_state=1
)
ada_model.fit(X_train, y_train)
y_pred_ada = ada_model.predict(X_test)
score = accuracy_score(y_pred_ada, y_test)
f1 = f1_score(y_pred_ada, y_test)
print("ADA-BOOST의 정확도:", score)
print("ADA-BOOST의 F1 score:", f1)
|
cs |
AdaBoost로 파싱한 데이터 셋을 훈련한 결과를 출력합니다. |
mlp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
import csic_parser
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neural_network import MLPClassifier
anomaly_train_raw = "anomal_train.txt"
anomaly_test_raw = "anomal_test.txt"
normal_train_raw = "norm_train.txt"
normal_test_raw = "norm_test.txt"
csic_parser.parse(normal_train_raw, "normal_train.txt")
csic_parser.parse(normal_test_raw, "normal_test.txt")
csic_parser.parse(anomaly_train_raw, "anomaly_train.txt")
csic_parser.parse(anomaly_test_raw, "anomaly_test.txt")
normal_train = csic_parser.load_parsed("normal_train.txt")
normal_train = csic_parser.make_data_set(normal_train, 0)
anomaly_train = csic_parser.load_parsed("anomaly_train.txt")
anomaly_train = csic_parser.make_data_set(anomaly_train, 1)
train_data = csic_parser.combine_data_set(normal_train, anomaly_train)
normal_test = csic_parser.load_parsed("normal_test.txt")
normal_test = csic_parser.make_data_set(normal_test, 0)
anomaly_test = csic_parser.load_parsed("anomaly_test.txt")
anomaly_test = csic_parser.make_data_set(anomaly_test, 1)
test_data = csic_parser.combine_data_set(normal_test, anomaly_test)
vectorizer = TfidfVectorizer(
min_df=0,
analyzer="char",
sublinear_tf=True,
ngram_range=(3, 3)
)
X_train = train_data["data"]
y_train = train_data["target"]
X_test = test_data["data"]
y_test = test_data["target"]
vectorizer.fit(X_train)
X_train = vectorizer.transform(X_train)
X_test = vectorizer.transform(X_test)
mlp = MLPClassifier(
hidden_layer_sizes=(20, 5),
activation="relu",
solver="adam",
batch_size=192,
learning_rate_init=0.01,
max_iter=1,
warm_start=True,
random_state=7
)
for i in range(1, 101):
mlp.fit(X_train, y_train)
print("Iteration", i, "finished.")
y_pred_mlp = mlp.predict(X_test)
score = accuracy_score(y_pred_mlp, y_test)
f1 = f1_score(y_pred_mlp, y_test)
print("MLP 모델의 정확도:", score)
print("MLP 모델의 F1 score:", f1)
|
cs |
다충 신경망으로 파싱한 데이터 셋을 훈련한 결과를 출력합니다. 해당 프로젝트에서는 2개의 은닉층을 이용했습니다. |
Summary.ipynb
위 코드들을 요약한 주피터 노트북 파일입니다. 코랩 환경에서 데이터 셋을 구글 드라이브와 연동하여 실행이 가능합니다. |
반응형
'프로젝트 (Project) > 웹 로그 기반 웹 공격 탐지 분석 인공지능' 카테고리의 다른 글
[python] 웹 공격 로그 분석 인공지능 학습 (최종 산출물) (0) | 2021.11.19 |
---|---|
[python] 웹 공격 로그 분석 인공지능 학습 (기반 코드 + 프로젝트 설계) (0) | 2021.10.31 |