Kubeflow Pipelines
Kubeflow Pipelines, KFP
Kubeflow Pipelines(KFP)์ML Workflow๋ฅผ ๊ตฌ์ถํ๊ณ ๋ฐฐํฌํ๊ธฐ ์ํ ML Workflow Orchestration ๋๊ตฌ์ด๋ค.
KFP์ ๋ชฉํ๋ Pipelines๊ณผ Pipeline Components๋ฅผ ์ฌ์ฌ์ฉํ์ฌ ๋ค์ํ ์คํ์ ์ฝ๊ฒ ์ํํ๋ ๊ฒ์ด๋ค. Pipelines์ Pipeline Components์ ์ฐ๊ฒฐํด์ DAG (Directed Acyclic Graph) ํํ๋ก Workflow๋ฅผ ๊ตฌ์ฑํ ์ ์์ผ๋ฉฐ, Workflow Engine์ผ๋ก CNCF ์ ํฌํจ๋์ด ์๋ Argo Workflow ๋ฅผ ์ฌ์ฉํ๋ค.
Argo Workflow
์ปจํ ์ด๋ ๊ธฐ๋ฐ์ผ๋ก ML Pipelines, Data processing, ETL, CI/CD ๊ตฌ์ถ ๋ฑ์ ์ฌ์ฉํ๋ค.
Workflows๋ Kubernetes CRD๋ก ์์ฑํ๋ฉฐ, Workflows์ ๊ฐ Step์ ์ปจํ ์ด๋๋ก ์ ์ํ๊ณ ์ด๊ฒ๋ค์ ์ฐ๊ฒฐํ์ฌ Pipeline ๊ตฌ์ฑํ๋ค.
์ฃผ์ ๊ตฌ์ฑ์์๋ Argo Server, Workflow Controller, Workflow Archive, Artifact Store ๊ฐ ์๋ค.
๋์ ๋ฐฉ๋ฒ์ ๋ค์๊ณผ ๊ฐ๋ค.
1. Workflow CR YAML ์์ฑํ์ฌ Kubernetes์ ์์ฒญํ๋ฉด etcd์ workflow CR์ ์ ์ ํ๋ค.
2. Argo Workflow Controller ๊ฐ Reconcilation ๋ฃจํ์์ etcd์ ์ ๋ณด๋ฅผ ํ์ธํ๊ณ kube-scheduler์ ํ๋ ์์ฑ์ ์์ฒญํ๋ค.
3. kube-scheduler๋ ํ๋๋ฅผ ์ ์ ํ ๋ ธ๋์ ์ค์ผ์ค๋งํ๋ค.
4. Argo Workflow Controller ๊ฐ ๋ค์ Reconcilation ๋ฃจํ์์ ๋ค์ dependency์ job์ ์์ฒญํ๋ค.
Kubeflow Pipeline (KFP)
KFP๋ Argo์ ๋ฌ๋ฆฌ ์ฃผ๋ก ML Workflow ์ฉ๋๋ค.
๋ค์์ KFP ํ์ฉํ์ฌ ML Worflow ๊ตฌ์ถํ๊ณ ๋ฐฐํฌํ๋ ๊ณผ์ ์ด๋ค.

1. Kubeflow Dashboard > Experiments (KFP)์์ Experiment๋ฅผ ์์ฑํ๋ค. (Create Experiment )
2. Jupyter Notebook ํน์ Python IDE์์ KFP Python SDK๋ฅผ ์ฌ์ฉํ์ฌ ML Pipeline ์ฝ๋๋ฅผ ์์ฑํ๋ค. (Write Pipeline)
3. ์์ฑํ Pipeline ํ์ด์ฌ ์ฝ๋๋ฅผ Argo Workflow CR YAML๋ก ๋ณํํ๋ค. (Convert Pipeline to Workflow CR YAML)
4. Kubeflow Dashboard > Pipelines ์์ Argo Workflow CR YAML์ Upload ํ๋ค (Upload Pipeline).
5. Kubeflow Dashboard > Pipelines ์์ Uploadํ Pipeline์ Run ํ๋ค. (Create Run โ Start Run)
KFP Architecture

