Kubeflow Pipelines

Kubeflow Pipelines, KFP

Kubeflow Pipelines(KFP)은ML Workflowλ₯Ό κ΅¬μΆ•ν•˜κ³  λ°°ν¬ν•˜κΈ° μœ„ν•œ ML Workflow Orchestration 도ꡬ이닀.

KFP의 λͺ©ν‘œλŠ” Pipelinesκ³Ό Pipeline Componentsλ₯Ό μž¬μ‚¬μš©ν•˜μ—¬ λ‹€μ–‘ν•œ μ‹€ν—˜μ„ μ‰½κ²Œ μˆ˜ν–‰ν•˜λŠ” 것이닀. Pipelines은 Pipeline Components을 μ—°κ²°ν•΄μ„œ DAG (Directed Acyclic Graph) ν˜•νƒœλ‘œ Workflowλ₯Ό ꡬ성할 수 있으며, Workflow Engine으둜 CNCF 에 ν¬ν•¨λ˜μ–΄ μžˆλŠ” Argo Workflow λ₯Ό μ‚¬μš©ν•œλ‹€.

Argo Workflow

μ»¨ν…Œμ΄λ„ˆ 기반으둜 ML Pipelines, Data processing, ETL, CI/CD ꡬ좕 등에 μ‚¬μš©ν•œλ‹€.

WorkflowsλŠ” Kubernetes CRD둜 μž‘μ„±ν•˜λ©°, Workflows의 각 Step은 μ»¨ν…Œμ΄λ„ˆλ‘œ μ •μ˜ν•˜κ³  이것듀을 μ—°κ²°ν•˜μ—¬ Pipeline κ΅¬μ„±ν•œλ‹€.

μ£Όμš” κ΅¬μ„±μš”μ†ŒλŠ” Argo Server, Workflow Controller, Workflow Archive, Artifact Store κ°€ μžˆλ‹€.

λ™μž‘ 방법은 λ‹€μŒκ³Ό κ°™λ‹€.

1. Workflow CR YAML μž‘μ„±ν•˜μ—¬ Kubernetes에 μš”μ²­ν•˜λ©΄ etcd에 workflow CR을 μ €μ •ν•œλ‹€.

2. Argo Workflow Controller κ°€ Reconcilation λ£¨ν”„μ—μ„œ etcd의 정보λ₯Ό ν™•μΈν•˜κ³  kube-scheduler에 νŒŒλ“œ 생성을 μš”μ²­ν•œλ‹€.

3. kube-schedulerλŠ” νŒŒλ“œλ₯Ό μ μ ˆν•œ λ…Έλ“œμ— μŠ€μΌ€μ€„λ§ν•œλ‹€.

4. Argo Workflow Controller κ°€ λ‹€μŒ Reconcilation λ£¨ν”„μ—μ„œ λ‹€μŒ dependency의 job을 μš”μ²­ν•œλ‹€.

Kubeflow Pipeline (KFP)

KFPλŠ” Argo와 달리 주둜 ML Workflow μš©λ„λ‹€.

λ‹€μŒμ€ KFP ν™œμš©ν•˜μ—¬ ML Worflow κ΅¬μΆ•ν•˜κ³  λ°°ν¬ν•˜λŠ” 과정이닀.

1. Kubeflow Dashboard > Experiments (KFP)μ—μ„œ Experimentλ₯Ό μƒμ„±ν•œλ‹€. (Create Experiment )

2. Jupyter Notebook ν˜Ήμ€ Python IDEμ—μ„œ KFP Python SDKλ₯Ό μ‚¬μš©ν•˜μ—¬ ML Pipeline μ½”λ“œλ₯Ό μž‘μ„±ν•œλ‹€. (Write Pipeline)

3. μž‘μ„±ν•œ Pipeline 파이썬 μ½”λ“œλ₯Ό Argo Workflow CR YAML둜 λ³€ν™˜ν•œλ‹€. (Convert Pipeline to Workflow CR YAML)

4. Kubeflow Dashboard > Pipelines μ—μ„œ Argo Workflow CR YAML을 Upload ν•œλ‹€ (Upload Pipeline).

