[python] 웹 공격 로그 분석 인공지능 학습 (프로젝트 코드)

oolongeya

·

2021. 11. 8. 23:19

csic.zip
3.25MB

필요 모듈 : Scikit-Learn (0.22 or higher is required)

필요 코드 : 파서, 모델별 성능 출력 코드


csic_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import io
import urllib.parse
import numpy as np
 
 
def parse(file_in, file_out):
    f = open("csic/" + file_in, 'r', encoding="utf8")
    lines = list(map(lambda line: line.strip(), f.readlines()))
    res = []
    for i in range(len(lines)):
        line = lines[i]
        words = line.split(' ')
        url_req = ''
        is_req = False
        if line.startswith("GET"):
            is_req = True
            url_req = words[0+ words[1]
        elif line.startswith("POST"or line.startswith("PUT"):
            is_req = True
            url_req = words[0+ words[1]
            idx = 1
            while not lines[i + idx].startswith("Content-Length"):
                idx += 1
            url_req += '?' + lines[i + idx + 2]
        if is_req:
            res.append(url_req)
    f.close()
 
    out = io.open(file_out, 'w', encoding="utf-8")
    for e in res:
        out.writelines(urllib.parse.unquote(e).replace('\n''').lower() + '\n')
    print("Parsing complete."len(res), "requests earned from", file_in)
 
 
def load_parsed(file):
    with open(file'r', encoding="utf8"as f:
        data = f.readlines()
    ret = []
    for i in data:
        i = i.strip()
        if i != '':
            ret.append(i)
    return ret
 
 
# 0: normal, 1: anomaly
def make_data_set(parsed: list, label: int):
    return {
        "data": parsed,
        "target": np.array([label] * len(parsed), dtype=np.uint8),
        "target_names": np.array(["normal""anomaly"])
    }
 
 
def combine_data_set(data_l: dict, data_r: dict):
    if "target_names" not in data_l or "target_names" not in data_r:
        print("Invalid data set!")
        return False
    if not np.array_equal(data_l["target_names"], data_r["target_names"]):
        print("Invalid combining!")
        return False
    return {
        "data": data_l["data"+ data_r["data"],
        "target": np.append(data_l["target"], data_r["target"]),
        "target_names": data_l["target_names"].copy()
    }
cs
CSIC 2010 원본 데이터에서 유의미한 문자열만 파싱하는 코드가 있습니다. 

parse() : 함수를 실행하여 문자열을 파싱합니다.
load_parsed() : 파싱된 데이터 셋을 리스트로 불러옵니다.
make_data_set() : scikit-learn에 내장된 데이터 셋 형태로 구조를 맞춰줍니다.
combine_data_set() : 두 데이터 셋을 결합합니다.

 

DecisionTree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import csic_parser
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.tree import DecisionTreeClassifier
 
anomaly_train_raw = "anomal_train.txt"
anomaly_test_raw = "anomal_test.txt"
normal_train_raw = "norm_train.txt"
normal_test_raw = "norm_test.txt"
 
csic_parser.parse(normal_train_raw, "normal_train.txt")
csic_parser.parse(normal_test_raw, "normal_test.txt")
csic_parser.parse(anomaly_train_raw, "anomaly_train.txt")
csic_parser.parse(anomaly_test_raw, "anomaly_test.txt")
 
normal_train = csic_parser.load_parsed("normal_train.txt")
normal_train = csic_parser.make_data_set(normal_train, 0)
anomaly_train = csic_parser.load_parsed("anomaly_train.txt")
anomaly_train = csic_parser.make_data_set(anomaly_train, 1)
train_data = csic_parser.combine_data_set(normal_train, anomaly_train)
 
normal_test = csic_parser.load_parsed("normal_test.txt")
normal_test = csic_parser.make_data_set(normal_test, 0)
anomaly_test = csic_parser.load_parsed("anomaly_test.txt")
anomaly_test = csic_parser.make_data_set(anomaly_test, 1)
test_data = csic_parser.combine_data_set(normal_test, anomaly_test)
 
vectorizer = TfidfVectorizer(
    min_df=0,
    analyzer="char",
    sublinear_tf=True,
    ngram_range=(33)
)
 
X_train = train_data["data"]
y_train = train_data["target"]
X_test = test_data["data"]
y_test = test_data["target"]
 
vectorizer.fit(X_train)
X_train = vectorizer.transform(X_train)
X_test = vectorizer.transform(X_test)
 
dtree = DecisionTreeClassifier(
    criterion="entropy",
    max_depth=150,
    random_state=29
)
dtree.fit(X_train, y_train)
 
y_pred_dt = dtree.predict(X_test)
score = accuracy_score(y_pred_dt, y_test)
f1 = f1_score(y_pred_dt, y_test)
print("Decision Tree 모델의 정확도 : ", score)
print("Decision Tree 모델의 F1 score:", f1)
cs
결정 트리 모델로 파싱한 데이터 셋을 훈련한 결과를 출력합니다.

 

LR.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import csic_parser
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
 
anomaly_train_raw = "anomal_train.txt"
anomaly_test_raw = "anomal_test.txt"
normal_train_raw = "norm_train.txt"
normal_test_raw = "norm_test.txt"
 
csic_parser.parse(normal_train_raw, "normal_train.txt")
csic_parser.parse(normal_test_raw, "normal_test.txt")
csic_parser.parse(anomaly_train_raw, "anomaly_train.txt")
csic_parser.parse(anomaly_test_raw, "anomaly_test.txt")
 
normal_train = csic_parser.load_parsed("normal_train.txt")
normal_train = csic_parser.make_data_set(normal_train, 0)
anomaly_train = csic_parser.load_parsed("anomaly_train.txt")
anomaly_train = csic_parser.make_data_set(anomaly_train, 1)
train_data = csic_parser.combine_data_set(normal_train, anomaly_train)
 
normal_test = csic_parser.load_parsed("normal_test.txt")
normal_test = csic_parser.make_data_set(normal_test, 0)
anomaly_test = csic_parser.load_parsed("anomaly_test.txt")
anomaly_test = csic_parser.make_data_set(anomaly_test, 1)
test_data = csic_parser.combine_data_set(normal_test, anomaly_test)
 
vectorizer = TfidfVectorizer(
    min_df=0,
    analyzer="char",
    sublinear_tf=True,
    ngram_range=(33)
)
 
X_train = train_data["data"]
y_train = train_data["target"]
X_test = test_data["data"]
y_test = test_data["target"]
 
vectorizer.fit(X_train)
X_train = vectorizer.transform(X_train)
X_test = vectorizer.transform(X_test)
 
lr = LogisticRegression(
    solver='liblinear',
    multi_class='auto',
    C=90,
    random_state=1
)
lr.fit(X_train, y_train)
 
y_pred_lr = lr.predict(X_test)
score = accuracy_score(y_pred_lr, y_test)
f1 = f1_score(y_pred_lr, y_test)
print("Logistic Regression 모델의 정확도:", score)
print("Logistic Regression 모델의 F1 score:", f1)
cs
로지스틱 회귀 모델로 파싱한 데이터 셋을 훈련한 결과를 출력합니다.

 

RF.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import csic_parser
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
 
anomaly_train_raw = "anomal_train.txt"
anomaly_test_raw = "anomal_test.txt"
normal_train_raw = "norm_train.txt"
normal_test_raw = "norm_test.txt"
 
csic_parser.parse(normal_train_raw, "normal_train.txt")
csic_parser.parse(normal_test_raw, "normal_test.txt")
csic_parser.parse(anomaly_train_raw, "anomaly_train.txt")
csic_parser.parse(anomaly_test_raw, "anomaly_test.txt")
 
normal_train = csic_parser.load_parsed("normal_train.txt")
normal_train = csic_parser.make_data_set(normal_train, 0)
anomaly_train = csic_parser.load_parsed("anomaly_train.txt")
anomaly_train = csic_parser.make_data_set(anomaly_train, 1)
train_data = csic_parser.combine_data_set(normal_train, anomaly_train)
 
normal_test = csic_parser.load_parsed("normal_test.txt")
normal_test = csic_parser.make_data_set(normal_test, 0)
anomaly_test = csic_parser.load_parsed("anomaly_test.txt")
anomaly_test = csic_parser.make_data_set(anomaly_test, 1)
test_data = csic_parser.combine_data_set(normal_test, anomaly_test)
 
vectorizer = TfidfVectorizer(
    min_df=0,
    analyzer="char",
    sublinear_tf=True,
    ngram_range=(33)
)
 
X_train = train_data["data"]
y_train = train_data["target"]
X_test = test_data["data"]
y_test = test_data["target"]
 
vectorizer.fit(X_train)
X_train = vectorizer.transform(X_train)
X_test = vectorizer.transform(X_test)
 
rf = RandomForestClassifier(
    criterion="entropy",
    n_estimators=180,
    random_state=2,
    n_jobs=4
)
rf.fit(X_train, y_train)
 
y_pred_rf = rf.predict(X_test)
score = accuracy_score(y_pred_rf, y_test)
f1 = f1_score(y_pred_rf, y_test)
print("Random Forest 모델의 정확도:", score)
print("Random Forest 모델의 F1 score:", f1)
cs
Random Forest 모델로 파싱한 데이터 셋을 훈련한 결과를 출력합니다.

 

SVM.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import csic_parser
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.svm import LinearSVC
 
anomaly_train_raw = "anomal_train.txt"
anomaly_test_raw = "anomal_test.txt"
normal_train_raw = "norm_train.txt"
normal_test_raw = "norm_test.txt"
 
csic_parser.parse(normal_train_raw, "normal_train.txt")
csic_parser.parse(normal_test_raw, "normal_test.txt")
csic_parser.parse(anomaly_train_raw, "anomaly_train.txt")
csic_parser.parse(anomaly_test_raw, "anomaly_test.txt")
 
normal_train = csic_parser.load_parsed("normal_train.txt")
normal_train = csic_parser.make_data_set(normal_train, 0)
anomaly_train = csic_parser.load_parsed("anomaly_train.txt")
anomaly_train = csic_parser.make_data_set(anomaly_train, 1)
train_data = csic_parser.combine_data_set(normal_train, anomaly_train)
 
normal_test = csic_parser.load_parsed("normal_test.txt")
normal_test = csic_parser.make_data_set(normal_test, 0)
anomaly_test = csic_parser.load_parsed("anomaly_test.txt")
anomaly_test = csic_parser.make_data_set(anomaly_test, 1)
test_data = csic_parser.combine_data_set(normal_test, anomaly_test)
 
vectorizer = TfidfVectorizer(
    min_df=0,
    analyzer="char",
    sublinear_tf=True,
    ngram_range=(33)
)
 
X_train = train_data["data"]
y_train = train_data["target"]
X_test = test_data["data"]
y_test = test_data["target"]
 
vectorizer.fit(X_train)
X_train = vectorizer.transform(X_train)
X_test = vectorizer.transform(X_test)
 
svclassifier = LinearSVC(random_state=1, tol=1e-5, C=1)
svclassifier.fit(X_train, y_train)
 
y_pred_svm = svclassifier.predict(X_test)
score = accuracy_score(y_pred_svm, y_test)
f1 = f1_score(y_pred_svm, y_test)
print("SVM 모델의 정확도:", score)
print("SVM Forest 모델의 F1 score:", f1)
cs
Support Vector machine 모델로 파싱한 데이터 셋을 훈련한 결과를 출력합니다.

 

ada.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import csic_parser
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import AdaBoostClassifier
 
 
anomaly_train_raw = "anomal_train.txt"
anomaly_test_raw = "anomal_test.txt"
normal_train_raw = "norm_train.txt"
normal_test_raw = "norm_test.txt"
 
csic_parser.parse(normal_train_raw, "normal_train.txt")
csic_parser.parse(normal_test_raw, "normal_test.txt")
csic_parser.parse(anomaly_train_raw, "anomaly_train.txt")
csic_parser.parse(anomaly_test_raw, "anomaly_test.txt")
 
normal_train = csic_parser.load_parsed("normal_train.txt")
normal_train = csic_parser.make_data_set(normal_train, 0)
anomaly_train = csic_parser.load_parsed("anomaly_train.txt")
anomaly_train = csic_parser.make_data_set(anomaly_train, 1)
train_data = csic_parser.combine_data_set(normal_train, anomaly_train)
 
normal_test = csic_parser.load_parsed("normal_test.txt")
normal_test = csic_parser.make_data_set(normal_test, 0)
anomaly_test = csic_parser.load_parsed("anomaly_test.txt")
anomaly_test = csic_parser.make_data_set(anomaly_test, 1)
test_data = csic_parser.combine_data_set(normal_test, anomaly_test)
 
vectorizer = TfidfVectorizer(
    min_df=0,
    analyzer="char",
    sublinear_tf=True,
    ngram_range=(33)
)
 
X_train = train_data["data"]
y_train = train_data["target"]
X_test = test_data["data"]
y_test = test_data["target"]
 
vectorizer.fit(X_train)
X_train = vectorizer.transform(X_train)
X_test = vectorizer.transform(X_test)
 
base_model = DecisionTreeClassifier(
    max_depth = 1,
    max_features=0.3,
    class_weight='balanced',
    random_state=1
)
ada_model = AdaBoostClassifier(
    base_estimator = base_model,
    n_estimators = 1000,
    learning_rate=1.,
    random_state=1
)
 
ada_model.fit(X_train, y_train)
 
y_pred_ada = ada_model.predict(X_test)
score = accuracy_score(y_pred_ada, y_test)
f1 = f1_score(y_pred_ada, y_test)
print("ADA-BOOST의 정확도:", score)
print("ADA-BOOST의 F1 score:", f1)
cs
AdaBoost로 파싱한 데이터 셋을 훈련한 결과를 출력합니다.

 

mlp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import csic_parser
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neural_network import MLPClassifier
 
 
anomaly_train_raw = "anomal_train.txt"
anomaly_test_raw = "anomal_test.txt"
normal_train_raw = "norm_train.txt"
normal_test_raw = "norm_test.txt"
 
csic_parser.parse(normal_train_raw, "normal_train.txt")
csic_parser.parse(normal_test_raw, "normal_test.txt")
csic_parser.parse(anomaly_train_raw, "anomaly_train.txt")
csic_parser.parse(anomaly_test_raw, "anomaly_test.txt")
 
normal_train = csic_parser.load_parsed("normal_train.txt")
normal_train = csic_parser.make_data_set(normal_train, 0)
anomaly_train = csic_parser.load_parsed("anomaly_train.txt")
anomaly_train = csic_parser.make_data_set(anomaly_train, 1)
train_data = csic_parser.combine_data_set(normal_train, anomaly_train)
 
normal_test = csic_parser.load_parsed("normal_test.txt")
normal_test = csic_parser.make_data_set(normal_test, 0)
anomaly_test = csic_parser.load_parsed("anomaly_test.txt")
anomaly_test = csic_parser.make_data_set(anomaly_test, 1)
test_data = csic_parser.combine_data_set(normal_test, anomaly_test)
 
vectorizer = TfidfVectorizer(
    min_df=0,
    analyzer="char",
    sublinear_tf=True,
    ngram_range=(33)
)
 
X_train = train_data["data"]
y_train = train_data["target"]
X_test = test_data["data"]
y_test = test_data["target"]
 
vectorizer.fit(X_train)
X_train = vectorizer.transform(X_train)
X_test = vectorizer.transform(X_test)
 
mlp = MLPClassifier(
    hidden_layer_sizes=(205),
    activation="relu",
    solver="adam",
    batch_size=192,
    learning_rate_init=0.01,
    max_iter=1,
    warm_start=True,
    random_state=7
)
 
for i in range(1101):
    mlp.fit(X_train, y_train)
    print("Iteration", i, "finished.")
 
y_pred_mlp = mlp.predict(X_test)
score = accuracy_score(y_pred_mlp, y_test)
f1 = f1_score(y_pred_mlp, y_test)
print("MLP 모델의 정확도:", score)
print("MLP 모델의 F1 score:", f1)
cs
다충 신경망으로 파싱한 데이터 셋을 훈련한 결과를 출력합니다. 
해당 프로젝트에서는 2개의 은닉층을 이용했습니다.

 

Summary.ipynb
위 코드들을 요약한 주피터 노트북 파일입니다.
코랩 환경에서 데이터 셋을 구글 드라이브와 연동하여 실행이 가능합니다.

 

반응형