Advertisement

volcano中的task-spec annotation

阅读量:

最近在支持CPU/GPU混布资源任务的时候,在我们自己开发的训练框架上,遇到volcano无法成功调度的情况,为此给volcano提了个issue

最终发现是因为我们的kubeflow/common版本太低,导致没有给PodTemplate添加annotation volcano.sh/task-spec=replecaType

背景

我们的crd类似于pytorch/tensorflow,需要创建一个master和多个worker,其中master只用到CPU资源,worker需要GPU资源。

为了保证集群中的GPU资源能够最大化使用,我们往集群里添加了CPU机器。将master节点调度到CPU机器上,worker调度到GPU机器上。

避免因master调度到GPU机器,导致GPU机器的卡分不完(极端情况下,如果有多个任务,master可能占用某个GPU机器的大量CPU资源,导致GPU卡分不完)。

这里有两种方式实现: nodeSelector或者nodeAffinity。

nodeSelector

nodeSelector可以定义要匹配拥有哪些标签的node。

这里可以为CPU的node增加对应的标签,比如resType=CPU,并在master的PodSpec增加nodeSelector。

worker可以不需要nodeSelector,因为CPU机器没有GPU卡,肯定不满足资源需要。

下面借用tensorflow的crd作为例子:

复制代码
    apiVersion: kubeflow.org/v1
    kind: TFJob
    metadata:
      name: tf-test
    spec:
      tfReplicaSpecs:
    Master:
      replicas: 1
      template:
        spec:
          containers:
            - name: tensorflow
              image: docker.io/kubeflowkatib/tf-mnist-with-summaries:latest
              command:
                - sh
                - -c
                - sleep 10
          nodeSelector:
            resType: CPU           -- 此处增加nodeSelector
    Worker:
      replicas: 1
      template:
        spec:
          containers:
            - name: tensorflow
              image: docker.io/kubeflowkatib/tf-mnist-with-summaries:latest
              command:
                - sh
                - -c
                - sleep 10
              resources:
                limits:
                  nvidia.com/gpu: "1"
                requests:
                  nvidia.com/gpu: "1"
    
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

nodeAffinity

nodeAffinity的表述能力更强,可以对label做In/NotIn等操作。

对于GPU机器,因为用的是nvidia的卡,为保证nvidia-device-plugin/dcgm-exporter这些组件正常工作,会给GPU机器打上label: nvidia-device-enable=enable

所以可以通过nodeAffinity的NotIn来实现,也无需单独为CPU机器增加label。

复制代码
    apiVersion: kubeflow.org/v1
    kind: TFJob
    metadata:
      name: tf-test
    spec:
      tfReplicaSpecs:
        Master:
          replicas: 1
          template:
            spec:
              affinity:
                nodeAffinity:                  -- 此处增加nodeAffnitiy排除某些label的机器
                  requiredDuringSchedulingIgnoredDuringExecution:
                    nodeSelectorTerms:
                    - matchExpressions:
                      - key: nvidia-device-enable
                        operator: NotIn
                        values:
                        - enable
              containers:
                - name: tensorflow
                  image: docker.io/kubeflowkatib/tf-mnist-with-summaries:latest
                  command:
                    - sh
                    - -c
                    - sleep 10
        Worker:
          replicas: 1
          template:
            spec:
              containers:
                - name: tensorflow
                  image: docker.io/kubeflowkatib/tf-mnist-with-summaries:latest
                  command:
                    - sh
                    - -c
                    - sleep 10
                  resources:
                    limits:
                      nvidia.com/gpu: "1"
                    requests:
                      nvidia.com/gpu: "1"
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

问题表现

在为master增加nodeSelector/nodeAffinity之后,发现volcano一直无法选择出合适的node。具体的日志可以参考文章开头的issue

分析volcano-scheduler的日志发现,master和worker会依次尝试分配node。顺序可能是master先分配,也可能worker先分配。分配过程中,可选的node集合会被不断缩小。

比如master先分配,那么worker可选的node就只能是满足master条件的那些node,而worker又有自己的一些条件,所以worker可选的node集合就变成master条件和worker条件的交集,在这里的场景下交集为空(master需要cpu机器,worker需要GPU机器),就无法调度起来。