5. Kubeflow Dashboard > Pipelines μ—μ„œ Uploadν•œ Pipeline을 Run ν•œλ‹€. (Create Run β†’ Start Run)

KFP Architecture

KFP Python SDK

SDKλ₯Ό μ‚¬μš©ν•˜μ—¬ Pipeline Components μƒμ„±ν•˜κ±°λ‚˜ Pipelines κ΅¬μ„±ν•œλ‹€.

Pipeline Service

Pipeline ServiceλŠ” Pipeline 배포λ₯Ό λ‹΄λ‹Ήν•œλ‹€.

Kubernetes Resource

Kubernetes API ServerλŠ” Pipeline Serviceμ—μ„œ 전달 받은 Pipeline μ •λ³΄λ‘œ Kubernetes Resourceλ₯Ό μƒμ„±ν•œλ‹€.

Orchestration Controller

Argo WorkflowλŠ” Pipeline을 μ‹€ν–‰ν•˜λŠ”λ° ν•„μš”ν•œ Kubernetes Pod둜 μ‹€ν–‰ν•œλ‹€.

Artifact Store

Metadata와 Artifact νŒŒλ“œκ°€ μžˆλ‹€.

Metadata (e.g. MySQL)λŠ” KFP의 Metadataλ₯Ό μ €μž₯ν•œλ‹€.

데이터(e.g. Large Data, Binary)κ°€ 클 경우, Artifact Store (e.g. Minio)에 μ €μž₯해두고 κ·Έ μ €μž₯정보λ₯Ό Pipeline Componentκ°„μ˜ μ£Όκ³  λ°›μ•„ 데이터λ₯Ό ν™œμš©ν•œλ‹€.

Pipeline Persistence Agent

Pipeline Serviceμ—μ„œ λ°°ν¬ν•œ Kubernetes Resourceλ₯Ό κ°μ‹œν•œλ‹€.

Pipeline Web Server

μ‹€ν–‰ 쀑인 Pipeline λͺ©λ‘, Pipeline μ‹€ν–‰ 기둝, 데이터 Artifact λͺ©λ‘ λ“±μ˜ 데이터λ₯Ό μˆ˜μ§‘ν•œλ‹€.

KFP UI

Experiments (KFP)

KFP의 Workspace둜 Experimentλ₯Ό κ΄€λ¦¬ν•œλ‹€. Experiment λ§ˆλ‹€ μ‹œλ„ν•œ Runs을 ν¬ν•¨ν•œλ‹€.

Pipelines

Pipeline을 κ΄€λ¦¬ν•œλ‹€.

Runs

Pipeline을 μ‹€ν–‰ν•œ Run λͺ©λ‘μ„ κ΄€λ¦¬ν•˜λ©°, Pipelineμ—μ„œ Step λ³„λ‘œ μˆ˜ν–‰ν•œ Graph, Run output, Config 등을 μ œκ³΅ν•œλ‹€.

Recurring Run은 λ°˜λ³΅λ˜λŠ” Run이닀.

Artifacts

ArtifactλŠ” Model ν•™μŠ΅ ν˜Ήμ€ 검증 κ³Όμ •μ—μ„œ μ‚¬μš©ν•˜κ±°λ‚˜ μƒμ„±ν•œ Input/Output 데이터 정보λ₯Ό ν™•μΈν•œλ‹€.

KFP Python SDK

SDKλ₯Ό μ‚¬μš©ν•˜μ—¬ Pipeline Componentsλ₯Ό μž‘μ„±ν•˜κ±°λ‚˜ Pipelines κ΅¬μ„±ν•œλ‹€.

ComponentλŠ” Ingest Data, Generate Statistics, Preprocess Data, Transform Data, Train Model κ³Ό 같은 μ‹€μ œ Job을 μˆ˜ν–‰ν•˜λŠ” Runtime μ½”λ“œμ™€ μ™ΈλΆ€ μ‹œμŠ€ν…œκ³Ό μ—°κ³„ν•˜κΈ° μœ„ν•œ Client μ½”λ“œλ‘œ κ΅¬λΆ„ν•œλ‹€.

