[python] 웹 공격 로그 분석 인공지능 학습 (기반 코드 + 프로젝트 설계)

oolongeya

·

2021. 10. 31. 13:54

정상, 비정상 웹 로그 데이터 text 파일을 이용해서 (normal, anormal, test, train)

머신러닝 학습을 시킨 후 로그 분석으로 공격인지 아닌지를 분류하는 것이 목표.

 

 

데이터를 이용해서 (GET, POST 부분 확인 + url 기반)

아래와 같은 과정으로 진행한다.

파싱 -> 데이터 셋, 라벨링 (정상, 비정상 분류) -> 벡터 변환 (단어) -> 훈련 -> 테스트 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import pandas as pd
import re
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score,f1_score
 
def parsing(path):#파싱을 진행하는 함수
    with open(path,'r',encoding='utf-8'as f:#파일을 읽어드리고 ['로그','로그',...] 이런식으로 로그를 구조화
        train=[]
        para=""
        while True:
            l = f.readline() #한줄씩 읽어 옵니다
 
            if not l:
                break #파일을 전부 읽으면 읽기를 중단합니다.
 
            if l != "\n":
                para +=l
            else:
                if para!='':
                    if para[:4]=='POST'#Method가 POST인 경우 예외적으로 바디까지 가져옵니다.
                        para+=f.readline()
                    train.append(para)
                    para=""
    return train
 
def dataset(path,mod='train'): #데이터셋을 생성합니다. 파싱한 데이터와 라벨을 생성합니다 
    x = parsing(f'{path}norm_{mod}.txt'# mod에 따라 train을 가져올지 test 데이터를 가져올지 결정됩니다.
    y = [0]*len(x) # 정상 라벨 0 을 정상 데이터 개수 만큼 생성
    x += parsing(f'{path}anomal_{mod}.txt')
    y += [1]*(len(x)-len(y)) # 비정상 라벨 1을 비정상 데이터 개수 만큼 생성
    return x, y
 
def vectorize(train_x,test_x): #문장을 벡터로 만듭니다 해당 코드에서는 기본적인 tf idf를 사용하고 있습니다.
    tf = TfidfVectorizer()
    tf = tf.fit(train_x)
    train_vec = tf.transform(train_x)
    test_vec = tf.transform(test_x)
    return train_vec,test_vec
 
def train(train_vec,train_y): #랜덤 포레스트로 훈련 시킵니다. 모델을 바꾸고 싶다면 이 함수를 변경해야 합니다.
    rf = RandomForestClassifier()
    rf.fit(train_vec,train_y)
    return rf
 
def test(test_y,test_vec,rf): #입렵 받은 테스트와 모델로 테스트를 실시합니다
    pred = rf.predict(test_vec)
    print(accuracy_score(test_y,pred))
    print(f1_score(test_y,pred))
    return pred
 
############### 실행 코드 #######################
################################################
train_x, train_y = dataset('./','train')
test_x, test_y =  dataset('./','test')
 
train_vec, test_vec = vectorize(train_x, test_x)
rf = train(train_vec, train_y)
pred = test(test_y,test_vec, rf)
 
########################################################
tf = TfidfVectorizer()
tf = tf.fit(train_x)
 
# print(len(tf.vocabulary_)) # 고유한 단어가 대략 8만개가 나옵니다
 
# print(tf.transform(train_x)[0]) #로그 하나당 약 8만차원이 나옵니다
                            # 필요없는 문장 때문에 단어의 개수가 많이 진것일  수도 있으니 데이터를 분석하여 필요한 부분만 사용하는것이 좋습니다
cs

 


깃헙에서 프로젝트 진행

 

데이터 셋 파싱 코드 = py 파일로 작성

리포지토리에 csic 폴더를 만들고 데이터 셋을 저장

 

구현을 원하는 모델을 선택하고 해당 모드를 사용한 코드 = py 파일로 작성

 

summary ipynb 파일로 최종 저리

 

ex) 데이터 셋 파싱 코드 + SVM을 이용한 학습 예제 py 코드

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import io
import urllib.parse
import numpy as np
 
 
def parse(file_in, file_out):
    f = open("csic/" + file_in, 'r', encoding="utf8")
    lines = list(map(lambda line: line.strip(), f.readlines()))
    res = []
    for i in range(len(lines)):
        line = lines[i]
        words = line.split(' ')
        url_req = ''
        is_req = False
        if line.startswith("GET"):
            is_req = True
            url_req = words[0+ words[1]
        elif line.startswith("POST"or line.startswith("PUT"):
            is_req = True
            url_req = words[0+ words[1]
            idx = 1
            while not lines[i + idx].startswith("Content-Length"):
                idx += 1
            url_req += '?' + lines[i + idx + 2]
        if is_req:
            res.append(url_req)
    f.close()
 
    out = io.open(file_out, 'w', encoding="utf-8")
    for e in res:
        out.writelines(urllib.parse.unquote(e).replace('\n''').lower() + '\n')
    print("Parsing complete."len(res), "requests earned from", file_in)
 
 
def load_parsed(file):
    with open(file'r', encoding="utf8"as f:
        data = f.readlines()
    ret = []
    for i in data:
        i = i.strip()
        if i != '':
            ret.append(i)
    return ret
 
 
# 0: normal, 1: anomaly
def make_data_set(parsed: list, label: int):
    return {
        "data": parsed,
        "target": np.array([label] * len(parsed), dtype=np.uint8),
        "target_names": np.array(["normal""anomaly"])
    }
 
 
def combine_data_set(data_l: dict, data_r: dict):
    if "target_names" not in data_l or "target_names" not in data_r:
        print("Invalid data set!")
        return False
    if not np.array_equal(data_l["target_names"], data_r["target_names"]):
        print("Invalid combining!")
        return False
    return {
        "data": data_l["data"+ data_r["data"],
        "target": np.append(data_l["target"], data_r["target"]),
        "target_names": data_l["target_names"].copy()
    }
cs

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import csic_parser
from sklearn.metrics import accuracy_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
 
anomaly_train_raw = "anomal_train.txt"
anomaly_test_raw = "anomal_test.txt"
normal_train_raw = "norm_train.txt"
normal_test_raw = "norm_test.txt"
 
csic_parser.parse(normal_train_raw, "normal_train.txt")
csic_parser.parse(normal_test_raw, "normal_test.txt")
csic_parser.parse(anomaly_train_raw, "anomaly_train.txt")
csic_parser.parse(anomaly_test_raw, "anomaly_test.txt")
 
normal_train = csic_parser.load_parsed("normal_train.txt")
normal_train = csic_parser.make_data_set(normal_train, 0)
anomaly_train = csic_parser.load_parsed("anomaly_train.txt")
anomaly_train = csic_parser.make_data_set(anomaly_train, 1)
train_data = csic_parser.combine_data_set(normal_train, anomaly_train)
 
normal_test = csic_parser.load_parsed("normal_test.txt")
normal_test = csic_parser.make_data_set(normal_test, 0)
anomaly_test = csic_parser.load_parsed("anomaly_test.txt")
anomaly_test = csic_parser.make_data_set(anomaly_test, 1)
test_data = csic_parser.combine_data_set(normal_test, anomaly_test)
 
vectorizer = TfidfVectorizer(
    min_df=0,
    analyzer="char",
    sublinear_tf=True,
    ngram_range=(33)
)
 
X_train = train_data["data"]
y_train = train_data["target"]
X_test = test_data["data"]
y_test = test_data["target"]
 
vectorizer.fit(X_train)
X_train = vectorizer.transform(X_train)
X_test = vectorizer.transform(X_test)
 
svclassifier = SVC(kernel='linear', degree=8)  
svclassifier.fit(X_train, y_train) 
 
y_pred = svclassifier.predict(X_test)  
 
score = accuracy_score(y_pred, y_test)
print("SVM 모델의 정확도: ", score)
cs
반응형