KFP Python SDK
SDK๋ฅผ ์ฌ์ฉํ์ฌ Pipeline Components ์์ฑํ๊ฑฐ๋ Pipelines ๊ตฌ์ฑํ๋ค.
Pipeline Service
Pipeline Service๋ Pipeline ๋ฐฐํฌ๋ฅผ ๋ด๋นํ๋ค.
Kubernetes Resource
Kubernetes API Server๋ Pipeline Service์์ ์ ๋ฌ ๋ฐ์ Pipeline ์ ๋ณด๋ก Kubernetes Resource๋ฅผ ์์ฑํ๋ค.
Orchestration Controller
Argo Workflow๋ Pipeline์ ์คํํ๋๋ฐ ํ์ํ Kubernetes Pod๋ก ์คํํ๋ค.
Artifact Store
Metadata์ Artifact ํ๋๊ฐ ์๋ค.
Metadata (e.g. MySQL)๋ KFP์ Metadata๋ฅผ ์ ์ฅํ๋ค.
๋ฐ์ดํฐ(e.g. Large Data, Binary)๊ฐ ํด ๊ฒฝ์ฐ, Artifact Store (e.g. Minio)์ ์ ์ฅํด๋๊ณ ๊ทธ ์ ์ฅ์ ๋ณด๋ฅผ Pipeline Component๊ฐ์ ์ฃผ๊ณ ๋ฐ์ ๋ฐ์ดํฐ๋ฅผ ํ์ฉํ๋ค.
Pipeline Persistence Agent
Pipeline Service์์ ๋ฐฐํฌํ Kubernetes Resource๋ฅผ ๊ฐ์ํ๋ค.
Pipeline Web Server
์คํ ์ค์ธ Pipeline ๋ชฉ๋ก, Pipeline ์คํ ๊ธฐ๋ก, ๋ฐ์ดํฐ Artifact ๋ชฉ๋ก ๋ฑ์ ๋ฐ์ดํฐ๋ฅผ ์์งํ๋ค.
KFP UI
Experiments (KFP)
KFP์ Workspace๋ก Experiment๋ฅผ ๊ด๋ฆฌํ๋ค. Experiment ๋ง๋ค ์๋ํ Runs์ ํฌํจํ๋ค.
Pipelines
Pipeline์ ๊ด๋ฆฌํ๋ค.
Runs
Pipeline์ ์คํํ Run ๋ชฉ๋ก์ ๊ด๋ฆฌํ๋ฉฐ, Pipeline์์ Step ๋ณ๋ก ์ํํ Graph, Run output, Config ๋ฑ์ ์ ๊ณตํ๋ค.
Recurring Run์ ๋ฐ๋ณต๋๋ Run์ด๋ค.
Artifacts
Artifact๋ Model ํ์ต ํน์ ๊ฒ์ฆ ๊ณผ์ ์์ ์ฌ์ฉํ๊ฑฐ๋ ์์ฑํ Input/Output ๋ฐ์ดํฐ ์ ๋ณด๋ฅผ ํ์ธํ๋ค.
KFP Python SDK
SDK๋ฅผ ์ฌ์ฉํ์ฌ Pipeline Components๋ฅผ ์์ฑํ๊ฑฐ๋ Pipelines ๊ตฌ์ฑํ๋ค.
Component๋ Ingest Data, Generate Statistics, Preprocess Data, Transform Data, Train Model ๊ณผ ๊ฐ์ ์ค์ Job์ ์ํํ๋ Runtime ์ฝ๋์ ์ธ๋ถ ์์คํ ๊ณผ ์ฐ๊ณํ๊ธฐ ์ํ Client ์ฝ๋๋ก ๊ตฌ๋ถํ๋ค.
์ฝ๋๋ฅผ ์์ฑํ ํ, DSL Compiler ์ฌ์ฉํด Workflow CR YAML๋ก ๋ณํํ๋ค.
kfp.dsl
Pipeline Components ์์ฑํ๋๋ฐ ์ฌ์ฉํ๋ค.
kfp.dsl.ContainerOp
Pipeline Components๋ฅผ ํน์ ์ปจํ ์ด๋ ์ด๋ฏธ์ง๋ก ์ ์ํ ๋ ์ฌ์ฉํ๋ฉฐ, image, command, arguments ๋ฑ์ ์ง์ ํ์ฌ Components๋ฅผ ์ ์ํ ์ ์๋ค.
import kfp.dsl as dsl
def training_op(learning_rate: float,
num_layers: int,
optimizer='ftrl',
step_name='training'):
return dsl.ContainerOp(
name=step_name, # Component ์ด๋ฆ
image='katib/mxnet-mnist-example', # ์ฌ์ฉํ ์ด๋ฏธ์ง
command=['python', '/mxnet/example/image-classification/train_mnist.py'], # ์ปจํ
์ด๋ ์ด๋ฏธ์ง์์ ์คํํ ๋ช
๋ น์ด
arguments=[ # ์ปจํ
์ด๋ ์ด๋ฏธ์ง์์ ์คํํ ๋ช
๋ ์ด์ ์คํ ์ธ์๊ฐ
'--batch-size', '64',
'--lr', learning_rate,
'--num-layers', num_layers,
'--optimizer', optimizer
],
file_outputs={'output': '/etc/timezone'} # ์ปจํ
์ด๋ ๊ฒฐ๊ณผ๋ฅผ ์ธ๋ถ๋ก ๋
ธ์ถ
)
def postprocessing_op(output,
step_name='postprocessing'):
return dsl.ContainerOp(
name=step_name,
image='library/bash:4.4.23',
command=['sh', '-c'],
arguments=['echo "%s"' % output]
)kfp.dsl.ResourceOp
Kubernetes Resource๋ฅผ ์์ฑํ๊ฑฐ๋ ๊ด๋ฆฌํ๋ Pipeline Components๋ฅผ ๋ง๋ ๋ค.
kfp.dsl.VolumeOp
Kubernetes PVC๋ฅผ ์์ฑํ๋ Pipeline Components๋ฅผ ๋ง๋ ๋ค.
kfp.dsl.PipelineParam
Pipeline Components ๊ฐ์ ํ๋ผ๋ฏธํฐ๋ฅผ ์ ๋ฌํ ๋ ์ฌ์ฉํ๋ค.
kfp.dsl.pipeline
Pipeline Components ์ฐ๊ฒฐํ์ฌ Pipeline์ ๊ตฌ์ฑํ๊ณ , ๊ตฌ์ฑํ Pipeline์ ๋ฐํํ๋ค. (๋ฐํ๋ Pipeline์ kfp.compiler๋ก ์ปดํ์ผํ์ฌ Workflow CR YAML ํ์ผ๋ก ๋ณํํ ์ ์๋ค.)
@dsl.pipeline(
name='Pipeline GPU Example',
description='Demonstrate the Kubeflow pipelines SDK with GPUs'
)
def kubeflow_training(
learning_rate: dsl.PipelineParam = dsl.PipelineParam(name='learningrate', value=0.1),
num_layers: dsl.PipelineParam = dsl.PipelineParam(name='numlayers', value='2'),
optimizer: dsl.PipelineParam = dsl.PipelineParam(name='optimizer', value='ftrl')):
training = training_op(learning_rate, num_layers, optimizer).set_gpu_limit(1)
postprocessing = postprocessing_op(training.output)kfp.compiler
kfp.dsl๋ก ์์ฑํ Pipelines ์ฝ๋๋ฅผ DSL Compiler๋ฅผ ์ฌ์ฉํ์ฌ Workflow CR YAML ํ์ผ๋ก ๋ณํํ๋ค.
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(kubeflow_training, __file__ + '.yaml')kfp.Client
Kubernetes์ ์ค์นํ Kubeflow Pipeline ํด๋ผ์ด์ธํธ์ด๋ค. Pipeline ์ ๋ก๋, Experiment ์์ฑ, Run ์์ฑ ๋ฑ์ API๋ฅผ ์ ๊ณตํ๋ค.
create_experiment
Experiment(KFP) ๋ฅผ ์์ฑํ๊ณ Experiment object๋ฅผ ๋ฆฌํดํ๋ค.
run_pipeline
Pipeline ์คํํ๊ณ Run object ๋ฆฌํดํ๋ค.
create_run_from_pipeline_func
Pipeline์ ์ปดํ์ผํ๊ณ Kubeflow Pipeline์ Submit ํ๋ค.
upload_pipeline_version
Pipeline์ Kubeflow Pipeline์ uploadํ๋ค.
Client๋ฅผ ์์ฑํ๋ ๋ฐฉ๋ฒ์ ๋ค์๊ณผ ๊ฐ๋ค.
IN_CLUSTER_DNS_NAME= 'ml-pipeline.{}.svc.cluster.local:8888'
KUBE_PROXY_PATH= 'api/v1/namespaces/{}/services/ml-pipeline:http/proxy/'
LOCAL_KFP_CONTEXT= '/home/docs/.config/kfp/context.json'
NAMESPACE_PATH= '/var/run/secrets/kubernetes.io/serviceaccount/namespace'kfp.components
Pipeline Components ๋ ML Workflow์์ ์ด๋ค Step์ ์ํํ๋ ์ฝ๋์ด๋ค.
Pipeline Component Specification์ ๋ค์๊ณผ ๊ฐ์ด ์ ์ํ ์ ์๋ค.
1. Component์ metadata๋ฅผ ์ ์ํ๋ค. (name, description ๋ฑ)
2. Component์ inputs, outputs ํญ๋ชฉ์ ์ ์ํ๋ค.
3. Component์ implementation ํญ๋ชฉ์ ์ฝ๋๋ฅผ ์คํํ ์ปจํ ์ด๋์ ๊ทธ ์ฝ๋๋ฅผ ์คํํ command, arguments ๋ฅผ ์ ์ํ๋ค.
Pipeline Components๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ์ ๋, KFP Component์์ ์ ๊ณตํ๋ InputPath์ OutputPath ํ๋ผ๋ฏธํฐ ์ด๋ ธํ ์ด์ ์ ์ฌ์ฉํ๋ค.
InputPath์ OutputPath์ ๋ฐ์ดํฐ ํ์ ์ ํ์ด์ฌ ํ์ ์ด๊ฑฐ๋ TFModel ๋ ๊ฐ๋ฅํ๋ค.
name: xgboost4j - Train classifier
description: Trains a boosted tree ensemble classifier using xgboost4j
inputs:
- {name: Training data}
- {name: Rounds, type: Integer, default: '30', description: 'Number of training rounds'}
outputs:
- {name: Trained model, type: XGBoost model, description: 'Trained XGBoost model'}
implementation:
container:
image: gcr.io/ml-pipeline/xgboost-classifier-train@sha256:b3a64d57
command: [
/ml/train.py,
--train-set, {inputPath: Training data},
--rounds, {inputValue: Rounds},
--out-model, {outputPath: Trained model},
]kfp.components.func_to_container_op
Python ํจ์๋ฅผ Pipeline Component๋ก ๋ณํํ๋ค.
kfp.components.load_component_from_file
YAML ํ์ผ๋ก ์ ์ํ Pipeline component๋ฅผ ๋ก๋ํ๋ค.
kfp.components.load_component_from_url
YAML ํ์ผ๋ก ์ ์ํ Pipeline component๋ฅผ URL๋ก ๋ถํฐ ๋ก๋ํ๋ค.
ML Workflow ์์ฑ ์์

