Skip to content

Commit b50d664

Browse files
authored
feat: disable backend synchronization (#5734)
1 parent f55dc24 commit b50d664

File tree

4 files changed

+42
-17
lines changed

4 files changed

+42
-17
lines changed

engine/cdn/storage/nfs/nfs_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/rockbears/log"
1111
"github.com/stretchr/testify/require"
1212
"io/ioutil"
13-
"math"
1413
"os"
1514
"testing"
1615
"time"
@@ -64,7 +63,7 @@ func TestNFSReadWrite(t *testing.T) {
6463
require.NotNil(t, d)
6564
bd, is := d.(storage.BufferUnit)
6665
require.True(t, is)
67-
bd.New(sdk.NewGoRoutines(), 1, math.MaxFloat64)
66+
bd.New(sdk.NewGoRoutines(), storage.AbstractUnitConfig{})
6867
err := bd.Init(ctx, &storage.NFSBufferConfiguration{
6968
Host: nfsHost,
7069
TargetPartition: nfsTargetPath,

engine/cdn/storage/storageunit.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
9191
if !is {
9292
return nil, sdk.WithStack(fmt.Errorf("redis driver is not a buffer unit driver"))
9393
}
94-
bd.New(gorts, 1, math.MaxFloat64)
94+
bd.New(gorts, AbstractUnitConfig{syncBandwidth: math.MaxFloat64, syncParrallel: 1})
9595
if err := bd.Init(ctx, bu.Redis, bu.BufferType); err != nil {
9696
return nil, err
9797
}
@@ -106,7 +106,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
106106
if !is {
107107
return nil, sdk.WithStack(fmt.Errorf("local driver is not a buffer unit driver"))
108108
}
109-
bd.New(gorts, 1, math.MaxFloat64)
109+
bd.New(gorts, AbstractUnitConfig{syncBandwidth: math.MaxFloat64, syncParrallel: 1})
110110
if err := bd.Init(ctx, bu.Local, bu.BufferType); err != nil {
111111
return nil, err
112112
}
@@ -121,7 +121,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
121121
if !is {
122122
return nil, sdk.WithStack(fmt.Errorf("nfs buffer driver is not a buffer unit driver"))
123123
}
124-
bd.New(gorts, 1, math.MaxFloat64)
124+
bd.New(gorts, AbstractUnitConfig{syncBandwidth: math.MaxFloat64, syncParrallel: 1})
125125
if err := bd.Init(ctx, bu.Nfs, bu.BufferType); err != nil {
126126
return nil, err
127127
}
@@ -175,7 +175,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
175175
if !is {
176176
return nil, sdk.WithStack(fmt.Errorf("cds driver is not a storage unit driver"))
177177
}
178-
sd.New(gorts, cfg.SyncParallel, float64(cfg.SyncBandwidth)*1024*1024) // convert from MBytes to Bytes
178+
sd.New(gorts, AbstractUnitConfig{syncBandwidth: float64(cfg.SyncBandwidth) * 1024 * 1024, syncParrallel: cfg.SyncParallel, disableSync: cfg.DisableSync}) // convert from MBytes to Bytes
179179

180180
if err := sd.Init(ctx, cfg.CDS); err != nil {
181181
return nil, err
@@ -191,7 +191,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
191191
if !is {
192192
return nil, sdk.WithStack(fmt.Errorf("local driver is not a storage unit driver"))
193193
}
194-
sd.New(gorts, cfg.SyncParallel, float64(cfg.SyncBandwidth)*1024*1024) // convert from MBytes to Bytes
194+
sd.New(gorts, AbstractUnitConfig{syncBandwidth: float64(cfg.SyncBandwidth) * 1024 * 1024, syncParrallel: cfg.SyncParallel, disableSync: cfg.DisableSync}) // convert from MBytes to Bytes
195195

196196
if err := sd.Init(ctx, cfg.Local); err != nil {
197197
return nil, err
@@ -204,7 +204,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
204204
if !is {
205205
return nil, sdk.WithStack(fmt.Errorf("swift driver is not a storage unit driver"))
206206
}
207-
sd.New(gorts, cfg.SyncParallel, float64(cfg.SyncBandwidth)*1024*1024) // convert from MBytes to Bytes
207+
sd.New(gorts, AbstractUnitConfig{syncBandwidth: float64(cfg.SyncBandwidth) * 1024 * 1024, syncParrallel: cfg.SyncParallel, disableSync: cfg.DisableSync}) // convert from MBytes to Bytes
208208

209209
if err := sd.Init(ctx, cfg.Swift); err != nil {
210210
return nil, err
@@ -217,7 +217,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
217217
if !is {
218218
return nil, sdk.WithStack(fmt.Errorf("webdav driver is not a storage unit driver"))
219219
}
220-
sd.New(gorts, cfg.SyncParallel, float64(cfg.SyncBandwidth)*1024*1024) // convert from MBytes to Bytes
220+
sd.New(gorts, AbstractUnitConfig{syncBandwidth: float64(cfg.SyncBandwidth) * 1024 * 1024, syncParrallel: cfg.SyncParallel, disableSync: cfg.DisableSync}) // convert from MBytes to Bytes
221221

222222
if err := sd.Init(ctx, cfg.Webdav); err != nil {
223223
return nil, err
@@ -230,7 +230,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
230230
if !is {
231231
return nil, sdk.WithStack(fmt.Errorf("s3 driver is not a storage unit driver"))
232232
}
233-
sd.New(gorts, cfg.SyncParallel, float64(cfg.SyncBandwidth)*1024*1024) // convert from MBytes to Bytes
233+
sd.New(gorts, AbstractUnitConfig{syncBandwidth: float64(cfg.SyncBandwidth) * 1024 * 1024, syncParrallel: cfg.SyncParallel, disableSync: cfg.DisableSync}) // convert from MBytes to Bytes
234234

235235
if err := sd.Init(ctx, cfg.S3); err != nil {
236236
return nil, err
@@ -280,6 +280,9 @@ func (r *RunningStorageUnits) PushInSyncQueue(ctx context.Context, itemID string
280280
return
281281
}
282282
for _, sto := range r.Storages {
283+
if !sto.CanSync() {
284+
continue
285+
}
283286
if err := r.cache.ScoredSetAdd(ctx, cache.Key(KeyBackendSync, sto.Name()), itemID, float64(created.Unix())); err != nil {
284287
log.Info(ctx, "storeLogs> cannot push item %s into scoredset for unit %s", itemID, sto.Name())
285288
continue
@@ -290,6 +293,9 @@ func (r *RunningStorageUnits) PushInSyncQueue(ctx context.Context, itemID string
290293
func (r *RunningStorageUnits) Start(ctx context.Context, gorts *sdk.GoRoutines) {
291294
// Get Unknown items
292295
for _, s := range r.Storages {
296+
if !s.CanSync() {
297+
continue
298+
}
293299
if err := r.FillWithUnknownItems(ctx, s, r.config.SyncNbElements); err != nil {
294300
log.Error(ctx, "Start> unable to get unknown items: %v", err)
295301
}
@@ -298,6 +304,9 @@ func (r *RunningStorageUnits) Start(ctx context.Context, gorts *sdk.GoRoutines)
298304
// Start the sync processes
299305
for i := range r.Storages {
300306
s := r.Storages[i]
307+
if !s.CanSync() {
308+
continue
309+
}
301310
for x := 0; x < cap(s.SyncItemChannel()); x++ {
302311
gorts.Run(ctx, fmt.Sprintf("RunningStorageUnits.process.%s.%d", s.Name(), x),
303312
func(ctx context.Context) {
@@ -381,6 +390,9 @@ func (r *RunningStorageUnits) Start(ctx context.Context, gorts *sdk.GoRoutines)
381390
wg := sync.WaitGroup{}
382391
for i := range r.Storages {
383392
s := r.Storages[i]
393+
if !s.CanSync() {
394+
continue
395+
}
384396
gorts.Exec(ctx, "RunningStorageUnits.run."+s.Name(),
385397
func(ctx context.Context) {
386398
wg.Add(1)

engine/cdn/storage/types.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ var (
4343
type Interface interface {
4444
Name() string
4545
ID() string
46-
New(gorts *sdk.GoRoutines, syncParrallel int64, syncBandwidth float64)
46+
New(gorts *sdk.GoRoutines, config AbstractUnitConfig)
4747
Set(u sdk.CDNUnit)
4848
ItemExists(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, i sdk.CDNItem) (bool, error)
4949
Status(ctx context.Context) []sdk.MonitoringStatusLine
@@ -56,13 +56,18 @@ type AbstractUnit struct {
5656
u sdk.CDNUnit
5757
syncChan chan string
5858
syncBandwidth float64
59+
disableSync bool
5960
}
6061

6162
func (a *AbstractUnit) ExistsInDatabase(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, id string) (*sdk.CDNItemUnit, error) {
6263
query := gorpmapper.NewQuery("SELECT * FROM storage_unit_item WHERE unit_id = $1 and item_id = $2 LIMIT 1").Args(a.ID(), id)
6364
return getItemUnit(ctx, m, db, query, gorpmapper.GetOptions.WithDecryption)
6465
}
6566

67+
func (a *AbstractUnit) CanSync() bool {
68+
return !a.disableSync
69+
}
70+
6671
func (a *AbstractUnit) Name() string {
6772
return a.u.Name
6873
}
@@ -73,13 +78,14 @@ func (a *AbstractUnit) ID() string {
7378

7479
func (a *AbstractUnit) Set(u sdk.CDNUnit) { a.u = u }
7580

76-
func (a *AbstractUnit) New(gorts *sdk.GoRoutines, syncParrallel int64, syncBandwidth float64) {
81+
func (a *AbstractUnit) New(gorts *sdk.GoRoutines, config AbstractUnitConfig) {
7782
a.GoRoutines = gorts
78-
a.syncChan = make(chan string, syncParrallel)
79-
if syncBandwidth <= 0 {
80-
syncBandwidth = math.MaxFloat64
83+
a.syncChan = make(chan string, config.syncParrallel)
84+
if config.syncBandwidth <= 0 {
85+
config.syncBandwidth = math.MaxFloat64
8186
}
82-
a.syncBandwidth = syncBandwidth / float64(syncParrallel)
87+
a.syncBandwidth = config.syncBandwidth / float64(config.syncParrallel)
88+
a.disableSync = config.disableSync
8389
}
8490

8591
func (a *AbstractUnit) SyncItemChannel() chan string { return a.syncChan }
@@ -116,13 +122,20 @@ type FileBufferUnit interface {
116122
Write(i sdk.CDNItemUnit, r io.Reader, w io.Writer) error
117123
}
118124

125+
type AbstractUnitConfig struct {
126+
syncParrallel int64
127+
syncBandwidth float64
128+
disableSync bool
129+
}
130+
119131
type StorageUnit interface {
120132
Interface
121133
Unit
122134
Init(ctx context.Context, cfg interface{}) error
123135
SyncItemChannel() chan string
124136
NewWriter(ctx context.Context, i sdk.CDNItemUnit) (io.WriteCloser, error)
125137
Write(i sdk.CDNItemUnit, r io.Reader, w io.Writer) error
138+
CanSync() bool
126139
}
127140

128141
type StorageUnitWithLocator interface {
@@ -155,6 +168,7 @@ const (
155168
type StorageConfiguration struct {
156169
SyncParallel int64 `toml:"syncParallel" json:"sync_parallel" comment:"number of parallel sync processes"`
157170
SyncBandwidth int64 `toml:"syncBandwidth" json:"sync_bandwidth" comment:"global bandwith shared by the sync processes (in Mb)"`
171+
DisableSync bool `toml:"disableSync" json:"disable_sync" comment:"flag to disabled backend synchronization"`
158172
Local *LocalStorageConfiguration `toml:"local" json:"local,omitempty" mapstructure:"local"`
159173
Swift *SwiftStorageConfiguration `toml:"swift" json:"swift,omitempty" mapstructure:"swift"`
160174
Webdav *WebdavStorageConfiguration `toml:"webdav" json:"webdav,omitempty" mapstructure:"webdav"`

engine/cdn/storage/webdav/webdav.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func init() {
3232
storage.RegisterDriver("webdav", new(Webdav))
3333
}
3434

35-
func (s *Webdav) Init(ctx context.Context, cfg interface{}) error {
35+
func (s *Webdav) Init(_ context.Context, cfg interface{}) error {
3636
config, is := cfg.(*storage.WebdavStorageConfiguration)
3737
if !is {
3838
return sdk.WithStack(fmt.Errorf("invalid configuration: %T", cfg))

0 commit comments

Comments
 (0)