开发 Operator 调度 GPU 实例资源池

!! 大家好,我是乔克,一个爱折腾的运维工程,一个睡觉都被自己丑醒的云原生爱好者。
作者:乔克
公众号:运维开发故事
博客:https://jokerbai.com

✍ 道路千万条,安全第一条。操作不规范,运维两行泪。
最近在学习《AIOps》相关的知识课程,为了让学习有一定的收获,所以将其进行了总结分享,如果你恰好也需要,很荣幸能帮到你。
前面我们介绍了《开发K8s Chat 命令行工具》和《开发 K8s GPT 故障诊断工具》两篇和 K8s 相关的文章,本篇文章我们将把 K8s、AI、云 三者结合起来,开发一个 AI 工具。
本章节将引入一个新的概念——K8s Operator,它是 K8s 的一种扩展形式,可以帮助用户以 K8s 声明式 API 的方式管理应用及服务,Operator 定义了一组在 Kubernetes 集群中打包和部署复杂业务应用的方法,主要是为解决特定应用或服务关于如何运行、部署及出现问题时如何处理提供的一种特定的自定义方式。比如:
按需部署应用服务
实现应用状态的备份和还原,完成版本升级
数据库 schema 或额外的配置设置的改动
在 K8s 中我们使用的 Deployment、Daemonset、Statefulset 等这些都是 K8s 的资源,这些资源的创建、删除、更新等动作都会被称为事件,K8s 的 Controller Manager 负责事件的监听,并触发对应的动作来满足期望,这种方式就是声明式,即用户只需要关心应用程序的最终状态。当我们在使用中发现有些资源并不能满足日常的需求,对于这类需求可以使用 K8s 的自定义资源和 Operator 为应用程序提供基于 K8s 的扩展。
在这其中,CRD 就是对自定义资源的描述,如果要自定义资源,就需要先定义好 CRD,也就是介绍这个资源有什么属性,这些属性的类型、结构是怎样的。
比如 PG 的 Operator 如下:
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: postgresqls.acid.zalan.do
labels:
app.kubernetes.io/name: postgres-operator
annotations:
“helm.sh/hook”: crd-install
spec:
group: acid.zalan.do
names:
kind: postgresql
listKind: postgresqlList
plural: postgresqls
singular: postgresql
shortNames:
– pg additionalPrinterColumns:

  • name: Team
    type: string
    description: Team responsible for Postgres CLuster
    JSONPath: .spec.teamId
  • name: Version
    type: string
    description: PostgreSQL version
    JSONPath: .spec.postgresql.version
  • name: Pods
    type: integer
    description: Number of Pods per Postgres cluster
    JSONPath: .spec.numberOfInstances
  • name: Volume type: string description: Size of the bound volume JSONPath: .spec.volume.size CRD 主要包括 apiVersion、kind、metadata 和 spec 四个部分。其中最关键的是 apiVersion 和 kind,apiVersion 表示资源所属组织和版本,apiVersion 一般由 APIGourp 和 Version 组成,这里的 APIGourp 是http://apiextensions.k8s.io,Version 是 v1beta1,相关信息可以通过kubectl api-resoures查看。kind 表示资源类型,这里是CustomResourceDefinition,表示是一个自定义的资源描述。 本文我们将自己开发一个 Operator 来维护 GPU 资源池的稳定,解决 AI 模型训练的基础平台的稳定性。其架构如下: ee11ee9bb3ba2f232c0f78573956823f MD5 ee11ee9bb3ba2f232c0f78573956823f MD5 其中: GPU 资源池采用的是腾讯云的竞价 GPU 实例 Operator 运行在 K8s 中,通过 SpootPool 控制 GPU 资源池的数量 若云平台释放了某台 GPU 实例,当 Operator 监听到资源池数量和期望的不匹配,会自动补充到期望数量 Operator 的开发有多种脚手架,常用的有 operator-sdk、kubebuilder 等,这里我们将使用 kubebuilder 来完成 Operator 的开发。 前置条件 准备一个可用的 K8s 集群,可以使用 kind、kubeadm、二进制等各种形式安装,如果使用 kubeadm 安装集群,可以参考 Kubernetes集群管理。 安装好 kubebuilder,可以参考 kubebuild快速安装。 准备好云平台的 AK,这里是采用腾讯云,其他云类似。 快速开始 1、设计 CRD 在开发之前需要先设计好 CRD(就像业务开发前先设计好表结构一样),本文的 CRD 主要包含云平台虚拟机的开通,包括最小和最大实例数,以及腾讯云 SDK 所需要的各种参数,比如地域、可用区、VPC、子网、安全组、镜像等。 最后 CRD 设计如下: apiVersion: devops.jokerbai.com/v1 kind: Spotpool metadata: labels: app.kubernetes.io/name: spotpool app.kubernetes.io/managed-by: kustomize name: spotpool-sample spec: secretId: 密钥ID secretKey: 密钥Key region: 区域 availabilityZone: 可用区 instanceType: 实例类型 minimum: 最小实例数 maximum: 最大实例数 subnetId: 子网ID vpcId: VPC ID securityGroupIds:
    • 安全组
      imageId: 镜像ID
      instanceChargeType: 实例付费类型
      2、初始化项目
      定义好 CRD 字段之后,我们先使用 kubebuilder 初始化一个 Operator 项目,命令如下:
      (1)初始化项目
      mkdir spotpool && cd spotpool
      kubebuilder init \
      –domain jokerbai.com \
      –repo github.com/joker-bai/spotpool \
      –project-name spotpool \
      –plugins go/v4 \
      –owner “Joker Bai”
      (2)创建 API
      kubebuilder create api –group devops.jokerbai.com –version v1 –kind Spotpool
      (3)生成后的目录结构大致如下
      .
      ├── api
      │ └── v1
      │ ├── groupversion_info.go
      │ ├── spotpool_types.go
      │ └── zz_generated.deepcopy.go
      ├── bin
      │ ├── controller-gen -> /root/workspace/godev/src/github.com/joker-bai/spotpool/bin/controller-gen-v0.18.0
      │ └── controller-gen-v0.18.0
      ├── cmd
      │ └── main.go
      ├── config
      │ ├── crd
      │ │ ├── kustomization.yaml
      │ │ └── kustomizeconfig.yaml
      │ ├── default
      │ │ ├── cert_metrics_manager_patch.yaml
      │ │ ├── kustomization.yaml
      │ │ ├── manager_metrics_patch.yaml
      │ │ └── metrics_service.yaml
      │ ├── manager
      │ │ ├── kustomization.yaml
      │ │ └── manager.yaml
      │ ├── network-policy
      │ │ ├── allow-metrics-traffic.yaml
      │ │ └── kustomization.yaml
      │ ├── prometheus
      │ │ ├── kustomization.yaml
      │ │ ├── monitor_tls_patch.yaml
      │ │ └── monitor.yaml
      │ ├── rbac
      │ │ ├── kustomization.yaml
      │ │ ├── leader_election_role_binding.yaml
      │ │ ├── leader_election_role.yaml
      │ │ ├── metrics_auth_role_binding.yaml
      │ │ ├── metrics_auth_role.yaml
      │ │ ├── metrics_reader_role.yaml
      │ │ ├── role_binding.yaml
      │ │ ├── role.yaml
      │ │ ├── service_account.yaml
      │ │ ├── spotpool_admin_role.yaml
      │ │ ├── spotpool_editor_role.yaml
      │ │ └── spotpool_viewer_role.yaml
      │ └── samples
      │ ├── devops.jokerbai.com_v1_spotpool.yaml
      │ └── kustomization.yaml
      ├── Dockerfile
      ├── go.mod
      ├── go.sum
      ├── hack
      │ └── boilerplate.go.txt
      ├── internal
      │ └── controller
      │ ├── spotpool_controller.go
      │ ├── spotpool_controller_test.go
      │ └── suite_test.go
      ├── Makefile
      ├── PROJECT
      ├── README.md
      └── test
      ├── e2e
      │ ├── e2e_suite_test.go
      │ └── e2e_test.go
      └── utils
      └── utils.go
      3、CRD 开发
      (1)定义 API
      在api/v1alpha1/spotpool_types.go中定义 CRD 的结构体,如下:
      package v1

