Skip to content

Commit 7f052af

Browse files
committed
KEP 5075: implement scheduler
Signed-off-by: Sunyanan Choochotkaew <sunyanan.choochotkaew1@ibm.com>
1 parent 59bba92 commit 7f052af

34 files changed

+2800
-183
lines changed

pkg/controller/devicetainteviction/device_taint_eviction.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"k8s.io/apimachinery/pkg/util/diff"
3737
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3838
"k8s.io/apimachinery/pkg/util/sets"
39+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3940
coreinformers "k8s.io/client-go/informers/core/v1"
4041
resourceinformers "k8s.io/client-go/informers/resource/v1"
4142
resourcealphainformers "k8s.io/client-go/informers/resource/v1alpha3"
@@ -51,6 +52,7 @@ import (
5152
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
5253
"k8s.io/kubernetes/pkg/controller/devicetainteviction/metrics"
5354
"k8s.io/kubernetes/pkg/controller/tainteviction"
55+
"k8s.io/kubernetes/pkg/features"
5456
utilpod "k8s.io/kubernetes/pkg/util/pod"
5557
)
5658

@@ -458,11 +460,12 @@ func (tc *Controller) Run(ctx context.Context) error {
458460
tc.haveSynced = append(tc.haveSynced, podHandler.HasSynced)
459461

460462
opts := resourceslicetracker.Options{
461-
EnableDeviceTaints: true,
462-
SliceInformer: tc.sliceInformer,
463-
TaintInformer: tc.taintInformer,
464-
ClassInformer: tc.classInformer,
465-
KubeClient: tc.client,
463+
EnableDeviceTaints: true,
464+
EnableConsumableCapacity: utilfeature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity),
465+
SliceInformer: tc.sliceInformer,
466+
TaintInformer: tc.taintInformer,
467+
ClassInformer: tc.classInformer,
468+
KubeClient: tc.client,
466469
}
467470
sliceTracker, err := resourceslicetracker.StartTracker(ctx, opts)
468471
if err != nil {

pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ func (r *resourceClaimTrackerContract) ListAllAllocatedDevices() (sets.Set[struc
101101
return nil, nil
102102
}
103103

104+
func (r *resourceClaimTrackerContract) GatherAllocatedState() (*structured.AllocatedState, error) {
105+
return nil, nil
106+
}
107+
104108
func (r *resourceClaimTrackerContract) SignalClaimPendingAllocation(_ types.UID, _ *resourceapi.ResourceClaim) error {
105109
return nil
106110
}

pkg/scheduler/framework/listers.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ type ResourceClaimTracker interface {
8484
// ListAllAllocatedDevices lists all allocated Devices from allocated ResourceClaims. The result is guaranteed to immediately include
8585
// any changes made via AssumeClaimAfterAPICall(), and SignalClaimPendingAllocation().
8686
ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error)
87+
// GatherAllocatedState gathers information about allocated devices from allocated ResourceClaims. The result is guaranteed to immediately include
88+
// any changes made via AssumeClaimAfterAPICall(), and SignalClaimPendingAllocation().
89+
GatherAllocatedState() (*structured.AllocatedState, error)
8790

8891
// SignalClaimPendingAllocation signals to the tracker that the given ResourceClaim will be allocated via an API call in the
8992
// binding phase. This change is immediately reflected in the result of List() and the other accessors.

pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go

Lines changed: 95 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import (
2121

2222
resourceapi "k8s.io/api/resource/v1"
2323
"k8s.io/apimachinery/pkg/util/sets"
24+
utilfeature "k8s.io/apiserver/pkg/util/feature"
2425
"k8s.io/client-go/tools/cache"
2526
"k8s.io/dynamic-resource-allocation/structured"
2627
"k8s.io/klog/v2"
28+
"k8s.io/kubernetes/pkg/features"
2729
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
2830
"k8s.io/utils/ptr"
2931
)
@@ -36,7 +38,11 @@ import (
3638
// claims and are skipped without invoking the callback.
3739
//
3840
// foreachAllocatedDevice does nothing if the claim is not allocated.
39-
func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID structured.DeviceID)) {
41+
func foreachAllocatedDevice(claim *resourceapi.ResourceClaim,
42+
dedicatedDeviceCallback func(deviceID structured.DeviceID),
43+
enabledConsumableCapacity bool,
44+
sharedDeviceCallback func(structured.SharedDeviceID),
45+
consumedCapacityCallback func(structured.DeviceConsumedCapacity)) {
4046
if claim.Status.Allocation == nil {
4147
return
4248
}
@@ -54,7 +60,24 @@ func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID s
5460

5561
// None of the users of this helper need to abort iterating,
5662
// therefore it's not supported as it only would add overhead.
57-
cb(deviceID)
63+
64+
// Execute sharedDeviceCallback and consumedCapacityCallback correspondingly
65+
// if DRAConsumableCapacity feature is enabled
66+
if enabledConsumableCapacity {
67+
shared := result.ShareID != nil
68+
if shared {
69+
sharedDeviceID := structured.MakeSharedDeviceID(deviceID, result.ShareID)
70+
sharedDeviceCallback(sharedDeviceID)
71+
if result.ConsumedCapacity != nil {
72+
deviceConsumedCapacity := structured.NewDeviceConsumedCapacity(deviceID, result.ConsumedCapacity)
73+
consumedCapacityCallback(deviceConsumedCapacity)
74+
}
75+
continue
76+
}
77+
}
78+
79+
// Otherwise, execute dedicatedDeviceCallback
80+
dedicatedDeviceCallback(deviceID)
5881
}
5982
}
6083

@@ -66,14 +89,20 @@ func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID s
6689
type allocatedDevices struct {
6790
logger klog.Logger
6891

69-
mutex sync.RWMutex
70-
ids sets.Set[structured.DeviceID]
92+
mutex sync.RWMutex
93+
ids sets.Set[structured.DeviceID]
94+
shareIDs sets.Set[structured.SharedDeviceID]
95+
capacities structured.ConsumedCapacityCollection
96+
enabledConsumableCapacity bool
7197
}
7298

7399
func newAllocatedDevices(logger klog.Logger) *allocatedDevices {
74100
return &allocatedDevices{
75-
logger: logger,
76-
ids: sets.New[structured.DeviceID](),
101+
logger: logger,
102+
ids: sets.New[structured.DeviceID](),
103+
shareIDs: sets.New[structured.SharedDeviceID](),
104+
capacities: structured.NewConsumedCapacityCollection(),
105+
enabledConsumableCapacity: utilfeature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity),
77106
}
78107
}
79108

