Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ Changelog for NeoFS Node
- Optimized GET/HEAD/SEARCH/RANGE request signing when forwarding (#4008, #4021)
- Optimized ranged GET in FSTree (#4016)
- Split object assembly now prefetches child objects concurrently while streaming (#4014, #4042)
- Replaced limited shard PUT queue with an adaptive load limiter (#XXX)

### Removed

### Updated
- NeoGo dependency to 0.120.0 (#4018)

### Updating from v0.53.0
Drop SN `storage.shard_pool_size` and `storage.put_retry_timeout` options, they are no-op now.

## [0.53.0] - 2026-05-21 - Sido

Expand Down
1 change: 0 additions & 1 deletion cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,6 @@ func (c *cfg) configWatcher(ctx context.Context) {
for _, optsWithID := range c.shardOpts() {
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts)
}
rcfg.SetShardPoolSize(uint32(c.appCfg.Storage.ShardPoolSize))

err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions cmd/neofs-node/config/engine/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ func TestEngineSection(t *testing.T) {
require.False(t, handlerCalled)

require.EqualValues(t, 0, empty.Storage.ShardROErrorThreshold)
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, empty.Storage.ShardPoolSize)
require.EqualValues(t, mode.ReadWrite, empty.Storage.Default.Mode)
require.Zero(t, empty.Storage.PutRetryTimeout)
})

const path = "../../../../config/example/node"
Expand All @@ -44,8 +42,6 @@ func TestEngineSection(t *testing.T) {
num := 0

require.EqualValues(t, 100, c.Storage.ShardROErrorThreshold)
require.EqualValues(t, 15, c.Storage.ShardPoolSize)
require.EqualValues(t, 5*time.Second, c.Storage.PutRetryTimeout)
require.EqualValues(t, true, c.Storage.IgnoreUninitedShards)

err := engineconfig.IterateShards(&c.Storage, true, func(sc *shardconfig.ShardDetails) error {
Expand Down
9 changes: 0 additions & 9 deletions cmd/neofs-node/config/engine/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@ import (
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
)

const (
// ShardPoolSizeDefault is the default value of routine pool size per-shard to
// process object PUT operations in a storage engine.
ShardPoolSizeDefault = 20
)

// Storage contains configuration for the storage engine.
type Storage struct {
ShardPoolSize int `mapstructure:"shard_pool_size"`
Expand All @@ -26,9 +20,6 @@ type Storage struct {
// If some of fields are not set or have invalid values, they will be
// set to default values.
func (s *Storage) Normalize() {
if s.ShardPoolSize == 0 {
s.ShardPoolSize = ShardPoolSizeDefault
}
for i := range s.ShardList {
s.ShardList[i].Normalize(s.Default)
}
Expand Down
2 changes: 0 additions & 2 deletions cmd/neofs-node/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (

func initLocalStorage(c *cfg) {
ls := engine.New([]engine.Option{
engine.WithShardPoolSize(uint32(c.appCfg.Storage.ShardPoolSize)),
engine.WithErrorThreshold(uint32(c.appCfg.Storage.ShardROErrorThreshold)),
engine.WithLogger(c.log),
engine.WithIgnoreUninitedShards(c.appCfg.Storage.IgnoreUninitedShards),
engine.WithObjectPutRetryTimeout(c.appCfg.Storage.PutRetryTimeout),
engine.WithContainersSource(c.cnrSrc),
engine.WithMetrics(c.metricsCollector),
}...)
Expand Down
2 changes: 0 additions & 2 deletions config/example/node.env
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ NEOFS_OBJECT_PUT_POOL_SIZE_REMOTE=100
NEOFS_OBJECT_SEARCH_POOL_SIZE=50

# Storage engine section
NEOFS_STORAGE_SHARD_POOL_SIZE=15
NEOFS_STORAGE_PUT_RETRY_TIMEOUT=5s
NEOFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
NEOFS_STORAGE_IGNORE_UNINITED_SHARDS=true
## 0 shard
Expand Down
2 changes: 0 additions & 2 deletions config/example/node.json
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,7 @@
}
},
"storage": {
"shard_pool_size": 15,
"shard_ro_error_threshold": 100,
"put_retry_timeout": "5s",
"ignore_uninited_shards": true,
"shards": [
{
Expand Down
2 changes: 0 additions & 2 deletions config/example/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ metadata:

storage:
# note: shard configuration can be omitted for relay node (see `node.relay`)
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
put_retry_timeout: 5s # object PUT retry timeout
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
ignore_uninited_shards: true # do we need to ignore uninited shards (default: false, fail on any shard failure)

Expand Down
2 changes: 0 additions & 2 deletions docs/storage-node-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,8 @@ Local storage engine configuration.

| Parameter | Type | Default value | Description |
|----------------------------|--------------------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
| `ignore_uninited_shards` | `bool` | `false` | Flag that specifies whether uninited shards should be ignored. |
| `put_retry_deadline` | `duration` | `0` | If an object cannot be PUT to storage, node tries to PUT it to the best shard for it (according to placement sorting) and only to it for this long before operation error is returned. Defalt value does not apply any retry policy at all. |
| `shard_defaults` | [Shard config](#shards-config) | | Configuration for default values in shards. |
| `shards` | [Shard config](#shards-config) | | Configuration for seprate shards. |

Expand Down
120 changes: 120 additions & 0 deletions pkg/local_object_storage/engine/bbr/bbr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package bbr

import (
"fmt"
"math"
"sync/atomic"
"time"
)

// alias to distinguish from byte units.
type blocks = int64

// TODO: comments.
const (
blockSize = 64 << 10
minLimit = 256 << 20 / blockSize
initialSpeed = 100 << 20 / blockSize
increaseAlpha = 0.2
decreaseAlpha = 0.05
speedDeviationThreshold = 1 << 20 / blockSize
targetLatency = 0.03 // 30ms
)

// TODO: docs.
type LoadLimiter struct {
speed atomic.Uint64
limit atomic.Uint64
inflight atomic.Int64
}

// TODO: docs.
func NewLoadLimiter() *LoadLimiter {
var res LoadLimiter
res.speed.Store(math.Float64bits(initialSpeed / blockSize))
return &res
}

// TODO: docs.
func (x *LoadLimiter) Write(sizeBytes int64, writeFn func() error) (bool, error) {
if sizeBytes < 0 {
panic(fmt.Sprintf("negative size %d", sizeBytes))
}
return x.write(calculateBlocks(sizeBytes), writeFn)
}

func (x *LoadLimiter) write(size blocks, writeFn func() error) (bool, error) {
if !x.acquire(size) {
return true, nil
}

startTime := time.Now()

err := writeFn()

duration := time.Since(startTime)

x.release(size)

if err != nil {
return false, err
}

x.handleMeasurement(size, duration)

return false, nil
}

func (x *LoadLimiter) acquire(size blocks) bool {
newInflight := x.inflight.Add(size)
limit := max(x.limit.Load(), minLimit)

if newInflight > size && uint64(newInflight) > limit {
x.release(size)
return false
}

return true
}

func (x *LoadLimiter) release(size blocks) {
x.inflight.Add(-size)
}

func (x *LoadLimiter) handleMeasurement(size blocks, duration time.Duration) {
if duration <= 0 {
return
}

opSpeed := float64(size) / duration.Seconds()

prevSpeedBits := x.speed.Load()
prevSpeed := math.Float64frombits(prevSpeedBits)

if math.Abs(opSpeed-prevSpeed) < speedDeviationThreshold {
return
}

var alpha float64
if opSpeed > prevSpeed {
alpha = increaseAlpha
} else {
alpha = decreaseAlpha
}

newSpeed := (prevSpeed * (1 - alpha)) + (opSpeed * alpha)

if !x.speed.CompareAndSwap(prevSpeedBits, math.Float64bits(newSpeed)) {
return
}

limit := uint64(math.Floor(newSpeed * targetLatency))
x.limit.Store(limit)
}

func calculateBlocks(sizeBytes int64) blocks {
if sizeBytes <= blockSize {
return 1
}
return (sizeBytes + blockSize - 1) / blockSize
}
15 changes: 15 additions & 0 deletions pkg/local_object_storage/engine/bbr/bbr_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package bbr_test

import (
"testing"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine/bbr"
)

func BenchmarkExps(b *testing.B) {
loadLimiter := bbr.NewLoadLimiter()

for b.Loop() {
_, _ = loadLimiter.Write(64<<20, func() error { return nil })
}
}
27 changes: 2 additions & 25 deletions pkg/local_object_storage/engine/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (e *StorageEngine) Close() error {
}

// closes all shards. Never returns an error, shard errors are logged.
func (e *StorageEngine) close(releasePools bool) error {
func (e *StorageEngine) close() error {
e.mtx.RLock()
defer e.mtx.RUnlock()

Expand All @@ -69,9 +69,6 @@ func (e *StorageEngine) close(releasePools bool) error {
zap.Error(err),
)
}
if releasePools {
close(sh.putCh)
}
}

return nil
Expand Down Expand Up @@ -100,7 +97,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error {
return e.open()
}
} else if prevErr == nil { // ok -> block
return e.close(errors.Is(err, errClosed))
return e.close()
}

// otherwise do nothing
Expand Down Expand Up @@ -137,7 +134,6 @@ func (e *StorageEngine) ResumeExecution() error {

type ReConfiguration struct {
errorsThreshold uint32
shardPoolSize uint32

shards map[string][]shard.Option // meta path -> shard opts
}
Expand All @@ -148,11 +144,6 @@ func (rCfg *ReConfiguration) SetErrorsThreshold(errorsThreshold uint32) {
rCfg.errorsThreshold = errorsThreshold
}

// SetShardPoolSize sets a size of worker pool for each shard.
func (rCfg *ReConfiguration) SetShardPoolSize(shardPoolSize uint32) {
rCfg.shardPoolSize = shardPoolSize
}

// AddShard adds a shard for the reconfiguration.
// Shard identifier is calculated from paths used in blobstor.
func (rCfg *ReConfiguration) AddShard(id string, opts []shard.Option) {
Expand All @@ -169,20 +160,6 @@ func (rCfg *ReConfiguration) AddShard(id string, opts []shard.Option) {

// Reload reloads StorageEngine's configuration in runtime.
func (e *StorageEngine) Reload(rcfg ReConfiguration) error {
e.mtx.Lock()
if rcfg.shardPoolSize != e.shardPoolSize {
e.shardPoolSize = rcfg.shardPoolSize
for id, sh := range e.shards {
close(sh.putCh)
sh.putCh = make(chan putTask)
for range e.shardPoolSize {
go sh.shardPutThread()
}
e.shards[id] = sh
}
}
e.mtx.Unlock()

type reloadInfo struct {
sh *shard.Shard
opts []shard.Option
Expand Down
Loading
Loading