Kubeflow Pipelines
Kubeflow Pipelines, KFP
Kubeflow Pipelines(KFP)μML Workflowλ₯Ό ꡬμΆνκ³ λ°°ν¬νκΈ° μν ML Workflow Orchestration λꡬμ΄λ€.
KFPμ λͺ©νλ Pipelinesκ³Ό Pipeline Componentsλ₯Ό μ¬μ¬μ©νμ¬ λ€μν μ€νμ μ½κ² μννλ κ²μ΄λ€. Pipelinesμ Pipeline Componentsμ μ°κ²°ν΄μ DAG (Directed Acyclic Graph) ννλ‘ Workflowλ₯Ό ꡬμ±ν μ μμΌλ©°, Workflow EngineμΌλ‘ CNCF μ ν¬ν¨λμ΄ μλ Argo Workflow λ₯Ό μ¬μ©νλ€.
Argo Workflow
컨ν μ΄λ κΈ°λ°μΌλ‘ ML Pipelines, Data processing, ETL, CI/CD κ΅¬μΆ λ±μ μ¬μ©νλ€.
Workflowsλ Kubernetes CRDλ‘ μμ±νλ©°, Workflowsμ κ° Stepμ 컨ν μ΄λλ‘ μ μνκ³ μ΄κ²λ€μ μ°κ²°νμ¬ Pipeline ꡬμ±νλ€.
μ£Όμ ꡬμ±μμλ Argo Server, Workflow Controller, Workflow Archive, Artifact Store κ° μλ€.
λμ λ°©λ²μ λ€μκ³Ό κ°λ€.
1. Workflow CR YAML μμ±νμ¬ Kubernetesμ μμ²νλ©΄ etcdμ workflow CRμ μ μ νλ€.
2. Argo Workflow Controller κ° Reconcilation 루νμμ etcdμ μ 보λ₯Ό νμΈνκ³ kube-schedulerμ νλ μμ±μ μμ²νλ€.
3. kube-schedulerλ νλλ₯Ό μ μ ν λ Έλμ μ€μΌμ€λ§νλ€.
4. Argo Workflow Controller κ° λ€μ Reconcilation 루νμμ λ€μ dependencyμ jobμ μμ²νλ€.
Kubeflow Pipeline (KFP)
KFPλ Argoμ λ¬λ¦¬ μ£Όλ‘ ML Workflow μ©λλ€.
λ€μμ KFP νμ©νμ¬ ML Worflow ꡬμΆνκ³ λ°°ν¬νλ κ³Όμ μ΄λ€.

1. Kubeflow Dashboard > Experiments (KFP)μμ Experimentλ₯Ό μμ±νλ€. (Create Experiment )
2. Jupyter Notebook νΉμ Python IDEμμ KFP Python SDKλ₯Ό μ¬μ©νμ¬ ML Pipeline μ½λλ₯Ό μμ±νλ€. (Write Pipeline)
3. μμ±ν Pipeline νμ΄μ¬ μ½λλ₯Ό Argo Workflow CR YAMLλ‘ λ³ννλ€. (Convert Pipeline to Workflow CR YAML)
4. Kubeflow Dashboard > Pipelines μμ Argo Workflow CR YAMLμ Upload νλ€ (Upload Pipeline).
5. Kubeflow Dashboard > Pipelines μμ Uploadν Pipelineμ Run νλ€. (Create Run β Start Run)
KFP Architecture

