Anomaly Detection in Network Traffic with K-means

Anomaly Detection

์ด์ƒ ํƒ์ง€๋Š” ์ง€๊ธˆ๊นŒ์ง€ ์•Œ๋ ค์ง€์ง€ ์•Š์€ ์ƒˆ๋กœ์šด ๋„คํŠธ์›Œํฌ ๊ณต๊ฒฉ์ด๋‚˜ ์„œ๋ฒ„ ์žฅ์• , ๊ณต์žฅ ์„ค๋น„ ๋“ฑ ์ƒˆ๋กœ์šด ์ข…๋ฅ˜์˜ ์ด์ƒ์„ ํƒ์ง€ํ•˜๋Š” ๊ฒƒ์ด๋‹ค. ์ด์ƒ ํƒ์ง€๋Š” ๋น„์ง€๋„ ํ•™์Šต ์•Œ๊ณ ๋ฆฌ์ฆ˜ K-Means๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํƒ์ง€ํ•  ์ˆ˜ ์žˆ๋‹ค. ์ •์ƒ ๋ฐ์ดํ„ฐ๋ฅผ ํ•™์Šตํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ํด๋Ÿฌ์Šคํ„ฐ๋กœ ํด๋Ÿฌ์Šคํ„ฐ๋งํ•˜๊ณ  ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๊ฐ€ ํด๋Ÿฌ์Šคํ„ฐ์— ํฌํ•จ๋˜์ง€ ์•Š์œผ๋ฉด ์ด์ƒ์œผ๋กœ ํƒ์ง€ํ•  ์ˆ˜ ์žˆ๋‹ค.

K-Means

K-Means ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ K๊ฐœ๋กœ ํด๋Ÿฌ์Šคํ„ฐ๋งํ•˜๋Š” ML ๋ชจ๋ธ์ด๋‹ค. K ๊ฐ’์€ ์‚ฌ์šฉ์ž๊ฐ€ ์ •์˜ํ•˜๋Š” ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ์ด๋‹ค. ํด๋Ÿฌ์Šคํ„ฐ ์ค‘์‹ฌ์„ centroid๋ผ๊ณ  ํ•˜๋Š”๋ฐ ํด๋Ÿฌ์Šคํ„ฐ์— ์†ํ•œ ๋ชจ๋“  ๋ฐ์ดํ„ฐ์˜ ์‚ฐ์ˆ ํ‰๊ท ์œผ๋กœ ๊ตฌํ•œ๋‹ค. ๊ทธ๋ž˜์„œ K-Means๋ผ๊ณ  ํ•œ๋‹ค. K-Means ๋ชจ๋ธ์€ ๊ฐ€์žฅ ์ข‹์€ K๊ฐ’์„ ์ฐพ๋Š” ๊ฒƒ์ด ํ•ต์‹ฌ์ด๋‹ค.

Network Anomaly Detection

์•„์ง๊นŒ์ง€ ๋„คํŠธ์›Œํฌ ๊ณต๊ฒฉ์œผ๋กœ ์•Œ๋ ค์ง€์ง€๋Š” ์•Š์•˜์ง€๋งŒ, ๊ณผ๊ฑฐ ๋„คํŠธ์›Œํฌ ์—ฐ๊ฒฐ๊ณผ ๋‹ค๋ฅธ ์–‘์ƒ์„ ์ฐพ์•„ ๋‚ด๋Š” ๊ฒƒ์ด ๋„คํŠธ์›Œํฌ ์ด์ƒ ํƒ์ง€์ด๋‹ค.