KFP Component๋ฅผ ์ด์ฉํ ML Workflow ์์
๋ค์ URL์ ์์ ๋ Katib์์ ์ฐพ์ ํ์ดํผํ๋ผ๋ฏธํฐ๋ฅผ ์ฌ์ฉํ์ฌ TFJob์ผ๋ก ML ๋ชจ๋ธ์ ์์ฑํ๊ณ , ๊ทธ ML ๋ชจ๋ธ๋ก ์์ธก์์คํ ์ ๊ตฌ์ถํ ํ์ดํ๋ผ์ธ ์์ ์ด๋ค.
์ฉ์ด
MinIO (Minimal Object Storage)
MinIO๋ ์คํ์์ค๋ก ์ ๊ณต๋๋ ์ค๋ธ์ ํธ ์คํ ๋ฆฌ์ง ์๋ฒ์ด๋ฉฐ, AWS S3์ ํธํ๋๋ ํด๋ผ์ฐ๋ ์คํ ๋ฆฌ์ง๋ฅผ ๊ตฌ์ถํ ์ ์๋ ๋๊ตฌ์ด๋ค.
Minio Server, Minio Client, Minio Library๋ฅผ ์ ๊ณตํ๋ค.
Reconciliation
์กฐ์ ์ ๋ ๊ฐ์ ๋ ์ฝ๋ ์ธํธ๊ฐ ์ผ์นํ๋์ง ํ์ธํ๋ ํ๋ก์ธ์ค์ด๋ค.
์ฐธ๊ณ ์๋ฃ
Argo Workflow Concepts, https://github.com/argoproj/argo-workflows/blob/master/examples/README.md
Argo Workflow Architecture, https://github.com/argoproj/argo-workflows https://github.com/argoproj/argo-workflows/blob/master/docs/architecture.md
Argo Workflow CRD, https://github.com/argoproj/argo-workflows/tree/master/manifests/base/crds/full
KFP Concepts, https://www.kubeflow.org/docs/components/pipelines/overview/concepts/
KFP Building, https://www.kubeflow.org/docs/components/pipelines/sdk/build-pipeline/
KFP Python SDK Examples, https://github.com/kubeflow/pipelines/tree/master/samples/tutorials
KFP Client, https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.client.html https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api/
KFP Backend, https://github.com/kubeflow/pipelines/tree/master/backend
KFP Component Spec, https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/#component-spec
KFP Architecture, https://github.com/kubeflow/kfp-tekton https://betterprogramming.pub/kubeflow-pipelines-with-gpus-1af6a74ec2a https://www.slideshare.net/AnimeshSingh/kubeflow-pipelines-with-tekton-236769976
KFP Samples. https://github.com/kubeflow/pipelines/tree/master/samples/contrib https://github.com/kubeflow/pipelines/blob/master/samples/contrib/kubeflow-e2e-mnist/kubeflow-e2e-mnist.ipynb
Last updated
Was this helpful?