μ½”λ“œλ₯Ό μž‘μ„±ν•œ ν›„, DSL Compiler μ‚¬μš©ν•΄ Workflow CR YAML둜 λ³€ν™˜ν•œλ‹€.

kfp.dsl

Pipeline Components μž‘μ„±ν•˜λŠ”λ° μ‚¬μš©ν•œλ‹€.

kfp.dsl.ContainerOp

Pipeline Componentsλ₯Ό νŠΉμ • μ»¨ν…Œμ΄λ„ˆ μ΄λ―Έμ§€λ‘œ μ •μ˜ν•  λ•Œ μ‚¬μš©ν•˜λ©°, image, command, arguments 등을 μ§€μ •ν•˜μ—¬ Componentsλ₯Ό μ •μ˜ν•  수 μžˆλ‹€.

Write Components
import kfp.dsl as dsl
 
def training_op(learning_rate: float,
                num_layers: int,
                optimizer='ftrl',
                step_name='training'):
     
    return dsl.ContainerOp(
        name=step_name,                                                           # Component 이름
        image='katib/mxnet-mnist-example',                                        # μ‚¬μš©ν•  이미지
        command=['python', '/mxnet/example/image-classification/train_mnist.py'], # μ»¨ν…Œμ΄λ„ˆ μ΄λ―Έμ§€μ—μ„œ μ‹€ν–‰ν•  λͺ…λ Ήμ–΄
        arguments=[                                                               # μ»¨ν…Œμ΄λ„ˆ μ΄λ―Έμ§€μ—μ„œ μ‹€ν–‰ν•  λͺ…λ μ–΄μ˜ μ‹€ν–‰ μΈμžκ°’
            '--batch-size', '64',
            '--lr', learning_rate,
            '--num-layers', num_layers,
            '--optimizer', optimizer
        ],
        file_outputs={'output': '/etc/timezone'}                                  # μ»¨ν…Œμ΄λ„ˆ κ²°κ³Όλ₯Ό μ™ΈλΆ€λ‘œ λ…ΈμΆœ
  )
 
 
def postprocessing_op(output,
                      step_name='postprocessing'):
    return dsl.ContainerOp(
        name=step_name,
        image='library/bash:4.4.23',
        command=['sh', '-c'],
        arguments=['echo "%s"' % output]
    )

kfp.dsl.ResourceOp

Kubernetes Resourceλ₯Ό μƒμ„±ν•˜κ±°λ‚˜ κ΄€λ¦¬ν•˜λŠ” Pipeline Componentsλ₯Ό λ§Œλ“ λ‹€.

kfp.dsl.VolumeOp

Kubernetes PVCλ₯Ό μƒμ„±ν•˜λŠ” Pipeline Componentsλ₯Ό λ§Œλ“ λ‹€.

kfp.dsl.PipelineParam

Pipeline Components κ°„μ˜ νŒŒλΌλ―Έν„°λ₯Ό 전달할 λ•Œ μ‚¬μš©ν•œλ‹€.

kfp.dsl.pipeline

Pipeline Components μ—°κ²°ν•˜μ—¬ Pipeline을 κ΅¬μ„±ν•˜κ³ , κ΅¬μ„±ν•œ Pipeline을 λ°˜ν™˜ν•œλ‹€. (λ°˜ν™˜λœ Pipeline은 kfp.compiler둜 μ»΄νŒŒμΌν•˜μ—¬ Workflow CR YAML 파일둜 λ³€ν™˜ν•  수 μžˆλ‹€.)

Write Pipeline
@dsl.pipeline(
  name='Pipeline GPU Example',
  description='Demonstrate the Kubeflow pipelines SDK with GPUs'
)
def kubeflow_training(
  learning_rate: dsl.PipelineParam = dsl.PipelineParam(name='learningrate', value=0.1),
  num_layers: dsl.PipelineParam = dsl.PipelineParam(name='numlayers', value='2'),
  optimizer: dsl.PipelineParam = dsl.PipelineParam(name='optimizer', value='ftrl')):
     
    training = training_op(learning_rate, num_layers, optimizer).set_gpu_limit(1)
    postprocessing = postprocessing_op(training.output)

