Kubeflow Pipelines

Kubeflow Pipelines, KFP

Kubeflow Pipelines(KFP)์€ML Workflow๋ฅผ ๊ตฌ์ถ•ํ•˜๊ณ  ๋ฐฐํฌํ•˜๊ธฐ ์œ„ํ•œ ML Workflow Orchestration ๋„๊ตฌ์ด๋‹ค.

KFP์˜ ๋ชฉํ‘œ๋Š” Pipelines๊ณผ Pipeline Components๋ฅผ ์žฌ์‚ฌ์šฉํ•˜์—ฌ ๋‹ค์–‘ํ•œ ์‹คํ—˜์„ ์‰ฝ๊ฒŒ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฒƒ์ด๋‹ค. Pipelines์€ Pipeline Components์„ ์—ฐ๊ฒฐํ•ด์„œ DAG (Directed Acyclic Graph) ํ˜•ํƒœ๋กœ Workflow๋ฅผ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, Workflow Engine์œผ๋กœ CNCF ์— ํฌํ•จ๋˜์–ด ์žˆ๋Š” Argo Workflow ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

Argo Workflow

์ปจํ…Œ์ด๋„ˆ ๊ธฐ๋ฐ˜์œผ๋กœ ML Pipelines, Data processing, ETL, CI/CD ๊ตฌ์ถ• ๋“ฑ์— ์‚ฌ์šฉํ•œ๋‹ค.

Workflows๋Š” Kubernetes CRD๋กœ ์ž‘์„ฑํ•˜๋ฉฐ, Workflows์˜ ๊ฐ Step์€ ์ปจํ…Œ์ด๋„ˆ๋กœ ์ •์˜ํ•˜๊ณ  ์ด๊ฒƒ๋“ค์„ ์—ฐ๊ฒฐํ•˜์—ฌ Pipeline ๊ตฌ์„ฑํ•œ๋‹ค.

์ฃผ์š” ๊ตฌ์„ฑ์š”์†Œ๋Š” Argo Server, Workflow Controller, Workflow Archive, Artifact Store ๊ฐ€ ์žˆ๋‹ค.

๋™์ž‘ ๋ฐฉ๋ฒ•์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

1. Workflow CR YAML ์ž‘์„ฑํ•˜์—ฌ Kubernetes์— ์š”์ฒญํ•˜๋ฉด etcd์— workflow CR์„ ์ €์ •ํ•œ๋‹ค.

2. Argo Workflow Controller ๊ฐ€ Reconcilation ๋ฃจํ”„์—์„œ etcd์˜ ์ •๋ณด๋ฅผ ํ™•์ธํ•˜๊ณ  kube-scheduler์— ํŒŒ๋“œ ์ƒ์„ฑ์„ ์š”์ฒญํ•œ๋‹ค.

3. kube-scheduler๋Š” ํŒŒ๋“œ๋ฅผ ์ ์ ˆํ•œ ๋…ธ๋“œ์— ์Šค์ผ€์ค„๋งํ•œ๋‹ค.

4. Argo Workflow Controller ๊ฐ€ ๋‹ค์Œ Reconcilation ๋ฃจํ”„์—์„œ ๋‹ค์Œ dependency์˜ job์„ ์š”์ฒญํ•œ๋‹ค.

Kubeflow Pipeline (KFP)

KFP๋Š” Argo์™€ ๋‹ฌ๋ฆฌ ์ฃผ๋กœ ML Workflow ์šฉ๋„๋‹ค.

๋‹ค์Œ์€ KFP ํ™œ์šฉํ•˜์—ฌ ML Worflow ๊ตฌ์ถ•ํ•˜๊ณ  ๋ฐฐํฌํ•˜๋Š” ๊ณผ์ •์ด๋‹ค.

1. Kubeflow Dashboard > Experiments (KFP)์—์„œ Experiment๋ฅผ ์ƒ์„ฑํ•œ๋‹ค. (Create Experiment )

2. Jupyter Notebook ํ˜น์€ Python IDE์—์„œ KFP Python SDK๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ML Pipeline ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•œ๋‹ค. (Write Pipeline)

3. ์ž‘์„ฑํ•œ Pipeline ํŒŒ์ด์ฌ ์ฝ”๋“œ๋ฅผ Argo Workflow CR YAML๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค. (Convert Pipeline to Workflow CR YAML)