import (
metav1 “k8s.io/apimachinery/pkg/apis/meta/v1”
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// SpotpoolSpec defines the desired state of Spotpool
type SpotpoolSpec struct {
// INSERT ADDITIONAL SPEC FIELDS – desired state of cluster
// Important: Run “make” to regenerate code after modifying this file
SecretId string json:"secretId,omitempty"
SecretKey string json:"secretKey,omitempty"
Region string json:"region,omitempty"
AvaliableZone string json:"availabilityZone,omitempty"
InstanceType string json:"instanceType,omitempty"
SubnetId string json:"subnetId,omitempty"
VpcId string json:"vpcId,omitempty"
SecurityGroupId []string json:"securityGroupIds,omitempty"
ImageId string json:"imageId,omitempty"
InstanceChargeType string json:"instanceChargeType,omitempty"
Minimum int32 json:"minimum,omitempty"
Maximum int32 json:"maximum,omitempty"
}

// SpotpoolStatus defines the observed state of Spotpool
type SpotpoolStatus struct {
// INSERT ADDITIONAL STATUS FIELD – define observed state of cluster
// Important: Run “make” to regenerate code after modifying this file
Size int32 json:"size,omitempty"
Conditions []metav1.Condition json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,rep,name=conditions"
Instances []Instances json:"instances,omitempty"
}

type Instances struct {
InstanceId string json:"instanceId,omitempty"
PublicIp string json:"publicIp,omitempty"
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

// Spotpool is the Schema for the spotpools API
type Spotpool struct {
metav1.TypeMeta json:",inline"
metav1.ObjectMeta json:"metadata,omitempty"

Spec SpotpoolSpec json:"spec,omitempty"
Status SpotpoolStatus json:"status,omitempty"
}

//+kubebuilder:object:root=true

// SpotpoolList contains a list of Spotpool
type SpotpoolList struct {
metav1.TypeMeta json:",inline"
metav1.ListMeta json:"metadata,omitempty"
Items []Spotpool json:"items"
}

func init() {
SchemeBuilder.Register(&Spotpool{}, &SpotpoolList{})
}
在 SpotpoolSpec 中定义设计的 CRD 结构体,这些字段都是创建虚拟机的必要字段。另外,在 SpotpoolStatus 中定义返回状态里的信息,这里只需要 Instance 相关的信息。
(2)生成代码
API 相关的代码开发完后,执行以下命令生成代码:
make generate
make manifests
4、Controller 开发
(1)开发控制器逻辑
控制器的主逻辑是:
从云平台获取运行的实例数
判断实例数和期望的实例数是否相等
如果小于期望值,则创建实例
如果大于期望值,则删除实例
所以主逻辑的代码如下,修改internal/controller/spotpool_controller.go:
func (r *SpotpoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := logf.FromContext(ctx)

// 获取用户期望
spotpool := &devopsjokerbaicomv1.Spotpool{}
if err := r.Get(ctx, req.NamespacedName, spotpool); err != nil {
log.Error(err, “unable to fetch spotspool”)
}

// 从云平台获取获取运行的实例
runningVmList, err := r.getRunningInstanceIds(spotpool)
if err != nil {
log.Error(err, “get running vm instance failed”)
// 十秒后重试
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

runningCount := len(runningVmList)

switch {
case runningCount < int(spotpool.Spec.Minimum): // 创建实例扩容 delta := spotpool.Spec.Minimum – int32(runningCount) log.Info(“creating instances”, “delta”, delta) err = r.runInstances(spotpool, delta) if err != nil { log.Error(err, “unable to create instances”) return ctrl.Result{RequeueAfter: 40 * time.Second}, nil } case runningCount > int(spotpool.Spec.Maximum):
// 删除实例缩容
delta := int32(runningCount) – spotpool.Spec.Maximum
log.Info(“terminating instances”, “delta”, delta)
err = r.terminateInstances(spotpool, delta)
if err != nil {
log.Error(err, “unable to terminate instances”)
return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
}
}

return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
}
其中:
r.getRunningInstanceIds(spotpool) 用户获取云平台运行的实例数
r.runInstances(spotpool, delta) 用于调用云平台进行扩容
r.terminateInstances(spotpool, delta) 用于调用云平台进行缩容
接下来分别实现上面的三个方法。
(1)首先,实现 getRunningInstanceIds 方法
func (r *SpotpoolReconciler) getRunningInstanceIds(spotpool *devopsjokerbaicomv1.Spotpool) ([]string, error) {
client, err := r.createCVMClient(spotpool.Spec)
if err != nil {
return nil, err
}

request := cvm.NewDescribeInstancesRequest()
response, err := client.DescribeInstances(request)
if err != nil {
return nil, err
}
var instances []devopsjokerbaicomv1.Instances
var runningInstanceIDs []string
for _, instance := range response.Response.InstanceSet {
if *instance.InstanceState == “RUNNING” || *instance.InstanceState == “PENDING” || *instance.InstanceState == “STARTING” {
runningInstanceIDs = append(runningInstanceIDs, *instance.InstanceId)
}
// 检查实例的公网 IP,如果不存在公网 IP,则继续重试
if len(instance.PublicIpAddresses) == 0 {
return nil, fmt.Errorf(“instance %s does not have public ip”, *instance.InstanceId)
}
instances = append(instances, devopsjokerbaicomv1.Instances{
InstanceId: *instance.InstanceId,
PublicIp: *instance.PublicIpAddresses[0],
})
}
// 更新 status
spotpool.Status.Instances = instances
err = r.Status().Update(context.Background(), spotpool)
if err != nil {
return nil, err
}
return runningInstanceIDs, nil
}

// 获取腾讯云 SDK client
func (r SpotpoolReconciler) createCVMClient(spec devopsjokerbaicomv1.SpotpoolSpec) (cvm.Client, error) {
credential := common.NewCredential(spec.SecretId, spec.SecretKey)
cpf := profile.NewClientProfile()
cpf.HttpProfile.ReqMethod = “POST”
cpf.HttpProfile.ReqTimeout = 30
cpf.SignMethod = “HmacSHA1”

client, err := cvm.NewClient(credential, spec.Region, cpf)
if err != nil {
return nil, err
}
return client, nil
}
其中:
调用 r.createCVMClient(spotpool.Spec) 获取腾讯云SDK client
然后调用 client.DescribeInstances(request) 获取实例详细信息
最后通过判断 instance.InstanceStat 和 instance.PublicIpAddresses 的状态信息决定是否是需要的实例
最后返回实例列表信息
(2)实现 r.runInstances(spotpool, delta) 用于调用云平台进行扩容
func (r *SpotpoolReconciler) runInstances(spotpool *devopsjokerbaicomv1.Spotpool, count int32) error {
client, err := r.createCVMClient(spotpool.Spec)
if err != nil {
return err
}
request := cvm.NewRunInstancesRequest()
request.ImageId = common.StringPtr(spotpool.Spec.ImageId)
request.Placement = &cvm.Placement{
Zone: common.StringPtr(spotpool.Spec.AvaliableZone),
}
request.InstanceChargeType = common.StringPtr(spotpool.Spec.InstanceChargeType)
request.InstanceCount = common.Int64Ptr(int64(count))
request.InstanceName = common.StringPtr(“spotpool” + time.Now().Format(“20060102150405”))
request.InstanceType = common.StringPtr(spotpool.Spec.InstanceType)
request.InternetAccessible = &cvm.InternetAccessible{
InternetChargeType: common.StringPtr(“BANDWIDTH_POSTPAID_BY_HOUR”),
InternetMaxBandwidthOut: common.Int64Ptr(1),
PublicIpAssigned: common.BoolPtr(true),
}
request.LoginSettings = &cvm.LoginSettings{
Password: common.StringPtr(“Password123”),
}
request.SecurityGroupIds = common.StringPtrs(spotpool.Spec.SecurityGroupId)
request.SystemDisk = &cvm.SystemDisk{
DiskType: common.StringPtr(“CLOUD_BSSD”),
DiskSize: common.Int64Ptr(100),
}
request.VirtualPrivateCloud = &cvm.VirtualPrivateCloud{
SubnetId: common.StringPtr(spotpool.Spec.SubnetId),
VpcId: common.StringPtr(spotpool.Spec.VpcId),
}

// print request
fmt.Println(request.ToJsonString())

// 创建实例
response, err := client.RunInstances(request)
if _, ok := err.(*errors.TencentCloudSDKError); ok {
return err
}
// other errors
if err != nil {
return err
}

// 获取到返回的 instancesid
instanceIds := make([]string, 0, len(response.Response.InstanceIdSet))
for _, instanceId := range response.Response.InstanceIdSet {
instanceIds = append(instanceIds, *instanceId)
}

fmt.Println(“run instances success”, instanceIds)
// 更新 status
_, err = r.getRunningInstanceIds(spotpool)
if err != nil {
return err
}
return nil
}
这个方法主要是调用 client.RunInstances(request) 进行实例创建,然后调用 r.getRunningInstanceIds(spotpool) 更新 status 的状态信息。
(3)开发r.terminateInstances(spotpool, delta) 用于调用云平台进行缩容
func (r *SpotpoolReconciler) terminateInstances(spotpool *devopsjokerbaicomv1.Spotpool, count int32) error {
client, err := r.createCVMClient(spotpool.Spec)
if err != nil {
return err
}

runningInstances, err := r.getRunningInstanceIds(spotpool)
if err != nil {
return err
}

instancesIds := runningInstances[:count]
request := cvm.NewTerminateInstancesRequest()
request.InstanceIds = common.StringPtrs(instancesIds)

// 获取返回
response, err := client.TerminateInstances(request)
if _, ok := err.(*errors.TencentCloudSDKError); ok {
return err
}
// other errors
if err != nil {
return err
}

fmt.Println(“Terminate response: “, response)
fmt.Println(“terminate instances success”, instancesIds)

// 更新 status
_, err = r.getRunningInstanceIds(spotpool)
if err != nil {
return err
}
return nil
}
删除实例和创建实例的实现逻辑类似,先调用 client.TerminateInstances(request) 进行删除,然后调用 r.getRunningInstanceIds(spotpool) 更新状态。
上面三个步骤完成了主要逻辑开发,可以初步实现具体的效果,如果希望功能更健全,则需要对其进行开发优化。
部署和测试
1、本地测试

安装 CRD

make install

运行 controller

make run
2、创建 Spotpool 实例测试
(1)创建 Spotpool 资源清单,编辑 config/samples/devops.jokerbai.com_v1_spotpool.yaml
apiVersion: devops.jokerbai.com.jokerbai.com/v1
kind: Spotpool
metadata:
labels:
app.kubernetes.io/name: spotpool
app.kubernetes.io/managed-by: kustomize
name: spotpool-sample
spec:
secretId: xxx
secretKey: xxx
region: ap-singapore
availabilityZone: ap-singapore-2
instanceType: “GN7.2XLARGE32”
minimum: 2
maximum: 2
subnetId: DEFAULT
vpcId: DEFAULT
securityGroupIds:
– sg-xxx
imageId: img-xxx
instanceChargeType: SPOTPAID
(2)运行资源清单

创建实例

kubectl apply -f config/samples/devops.jokerbai.com_v1_spotpool.yaml

查看状态

kubectl get spotpool
(3)构建并部署到集群

构建镜像

make docker-build docker-push IMG=/spotpool:v1

部署到集群

make deploy IMG=/spotpool:v1
(4)清理

删除 operator

make undeploy

删除 CRD

make uninstall
最后
本文通过结合 Kubernetes、AI 和云平台,深入探讨了如何利用 K8s Operator 实现对 GPU 资源池的自动化管理。我们从 Operator 的核心概念出发,介绍了 CRD(自定义资源定义)和控制器的设计原理,并基于 kubebuilder 开发了一个名为 Spotpool 的 Operator,用于在腾讯云上维护竞价实例的稳定运行。
整个开发过程遵循“声明式 API”的思想,用户只需定义期望的状态(如最小/最大实例数),Operator 便会在后台持续监控并自动调整实际状态,确保资源池始终符合预期。这不仅极大地简化了运维操作,也提升了 AI 模型训练平台的稳定性和弹性。
Operator 是云原生时代自动化运维的重要利器。掌握其开发方法,意味着我们不仅能“用好” Kubernetes,更能“扩展” Kubernetes,为复杂业务场景提供定制化的解决方案。

最后,求关注。如果你还想看更多优质原创文章,欢迎关注我们的公众号「运维开发故事」。

如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

你还可以把我的公众号设为「星标」,这样当公众号文章更新时,你会在第一时间收到推送消息,避免错过我的文章更新。

我是 乔克,《运维开发故事》公众号团队中的一员,一线运维农民工,云原生实践者,这里不仅有硬核的技术干货,还有我们对技术的思考和感悟,欢迎关注我们的公众号,期待和你一起成长!

声明:来自运维开发故事,仅代表创作者观点。链接:https://eyangzhen.com/2614.html

运维开发故事的头像运维开发故事

相关推荐

关注我们
关注我们
购买服务
购买服务
返回顶部