kfp.compiler

kfp.dsl둜 μž‘μ„±ν•œ Pipelines μ½”λ“œλ₯Ό DSL Compilerλ₯Ό μ‚¬μš©ν•˜μ—¬ Workflow CR YAML 파일둜 λ³€ν™˜ν•œλ‹€.

Convert Pipeline to Workflow CR
if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(kubeflow_training, __file__ + '.yaml')

kfp.Client

Kubernetes에 μ„€μΉ˜ν•œ Kubeflow Pipeline ν΄λΌμ΄μ–ΈνŠΈμ΄λ‹€. Pipeline μ—…λ‘œλ“œ, Experiment 생성, Run 생성 λ“±μ˜ APIλ₯Ό μ œκ³΅ν•œλ‹€.

create_experiment

Experiment(KFP) λ₯Ό μƒμ„±ν•˜κ³  Experiment objectλ₯Ό λ¦¬ν„΄ν•œλ‹€.

run_pipeline

Pipeline μ‹€ν–‰ν•˜κ³  Run object λ¦¬ν„΄ν•œλ‹€.

create_run_from_pipeline_func

Pipeline을 μ»΄νŒŒμΌν•˜κ³  Kubeflow Pipeline에 Submit ν•œλ‹€.

upload_pipeline_version

Pipeline을 Kubeflow Pipeline에 uploadν•œλ‹€.

Clientλ₯Ό μƒμ„±ν•˜λŠ” 방법은 λ‹€μŒκ³Ό κ°™λ‹€.

IN_CLUSTER_DNS_NAME= 'ml-pipeline.{}.svc.cluster.local:8888'
KUBE_PROXY_PATH= 'api/v1/namespaces/{}/services/ml-pipeline:http/proxy/'
LOCAL_KFP_CONTEXT= '/home/docs/.config/kfp/context.json'
NAMESPACE_PATH= '/var/run/secrets/kubernetes.io/serviceaccount/namespace'

kfp.components

Pipeline Components λŠ” ML Workflowμ—μ„œ μ–΄λ–€ Step을 μˆ˜ν–‰ν•˜λŠ” μ½”λ“œμ΄λ‹€.

Pipeline Component Specification은 λ‹€μŒκ³Ό 같이 μ •μ˜ν•  수 μžˆλ‹€.

1. Component의 metadataλ₯Ό μ •μ˜ν•œλ‹€. (name, description λ“±)

2. Component의 inputs, outputs ν•­λͺ©μ„ μ •μ˜ν•œλ‹€.

3. Component의 implementation ν•­λͺ©μ— μ½”λ“œλ₯Ό μ‹€ν–‰ν•  μ»¨ν…Œμ΄λ„ˆμ™€ κ·Έ μ½”λ“œλ₯Ό μ‹€ν–‰ν•  command, arguments λ₯Ό μ •μ˜ν•œλ‹€.

Pipeline Componentsκ°„μ˜ 데이터λ₯Ό 데이터λ₯Ό μ£Όκ³  받을 λ•Œ, KFP Componentμ—μ„œ μ œκ³΅ν•˜λŠ” InputPath와 OutputPath νŒŒλΌλ―Έν„° μ–΄λ…Έν…Œμ΄μ…˜μ„ μ‚¬μš©ν•œλ‹€.

InputPath와 OutputPath의 데이터 νƒ€μž…μ€ 파이썬 νƒ€μž…μ΄κ±°λ‚˜ TFModel 도 κ°€λŠ₯ν•˜λ‹€.

Component Specification
name: xgboost4j - Train classifier
description: Trains a boosted tree ensemble classifier using xgboost4j
 
inputs:
- {name: Training data}
- {name: Rounds, type: Integer, default: '30', description: 'Number of training rounds'}
 
outputs:
- {name: Trained model, type: XGBoost model, description: 'Trained XGBoost model'}
 
