์ด์ ํ์ง๋ ์ง๊ธ๊น์ง ์๋ ค์ง์ง ์์ ์๋ก์ด ๋คํธ์ํฌ ๊ณต๊ฒฉ์ด๋ ์๋ฒ ์ฅ์ , ๊ณต์ฅ ์ค๋น ๋ฑ ์๋ก์ด ์ข
๋ฅ์ ์ด์์ ํ์งํ๋ ๊ฒ์ด๋ค. ์ด์ ํ์ง๋ ๋น์ง๋ ํ์ต ์๊ณ ๋ฆฌ์ฆ K-Means๋ฅผ ์ฌ์ฉํ์ฌ ํ์งํ ์ ์๋ค. ์ ์ ๋ฐ์ดํฐ๋ฅผ ํ์ตํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ๊ฐ์ ํด๋ฌ์คํฐ๋ก ํด๋ฌ์คํฐ๋งํ๊ณ ์๋ก์ด ๋ฐ์ดํฐ๊ฐ ํด๋ฌ์คํฐ์ ํฌํจ๋์ง ์์ผ๋ฉด ์ด์์ผ๋ก ํ์งํ ์ ์๋ค.
K-Means ๋ ๋ฐ์ดํฐ๋ฅผ K๊ฐ๋ก ํด๋ฌ์คํฐ๋งํ๋ ML ๋ชจ๋ธ์ด๋ค. K ๊ฐ์ ์ฌ์ฉ์๊ฐ ์ ์ํ๋ ํ์ดํผํ๋ผ๋ฏธํฐ์ด๋ค. ํด๋ฌ์คํฐ ์ค์ฌ์ centroid๋ผ๊ณ ํ๋๋ฐ ํด๋ฌ์คํฐ์ ์ํ ๋ชจ๋ ๋ฐ์ดํฐ์ ์ฐ์ ํ๊ท ์ผ๋ก ๊ตฌํ๋ค. ๊ทธ๋์ K-Means๋ผ๊ณ ํ๋ค. K-Means ๋ชจ๋ธ์ ๊ฐ์ฅ ์ข์ K๊ฐ์ ์ฐพ๋ ๊ฒ์ด ํต์ฌ์ด๋ค.
์์ง๊น์ง ๋คํธ์ํฌ ๊ณต๊ฒฉ์ผ๋ก ์๋ ค์ง์ง๋ ์์์ง๋ง, ๊ณผ๊ฑฐ ๋คํธ์ํฌ ์ฐ๊ฒฐ๊ณผ ๋ค๋ฅธ ์์์ ์ฐพ์ ๋ด๋ ๊ฒ์ด ๋คํธ์ํฌ ์ด์ ํ์ง์ด๋ค.
import warnings
warnings.filterwarnings(action='default')
#install Apache Spark
!pip install pyspark --quiet
from pyspark.ml import PipelineModel, Pipeline
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.linalg import Vector, Vectors
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, asc, desc
import random
#Building Spark Session
#spark = SparkSession.builder.appName("kmeans").master("local[*]").getOrCreate()
spark = (SparkSession.builder
.appName('Anomaly Detection in Network Traffic with K-means')
.config("spark.executor.memory", "1G")
.config("spark.executor.cores","4")
.getOrCreate())
# Setting Spark Log Level
spark.sparkContext.setLogLevel('ERROR')
spark.version
# Load Data
url = '/kaggle/input/sparkml/kddcup.data_10_percent'
data = spark.read.\
option("inferSchema", True).\
option("header", False).\
csv(url).\
toDF(
"duration", "protocol_type", "service", "flag",
"src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
"hot", "num_failed_logins", "logged_in", "num_compromised",
"root_shell", "su_attempted", "num_root", "num_file_creations",
"num_shells", "num_access_files", "num_outbound_cmds",
"is_host_login", "is_guest_login", "count", "srv_count",
"serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
"same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
"dst_host_count", "dst_host_srv_count",
"dst_host_same_srv_rate", "dst_host_diff_srv_rate",
"dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
"dst_host_serror_rate", "dst_host_srv_serror_rate",
"dst_host_rerror_rate", "dst_host_srv_rerror_rate",
"label")
data.cache() #for faster re-use
## Data Exploration
# 23๊ฐ ๋ ์ด๋ธ์ด ์๊ณ smurf์ neptune ๋น๋๊ฐ ๋๋ค. ๋ฐ์ดํฐ์ 23๊ฐ ๋ ์ด๋ธ์ด ์๊ธฐ ๋๋ฌธ์ k๋ ์ต์ 23์ด๋ผ๊ณ ์ถ์ธกํ ์ ์๋ค.
countByLabel = data.select("label").groupBy("label").count().orderBy(col("count").desc())
countByLabel.show(25)
countByLabel.count()
# ๋ฐ์ดํฐ๋ ์์นํ์ ๋ฒ์ฃผํ๋ก ์ด๋ฃจ์ด์ ธ ์๋ค. ๋ฒ์ฃผํ ๋ฐ์ดํฐ๋ ์์นํ์ผ๋ก ์์ฝ๋ฉ์ด ํ์ํ๋ค.
data.show(5)
class RunKMeans:
def __init__(self, spark: SparkSession):
self.spark = spark
def oneHotPipeline(self, inputCol: str):
# ๋ฌธ์์ด์ 0, 1, 2 ๋ฑ์ ์ ์ ์ธ๋ฑ์ค๋ก ๋ณํํ๋ค.
indexer = StringIndexer().setInputCol(inputCol).setOutputCol(inputCol+"_indexed")
# ์ ์ ์ธ๋ฑ์ค๋ฅผ One-Hot Encoding ํ๋ค.
encoder = OneHotEncoder().setInputCol(inputCol + "_indexed").setOutputCol(inputCol + "_vec")
# One-Hot Encoding ํ์ดํ๋ผ์ธ์ ์์ฑํ๋ค.
pipeline = Pipeline().setStages([indexer, encoder])
return pipeline, inputCol + "_vec"
def fitPipeline(self, data: DataFrame, k: int):
# ๋ฒ์ฃผํ ๋ฐ์ดํฐ๋ฅผ One-Hot Encoding์ ์ฌ์ฉํ์ฌ ์์นํ์ผ๋ก ๋ณํํ๋ค.
protoTypeEncoder, protoTypeVecCol = self.oneHotPipeline("protocol_type")
serviceEncoder, serivceVecCol = self.oneHotPipeline("service")
flagEncoder, flagVecCol = self.oneHotPipeline("flag")
# VectorAssembler๋ feature vector๋ฅผ ์์ฑํ๋ค.
assembleCols = (set(data.columns) - set(["label", "protocol_type", "service", "flag"])).\
union(set([protoTypeVecCol, serivceVecCol, flagVecCol]))
assembler = VectorAssembler().\
setInputCols(list(assembleCols)).\
setOutputCol("featureVector")
# feature๋ฅผ Standard Score๋ก ๋ฐ๊พธ์ด ์ ๊ทํํ๋ค.(๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ๋ฐฉํฅ์ผ๋ก ๊ฐ์ ํฌ๊ธฐ๋งํผ ์ด๋ํ๋ค๋ ๋ฏ)
scaler = StandardScaler().\
setInputCol("featureVector").\
setOutputCol("scaledFeatureVector").\
setWithStd(True)
# K-Means ๋ชจ๋ธ์ ์์ฑํ๋ค.
kmeans = KMeans().\
setSeed(random.randint(1, 10000000)).\
setK(k).\
setPredictionCol("cluster").\
setFeaturesCol("scaledFeatureVector").\
setMaxIter(40).\
setTol(1.0e-5)
# Pipeline ์ผ๋ก ์ฐ๊ฒฐํ๋ค.
pipeline = Pipeline().setStages(
[protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans])
pipelineModel = pipeline.fit(data)
return pipelineModel
def clusteringSocre(self, data: DataFrame, k: int):
pipelineModel = self.fitPipeline(data, k)
kmeansModel = pipelineModel.stages[-1]
return kmeansModel.summary.trainingCost
runKMeans = RunKMeans(spark)
# K ๊ฐ์ด ์ปค์ง ์๋ก ๊ฐ ํด๋ฌ์คํฐ ๋ฐ์ดํฐ๊ฐ Centroid์ ๊ฐ๊น์์ผ ํ๋ค.
# K ๊ฐ์ด ํฐ๋ฐ ๊ฑฐ๋ฆฌ๊ฐ ๋ฉ ๊ฒฝ์ฐ Local Optimun์ ๋๋ฌํ๊ธฐ ์ ์ ํ์ต์ ์ข
๋ฃํ์ ์๋ ์์ผ๋ ๋ฐ๋ณต ํ์๋ฅผ ๋๋ ค์ ๋ชจ๋ธ ์ฑ๋ฅ์ ๊ฐ์ ํ๋ค.
# K ๊ฐ์ด ์ฆ๊ฐํด๋ ํ๊ฐ ์ ์๊ฐ ํฌ๊ฒ ๋จ์ด์ง์ง ์๋ ์ง์ (Elbow)์ด ์ ๋นํ K ๊ฐ ์ผ ์ ์๋ค.
for k in range(60, 270, 30):
print(k, runKMeans.clusteringSocre(data, k))
9๊ฐ์ง ์ฌ๋ก๋ก ์ตํ๋ ๊ณ ๊ธ ์คํํฌ ๋ถ์(2ํ) ๋์ (ํ๋น๋ฏธ๋์ด)