源码阅读

volcano调度任务的时候,会参考worker和master对node节点的条件。这里的条件有几部分,分别对应predicates plugin的一些参数。

复制代码
    /******************************************************** * pkg/scheduler/plugins/predicates/predicates.go 155行 * ********************************************************/
    predicate := predicateEnable{
    		nodeAffinityEnable:      true,     -- nodeAffinity/nodeSelector
    		nodePortEnable:          true,
    		taintTolerationEnable:   true,
    		podAffinityEnable:       true,
    		nodeVolumeLimitsEnable:  true,
    		volumeZoneEnable:        true,
    		podTopologySpreadEnable: true,
    		cacheEnable:             false,
    		proportionalEnable:      false,
    	}
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

predicate.NodeAffinityEnable

此选项默认打开,打开后会考虑PodSpec里定义的nodeSelector和affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution进行节点选择。

GetRequiredNodeAffinity函数定义了如何从PodSpec里取出对应的条件构造过滤器。Match过程也很简单,逐个条件判断即可。

复制代码
    /**************************************************************************************** * vendor/k8s.io/component-helpers/scheduling/corev1/nodeaffinity/nodeaffinity.go 296行 * ****************************************************************************************/
    // GetRequiredNodeAffinity returns the parsing result of pod's nodeSelector and nodeAffinity.
    func GetRequiredNodeAffinity(pod *v1.Pod) RequiredNodeAffinity {
    	var selector labels.Selector
    	if len(pod.Spec.NodeSelector) > 0 {
    		selector = labels.SelectorFromSet(pod.Spec.NodeSelector)
    	}
    	// Use LazyErrorNodeSelector for backwards compatibility of parsing errors.
    	var affinity *LazyErrorNodeSelector
    	if pod.Spec.Affinity != nil &&
    		pod.Spec.Affinity.NodeAffinity != nil &&
    		pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
    		affinity = NewLazyErrorNodeSelector(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution)
    	}
    	return RequiredNodeAffinity{labelSelector: selector, nodeSelector: affinity}
    }
    
    // Match checks whether the pod is schedulable onto nodes according to
    // the requirements in both nodeSelector and nodeAffinity.
    func (s RequiredNodeAffinity) Match(node *v1.Node) (bool, error) {
    	if s.labelSelector != nil {
    		if !s.labelSelector.Matches(labels.Set(node.Labels)) {
    			return false, nil
    		}
    	}
    	if s.nodeSelector != nil {
    		return s.nodeSelector.Match(node)
    	}
    	return true, nil
    }
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

nodeSelector和nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution都会转成labels.Requirement结构(这个结构和corev1.NodeSelectorRequirement差不多,只是operator更丰富些)。

复制代码
    /*********************************************************** * vendor/k8s.io/apimachinery/pkg/labels/selector.go 135行 * ***********************************************************/
    // Requirement contains values, a key, and an operator that relates the key and values.
    // The zero value of Requirement is invalid.
    // Requirement implements both set based match and exact match
    // Requirement should be initialized via NewRequirement constructor for creating a valid Requirement.
    // +k8s:deepcopy-gen=true
    type Requirement struct {
    	key      string
    	operator selection.Operator
    	// In huge majority of cases we have at most one value here.
    	// It is generally faster to operate on a single-element slice
    	// than on a single-element map, so we have a slice here.
    	strValues []string
    }
    
    /******************************************************** * vendor/k8s.io/apimachinery/pkg/selection/operator.go * ********************************************************/
    // Operator represents a key/field's relationship to value(s).
    // See labels.Requirement and fields.Requirement for more details.
    type Operator string
    
    const (
    	DoesNotExist Operator = "!"
    	Equals       Operator = "="
    	DoubleEquals Operator = "=="
    	In           Operator = "in"
    	NotEquals    Operator = "!="
    	NotIn        Operator = "notin"
    	Exists       Operator = "exists"
    	GreaterThan  Operator = "gt"
    	LessThan     Operator = "lt"
    )
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

