From 55e681c593e4ba460138dba15de8f8a70a73e6e3 Mon Sep 17 00:00:00 2001 From: Leonard Liubich Date: Fri, 26 Jun 2026 17:52:23 +0300 Subject: [PATCH] storage: Replace shard pool with BBR load limiter Refs #3985. Signed-off-by: Leonard Liubich --- CHANGELOG.md | 2 + cmd/neofs-node/config.go | 1 - cmd/neofs-node/config/engine/config_test.go | 4 - cmd/neofs-node/config/engine/storage.go | 9 -- cmd/neofs-node/storage.go | 2 - config/example/node.env | 2 - config/example/node.json | 2 - config/example/node.yaml | 2 - docs/storage-node-configuration.md | 2 - pkg/local_object_storage/engine/bbr/bbr.go | 120 ++++++++++++++++++ .../engine/bbr/bbr_bench_test.go | 15 +++ pkg/local_object_storage/engine/control.go | 27 +--- pkg/local_object_storage/engine/engine.go | 36 +----- .../engine/engine_test.go | 2 +- pkg/local_object_storage/engine/error_test.go | 2 +- .../engine/evacuate_test.go | 2 +- pkg/local_object_storage/engine/put.go | 81 ++++++------ pkg/local_object_storage/engine/shards.go | 14 +- 18 files changed, 190 insertions(+), 135 deletions(-) create mode 100644 pkg/local_object_storage/engine/bbr/bbr.go create mode 100644 pkg/local_object_storage/engine/bbr/bbr_bench_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 59cb3b3d89..035de90335 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Changelog for NeoFS Node - Optimized GET/HEAD/SEARCH/RANGE request signing when forwarding (#4008, #4021) - Optimized ranged GET in FSTree (#4016) - Split object assembly now prefetches child objects concurrently while streaming (#4014, #4042) +- Replaced limited shard PUT queue with an adaptive load limiter (#XXX) ### Removed @@ -26,6 +27,7 @@ Changelog for NeoFS Node - NeoGo dependency to 0.120.0 (#4018) ### Updating from v0.53.0 +Drop SN `storage.shard_pool_size` and `storage.put_retry_timeout` options, they are no-op now. ## [0.53.0] - 2026-05-21 - Sido diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index cba66be937..922dcbb936 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -709,7 +709,6 @@ func (c *cfg) configWatcher(ctx context.Context) { for _, optsWithID := range c.shardOpts() { rcfg.AddShard(optsWithID.configID, optsWithID.shOpts) } - rcfg.SetShardPoolSize(uint32(c.appCfg.Storage.ShardPoolSize)) err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg) if err != nil { diff --git a/cmd/neofs-node/config/engine/config_test.go b/cmd/neofs-node/config/engine/config_test.go index f6ed60e8ee..9ce49f91e4 100644 --- a/cmd/neofs-node/config/engine/config_test.go +++ b/cmd/neofs-node/config/engine/config_test.go @@ -33,9 +33,7 @@ func TestEngineSection(t *testing.T) { require.False(t, handlerCalled) require.EqualValues(t, 0, empty.Storage.ShardROErrorThreshold) - require.EqualValues(t, engineconfig.ShardPoolSizeDefault, empty.Storage.ShardPoolSize) require.EqualValues(t, mode.ReadWrite, empty.Storage.Default.Mode) - require.Zero(t, empty.Storage.PutRetryTimeout) }) const path = "../../../../config/example/node" @@ -44,8 +42,6 @@ func TestEngineSection(t *testing.T) { num := 0 require.EqualValues(t, 100, c.Storage.ShardROErrorThreshold) - require.EqualValues(t, 15, c.Storage.ShardPoolSize) - require.EqualValues(t, 5*time.Second, c.Storage.PutRetryTimeout) require.EqualValues(t, true, c.Storage.IgnoreUninitedShards) err := engineconfig.IterateShards(&c.Storage, true, func(sc *shardconfig.ShardDetails) error { diff --git a/cmd/neofs-node/config/engine/storage.go b/cmd/neofs-node/config/engine/storage.go index 3441a0f96e..5fb6ed6f96 100644 --- a/cmd/neofs-node/config/engine/storage.go +++ b/cmd/neofs-node/config/engine/storage.go @@ -6,12 +6,6 @@ import ( shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" ) -const ( - // ShardPoolSizeDefault is the default value of routine pool size per-shard to - // process object PUT operations in a storage engine. - ShardPoolSizeDefault = 20 -) - // Storage contains configuration for the storage engine. type Storage struct { ShardPoolSize int `mapstructure:"shard_pool_size"` @@ -26,9 +20,6 @@ type Storage struct { // If some of fields are not set or have invalid values, they will be // set to default values. func (s *Storage) Normalize() { - if s.ShardPoolSize == 0 { - s.ShardPoolSize = ShardPoolSizeDefault - } for i := range s.ShardList { s.ShardList[i].Normalize(s.Default) } diff --git a/cmd/neofs-node/storage.go b/cmd/neofs-node/storage.go index fa11602a75..70d45d71d4 100644 --- a/cmd/neofs-node/storage.go +++ b/cmd/neofs-node/storage.go @@ -22,11 +22,9 @@ import ( func initLocalStorage(c *cfg) { ls := engine.New([]engine.Option{ - engine.WithShardPoolSize(uint32(c.appCfg.Storage.ShardPoolSize)), engine.WithErrorThreshold(uint32(c.appCfg.Storage.ShardROErrorThreshold)), engine.WithLogger(c.log), engine.WithIgnoreUninitedShards(c.appCfg.Storage.IgnoreUninitedShards), - engine.WithObjectPutRetryTimeout(c.appCfg.Storage.PutRetryTimeout), engine.WithContainersSource(c.cnrSrc), engine.WithMetrics(c.metricsCollector), }...) diff --git a/config/example/node.env b/config/example/node.env index 647c395251..67e0653de8 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -84,8 +84,6 @@ NEOFS_OBJECT_PUT_POOL_SIZE_REMOTE=100 NEOFS_OBJECT_SEARCH_POOL_SIZE=50 # Storage engine section -NEOFS_STORAGE_SHARD_POOL_SIZE=15 -NEOFS_STORAGE_PUT_RETRY_TIMEOUT=5s NEOFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100 NEOFS_STORAGE_IGNORE_UNINITED_SHARDS=true ## 0 shard diff --git a/config/example/node.json b/config/example/node.json index 7a9c5bbdb7..a39344efd0 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -121,9 +121,7 @@ } }, "storage": { - "shard_pool_size": 15, "shard_ro_error_threshold": 100, - "put_retry_timeout": "5s", "ignore_uninited_shards": true, "shards": [ { diff --git a/config/example/node.yaml b/config/example/node.yaml index 2756647130..49ee03433e 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -108,8 +108,6 @@ metadata: storage: # note: shard configuration can be omitted for relay node (see `node.relay`) - shard_pool_size: 15 # size of per-shard worker pools used for PUT operations - put_retry_timeout: 5s # object PUT retry timeout shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors) ignore_uninited_shards: true # do we need to ignore uninited shards (default: false, fail on any shard failure) diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index eb2f904d71..0a867fac45 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -142,10 +142,8 @@ Local storage engine configuration. | Parameter | Type | Default value | Description | |----------------------------|--------------------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. | | `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. | | `ignore_uninited_shards` | `bool` | `false` | Flag that specifies whether uninited shards should be ignored. | -| `put_retry_deadline` | `duration` | `0` | If an object cannot be PUT to storage, node tries to PUT it to the best shard for it (according to placement sorting) and only to it for this long before operation error is returned. Defalt value does not apply any retry policy at all. | | `shard_defaults` | [Shard config](#shards-config) | | Configuration for default values in shards. | | `shards` | [Shard config](#shards-config) | | Configuration for seprate shards. | diff --git a/pkg/local_object_storage/engine/bbr/bbr.go b/pkg/local_object_storage/engine/bbr/bbr.go new file mode 100644 index 0000000000..86dcd5ba73 --- /dev/null +++ b/pkg/local_object_storage/engine/bbr/bbr.go @@ -0,0 +1,120 @@ +package bbr + +import ( + "fmt" + "math" + "sync/atomic" + "time" +) + +// alias to distinguish from byte units. +type blocks = int64 + +// TODO: comments. +const ( + blockSize = 64 << 10 + minLimit = 256 << 20 / blockSize + initialSpeed = 100 << 20 / blockSize + increaseAlpha = 0.2 + decreaseAlpha = 0.05 + speedDeviationThreshold = 1 << 20 / blockSize + targetLatency = 0.03 // 30ms +) + +// TODO: docs. +type LoadLimiter struct { + speed atomic.Uint64 + limit atomic.Uint64 + inflight atomic.Int64 +} + +// TODO: docs. +func NewLoadLimiter() *LoadLimiter { + var res LoadLimiter + res.speed.Store(math.Float64bits(initialSpeed / blockSize)) + return &res +} + +// TODO: docs. +func (x *LoadLimiter) Write(sizeBytes int64, writeFn func() error) (bool, error) { + if sizeBytes < 0 { + panic(fmt.Sprintf("negative size %d", sizeBytes)) + } + return x.write(calculateBlocks(sizeBytes), writeFn) +} + +func (x *LoadLimiter) write(size blocks, writeFn func() error) (bool, error) { + if !x.acquire(size) { + return true, nil + } + + startTime := time.Now() + + err := writeFn() + + duration := time.Since(startTime) + + x.release(size) + + if err != nil { + return false, err + } + + x.handleMeasurement(size, duration) + + return false, nil +} + +func (x *LoadLimiter) acquire(size blocks) bool { + newInflight := x.inflight.Add(size) + limit := max(x.limit.Load(), minLimit) + + if newInflight > size && uint64(newInflight) > limit { + x.release(size) + return false + } + + return true +} + +func (x *LoadLimiter) release(size blocks) { + x.inflight.Add(-size) +} + +func (x *LoadLimiter) handleMeasurement(size blocks, duration time.Duration) { + if duration <= 0 { + return + } + + opSpeed := float64(size) / duration.Seconds() + + prevSpeedBits := x.speed.Load() + prevSpeed := math.Float64frombits(prevSpeedBits) + + if math.Abs(opSpeed-prevSpeed) < speedDeviationThreshold { + return + } + + var alpha float64 + if opSpeed > prevSpeed { + alpha = increaseAlpha + } else { + alpha = decreaseAlpha + } + + newSpeed := (prevSpeed * (1 - alpha)) + (opSpeed * alpha) + + if !x.speed.CompareAndSwap(prevSpeedBits, math.Float64bits(newSpeed)) { + return + } + + limit := uint64(math.Floor(newSpeed * targetLatency)) + x.limit.Store(limit) +} + +func calculateBlocks(sizeBytes int64) blocks { + if sizeBytes <= blockSize { + return 1 + } + return (sizeBytes + blockSize - 1) / blockSize +} diff --git a/pkg/local_object_storage/engine/bbr/bbr_bench_test.go b/pkg/local_object_storage/engine/bbr/bbr_bench_test.go new file mode 100644 index 0000000000..5902ba4c29 --- /dev/null +++ b/pkg/local_object_storage/engine/bbr/bbr_bench_test.go @@ -0,0 +1,15 @@ +package bbr_test + +import ( + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine/bbr" +) + +func BenchmarkExps(b *testing.B) { + loadLimiter := bbr.NewLoadLimiter() + + for b.Loop() { + _, _ = loadLimiter.Write(64<<20, func() error { return nil }) + } +} diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index a014d5a50d..40bd71ddf5 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -58,7 +58,7 @@ func (e *StorageEngine) Close() error { } // closes all shards. Never returns an error, shard errors are logged. -func (e *StorageEngine) close(releasePools bool) error { +func (e *StorageEngine) close() error { e.mtx.RLock() defer e.mtx.RUnlock() @@ -69,9 +69,6 @@ func (e *StorageEngine) close(releasePools bool) error { zap.Error(err), ) } - if releasePools { - close(sh.putCh) - } } return nil @@ -100,7 +97,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error { return e.open() } } else if prevErr == nil { // ok -> block - return e.close(errors.Is(err, errClosed)) + return e.close() } // otherwise do nothing @@ -137,7 +134,6 @@ func (e *StorageEngine) ResumeExecution() error { type ReConfiguration struct { errorsThreshold uint32 - shardPoolSize uint32 shards map[string][]shard.Option // meta path -> shard opts } @@ -148,11 +144,6 @@ func (rCfg *ReConfiguration) SetErrorsThreshold(errorsThreshold uint32) { rCfg.errorsThreshold = errorsThreshold } -// SetShardPoolSize sets a size of worker pool for each shard. -func (rCfg *ReConfiguration) SetShardPoolSize(shardPoolSize uint32) { - rCfg.shardPoolSize = shardPoolSize -} - // AddShard adds a shard for the reconfiguration. // Shard identifier is calculated from paths used in blobstor. func (rCfg *ReConfiguration) AddShard(id string, opts []shard.Option) { @@ -169,20 +160,6 @@ func (rCfg *ReConfiguration) AddShard(id string, opts []shard.Option) { // Reload reloads StorageEngine's configuration in runtime. func (e *StorageEngine) Reload(rcfg ReConfiguration) error { - e.mtx.Lock() - if rcfg.shardPoolSize != e.shardPoolSize { - e.shardPoolSize = rcfg.shardPoolSize - for id, sh := range e.shards { - close(sh.putCh) - sh.putCh = make(chan putTask) - for range e.shardPoolSize { - go sh.shardPutThread() - } - e.shards[id] = sh - } - } - e.mtx.Unlock() - type reloadInfo struct { sh *shard.Shard opts []shard.Option diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index f1fb025a95..bd8bcebe61 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -5,11 +5,11 @@ import ( "io" "sync" "sync/atomic" - "time" iec "github.com/nspcc-dev/neofs-node/internal/ec" containercore "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine/bbr" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" @@ -57,19 +57,12 @@ type shardInterface interface { ReadECPartHeader(cid.ID, oid.ID, iec.PartInfo, []byte) (int, error) } -type putTask struct { - addr oid.Address - obj *object.Object - objBin []byte - retCh chan error -} - type shardWrapper struct { errorCount *atomic.Uint32 *shard.Shard - shardIface shardInterface // TODO: make Shard a shardInterface - putCh chan putTask - engine *StorageEngine + shardIface shardInterface // TODO: make Shard a shardInterface + loadLimiter *bbr.LoadLimiter + engine *StorageEngine } type setModeRequest struct { @@ -228,9 +221,6 @@ type cfg struct { metrics MetricRegister - objectPutTimeout time.Duration - shardPoolSize uint32 - containerSource containercore.Source isIgnoreUninitedShards bool @@ -239,8 +229,6 @@ type cfg struct { func defaultCfg() *cfg { return &cfg{ log: zap.L(), - - shardPoolSize: 20, } } @@ -276,13 +264,6 @@ func WithMetrics(v MetricRegister) Option { } } -// WithShardPoolSize returns option to specify size of worker pool for each shard. -func WithShardPoolSize(sz uint32) Option { - return func(c *cfg) { - c.shardPoolSize = sz - } -} - // WithErrorThreshold returns an option to specify size amount of errors after which // shard is moved to read-only mode. func WithErrorThreshold(sz uint32) Option { @@ -304,12 +285,3 @@ func WithIgnoreUninitedShards(flag bool) Option { c.isIgnoreUninitedShards = flag } } - -// WithObjectPutRetryTimeout return an option to specify time for object PUT operation. -// It does not stop any disk operation, only affects retryes policy. Zero value -// is acceptable and means no retry on any shard. -func WithObjectPutRetryTimeout(t time.Duration) Option { - return func(c *cfg) { - c.objectPutTimeout = t - } -} diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index c3ce9fa8a5..dfebb4e6fd 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -39,7 +39,7 @@ func (s epochState) CurrentEpoch() uint64 { } func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine { - engine := New(WithObjectPutRetryTimeout(100 * time.Millisecond)) + engine := New() for _, s := range shards { err := engine.addShard(s) diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index a261a0ede3..4d658882e9 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -27,7 +27,7 @@ func newEngine(t testing.TB, dir string, opts ...Option) (*StorageEngine, string dir = t.TempDir() } - e := New(append([]Option{WithShardPoolSize(1)}, opts...)...) + e := New(opts...) var ids [2]common.ID var err error diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 7ed2dc0ed6..8a157688ea 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -26,7 +26,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng dir = t.TempDir() e = New( WithLogger(zaptest.NewLogger(t)), - WithShardPoolSize(uint32(objPerShard))) + ) err error ids = make([]common.ID, shardNum) ) diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 4b091cf08e..2241d0ca5a 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "time" iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" @@ -102,61 +101,59 @@ func (e *StorageEngine) Put(ctx context.Context, obj *object.Object, objBin []by // putToShard puts object to sh. // Returns error from shard put or errOverloaded (when shard pool can't accept // the task) or errExists (if object is already stored there). -func (e *StorageEngine) putToShard(ctx context.Context, sh shardWrapper, addr oid.Address, obj *object.Object, objBin []byte) error { - var ( - exitCh = make(chan error) - pCtx, pCancel = context.WithTimeout(ctx, e.objectPutTimeout+time.Millisecond) // 1ms to avoid zero value. - ) - defer pCancel() +func (e *StorageEngine) putToShard(_ context.Context, sh shardWrapper, addr oid.Address, obj *object.Object, objBin []byte) error { + var size int64 + if objBin != nil { + size = int64(len(objBin)) + } else { + // TODO: support direct size getter + size = int64(obj.ProtoMessage().MarshaledSize()) + } - select { - case sh.putCh <- putTask{addr: addr, obj: obj, objBin: objBin, retCh: exitCh}: - case <-pCtx.Done(): + overload, err := sh.loadLimiter.Write(size, func() error { + return sh.putObject(addr, obj, objBin) + }) + if overload { return errOverloaded } - - err := <-exitCh return err } -func (sh *shardWrapper) shardPutThread() { +func (sh *shardWrapper) putObject(addr oid.Address, obj *object.Object, objBin []byte) error { var id = sh.ID().String() - for t := range sh.putCh { - exists, err := sh.Exists(t.addr, false) - if err != nil { - sh.engine.log.Warn("object put: check object existence", - zap.Stringer("addr", t.addr), - zap.String("shard", id), - zap.Error(err)) + exists, err := sh.Exists(addr, false) + if err != nil { + sh.engine.log.Warn("object put: check object existence", + zap.Stringer("addr", addr), + zap.String("shard", id), + zap.Error(err)) - if shard.IsErrObjectExpired(err) { - // object is already found but - // expired => do nothing with it - err = errExists - } - t.retCh <- err - continue // this is not ErrAlreadyRemoved error so we can go to the next task + if shard.IsErrObjectExpired(err) { + // object is already found but + // expired => do nothing with it + err = errExists } + return err + } - if exists { - t.retCh <- errExists - continue - } + if exists { + return errExists + } - err = sh.Put(t.obj, t.objBin) - if err != nil { - if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, common.ErrReadOnly) || - errors.Is(err, common.ErrNoSpace) { - sh.engine.log.Warn("could not put object to shard", - zap.String("shard_id", id), - zap.Error(err)) - } else { - sh.engine.reportShardError(*sh, "could not put object to shard", err) - } + err = sh.Put(obj, objBin) + if err != nil { + if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, common.ErrReadOnly) || + errors.Is(err, common.ErrNoSpace) { + sh.engine.log.Warn("could not put object to shard", + zap.String("shard_id", id), + zap.Error(err)) + } else { + sh.engine.reportShardError(*sh, "could not put object to shard", err) } - t.retCh <- err } + + return err } // broadcastObject stores object on ALL shards to ensure it's available everywhere. diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 26cd6d2b69..9dc67697d0 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -9,6 +9,7 @@ import ( "github.com/nspcc-dev/hrw/v2" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine/bbr" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" @@ -129,14 +130,10 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error { } var shw = shardWrapper{ - errorCount: new(atomic.Uint32), - Shard: sh, - putCh: make(chan putTask), - engine: e, - } - - for range e.shardPoolSize { - go shw.shardPutThread() + errorCount: new(atomic.Uint32), + Shard: sh, + loadLimiter: bbr.NewLoadLimiter(), + engine: e, } e.shards[strID] = shw @@ -176,7 +173,6 @@ func (e *StorageEngine) removeShards(ids ...string) { zap.Error(err), ) } - close(sh.putCh) } }