4. Kubeflow Dashboard > Pipelines ์—์„œ Argo Workflow CR YAML์„ Upload ํ•œ๋‹ค (Upload Pipeline).

5. Kubeflow Dashboard > Pipelines ์—์„œ Uploadํ•œ Pipeline์„ Run ํ•œ๋‹ค. (Create Run โ†’ Start Run)

KFP Architecture

KFP Python SDK

SDK๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Pipeline Components ์ƒ์„ฑํ•˜๊ฑฐ๋‚˜ Pipelines ๊ตฌ์„ฑํ•œ๋‹ค.

Pipeline Service

Pipeline Service๋Š” Pipeline ๋ฐฐํฌ๋ฅผ ๋‹ด๋‹นํ•œ๋‹ค.

Kubernetes Resource

Kubernetes API Server๋Š” Pipeline Service์—์„œ ์ „๋‹ฌ ๋ฐ›์€ Pipeline ์ •๋ณด๋กœ Kubernetes Resource๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.

Orchestration Controller

Argo Workflow๋Š” Pipeline์„ ์‹คํ–‰ํ•˜๋Š”๋ฐ ํ•„์š”ํ•œ Kubernetes Pod๋กœ ์‹คํ–‰ํ•œ๋‹ค.

Artifact Store

Metadata์™€ Artifact ํŒŒ๋“œ๊ฐ€ ์žˆ๋‹ค.

Metadata (e.g. MySQL)๋Š” KFP์˜ Metadata๋ฅผ ์ €์žฅํ•œ๋‹ค.

๋ฐ์ดํ„ฐ(e.g. Large Data, Binary)๊ฐ€ ํด ๊ฒฝ์šฐ, Artifact Store (e.g. Minio)์— ์ €์žฅํ•ด๋‘๊ณ  ๊ทธ ์ €์žฅ์ •๋ณด๋ฅผ Pipeline Component๊ฐ„์˜ ์ฃผ๊ณ  ๋ฐ›์•„ ๋ฐ์ดํ„ฐ๋ฅผ ํ™œ์šฉํ•œ๋‹ค.

Pipeline Persistence Agent

Pipeline Service์—์„œ ๋ฐฐํฌํ•œ Kubernetes Resource๋ฅผ ๊ฐ์‹œํ•œ๋‹ค.

Pipeline Web Server

์‹คํ–‰ ์ค‘์ธ Pipeline ๋ชฉ๋ก, Pipeline ์‹คํ–‰ ๊ธฐ๋ก, ๋ฐ์ดํ„ฐ Artifact ๋ชฉ๋ก ๋“ฑ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•œ๋‹ค.

KFP UI

Experiments (KFP)

KFP์˜ Workspace๋กœ Experiment๋ฅผ ๊ด€๋ฆฌํ•œ๋‹ค. Experiment ๋งˆ๋‹ค ์‹œ๋„ํ•œ Runs์„ ํฌํ•จํ•œ๋‹ค.

Pipelines

Pipeline์„ ๊ด€๋ฆฌํ•œ๋‹ค.

Runs

Pipeline์„ ์‹คํ–‰ํ•œ Run ๋ชฉ๋ก์„ ๊ด€๋ฆฌํ•˜๋ฉฐ, Pipeline์—์„œ Step ๋ณ„๋กœ ์ˆ˜ํ–‰ํ•œ Graph, Run output, Config ๋“ฑ์„ ์ œ๊ณตํ•œ๋‹ค.

Recurring Run์€ ๋ฐ˜๋ณต๋˜๋Š” Run์ด๋‹ค.

Artifacts

Artifact๋Š” Model ํ•™์Šต ํ˜น์€ ๊ฒ€์ฆ ๊ณผ์ •์—์„œ ์‚ฌ์šฉํ•˜๊ฑฐ๋‚˜ ์ƒ์„ฑํ•œ Input/Output ๋ฐ์ดํ„ฐ ์ •๋ณด๋ฅผ ํ™•์ธํ•œ๋‹ค.

KFP Python SDK

SDK๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Pipeline Components๋ฅผ ์ž‘์„ฑํ•˜๊ฑฐ๋‚˜ Pipelines ๊ตฌ์„ฑํ•œ๋‹ค.