@@ -84,6 +113,13 @@ func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] {
84113
return a.ids.Clone()
85114
}
86115

116+
func (a *allocatedDevices) Capacities() structured.ConsumedCapacityCollection {
117+
a.mutex.RLock()
118+
defer a.mutex.RUnlock()
119+
120+
return a.capacities.Clone()
121+
}
122+
87123
func (a *allocatedDevices) handlers() cache.ResourceEventHandler {
88124
return cache.ResourceEventHandlerFuncs{
89125
AddFunc: a.onAdd,
@@ -142,16 +178,39 @@ func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) {
142178
// Locking of the mutex gets minimized by pre-computing what needs to be done
143179
// without holding the lock.
144180
deviceIDs := make([]structured.DeviceID, 0, 20)
145-
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
146-
a.logger.V(6).Info("Observed device allocation", "device", deviceID, "claim", klog.KObj(claim))
147-
deviceIDs = append(deviceIDs, deviceID)
148-
})
181+
var shareIDs []structured.SharedDeviceID
182+
var deviceCapacities []structured.DeviceConsumedCapacity
183+
if a.enabledConsumableCapacity {
184+
shareIDs = make([]structured.SharedDeviceID, 0, 20)
185+
deviceCapacities = make([]structured.DeviceConsumedCapacity, 0, 20)
186+
}
187+
foreachAllocatedDevice(claim,
188+
func(deviceID structured.DeviceID) {
189+
a.logger.V(6).Info("Observed device allocation", "device", deviceID, "claim", klog.KObj(claim))
190+
deviceIDs = append(deviceIDs, deviceID)
191+
},
192+
a.enabledConsumableCapacity,
193+
func(sharedDeviceID structured.SharedDeviceID) {
194+
a.logger.V(6).Info("Observed shared device allocation", "shared device", sharedDeviceID, "claim", klog.KObj(claim))
195+
shareIDs = append(shareIDs, sharedDeviceID)
196+
},
197+
func(capacity structured.DeviceConsumedCapacity) {
198+
a.logger.V(6).Info("Observed consumed capacity", "device", capacity.DeviceID, "consumed capacity", capacity.ConsumedCapacity, "claim", klog.KObj(claim))
199+
deviceCapacities = append(deviceCapacities, capacity)
200+
},
201+
)
149202

150203
a.mutex.Lock()
151204
defer a.mutex.Unlock()
152205
for _, deviceID := range deviceIDs {
153206
a.ids.Insert(deviceID)
154207
}
208+
for _, shareID := range shareIDs {
209+
a.shareIDs.Insert(shareID)
210+
}
211+
for _, capacity := range deviceCapacities {
212+
a.capacities.Insert(capacity)
213+
}
155214
}
156215