Anomaly Detection in Network Traffic with K-means

  • Spark ์„ค์น˜ํ•œ๋‹ค.

  • Spark Session ์„ค์ •ํ•œ๋‹ค.

  • ๋ฐ์ดํ„ฐ ๋กœ๋“œํ•œ๋‹ค.

  • ๋ฐ์ดํ„ฐ ํƒ์ƒ‰ํ•œ๋‹ค.

    • ๊ฒฐ์ธก์น˜, ์ˆ˜์น˜ํ˜•, ๋ฒ”์ฃผํ˜• ๋ฐ์ดํ„ฐ, ๋ ˆ์ด๋ธ” ๊ณ ์œ ๊ฐ’ ์ ๊ฒ€ํ•œ๋‹ค.

  • ๋ชจ๋ธ๋งํ•œ๋‹ค.

    • StringIndexer์™€ OneHotEncoder๋กœ ๋ฒ”์ฃผํ˜• ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์น˜ํ˜•์œผ๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.

    • VectorAssembler๋กœ feature vector๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.

    • StandardScaler๋กœ feature๋ฅผ Standard Score๋กœ ๋ฐ”๊พธ์–ด ์ •๊ทœํ™”ํ•œ๋‹ค.

    • K-Means ๋ชจ๋ธ ์ƒ์„ฑํ•œ๋‹ค.

    • Pipeline์œผ๋กœ ์—ฐ๊ฒฐํ•œ๋‹ค.

  • ๋‹ค์–‘ํ•œ K ๊ฐ’์œผ๋กœ ๋ชจ๋ธ์„ ํ•™์Šตํ•œ๋‹ค.

    • K ๊ฐ’์ด ์ปค์งˆ ์ˆ˜๋ก ๊ฐ ํด๋Ÿฌ์Šคํ„ฐ ๋ฐ์ดํ„ฐ๊ฐ€ Centroid์™€ ๊ฐ€๊นŒ์›Œ์•ผ ํ•œ๋‹ค.

    • K ๊ฐ’์ด ํฐ๋ฐ ๊ฑฐ๋ฆฌ๊ฐ€ ๋ฉ€ ๊ฒฝ์šฐ Local Optimum์— ๋„๋‹ฌํ•˜๊ธฐ ์ „์— ํ•™์Šต์„ ์ข…๋ฃŒํ–ˆ์„ ์ˆ˜๋„ ์žˆ์œผ๋‹ˆ ๋ฐ˜๋ณต ํšŸ์ˆ˜๋ฅผ ๋Š˜๋ ค์„œ ๋ชจ๋ธ ์„ฑ๋Šฅ์„ ๊ฐœ์„ ํ•œ๋‹ค.

    • K ๊ฐ’์ด ์ฆ๊ฐ€ํ•ด๋„ ํ‰๊ฐ€ ์ ์ˆ˜๊ฐ€ ํฌ๊ฒŒ ๋–จ์–ด์ง€์ง€ ์•Š๋Š” ์ง€์ (Elbow)์ด ์ ๋‹นํ•œ K ๊ฐ’ ์ผ ์ˆ˜ ์žˆ๋‹ค.

Anomaly Detection

์ด์ƒ ํƒ์ง€๋Š” ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ์—์„œ ๊ฐ€์žฅ ๊ฐ€๊นŒ์šด ๊ตฐ์ง‘ ์ค‘์‹ฌ๊ณผ์˜ ๊ฑฐ๋ฆฌ๋ฅผ ์ธก์ •ํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ ์ง„ํ–‰ํ•œ๋‹ค. ๊ฑฐ๋ฆฌ๊ฐ€ ์ •์˜ํ•œ Threshold ๊ฐ’์„ ๋„˜์–ด์„œ๋ฉด ์ด์ƒํ•œ ๋ฐ์ดํ„ฐ๋กœ ๊ฐ„์ฃผํ•œ๋‹ค.

import warnings
warnings.filterwarnings(action='default')

#install Apache Spark
!pip install pyspark --quiet

from pyspark.ml import PipelineModel, Pipeline
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.linalg import Vector, Vectors
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, asc, desc
import random