研究完这里的逻辑,发现并没有什么疑点,还是得从master分配后,worker为何要从满足master条件的node中进一步筛选,而不是从全局筛选,这一角度来排查。

通过对比我们自己的operator和tf-operator,对开启Gang调度的任务,tf-operator里多设置了一个annotation:podTemplate.Annotations[volcanoTaskSpecKey] = rt

这里的volcanoTaskSpecKey=“volcano.sh/task-spec”,在volcano里搜索相关逻辑,可以找到如下函数:

复制代码
    /*************************************** * pkg/scheduler/api/job_info.go 145行 * ***************************************/
    func getTaskID(pod *v1.Pod) TaskID {
    	if ts, found := pod.Annotations[batch.TaskSpecKey]; found && len(ts) != 0 {
    		return TaskID(ts)
    	}
    
    	return ""
    }
    
    /*************************************** * pkg/scheduler/api/job_info.go 262行 * ***************************************/
    func (ti *TaskInfo) GetTaskSpecKey() TaskID {
    	if ti.Pod == nil {
    		return ""
    	}
    	return getTaskID(ti.Pod)
    }
    
    /************************************************ * pkg/scheduler/util/predicate_helper.go 103行 * ************************************************/
    func taskGroupID(task *api.TaskInfo) string {
    	return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey())
    }
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

最后这个taskGroupID函数会被predicateHelper.PredicateNodes调用,其作用是对一个volcano job里的task进行分组,同一组task共享node校验失败的信息。

复制代码
    /*********************************************** * pkg/scheduler/util/predicate_helper.go 23行 * ***********************************************/
    // PredicateNodes returns the specified number of nodes that fit a task
    func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn, enableErrorCache bool) ([]*api.NodeInfo, *api.FitErrors) {
    
    	// ... 省略若干行代码 ...
    
    	taskGroupid := taskGroupID(task)
    	nodeErrorCache, taskFailedBefore := ph.taskPredicateErrorCache[taskGroupid]   // 同一个taskGroupid共享nodeErrorCache
    	if nodeErrorCache == nil {
    		nodeErrorCache = map[string]error{}
    	}
    
    	//create a context with cancellation
    	ctx, cancel := context.WithCancel(context.Background())
    
    	checkNode := func(index int) {
    
    		// ... 省略若干行代码 ...
    
    		// Check if the task had "predicate" failure before.
    		// And then check if the task failed to predict on this node before.
    		if enableErrorCache && taskFailedBefore {
    			errorLock.RLock()
    			errC, ok := nodeErrorCache[node.Name]
    			errorLock.RUnlock()
    
    			if ok {                               // node不满足前面task的条件,直接判定当前task不能分配到这个node
    				errorLock.Lock()
    				fe.SetNodeError(node.Name, errC)
    				errorLock.Unlock()
    				return
    			}
    		}
    
    		// TODO (k82cn): Enable eCache for performance improvement.
    		if err := fn(task, node); err != nil {
    			klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
    				task.Namespace, task.Name, node.Name, err)
    			errorLock.Lock()
    			nodeErrorCache[node.Name] = err
    			ph.taskPredicateErrorCache[taskGroupid] = nodeErrorCache  // node不满足当前task的条件,记录到缓存里
    			fe.SetNodeError(node.Name, err)
    			errorLock.Unlock()
    			return
    		}
    
        // ... 省略若干行代码 ...
    	}
    
    	// ... 省略若干行代码 ...
    }
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

到此就可以明白为什么tf-operator要设置volcano.sh/task-spec这个annotation了,通过对master和worker设置不同的annotation值,让其归属到不同的taskGroupid,从而避免共用nodeErrorCache,master和worker都可以各自从全局查找合适的node。

解决方案

在自己的operator里加上类似于tf-operator的处理逻辑即可,即增加annotation: podTemplate.Annotations[volcanoTaskSpecKey] = rt

如果依赖的是kubeflow/common里面的ReconcilePod逻辑,需要升级kubeflow/common版本(至少0.4.7),社区的这个pull requests顺带修复了这里的问题。

全部评论 (0)

还没有任何评论哟~