157216
func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) {
@@ -162,14 +221,35 @@ func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) {
162221
// Locking of the mutex gets minimized by pre-computing what needs to be done
163222
// without holding the lock.
164223
deviceIDs := make([]structured.DeviceID, 0, 20)
165-
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
166-
a.logger.V(6).Info("Observed device deallocation", "device", deviceID, "claim", klog.KObj(claim))
167-
deviceIDs = append(deviceIDs, deviceID)
168-
})
169-
224+
var shareIDs []structured.SharedDeviceID
225+
var deviceCapacities []structured.DeviceConsumedCapacity
226+
if a.enabledConsumableCapacity {
227+
shareIDs = make([]structured.SharedDeviceID, 0, 20)
228+
deviceCapacities = make([]structured.DeviceConsumedCapacity, 0, 20)
229+
}
230+
foreachAllocatedDevice(claim,
231+
func(deviceID structured.DeviceID) {
232+
a.logger.V(6).Info("Observed device deallocation", "device", deviceID, "claim", klog.KObj(claim))
233+
deviceIDs = append(deviceIDs, deviceID)
234+
},
235+
a.enabledConsumableCapacity,
236+
func(sharedDeviceID structured.SharedDeviceID) {
237+
a.logger.V(6).Info("Observed shared device deallocation", "shared device", sharedDeviceID, "claim", klog.KObj(claim))
238+
shareIDs = append(shareIDs, sharedDeviceID)
239+
},
240+
func(capacity structured.DeviceConsumedCapacity) {
241+
a.logger.V(6).Info("Observed consumed capacity release", "device id", capacity.DeviceID, "consumed capacity", capacity.ConsumedCapacity, "claim", klog.KObj(claim))
242+
deviceCapacities = append(deviceCapacities, capacity)
243+
})
170244
a.mutex.Lock()
171245
defer a.mutex.Unlock()
172246
for _, deviceID := range deviceIDs {
173247
a.ids.Delete(deviceID)
174248
}
249+
for _, shareID := range shareIDs {
250+
a.shareIDs.Delete(shareID)
251+
}
252+
for _, capacity := range deviceCapacities {
253+
a.capacities.Remove(capacity)
254+
}
175255
}

pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ import (
2525
"k8s.io/apimachinery/pkg/labels"
2626
"k8s.io/apimachinery/pkg/types"
2727
"k8s.io/apimachinery/pkg/util/sets"
28+
utilfeature "k8s.io/apiserver/pkg/util/feature"
2829
"k8s.io/client-go/informers"
2930
resourcelisters "k8s.io/client-go/listers/resource/v1"
3031
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
3132
"k8s.io/dynamic-resource-allocation/structured"
3233
"k8s.io/klog/v2"
34+
"k8s.io/kubernetes/pkg/features"
3335
"k8s.io/kubernetes/pkg/scheduler/framework"
3436
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
3537
)
@@ -212,13 +214,48 @@ func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID],
212214
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
213215
c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
214216
allocated.Insert(deviceID)
215-
})
217+
}, false, func(structured.SharedDeviceID) {}, func(structured.DeviceConsumedCapacity) {})
216218
return true
217219
})
218220
// There's no reason to return an error in this implementation, but the error might be helpful for other implementations.
219221
return allocated, nil
220222
}
221223