Component๋Š” Ingest Data, Generate Statistics, Preprocess Data, Transform Data, Train Model ๊ณผ ๊ฐ™์€ ์‹ค์ œ Job์„ ์ˆ˜ํ–‰ํ•˜๋Š” Runtime ์ฝ”๋“œ์™€ ์™ธ๋ถ€ ์‹œ์Šคํ…œ๊ณผ ์—ฐ๊ณ„ํ•˜๊ธฐ ์œ„ํ•œ Client ์ฝ”๋“œ๋กœ ๊ตฌ๋ถ„ํ•œ๋‹ค.

์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•œ ํ›„, DSL Compiler ์‚ฌ์šฉํ•ด Workflow CR YAML๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.

kfp.dsl

Pipeline Components ์ž‘์„ฑํ•˜๋Š”๋ฐ ์‚ฌ์šฉํ•œ๋‹ค.

kfp.dsl.ContainerOp

Pipeline Components๋ฅผ ํŠน์ • ์ปจํ…Œ์ด๋„ˆ ์ด๋ฏธ์ง€๋กœ ์ •์˜ํ•  ๋•Œ ์‚ฌ์šฉํ•˜๋ฉฐ, image, command, arguments ๋“ฑ์„ ์ง€์ •ํ•˜์—ฌ Components๋ฅผ ์ •์˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

Write Components
import kfp.dsl as dsl
 
def training_op(learning_rate: float,
                num_layers: int,
                optimizer='ftrl',
                step_name='training'):
     
    return dsl.ContainerOp(
        name=step_name,                                                           # Component ์ด๋ฆ„
        image='katib/mxnet-mnist-example',                                        # ์‚ฌ์šฉํ•  ์ด๋ฏธ์ง€
        command=['python', '/mxnet/example/image-classification/train_mnist.py'], # ์ปจํ…Œ์ด๋„ˆ ์ด๋ฏธ์ง€์—์„œ ์‹คํ–‰ํ•  ๋ช…๋ น์–ด
        arguments=[                                                               # ์ปจํ…Œ์ด๋„ˆ ์ด๋ฏธ์ง€์—์„œ ์‹คํ–‰ํ•  ๋ช…๋ ์–ด์˜ ์‹คํ–‰ ์ธ์ž๊ฐ’
            '--batch-size', '64',
            '--lr', learning_rate,
            '--num-layers', num_layers,
            '--optimizer', optimizer
        ],
        file_outputs={'output': '/etc/timezone'}                                  # ์ปจํ…Œ์ด๋„ˆ ๊ฒฐ๊ณผ๋ฅผ ์™ธ๋ถ€๋กœ ๋…ธ์ถœ
  )
 
 
def postprocessing_op(output,
                      step_name='postprocessing'):
    return dsl.ContainerOp(
        name=step_name,
        image='library/bash:4.4.23',
        command=['sh', '-c'],
        arguments=['echo "%s"' % output]
    )

kfp.dsl.ResourceOp

Kubernetes Resource๋ฅผ ์ƒ์„ฑํ•˜๊ฑฐ๋‚˜ ๊ด€๋ฆฌํ•˜๋Š” Pipeline Components๋ฅผ ๋งŒ๋“ ๋‹ค.

kfp.dsl.VolumeOp

Kubernetes PVC๋ฅผ ์ƒ์„ฑํ•˜๋Š” Pipeline Components๋ฅผ ๋งŒ๋“ ๋‹ค.

kfp.dsl.PipelineParam

Pipeline Components ๊ฐ„์˜ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ „๋‹ฌํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.

kfp.dsl.pipeline

Pipeline Components ์—ฐ๊ฒฐํ•˜์—ฌ Pipeline์„ ๊ตฌ์„ฑํ•˜๊ณ , ๊ตฌ์„ฑํ•œ Pipeline์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค. (๋ฐ˜ํ™˜๋œ Pipeline์€ kfp.compiler๋กœ ์ปดํŒŒ์ผํ•˜์—ฌ Workflow CR YAML ํŒŒ์ผ๋กœ ๋ณ€ํ™˜ํ•  ์ˆ˜ ์žˆ๋‹ค.)