#Building Spark Session
#spark = SparkSession.builder.appName("kmeans").master("local[*]").getOrCreate()
spark = (SparkSession.builder
                  .appName('Anomaly Detection in Network Traffic with K-means')
                  .config("spark.executor.memory", "1G")
                  .config("spark.executor.cores","4")
                  .getOrCreate())
                  
# Setting Spark Log Level
spark.sparkContext.setLogLevel('ERROR')
spark.version

# Load Data

url = '/kaggle/input/sparkml/kddcup.data_10_percent'

data = spark.read.\
    option("inferSchema", True).\
    option("header", False).\
    csv(url).\
    toDF(
        "duration", "protocol_type", "service", "flag",
        "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
        "hot", "num_failed_logins", "logged_in", "num_compromised",
        "root_shell", "su_attempted", "num_root", "num_file_creations",
        "num_shells", "num_access_files", "num_outbound_cmds",
        "is_host_login", "is_guest_login", "count", "srv_count",
        "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
        "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
        "dst_host_count", "dst_host_srv_count",
        "dst_host_same_srv_rate", "dst_host_diff_srv_rate",
        "dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
        "dst_host_serror_rate", "dst_host_srv_serror_rate",
        "dst_host_rerror_rate", "dst_host_srv_rerror_rate",
        "label")

data.cache() #for faster re-use

## Data Exploration
# 23๊ฐœ ๋ ˆ์ด๋ธ”์ด ์žˆ๊ณ  smurf์™€ neptune ๋นˆ๋„๊ฐ€ ๋†’๋‹ค. ๋ฐ์ดํ„ฐ์— 23๊ฐœ ๋ ˆ์ด๋ธ”์ด ์žˆ๊ธฐ ๋•Œ๋ฌธ์— k๋Š” ์ตœ์†Œ 23์ด๋ผ๊ณ  ์ถ”์ธกํ•  ์ˆ˜ ์žˆ๋‹ค.

countByLabel = data.select("label").groupBy("label").count().orderBy(col("count").desc())
countByLabel.show(25)
countByLabel.count()

# ๋ฐ์ดํ„ฐ๋Š” ์ˆ˜์น˜ํ˜•์™€ ๋ฒ”์ฃผํ˜•๋กœ ์ด๋ฃจ์–ด์ ธ ์žˆ๋‹ค. ๋ฒ”์ฃผํ˜• ๋ฐ์ดํ„ฐ๋Š” ์ˆ˜์น˜ํ˜•์œผ๋กœ ์—”์ฝ”๋”ฉ์ด ํ•„์š”ํ•˜๋‹ค.
data.show(5)