224+
func (c *claimTracker) GatherAllocatedState() (*structured.AllocatedState, error) {
225+
// Start with a fresh set that matches the current known state of the
226+
// world according to the informers.
227+
allocated := c.allocatedDevices.Get()
228+
allocatedSharedDeviceIDs := sets.New[structured.SharedDeviceID]()
229+
aggregatedCapacity := c.allocatedDevices.Capacities()
230+
231+
enabledConsumableCapacity := utilfeature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity)
232+
233+
// Whatever is in flight also has to be checked.
234+
c.inFlightAllocations.Range(func(key, value any) bool {
235+
claim := value.(*resourceapi.ResourceClaim)
236+
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
237+
c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
238+
allocated.Insert(deviceID)
239+
},
240+
enabledConsumableCapacity,
241+
func(sharedDeviceID structured.SharedDeviceID) {
242+
c.logger.V(6).Info("Device is in flight for allocation", "shared device", sharedDeviceID, "claim", klog.KObj(claim))
243+
allocatedSharedDeviceIDs.Insert(sharedDeviceID)
244+
}, func(capacity structured.DeviceConsumedCapacity) {
245+
c.logger.V(6).Info("Device is in flight for allocation", "consumed capacity", capacity, "claim", klog.KObj(claim))
246+
aggregatedCapacity.Insert(capacity)
247+
})
248+
return true
249+
})
250+
251+
// There's no reason to return an error in this implementation, but the error might be helpful for other implementations.
252+
return &structured.AllocatedState{
253+
AllocatedDevices: allocated,
254+
AllocatedSharedDeviceIDs: allocatedSharedDeviceIDs,
255+
AggregatedCapacity: aggregatedCapacity,
256+
}, nil
257+
}
258+
222259
func (c *claimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error {
223260
return c.cache.Assume(claim)
224261
}

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ type DynamicResources struct {
170170
enableExtendedResource bool
171171
enableFilterTimeout bool
172172
filterTimeout time.Duration
173+
enableConsumableCapacity bool
173174

174175
fh framework.Handle
175176
clientset kubernetes.Interface
@@ -201,6 +202,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
201202
enableSchedulingQueueHint: fts.EnableSchedulingQueueHint,
202203
enablePartitionableDevices: fts.EnablePartitionableDevices,
203204
enableExtendedResource: fts.EnableDRAExtendedResource,
205+
enableConsumableCapacity: fts.EnableConsumableCapacity,
204206
filterTimeout: ptr.Deref(args.FilterTimeout, metav1.Duration{}).Duration,
205207
enableDeviceBindingConditions: fts.EnableDRADeviceBindingConditions,
206208
enableDeviceStatus: fts.EnableDRAResourceClaimDeviceStatus,
@@ -210,7 +212,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
210212
// This is a LRU cache for compiled CEL expressions. The most
211213
// recent 10 of them get reused across different scheduling
212214
// cycles.
213-
celCache: cel.NewCache(10),
215+
celCache: cel.NewCache(10, cel.Features{EnableConsumableCapacity: fts.EnableConsumableCapacity}),
214216
draManager: fh.SharedDRAManager(),
215217
}
216218

@@ -658,9 +660,25 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState,
658660
// Claims (and thus their devices) are treated as "allocated" if they are in the assume cache
659661
// or currently their allocation is in-flight. This does not change
660662
// during filtering, so we can determine that once.
661-
allAllocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices()
662-
if err != nil {
663-
return nil, statusError(logger, err)
663+
var allocatedState *structured.AllocatedState
664+
if pl.enableConsumableCapacity {
665+
allocatedState, err = pl.draManager.ResourceClaims().GatherAllocatedState()
666+
if err != nil {
667+
return nil, statusError(logger, err)
668+
}
669+
if allocatedState == nil {
670+
return nil, statusError(logger, errors.New("nil allocated state"))
671+
}
672+
} else {
673+
allocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices()
674+
if err != nil {
675+
return nil, statusError(logger, err)
676+
}
677+
allocatedState = &structured.AllocatedState{
678+
AllocatedDevices: allocatedDevices,
679+
AllocatedSharedDeviceIDs: sets.New[structured.SharedDeviceID](),
680+
AggregatedCapacity: structured.NewConsumedCapacityCollection(),
681+
}
664682
}
665683
slices, err := pl.draManager.ResourceSlices().ListWithDeviceTaintRules()
666684
if err != nil {
@@ -673,8 +691,9 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState,
673691
DeviceTaints: pl.enableDeviceTaints,
674692
DeviceBinding: pl.enableDeviceBindingConditions,
675693
DeviceStatus: pl.enableDeviceStatus,
694+
ConsumableCapacity: pl.enableConsumableCapacity,
676695
}
677-
allocator, err := structured.NewAllocator(ctx, features, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache)
696+
allocator, err := structured.NewAllocator(ctx, features, *allocatedState, pl.draManager.DeviceClasses(), slices, pl.celCache)
678697
if err != nil {
679698
return nil, statusError(logger, err)
680699
}

pkg/scheduler/scheduler.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,9 +326,10 @@ func New(ctx context.Context,
326326
resourceClaimInformer := informerFactory.Resource().V1().ResourceClaims().Informer()
327327
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
328328
resourceSliceTrackerOpts := resourceslicetracker.Options{
329-
EnableDeviceTaints: feature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
330-
SliceInformer: informerFactory.Resource().V1().ResourceSlices(),
331-
KubeClient: client,
329+
EnableDeviceTaints: feature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
330+
EnableConsumableCapacity: feature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity),
331+
SliceInformer: informerFactory.Resource().V1().ResourceSlices(),
332+
KubeClient: client,
332333
}
333334
// If device taints are disabled, the additional informers are not needed and
334335
// the tracker turns into a simple wrapper around the slice informer.

0 commit comments

Comments
 (0)