Write Pipeline
@dsl.pipeline(
  name='Pipeline GPU Example',
  description='Demonstrate the Kubeflow pipelines SDK with GPUs'
)
def kubeflow_training(
  learning_rate: dsl.PipelineParam = dsl.PipelineParam(name='learningrate', value=0.1),
  num_layers: dsl.PipelineParam = dsl.PipelineParam(name='numlayers', value='2'),
  optimizer: dsl.PipelineParam = dsl.PipelineParam(name='optimizer', value='ftrl')):
     
    training = training_op(learning_rate, num_layers, optimizer).set_gpu_limit(1)
    postprocessing = postprocessing_op(training.output)

kfp.compiler

kfp.dsl๋กœ ์ž‘์„ฑํ•œ Pipelines ์ฝ”๋“œ๋ฅผ DSL Compiler๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Workflow CR YAML ํŒŒ์ผ๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.

Convert Pipeline to Workflow CR
if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(kubeflow_training, __file__ + '.yaml')

kfp.Client

Kubernetes์— ์„ค์น˜ํ•œ Kubeflow Pipeline ํด๋ผ์ด์–ธํŠธ์ด๋‹ค. Pipeline ์—…๋กœ๋“œ, Experiment ์ƒ์„ฑ, Run ์ƒ์„ฑ ๋“ฑ์˜ API๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

create_experiment

Experiment(KFP) ๋ฅผ ์ƒ์„ฑํ•˜๊ณ  Experiment object๋ฅผ ๋ฆฌํ„ดํ•œ๋‹ค.

run_pipeline

Pipeline ์‹คํ–‰ํ•˜๊ณ  Run object ๋ฆฌํ„ดํ•œ๋‹ค.

create_run_from_pipeline_func

Pipeline์„ ์ปดํŒŒ์ผํ•˜๊ณ  Kubeflow Pipeline์— Submit ํ•œ๋‹ค.

upload_pipeline_version

Pipeline์„ Kubeflow Pipeline์— uploadํ•œ๋‹ค.

Client๋ฅผ ์ƒ์„ฑํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

IN_CLUSTER_DNS_NAME= 'ml-pipeline.{}.svc.cluster.local:8888'
KUBE_PROXY_PATH= 'api/v1/namespaces/{}/services/ml-pipeline:http/proxy/'
LOCAL_KFP_CONTEXT= '/home/docs/.config/kfp/context.json'
NAMESPACE_PATH= '/var/run/secrets/kubernetes.io/serviceaccount/namespace'

kfp.components

Pipeline Components ๋Š” ML Workflow์—์„œ ์–ด๋–ค Step์„ ์ˆ˜ํ–‰ํ•˜๋Š” ์ฝ”๋“œ์ด๋‹ค.

Pipeline Component Specification์€ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ •์˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

1. Component์˜ metadata๋ฅผ ์ •์˜ํ•œ๋‹ค. (name, description ๋“ฑ)

2. Component์˜ inputs, outputs ํ•ญ๋ชฉ์„ ์ •์˜ํ•œ๋‹ค.

3. Component์˜ implementation ํ•ญ๋ชฉ์— ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•  ์ปจํ…Œ์ด๋„ˆ์™€ ๊ทธ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•  command, arguments ๋ฅผ ์ •์˜ํ•œ๋‹ค.

Pipeline Components๊ฐ„์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ  ๋ฐ›์„ ๋•Œ, KFP Component์—์„œ ์ œ๊ณตํ•˜๋Š” InputPath์™€ OutputPath ํŒŒ๋ผ๋ฏธํ„ฐ ์–ด๋…ธํ…Œ์ด์…˜์„ ์‚ฌ์šฉํ•œ๋‹ค.

InputPath์™€ OutputPath์˜ ๋ฐ์ดํ„ฐ ํƒ€์ž…์€ ํŒŒ์ด์ฌ ํƒ€์ž…์ด๊ฑฐ๋‚˜ TFModel ๋„ ๊ฐ€๋Šฅํ•˜๋‹ค.

Component Specification
name: xgboost4j - Train classifier
description: Trains a boosted tree ensemble classifier using xgboost4j
 
inputs:
- {name: Training data}
- {name: Rounds, type: Integer, default: '30', description: 'Number of training rounds'}
 
outputs:
- {name: Trained model, type: XGBoost model, description: 'Trained XGBoost model'}
 
