Skip to content

Commit 461ba83

Browse files
authored
Merge pull request kubernetes#133021 from sanposhiho/pre-flight-skip
feat: trigger PreBindPreFlight in the binding cycle
2 parents 1ce98e3 + ac9fad6 commit 461ba83

File tree

13 files changed

+792
-202
lines changed

13 files changed

+792
-202
lines changed

pkg/scheduler/backend/api_dispatcher/call_queue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ func (cq *callQueue) merge(oldAPICall, apiCall *queuedAPICall) error {
132132
return nil
133133
}
134134

135-
// Merge API calls if they are of the same type for the same object.
136-
err := apiCall.Merge(oldAPICall)
135+
// Send a concrete APICall object to allow for a type assertion.
136+
err := apiCall.Merge(oldAPICall.APICall)
137137
if err != nil {
138138
err := fmt.Errorf("failed to merge API calls: %w", err)
139139
apiCall.sendOnFinish(err)

pkg/scheduler/framework/cycle_state.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type CycleState struct {
3434
skipFilterPlugins sets.Set[string]
3535
// skipScorePlugins are plugins that will be skipped in the Score extension point.
3636
skipScorePlugins sets.Set[string]
37+
// skipPreBindPlugins are plugins that will be skipped in the PreBind extension point.
38+
skipPreBindPlugins sets.Set[string]
3739
}
3840

3941
// NewCycleState initializes a new CycleState and returns its pointer.
@@ -73,6 +75,14 @@ func (c *CycleState) GetSkipScorePlugins() sets.Set[string] {
7375
return c.skipScorePlugins
7476
}
7577

78+
func (c *CycleState) SetSkipPreBindPlugins(plugins sets.Set[string]) {
79+
c.skipPreBindPlugins = plugins
80+
}
81+
82+
func (c *CycleState) GetSkipPreBindPlugins() sets.Set[string] {
83+
return c.skipPreBindPlugins
84+
}
85+
7686
// Clone creates a copy of CycleState and returns its pointer. Clone returns
7787
// nil if the context being cloned is nil.
7888
func (c *CycleState) Clone() fwk.CycleState {
@@ -89,6 +99,7 @@ func (c *CycleState) Clone() fwk.CycleState {
8999
copy.recordPluginMetrics = c.recordPluginMetrics
90100
copy.skipFilterPlugins = c.skipFilterPlugins
91101
copy.skipScorePlugins = c.skipScorePlugins
102+
copy.skipPreBindPlugins = c.skipPreBindPlugins
92103

93104
return copy
94105
}

pkg/scheduler/framework/interface.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,12 +440,12 @@ type ReservePlugin interface {
440440
// These plugins are called before a pod being scheduled.
441441
type PreBindPlugin interface {
442442
Plugin
443-
// PreBindPreFlight is called before PreBind, and the plugin is supposed to return Success, Skip, or Error status.
443+
// PreBindPreFlight is called before PreBind, and the plugin is supposed to return Success, Skip, or Error status
444+
// to tell the scheduler whether the plugin will do something in PreBind or not.
444445
// If it returns Success, it means this PreBind plugin will handle this pod.
445446
// If it returns Skip, it means this PreBind plugin has nothing to do with the pod, and PreBind will be skipped.
446447
// This function should be lightweight, and shouldn't do any actual operation, e.g., creating a volume etc.
447448
PreBindPreFlight(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status
448-
449449
// PreBind is called before binding a pod. All prebind plugins must return
450450
// success or the pod will be rejected and won't be sent for binding.
451451
PreBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status
@@ -526,6 +526,10 @@ type Framework interface {
526526
// internal error. In either case the pod is not going to be bound.
527527
RunPreBindPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status
528528

529+
// RunPreBindPreFlights runs the set of configured PreBindPreFlight functions from PreBind plugins.
530+
// It returns immediately if any of the plugins returns a non-skip status.
531+
RunPreBindPreFlights(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status
532+
529533
// RunPostBindPlugins runs the set of configured PostBind plugins.
530534
RunPostBindPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string)
531535

@@ -547,6 +551,9 @@ type Framework interface {
547551
// Pod will remain waiting pod for the minimum duration returned by the Permit plugins.
548552
RunPermitPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status
549553

554+
// WillWaitOnPermit returns whether this pod will wait on permit by checking if the pod is a waiting pod.
555+
WillWaitOnPermit(ctx context.Context, pod *v1.Pod) bool
556+
550557
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
551558
WaitOnPermit(ctx context.Context, pod *v1.Pod) *fwk.Status
552559

pkg/scheduler/framework/runtime/framework.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,6 +1278,10 @@ func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state fwk.CycleSt
12781278
logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
12791279
}
12801280
for _, pl := range f.preBindPlugins {
1281+
if state.GetSkipPreBindPlugins().Has(pl.Name()) {
1282+
continue
1283+
}
1284+
12811285
ctx := ctx
12821286
if verboseLogs {
12831287
logger := klog.LoggerWithName(logger, pl.Name())
@@ -1308,6 +1312,60 @@ func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBi
13081312
return status
13091313
}
13101314

1315+
// RunPreBindPreFlights runs the set of configured PreBindPreFlight functions from PreBind plugins.
1316+
// The returning value is:
1317+
// - Success: one or more plugins return success, meaning, some PreBind plugins will work for this pod.
1318+
// - Skip: all plugins return skip.
1319+
// - Error: any plugin return error.
1320+
func (f *frameworkImpl) RunPreBindPreFlights(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (status *fwk.Status) {
1321+
startTime := time.Now()
1322+
defer func() {
1323+
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreBindPreFlight, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
1324+
}()
1325+
logger := klog.FromContext(ctx)
1326+
verboseLogs := logger.V(4).Enabled()
1327+
if verboseLogs {
1328+
logger = klog.LoggerWithName(logger, "PreBindPreFlight")
1329+
logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
1330+
}
1331+
skipPlugins := sets.New[string]()
1332+
returningStatus := fwk.NewStatus(fwk.Skip)
1333+
for _, pl := range f.preBindPlugins {
1334+
ctx := ctx
1335+
if verboseLogs {
1336+
logger := klog.LoggerWithName(logger, pl.Name())
1337+
ctx = klog.NewContext(ctx, logger)
1338+
}
1339+
status = f.runPreBindPreFlight(ctx, pl, state, pod, nodeName)
1340+
switch {
1341+
case status.Code() == fwk.Error:
1342+
err := status.AsError()
1343+
logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName)
1344+
return fwk.AsStatus(fmt.Errorf("running PreBindPreFlight %q: %w", pl.Name(), err))
1345+
case status.IsSuccess():
1346+
// We return success when one or more plugins return success.
1347+
returningStatus = nil
1348+
case status.IsSkip():
1349+
skipPlugins.Insert(pl.Name())
1350+
default:
1351+
// Other statuses are unexpected
1352+
return fwk.AsStatus(fmt.Errorf("PreBindPreFlight %s returned %q, which is unsupported. It is supposed to return Success, Skip, or Error status", pl.Name(), status.Code()))
1353+
}
1354+
}
1355+
state.SetSkipPreBindPlugins(skipPlugins)
1356+
return returningStatus
1357+
}
1358+
1359+
func (f *frameworkImpl) runPreBindPreFlight(ctx context.Context, pl framework.PreBindPlugin, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
1360+
if !state.ShouldRecordPluginMetrics() {
1361+
return pl.PreBindPreFlight(ctx, state, pod, nodeName)
1362+
}
1363+
startTime := time.Now()
1364+
status := pl.PreBindPreFlight(ctx, state, pod, nodeName)
1365+
f.metricsRecorder.ObservePluginDurationAsync(metrics.PreBindPreFlight, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
1366+
return status
1367+
}
1368+
13111369
// RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
13121370
func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (status *fwk.Status) {
13131371
startTime := time.Now()
@@ -1536,6 +1594,10 @@ func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl framework.Permit
15361594
return status, timeout
15371595
}
15381596

1597+
func (f *frameworkImpl) WillWaitOnPermit(ctx context.Context, pod *v1.Pod) bool {
1598+
return f.waitingPods.get(pod.UID) != nil
1599+
}
1600+
15391601
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
15401602
func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *fwk.Status {
15411603
waitingPod := f.waitingPods.get(pod.UID)

pkg/scheduler/framework/runtime/framework_test.go

Lines changed: 109 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -235,12 +235,12 @@ func (pl *TestPlugin) Reserve(ctx context.Context, state fwk.CycleState, p *v1.P
235235
func (pl *TestPlugin) Unreserve(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) {
236236
}
237237

238-
func (pl *TestPlugin) PreBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status {
239-
return fwk.NewStatus(fwk.Code(pl.inj.PreBindStatus), injectReason)
238+
func (pl *TestPlugin) PreBindPreFlight(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status {
239+
return fwk.NewStatus(fwk.Code(pl.inj.PreBindPreFlightStatus), injectReason)
240240
}
241241

242-
func (pl *TestPlugin) PreBindPreFlight(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status {
243-
return nil
242+
func (pl *TestPlugin) PreBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status {
243+
return fwk.NewStatus(fwk.Code(pl.inj.PreBindStatus), injectReason)
244244
}
245245

246246
func (pl *TestPlugin) PostBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) {
@@ -2613,6 +2613,110 @@ func TestPreBindPlugins(t *testing.T) {
26132613
}
26142614
}
26152615

2616+
func TestPreBindPreFlightPlugins(t *testing.T) {
2617+
tests := []struct {
2618+
name string
2619+
plugins []*TestPlugin
2620+
wantStatus *fwk.Status
2621+
}{
2622+
{
2623+
name: "Skip when there's no PreBind Plugin",
2624+
plugins: []*TestPlugin{},
2625+
wantStatus: fwk.NewStatus(fwk.Skip),
2626+
},
2627+
{
2628+
name: "Success when PreBindPreFlight returns Success",
2629+
plugins: []*TestPlugin{
2630+
{
2631+
name: "TestPlugin1",
2632+
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Skip)},
2633+
},
2634+
{
2635+
name: "TestPlugin2",
2636+
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Success)},
2637+
},
2638+
},
2639+
wantStatus: nil,
2640+
},
2641+
{
2642+
name: "Skip when all PreBindPreFlight returns Skip",
2643+
plugins: []*TestPlugin{
2644+
{
2645+
name: "TestPlugin1",
2646+
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Skip)},
2647+
},
2648+
{
2649+
name: "TestPlugin2",
2650+
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Skip)},
2651+
},
2652+
},
2653+
wantStatus: fwk.NewStatus(fwk.Skip),
2654+
},
2655+
{
2656+
name: "Error when PreBindPreFlight returns Error",
2657+
plugins: []*TestPlugin{
2658+
{
2659+
name: "TestPlugin1",
2660+
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Skip)},
2661+
},
2662+
{
2663+
name: "TestPlugin2",
2664+
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Error)},
2665+
},
2666+
},
2667+
wantStatus: fwk.AsStatus(fmt.Errorf(`running PreBindPreFlight "TestPlugin2": %w`, errInjectedStatus)),
2668+
},
2669+
{
2670+
name: "Error when PreBindPreFlight returns Unschedulable",
2671+
plugins: []*TestPlugin{
2672+
{
2673+
name: "TestPlugin",
2674+
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Unschedulable)},
2675+
},
2676+
},
2677+
wantStatus: fwk.NewStatus(fwk.Error, "PreBindPreFlight TestPlugin returned \"Unschedulable\", which is unsupported. It is supposed to return Success, Skip, or Error status"),
2678+
},
2679+
}
2680+
2681+
for _, tt := range tests {
2682+
t.Run(tt.name, func(t *testing.T) {
2683+
_, ctx := ktesting.NewTestContext(t)
2684+
registry := Registry{}
2685+
configPlugins := &config.Plugins{}
2686+
2687+
for _, pl := range tt.plugins {
2688+
tmpPl := pl
2689+
if err := registry.Register(pl.name, func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
2690+
return tmpPl, nil
2691+
}); err != nil {
2692+
t.Fatalf("Unable to register pre bind plugins: %s", pl.name)
2693+
}
2694+
2695+
configPlugins.PreBind.Enabled = append(
2696+
configPlugins.PreBind.Enabled,
2697+
config.Plugin{Name: pl.name},
2698+
)
2699+
}
2700+
profile := config.KubeSchedulerProfile{Plugins: configPlugins}
2701+
ctx, cancel := context.WithCancel(ctx)
2702+
defer cancel()
2703+
f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
2704+
if err != nil {
2705+
t.Fatalf("fail to create framework: %s", err)
2706+
}
2707+
defer func() {
2708+
_ = f.Close()
2709+
}()
2710+
2711+
status := f.RunPreBindPreFlights(ctx, state, pod, "")
2712+
2713+
if diff := cmp.Diff(tt.wantStatus, status, statusCmpOpts...); diff != "" {
2714+
t.Errorf("Wrong status code (-want,+got):\n%s", diff)
2715+
}
2716+
})
2717+
}
2718+
}
2719+
26162720
func TestReservePlugins(t *testing.T) {
26172721
tests := []struct {
26182722
name string
@@ -3458,6 +3562,7 @@ type injectedResult struct {
34583562
PostFilterStatus int `json:"postFilterStatus,omitempty"`
34593563
PreScoreStatus int `json:"preScoreStatus,omitempty"`
34603564
ReserveStatus int `json:"reserveStatus,omitempty"`
3565+
PreBindPreFlightStatus int `json:"preBindPreFlightStatus,omitempty"`
34613566
PreBindStatus int `json:"preBindStatus,omitempty"`
34623567
BindStatus int `json:"bindStatus,omitempty"`
34633568
PermitStatus int `json:"permitStatus,omitempty"`

pkg/scheduler/metrics/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ const (
7373
Score = "Score"
7474
ScoreExtensionNormalize = "ScoreExtensionNormalize"
7575
PreBind = "PreBind"
76+
PreBindPreFlight = "PreBindPreFlight"
7677
Bind = "Bind"
7778
PostBind = "PostBind"
7879
Reserve = "Reserve"

pkg/scheduler/schedule_one.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,26 @@ func (sched *Scheduler) bindingCycle(
279279

280280
assumedPod := assumedPodInfo.Pod
281281

282+
if sched.nominatedNodeNameForExpectationEnabled {
283+
preFlightStatus := schedFramework.RunPreBindPreFlights(ctx, state, assumedPod, scheduleResult.SuggestedHost)
284+
if preFlightStatus.Code() == fwk.Error ||
285+
// Unschedulable status is not supported in PreBindPreFlight and hence we regard it as an error.
286+
preFlightStatus.IsRejected() {
287+
return preFlightStatus
288+
}
289+
if preFlightStatus.IsSuccess() || schedFramework.WillWaitOnPermit(ctx, assumedPod) {
290+
// Add NominatedNodeName to tell the external components (e.g., the cluster autoscaler) that the pod is about to be bound to the node.
291+
// We only do this when any of WaitOnPermit or PreBind will work because otherwise the pod will be soon bound anyway.
292+
if err := updatePod(ctx, sched.client, schedFramework.APICacher(), assumedPod, nil, &framework.NominatingInfo{
293+
NominatedNodeName: scheduleResult.SuggestedHost,
294+
NominatingMode: framework.ModeOverride,
295+
}); err != nil {
296+
logger.Error(err, "Failed to update the nominated node name in the binding cycle", "pod", klog.KObj(assumedPod), "nominatedNodeName", scheduleResult.SuggestedHost)
297+
// We continue the processing because it's not critical enough to stop binding cycles here.
298+
}
299+
}
300+
}
301+
282302
// Run "permit" plugins.
283303
if status := schedFramework.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
284304
if status.IsRejected() {
@@ -1118,12 +1138,21 @@ func updatePod(ctx context.Context, client clientset.Interface, apiCacher framew
11181138
return err
11191139
}
11201140
logger := klog.FromContext(ctx)
1121-
logger.V(3).Info("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
1141+
logValues := []any{"pod", klog.KObj(pod)}
1142+
if condition != nil {
1143+
logValues = append(logValues, "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
1144+
}
1145+
if nominatingInfo != nil {
1146+
logValues = append(logValues, "nominatedNodeName", nominatingInfo.NominatedNodeName, "nominatingMode", nominatingInfo.Mode())
1147+
}
1148+
logger.V(3).Info("Updating pod condition and nominated node name", logValues...)
1149+
11221150
podStatusCopy := pod.Status.DeepCopy()
11231151
// NominatedNodeName is updated only if we are trying to set it, and the value is
11241152
// different from the existing one.
11251153
nnnNeedsUpdate := nominatingInfo.Mode() == framework.ModeOverride && pod.Status.NominatedNodeName != nominatingInfo.NominatedNodeName
1126-
if !podutil.UpdatePodCondition(podStatusCopy, condition) && !nnnNeedsUpdate {
1154+
podConditionNeedsUpdate := condition != nil && podutil.UpdatePodCondition(podStatusCopy, condition)
1155+
if !podConditionNeedsUpdate && !nnnNeedsUpdate {
11271156
return nil
11281157
}
11291158
if nnnNeedsUpdate {

0 commit comments

Comments
 (0)