class RunKMeans:
    def __init__(self, spark: SparkSession):
        self.spark = spark
    
    def oneHotPipeline(self, inputCol: str):
        # ๋ฌธ์ž์—ด์„ 0, 1, 2 ๋“ฑ์˜ ์ •์ˆ˜ ์ธ๋ฑ์Šค๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.
        indexer = StringIndexer().setInputCol(inputCol).setOutputCol(inputCol+"_indexed")
        # ์ •์ˆ˜ ์ธ๋ฑ์Šค๋ฅผ One-Hot Encoding ํ•œ๋‹ค.
        encoder = OneHotEncoder().setInputCol(inputCol + "_indexed").setOutputCol(inputCol + "_vec")
        # One-Hot Encoding ํŒŒ์ดํ”„๋ผ์ธ์„ ์ƒ์„ฑํ•œ๋‹ค.
        pipeline = Pipeline().setStages([indexer, encoder])

        return pipeline, inputCol + "_vec"

    def fitPipeline(self, data: DataFrame, k: int):
        
        # ๋ฒ”์ฃผํ˜• ๋ฐ์ดํ„ฐ๋ฅผ One-Hot Encoding์„ ์‚ฌ์šฉํ•˜์—ฌ ์ˆ˜์น˜ํ˜•์œผ๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.
        protoTypeEncoder, protoTypeVecCol = self.oneHotPipeline("protocol_type")
        serviceEncoder, serivceVecCol = self.oneHotPipeline("service")
        flagEncoder, flagVecCol = self.oneHotPipeline("flag")
        
        # VectorAssembler๋Š” feature vector๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
        assembleCols = (set(data.columns) - set(["label", "protocol_type", "service", "flag"])).\
            union(set([protoTypeVecCol, serivceVecCol, flagVecCol]))

        assembler = VectorAssembler().\
            setInputCols(list(assembleCols)).\
            setOutputCol("featureVector")

        # feature๋ฅผ Standard Score๋กœ ๋ฐ”๊พธ์–ด ์ •๊ทœํ™”ํ•œ๋‹ค.(๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ™์€ ๋ฐฉํ–ฅ์œผ๋กœ ๊ฐ™์€ ํฌ๊ธฐ๋งŒํผ ์ด๋™ํ•œ๋‹ค๋Š” ๋“ฏ)
        scaler = StandardScaler().\
            setInputCol("featureVector").\
            setOutputCol("scaledFeatureVector").\
            setWithStd(True)

        # K-Means ๋ชจ๋ธ์„ ์ƒ์„ฑํ•œ๋‹ค.
        kmeans = KMeans().\
            setSeed(random.randint(1, 10000000)).\
            setK(k).\
            setPredictionCol("cluster").\
            setFeaturesCol("scaledFeatureVector").\
            setMaxIter(40).\
            setTol(1.0e-5)
        
        # Pipeline ์œผ๋กœ ์—ฐ๊ฒฐํ•œ๋‹ค.
        pipeline = Pipeline().setStages(
            [protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans])

        pipelineModel = pipeline.fit(data)
        
        return pipelineModel


    def clusteringSocre(self, data: DataFrame, k: int):
        pipelineModel = self.fitPipeline(data, k)
        kmeansModel = pipelineModel.stages[-1]
        
        return kmeansModel.summary.trainingCost
        
runKMeans = RunKMeans(spark)
# K ๊ฐ’์ด ์ปค์งˆ ์ˆ˜๋ก ๊ฐ ํด๋Ÿฌ์Šคํ„ฐ ๋ฐ์ดํ„ฐ๊ฐ€ Centroid์™€ ๊ฐ€๊นŒ์›Œ์•ผ ํ•œ๋‹ค.
# K ๊ฐ’์ด ํฐ๋ฐ ๊ฑฐ๋ฆฌ๊ฐ€ ๋ฉ€ ๊ฒฝ์šฐ Local Optimun์— ๋„๋‹ฌํ•˜๊ธฐ ์ „์— ํ•™์Šต์„ ์ข…๋ฃŒํ–ˆ์„ ์ˆ˜๋„ ์žˆ์œผ๋‹ˆ ๋ฐ˜๋ณต ํšŸ์ˆ˜๋ฅผ ๋Š˜๋ ค์„œ ๋ชจ๋ธ ์„ฑ๋Šฅ์„ ๊ฐœ์„ ํ•œ๋‹ค.
# K ๊ฐ’์ด ์ฆ๊ฐ€ํ•ด๋„ ํ‰๊ฐ€ ์ ์ˆ˜๊ฐ€ ํฌ๊ฒŒ ๋–จ์–ด์ง€์ง€ ์•Š๋Š” ์ง€์ (Elbow)์ด ์ ๋‹นํ•œ K ๊ฐ’ ์ผ ์ˆ˜ ์žˆ๋‹ค.

for k in range(60, 270, 30):
    print(k, runKMeans.clusteringSocre(data, k))

์ฐธ๊ณ ์ž๋ฃŒ

9๊ฐ€์ง€ ์‚ฌ๋ก€๋กœ ์ตํžˆ๋Š” ๊ณ ๊ธ‰ ์ŠคํŒŒํฌ ๋ถ„์„(2ํŒ) ๋„์„œ (ํ•œ๋น›๋ฏธ๋””์–ด)

Last updated