implementation:
  container:
    image: gcr.io/ml-pipeline/xgboost-classifier-train@sha256:b3a64d57
    command: [
      /ml/train.py,
      --train-set, {inputPath: Training data},
      --rounds,    {inputValue: Rounds},
      --out-model, {outputPath: Trained model},
    ]

kfp.components.func_to_container_op

Python ν•¨μˆ˜λ₯Ό Pipeline Component둜 λ³€ν™˜ν•œλ‹€.

kfp.components.load_component_from_file

YAML 파일둜 μ •μ˜ν•œ Pipeline componentλ₯Ό λ‘œλ“œν•œλ‹€.

kfp.components.load_component_from_url

YAML 파일둜 μ •μ˜ν•œ Pipeline componentλ₯Ό URL둜 λΆ€ν„° λ‘œλ“œν•œλ‹€.

ML Workflow μž‘μ„± 예제

KFP Componentλ₯Ό μ΄μš©ν•œ ML Workflow 예제

λ‹€μŒ URL의 μ˜ˆμ œλŠ” Katibμ—μ„œ 찾은 ν•˜μ΄νΌνŒŒλΌλ―Έν„°λ₯Ό μ‚¬μš©ν•˜μ—¬ TFJob으둜 ML λͺ¨λΈμ„ μƒμ„±ν•˜κ³ , κ·Έ ML λͺ¨λΈλ‘œ μ˜ˆμΈ‘μ‹œμŠ€ν…œμ„ κ΅¬μΆ•ν•œ νŒŒμ΄ν”„λΌμΈ μ˜ˆμ œμ΄λ‹€.

https://github.com/kubeflow/pipelines/blob/master/samples/contrib/kubeflow-e2e-mnist/kubeflow-e2e-mnist.ipynb

μš©μ–΄

MinIO (Minimal Object Storage)

MinIOλŠ” μ˜€ν”ˆμ†ŒμŠ€λ‘œ μ œκ³΅λ˜λŠ” 였브젝트 μŠ€ν† λ¦¬μ§€ μ„œλ²„μ΄λ©°, AWS S3와 ν˜Έν™˜λ˜λŠ” ν΄λΌμš°λ“œ μŠ€ν† λ¦¬μ§€λ₯Ό ꡬ좕할 수 μžˆλŠ” 도ꡬ이닀.

Minio Server, Minio Client, Minio Libraryλ₯Ό μ œκ³΅ν•œλ‹€.

Reconciliation

쑰정은 두 개의 λ ˆμ½”λ“œ μ„ΈνŠΈκ°€ μΌμΉ˜ν•˜λŠ”μ§€ ν™•μΈν•˜λŠ” ν”„λ‘œμ„ΈμŠ€μ΄λ‹€.

참고자료

Argo Workflow Concepts, https://github.com/argoproj/argo-workflows/blob/master/examples/README.md

Argo Workflow Architecture, https://github.com/argoproj/argo-workflows https://github.com/argoproj/argo-workflows/blob/master/docs/architecture.md

Argo Workflow CRD, https://github.com/argoproj/argo-workflows/tree/master/manifests/base/crds/full

KFP Concepts, https://www.kubeflow.org/docs/components/pipelines/overview/concepts/

KFP Building, https://www.kubeflow.org/docs/components/pipelines/sdk/build-pipeline/

KFP Python SDK Examples, https://github.com/kubeflow/pipelines/tree/master/samples/tutorials

KFP Client, https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.client.html https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api/

KFP Backend, https://github.com/kubeflow/pipelines/tree/master/backend

KFP Component Spec, https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/#component-spec

KFP Architecture, https://github.com/kubeflow/kfp-tekton https://betterprogramming.pub/kubeflow-pipelines-with-gpus-1af6a74ec2a https://www.slideshare.net/AnimeshSingh/kubeflow-pipelines-with-tekton-236769976

KFP Samples. https://github.com/kubeflow/pipelines/tree/master/samples/contrib https://github.com/kubeflow/pipelines/blob/master/samples/contrib/kubeflow-e2e-mnist/kubeflow-e2e-mnist.ipynb

Last updated

Was this helpful?