KFP Python SDK
SDKλ₯Ό μ¬μ©νμ¬ Pipeline Components μμ±νκ±°λ Pipelines ꡬμ±νλ€.
Pipeline Service
Pipeline Serviceλ Pipeline λ°°ν¬λ₯Ό λ΄λΉνλ€.
Kubernetes Resource
Kubernetes API Serverλ Pipeline Serviceμμ μ λ¬ λ°μ Pipeline μ λ³΄λ‘ Kubernetes Resourceλ₯Ό μμ±νλ€.
Orchestration Controller
Argo Workflowλ Pipelineμ μ€ννλλ° νμν Kubernetes Podλ‘ μ€ννλ€.
Artifact Store
Metadataμ Artifact νλκ° μλ€.
Metadata (e.g. MySQL)λ KFPμ Metadataλ₯Ό μ μ₯νλ€.
λ°μ΄ν°(e.g. Large Data, Binary)κ° ν΄ κ²½μ°, Artifact Store (e.g. Minio)μ μ μ₯ν΄λκ³ κ·Έ μ μ₯μ 보λ₯Ό Pipeline Componentκ°μ μ£Όκ³ λ°μ λ°μ΄ν°λ₯Ό νμ©νλ€.
Pipeline Persistence Agent
Pipeline Serviceμμ λ°°ν¬ν Kubernetes Resourceλ₯Ό κ°μνλ€.
Pipeline Web Server
μ€ν μ€μΈ Pipeline λͺ©λ‘, Pipeline μ€ν κΈ°λ‘, λ°μ΄ν° Artifact λͺ©λ‘ λ±μ λ°μ΄ν°λ₯Ό μμ§νλ€.
KFP UI
Experiments (KFP)
KFPμ Workspaceλ‘ Experimentλ₯Ό κ΄λ¦¬νλ€. Experiment λ§λ€ μλν Runsμ ν¬ν¨νλ€.
Pipelines
Pipelineμ κ΄λ¦¬νλ€.
Runs
Pipelineμ μ€νν Run λͺ©λ‘μ κ΄λ¦¬νλ©°, Pipelineμμ Step λ³λ‘ μνν Graph, Run output, Config λ±μ μ 곡νλ€.
Recurring Runμ λ°λ³΅λλ Runμ΄λ€.
Artifacts
Artifactλ Model νμ΅ νΉμ κ²μ¦ κ³Όμ μμ μ¬μ©νκ±°λ μμ±ν Input/Output λ°μ΄ν° μ 보λ₯Ό νμΈνλ€.
KFP Python SDK
SDKλ₯Ό μ¬μ©νμ¬ Pipeline Componentsλ₯Ό μμ±νκ±°λ Pipelines ꡬμ±νλ€.
Componentλ Ingest Data, Generate Statistics, Preprocess Data, Transform Data, Train Model κ³Ό κ°μ μ€μ Jobμ μννλ Runtime μ½λμ μΈλΆ μμ€ν κ³Ό μ°κ³νκΈ° μν Client μ½λλ‘ κ΅¬λΆνλ€.
μ½λλ₯Ό μμ±ν ν, DSL Compiler μ¬μ©ν΄ Workflow CR YAMLλ‘ λ³ννλ€.
kfp.dsl
Pipeline Components μμ±νλλ° μ¬μ©νλ€.
kfp.dsl.ContainerOp
Pipeline Componentsλ₯Ό νΉμ 컨ν μ΄λ μ΄λ―Έμ§λ‘ μ μν λ μ¬μ©νλ©°, image, command, arguments λ±μ μ§μ νμ¬ Componentsλ₯Ό μ μν μ μλ€.
import kfp.dsl as dsl
def training_op(learning_rate: float,
num_layers: int,
optimizer='ftrl',
step_name='training'):
return dsl.ContainerOp(
name=step_name, # Component μ΄λ¦
image='katib/mxnet-mnist-example', # μ¬μ©ν μ΄λ―Έμ§
command=['python', '/mxnet/example/image-classification/train_mnist.py'], # 컨ν
μ΄λ μ΄λ―Έμ§μμ μ€νν λͺ
λ Ήμ΄
arguments=[ # 컨ν
μ΄λ μ΄λ―Έμ§μμ μ€νν λͺ
λ μ΄μ μ€ν μΈμκ°
'--batch-size', '64',
'--lr', learning_rate,
'--num-layers', num_layers,
'--optimizer', optimizer
],
file_outputs={'output': '/etc/timezone'} # 컨ν
μ΄λ κ²°κ³Όλ₯Ό μΈλΆλ‘ λ
ΈμΆ
)
def postprocessing_op(output,
step_name='postprocessing'):
return dsl.ContainerOp(
name=step_name,
image='library/bash:4.4.23',
command=['sh', '-c'],
arguments=['echo "%s"' % output]
)
kfp.dsl.ResourceOp
Kubernetes Resourceλ₯Ό μμ±νκ±°λ κ΄λ¦¬νλ Pipeline Componentsλ₯Ό λ§λ λ€.
kfp.dsl.VolumeOp
Kubernetes PVCλ₯Ό μμ±νλ Pipeline Componentsλ₯Ό λ§λ λ€.
kfp.dsl.PipelineParam
Pipeline Components κ°μ νλΌλ―Έν°λ₯Ό μ λ¬ν λ μ¬μ©νλ€.
kfp.dsl.pipeline
Pipeline Components μ°κ²°νμ¬ Pipelineμ ꡬμ±νκ³ , ꡬμ±ν Pipelineμ λ°ννλ€. (λ°νλ Pipelineμ kfp.compilerλ‘ μ»΄νμΌνμ¬ Workflow CR YAML νμΌλ‘ λ³νν μ μλ€.)
@dsl.pipeline(
name='Pipeline GPU Example',
description='Demonstrate the Kubeflow pipelines SDK with GPUs'
)
def kubeflow_training(
learning_rate: dsl.PipelineParam = dsl.PipelineParam(name='learningrate', value=0.1),
num_layers: dsl.PipelineParam = dsl.PipelineParam(name='numlayers', value='2'),
optimizer: dsl.PipelineParam = dsl.PipelineParam(name='optimizer', value='ftrl')):
training = training_op(learning_rate, num_layers, optimizer).set_gpu_limit(1)
postprocessing = postprocessing_op(training.output)
kfp.compiler
kfp.dslλ‘ μμ±ν Pipelines μ½λλ₯Ό DSL Compilerλ₯Ό μ¬μ©νμ¬ Workflow CR YAML νμΌλ‘ λ³ννλ€.
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(kubeflow_training, __file__ + '.yaml')
kfp.Client
Kubernetesμ μ€μΉν Kubeflow Pipeline ν΄λΌμ΄μΈνΈμ΄λ€. Pipeline μ λ‘λ, Experiment μμ±, Run μμ± λ±μ APIλ₯Ό μ 곡νλ€.
create_experiment
Experiment(KFP) λ₯Ό μμ±νκ³ Experiment objectλ₯Ό 리ν΄νλ€.
run_pipeline
Pipeline μ€ννκ³ Run object 리ν΄νλ€.
create_run_from_pipeline_func
Pipelineμ μ»΄νμΌνκ³ Kubeflow Pipelineμ Submit νλ€.
upload_pipeline_version
Pipelineμ Kubeflow Pipelineμ uploadνλ€.
Clientλ₯Ό μμ±νλ λ°©λ²μ λ€μκ³Ό κ°λ€.
IN_CLUSTER_DNS_NAME= 'ml-pipeline.{}.svc.cluster.local:8888'
KUBE_PROXY_PATH= 'api/v1/namespaces/{}/services/ml-pipeline:http/proxy/'
LOCAL_KFP_CONTEXT= '/home/docs/.config/kfp/context.json'
NAMESPACE_PATH= '/var/run/secrets/kubernetes.io/serviceaccount/namespace'
kfp.components
Pipeline Components λ ML Workflowμμ μ΄λ€ Stepμ μννλ μ½λμ΄λ€.
Pipeline Component Specificationμ λ€μκ³Ό κ°μ΄ μ μν μ μλ€.
1. Componentμ metadataλ₯Ό μ μνλ€. (name, description λ±)
2. Componentμ inputs, outputs νλͺ©μ μ μνλ€.
3. Componentμ implementation νλͺ©μ μ½λλ₯Ό μ€νν 컨ν μ΄λμ κ·Έ μ½λλ₯Ό μ€νν command, arguments λ₯Ό μ μνλ€.
Pipeline Componentsκ°μ λ°μ΄ν°λ₯Ό λ°μ΄ν°λ₯Ό μ£Όκ³ λ°μ λ, KFP Componentμμ μ 곡νλ InputPathμ OutputPath νλΌλ―Έν° μ΄λ Έν μ΄μ μ μ¬μ©νλ€.
InputPathμ OutputPathμ λ°μ΄ν° νμ μ νμ΄μ¬ νμ μ΄κ±°λ TFModel λ κ°λ₯νλ€.
name: xgboost4j - Train classifier
description: Trains a boosted tree ensemble classifier using xgboost4j
inputs:
- {name: Training data}
- {name: Rounds, type: Integer, default: '30', description: 'Number of training rounds'}
outputs:
- {name: Trained model, type: XGBoost model, description: 'Trained XGBoost model'}
implementation:
container:
image: gcr.io/ml-pipeline/xgboost-classifier-train@sha256:b3a64d57
command: [
/ml/train.py,
--train-set, {inputPath: Training data},
--rounds, {inputValue: Rounds},
--out-model, {outputPath: Trained model},
]
kfp.components.func_to_container_op
Python ν¨μλ₯Ό Pipeline Componentλ‘ λ³ννλ€.
kfp.components.load_component_from_file
YAML νμΌλ‘ μ μν Pipeline componentλ₯Ό λ‘λνλ€.
kfp.components.load_component_from_url
YAML νμΌλ‘ μ μν Pipeline componentλ₯Ό URLλ‘ λΆν° λ‘λνλ€.
ML Workflow μμ± μμ

