volcano中的task-spec annotation
最近在支持CPU/GPU混布资源任务的时候,在我们自己开发的训练框架上,遇到volcano无法成功调度的情况,为此给volcano提了个issue。
最终发现是因为我们的kubeflow/common版本太低,导致没有给PodTemplate添加annotation volcano.sh/task-spec=replecaType。
背景
我们的crd类似于pytorch/tensorflow,需要创建一个master和多个worker,其中master只用到CPU资源,worker需要GPU资源。
为了保证集群中的GPU资源能够最大化使用,我们往集群里添加了CPU机器。将master节点调度到CPU机器上,worker调度到GPU机器上。
避免因master调度到GPU机器,导致GPU机器的卡分不完(极端情况下,如果有多个任务,master可能占用某个GPU机器的大量CPU资源,导致GPU卡分不完)。
这里有两种方式实现: nodeSelector或者nodeAffinity。
nodeSelector
nodeSelector可以定义要匹配拥有哪些标签的node。
这里可以为CPU的node增加对应的标签,比如resType=CPU,并在master的PodSpec增加nodeSelector。
worker可以不需要nodeSelector,因为CPU机器没有GPU卡,肯定不满足资源需要。
下面借用tensorflow的crd作为例子:
apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
name: tf-test
spec:
tfReplicaSpecs:
Master:
replicas: 1
template:
spec:
containers:
- name: tensorflow
image: docker.io/kubeflowkatib/tf-mnist-with-summaries:latest
command:
- sh
- -c
- sleep 10
nodeSelector:
resType: CPU -- 此处增加nodeSelector
Worker:
replicas: 1
template:
spec:
containers:
- name: tensorflow
image: docker.io/kubeflowkatib/tf-mnist-with-summaries:latest
command:
- sh
- -c
- sleep 10
resources:
limits:
nvidia.com/gpu: "1"
requests:
nvidia.com/gpu: "1"
AI写代码
nodeAffinity
nodeAffinity的表述能力更强,可以对label做In/NotIn等操作。
对于GPU机器,因为用的是nvidia的卡,为保证nvidia-device-plugin/dcgm-exporter这些组件正常工作,会给GPU机器打上label: nvidia-device-enable=enable。
所以可以通过nodeAffinity的NotIn来实现,也无需单独为CPU机器增加label。
apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
name: tf-test
spec:
tfReplicaSpecs:
Master:
replicas: 1
template:
spec:
affinity:
nodeAffinity: -- 此处增加nodeAffnitiy排除某些label的机器
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: nvidia-device-enable
operator: NotIn
values:
- enable
containers:
- name: tensorflow
image: docker.io/kubeflowkatib/tf-mnist-with-summaries:latest
command:
- sh
- -c
- sleep 10
Worker:
replicas: 1
template:
spec:
containers:
- name: tensorflow
image: docker.io/kubeflowkatib/tf-mnist-with-summaries:latest
command:
- sh
- -c
- sleep 10
resources:
limits:
nvidia.com/gpu: "1"
requests:
nvidia.com/gpu: "1"
AI写代码
问题表现
在为master增加nodeSelector/nodeAffinity之后,发现volcano一直无法选择出合适的node。具体的日志可以参考文章开头的issue。
分析volcano-scheduler的日志发现,master和worker会依次尝试分配node。顺序可能是master先分配,也可能worker先分配。分配过程中,可选的node集合会被不断缩小。
比如master先分配,那么worker可选的node就只能是满足master条件的那些node,而worker又有自己的一些条件,所以worker可选的node集合就变成master条件和worker条件的交集,在这里的场景下交集为空(master需要cpu机器,worker需要GPU机器),就无法调度起来。
源码阅读
volcano调度任务的时候,会参考worker和master对node节点的条件。这里的条件有几部分,分别对应predicates plugin的一些参数。
/******************************************************** * pkg/scheduler/plugins/predicates/predicates.go 155行 * ********************************************************/
predicate := predicateEnable{
nodeAffinityEnable: true, -- nodeAffinity/nodeSelector
nodePortEnable: true,
taintTolerationEnable: true,
podAffinityEnable: true,
nodeVolumeLimitsEnable: true,
volumeZoneEnable: true,
podTopologySpreadEnable: true,
cacheEnable: false,
proportionalEnable: false,
}
AI写代码
predicate.NodeAffinityEnable
此选项默认打开,打开后会考虑PodSpec里定义的nodeSelector和affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution进行节点选择。
GetRequiredNodeAffinity函数定义了如何从PodSpec里取出对应的条件构造过滤器。Match过程也很简单,逐个条件判断即可。
/**************************************************************************************** * vendor/k8s.io/component-helpers/scheduling/corev1/nodeaffinity/nodeaffinity.go 296行 * ****************************************************************************************/
// GetRequiredNodeAffinity returns the parsing result of pod's nodeSelector and nodeAffinity.
func GetRequiredNodeAffinity(pod *v1.Pod) RequiredNodeAffinity {
var selector labels.Selector
if len(pod.Spec.NodeSelector) > 0 {
selector = labels.SelectorFromSet(pod.Spec.NodeSelector)
}
// Use LazyErrorNodeSelector for backwards compatibility of parsing errors.
var affinity *LazyErrorNodeSelector
if pod.Spec.Affinity != nil &&
pod.Spec.Affinity.NodeAffinity != nil &&
pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
affinity = NewLazyErrorNodeSelector(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution)
}
return RequiredNodeAffinity{labelSelector: selector, nodeSelector: affinity}
}
// Match checks whether the pod is schedulable onto nodes according to
// the requirements in both nodeSelector and nodeAffinity.
func (s RequiredNodeAffinity) Match(node *v1.Node) (bool, error) {
if s.labelSelector != nil {
if !s.labelSelector.Matches(labels.Set(node.Labels)) {
return false, nil
}
}
if s.nodeSelector != nil {
return s.nodeSelector.Match(node)
}
return true, nil
}
AI写代码
nodeSelector和nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution都会转成labels.Requirement结构(这个结构和corev1.NodeSelectorRequirement差不多,只是operator更丰富些)。
/*********************************************************** * vendor/k8s.io/apimachinery/pkg/labels/selector.go 135行 * ***********************************************************/
// Requirement contains values, a key, and an operator that relates the key and values.
// The zero value of Requirement is invalid.
// Requirement implements both set based match and exact match
// Requirement should be initialized via NewRequirement constructor for creating a valid Requirement.
// +k8s:deepcopy-gen=true
type Requirement struct {
key string
operator selection.Operator
// In huge majority of cases we have at most one value here.
// It is generally faster to operate on a single-element slice
// than on a single-element map, so we have a slice here.
strValues []string
}
/******************************************************** * vendor/k8s.io/apimachinery/pkg/selection/operator.go * ********************************************************/
// Operator represents a key/field's relationship to value(s).
// See labels.Requirement and fields.Requirement for more details.
type Operator string
const (
DoesNotExist Operator = "!"
Equals Operator = "="
DoubleEquals Operator = "=="
In Operator = "in"
NotEquals Operator = "!="
NotIn Operator = "notin"
Exists Operator = "exists"
GreaterThan Operator = "gt"
LessThan Operator = "lt"
)
AI写代码
研究完这里的逻辑,发现并没有什么疑点,还是得从master分配后,worker为何要从满足master条件的node中进一步筛选,而不是从全局筛选,这一角度来排查。
通过对比我们自己的operator和tf-operator,对开启Gang调度的任务,tf-operator里多设置了一个annotation:podTemplate.Annotations[volcanoTaskSpecKey] = rt。
这里的volcanoTaskSpecKey=“volcano.sh/task-spec”,在volcano里搜索相关逻辑,可以找到如下函数:
/*************************************** * pkg/scheduler/api/job_info.go 145行 * ***************************************/
func getTaskID(pod *v1.Pod) TaskID {
if ts, found := pod.Annotations[batch.TaskSpecKey]; found && len(ts) != 0 {
return TaskID(ts)
}
return ""
}
/*************************************** * pkg/scheduler/api/job_info.go 262行 * ***************************************/
func (ti *TaskInfo) GetTaskSpecKey() TaskID {
if ti.Pod == nil {
return ""
}
return getTaskID(ti.Pod)
}
/************************************************ * pkg/scheduler/util/predicate_helper.go 103行 * ************************************************/
func taskGroupID(task *api.TaskInfo) string {
return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey())
}
AI写代码
最后这个taskGroupID函数会被predicateHelper.PredicateNodes调用,其作用是对一个volcano job里的task进行分组,同一组task共享node校验失败的信息。
/*********************************************** * pkg/scheduler/util/predicate_helper.go 23行 * ***********************************************/
// PredicateNodes returns the specified number of nodes that fit a task
func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn, enableErrorCache bool) ([]*api.NodeInfo, *api.FitErrors) {
// ... 省略若干行代码 ...
taskGroupid := taskGroupID(task)
nodeErrorCache, taskFailedBefore := ph.taskPredicateErrorCache[taskGroupid] // 同一个taskGroupid共享nodeErrorCache
if nodeErrorCache == nil {
nodeErrorCache = map[string]error{}
}
//create a context with cancellation
ctx, cancel := context.WithCancel(context.Background())
checkNode := func(index int) {
// ... 省略若干行代码 ...
// Check if the task had "predicate" failure before.
// And then check if the task failed to predict on this node before.
if enableErrorCache && taskFailedBefore {
errorLock.RLock()
errC, ok := nodeErrorCache[node.Name]
errorLock.RUnlock()
if ok { // node不满足前面task的条件,直接判定当前task不能分配到这个node
errorLock.Lock()
fe.SetNodeError(node.Name, errC)
errorLock.Unlock()
return
}
}
// TODO (k82cn): Enable eCache for performance improvement.
if err := fn(task, node); err != nil {
klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
errorLock.Lock()
nodeErrorCache[node.Name] = err
ph.taskPredicateErrorCache[taskGroupid] = nodeErrorCache // node不满足当前task的条件,记录到缓存里
fe.SetNodeError(node.Name, err)
errorLock.Unlock()
return
}
// ... 省略若干行代码 ...
}
// ... 省略若干行代码 ...
}
AI写代码
到此就可以明白为什么tf-operator要设置volcano.sh/task-spec这个annotation了,通过对master和worker设置不同的annotation值,让其归属到不同的taskGroupid,从而避免共用nodeErrorCache,master和worker都可以各自从全局查找合适的node。
解决方案
在自己的operator里加上类似于tf-operator的处理逻辑即可,即增加annotation: podTemplate.Annotations[volcanoTaskSpecKey] = rt。
如果依赖的是kubeflow/common里面的ReconcilePod逻辑,需要升级kubeflow/common版本(至少0.4.7),社区的这个pull requests顺带修复了这里的问题。