implementation:
  container:
    image: gcr.io/ml-pipeline/xgboost-classifier-train@sha256:b3a64d57
    command: [
      /ml/train.py,
      --train-set, {inputPath: Training data},
      --rounds,    {inputValue: Rounds},
      --out-model, {outputPath: Trained model},
    ]

kfp.components.func_to_container_op

Python ํ•จ์ˆ˜๋ฅผ Pipeline Component๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.

kfp.components.load_component_from_file

YAML ํŒŒ์ผ๋กœ ์ •์˜ํ•œ Pipeline component๋ฅผ ๋กœ๋“œํ•œ๋‹ค.

kfp.components.load_component_from_url

YAML ํŒŒ์ผ๋กœ ์ •์˜ํ•œ Pipeline component๋ฅผ URL๋กœ ๋ถ€ํ„ฐ ๋กœ๋“œํ•œ๋‹ค.

ML Workflow ์ž‘์„ฑ ์˜ˆ์ œ

KFP Component๋ฅผ ์ด์šฉํ•œ ML Workflow ์˜ˆ์ œ

๋‹ค์Œ URL์˜ ์˜ˆ์ œ๋Š” Katib์—์„œ ์ฐพ์€ ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ TFJob์œผ๋กœ ML ๋ชจ๋ธ์„ ์ƒ์„ฑํ•˜๊ณ , ๊ทธ ML ๋ชจ๋ธ๋กœ ์˜ˆ์ธก์‹œ์Šคํ…œ์„ ๊ตฌ์ถ•ํ•œ ํŒŒ์ดํ”„๋ผ์ธ ์˜ˆ์ œ์ด๋‹ค.

https://github.com/kubeflow/pipelines/blob/master/samples/contrib/kubeflow-e2e-mnist/kubeflow-e2e-mnist.ipynb

์šฉ์–ด

MinIO (Minimal Object Storage)

MinIO๋Š” ์˜คํ”ˆ์†Œ์Šค๋กœ ์ œ๊ณต๋˜๋Š” ์˜ค๋ธŒ์ ํŠธ ์Šคํ† ๋ฆฌ์ง€ ์„œ๋ฒ„์ด๋ฉฐ, AWS S3์™€ ํ˜ธํ™˜๋˜๋Š” ํด๋ผ์šฐ๋“œ ์Šคํ† ๋ฆฌ์ง€๋ฅผ ๊ตฌ์ถ•ํ•  ์ˆ˜ ์žˆ๋Š” ๋„๊ตฌ์ด๋‹ค.

Minio Server, Minio Client, Minio Library๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

Reconciliation

์กฐ์ •์€ ๋‘ ๊ฐœ์˜ ๋ ˆ์ฝ”๋“œ ์„ธํŠธ๊ฐ€ ์ผ์น˜ํ•˜๋Š”์ง€ ํ™•์ธํ•˜๋Š” ํ”„๋กœ์„ธ์Šค์ด๋‹ค.

์ฐธ๊ณ ์ž๋ฃŒ

Argo Workflow Concepts, https://github.com/argoproj/argo-workflows/blob/master/examples/README.md

Argo Workflow Architecture, https://github.com/argoproj/argo-workflows https://github.com/argoproj/argo-workflows/blob/master/docs/architecture.md

Argo Workflow CRD, https://github.com/argoproj/argo-workflows/tree/master/manifests/base/crds/full

KFP Concepts, https://www.kubeflow.org/docs/components/pipelines/overview/concepts/

KFP Building, https://www.kubeflow.org/docs/components/pipelines/sdk/build-pipeline/

KFP Python SDK Examples, https://github.com/kubeflow/pipelines/tree/master/samples/tutorials

KFP Client, https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.client.html https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api/

KFP Backend, https://github.com/kubeflow/pipelines/tree/master/backend

KFP Component Spec, https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/#component-spec

KFP Architecture, https://github.com/kubeflow/kfp-tekton https://betterprogramming.pub/kubeflow-pipelines-with-gpus-1af6a74ec2a https://www.slideshare.net/AnimeshSingh/kubeflow-pipelines-with-tekton-236769976

KFP Samples. https://github.com/kubeflow/pipelines/tree/master/samples/contrib https://github.com/kubeflow/pipelines/blob/master/samples/contrib/kubeflow-e2e-mnist/kubeflow-e2e-mnist.ipynb

Last updated

Was this helpful?