KFP Componentλ₯Ό μ΄μ©ν ML Workflow μμ
λ€μ URLμ μμ λ Katibμμ μ°Ύμ νμ΄νΌνλΌλ―Έν°λ₯Ό μ¬μ©νμ¬ TFJobμΌλ‘ ML λͺ¨λΈμ μμ±νκ³ , κ·Έ ML λͺ¨λΈλ‘ μμΈ‘μμ€ν μ ꡬμΆν νμ΄νλΌμΈ μμ μ΄λ€.
μ©μ΄
MinIO (Minimal Object Storage)
MinIOλ μ€νμμ€λ‘ μ 곡λλ μ€λΈμ νΈ μ€ν λ¦¬μ§ μλ²μ΄λ©°, AWS S3μ νΈνλλ ν΄λΌμ°λ μ€ν 리μ§λ₯Ό ꡬμΆν μ μλ λꡬμ΄λ€.
Minio Server, Minio Client, Minio Libraryλ₯Ό μ 곡νλ€.
Reconciliation
μ‘°μ μ λ κ°μ λ μ½λ μΈνΈκ° μΌμΉνλμ§ νμΈνλ νλ‘μΈμ€μ΄λ€.
μ°Έκ³ μλ£
Argo Workflow Concepts, https://github.com/argoproj/argo-workflows/blob/master/examples/README.md
Argo Workflow Architecture, https://github.com/argoproj/argo-workflows https://github.com/argoproj/argo-workflows/blob/master/docs/architecture.md
Argo Workflow CRD, https://github.com/argoproj/argo-workflows/tree/master/manifests/base/crds/full
KFP Concepts, https://www.kubeflow.org/docs/components/pipelines/overview/concepts/
KFP Building, https://www.kubeflow.org/docs/components/pipelines/sdk/build-pipeline/
KFP Python SDK Examples, https://github.com/kubeflow/pipelines/tree/master/samples/tutorials
KFP Client, https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.client.html https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api/
KFP Backend, https://github.com/kubeflow/pipelines/tree/master/backend
KFP Component Spec, https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/#component-spec
KFP Architecture, https://github.com/kubeflow/kfp-tekton https://betterprogramming.pub/kubeflow-pipelines-with-gpus-1af6a74ec2a https://www.slideshare.net/AnimeshSingh/kubeflow-pipelines-with-tekton-236769976
KFP Samples. https://github.com/kubeflow/pipelines/tree/master/samples/contrib https://github.com/kubeflow/pipelines/blob/master/samples/contrib/kubeflow-e2e-mnist/kubeflow-e2e-mnist.ipynb
Last updated
Was this helpful?