Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Changelog for NeoFS Node
- Optimized EC GET request execution (#3996)
- Optimized GET/HEAD/SEARCH/RANGE request signing when forwarding (#4008, #4021)
- Optimized ranged GET in FSTree (#4016)
- Split object assembly now prefetches child objects concurrently while streaming (#4014)

### Removed

Expand Down
15 changes: 15 additions & 0 deletions pkg/services/object/get/assemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,21 @@ func (exec *execCtx) initFromChild(obj oid.ID) (*oid.ID, []oid.ID) {
func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []object.Range, checkRight bool) {
withRng := len(rngs) > 0 && exec.ctxRange() != nil

if len(children) > 1 && prefetchWindow > 1 {
at := func(i int) (oid.ID, *object.Range, bool) {
if i >= len(children) {
return oid.ID{}, nil, false
}
if withRng {
return children[i], &rngs[i], true
}
return children[i], nil, true
}

exec.statusError = exec.streamChildrenPipelined(at, !withRng && checkRight)
return
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What worries me a bit is that everything down below effectively becomes a dead code. Most likely it can be dropped in future.

for i := range children {
var r *object.Range
if withRng {
Expand Down
120 changes: 120 additions & 0 deletions pkg/services/object/get/assemble_pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package getsvc

import (
"context"
"io"
"sync"

"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// prefetchWindow is the split-assembly look-ahead: one child streams to the
// client while up to prefetchWindow-1 others are fetched concurrently.
// value <=1 disables pipelining and falls back to the serial path.
var prefetchWindow = 2
Comment thread
roman-khimov marked this conversation as resolved.

// childSlot carries one child's stream and the producer's result.
type childSlot struct {
ready chan struct{}
rc io.ReadCloser
hdr *object.Object
se statusError
}

// streamChildrenPipelined fetches children concurrently, up to prefetchWindow in flight,
// and writes their payloads to the client writer strictly in index order. at(i)
// yields the i-th child ID and its sub-range; ok=false means there are no more
// children.
func (exec *execCtx) streamChildrenPipelined(at func(i int) (oid.ID, *object.Range, bool), checkHdr bool) statusError {
ring := make([]*childSlot, prefetchWindow)

gCtx, cancel := context.WithCancel(exec.context())
defer cancel()

var wg sync.WaitGroup
launch := func(i int) bool {
id, rng, ok := at(i)
if !ok {
return false
}

s := &childSlot{ready: make(chan struct{})}
ring[i%len(ring)] = s
wg.Go(func() {
defer close(s.ready)
s.hdr, s.rc, s.se = exec.fetchChildStream(gCtx, id, rng)
})
return true
}

launched := 0
for launched < prefetchWindow && launch(launched) {
launched++
}
if launched == 0 {
return statusError{status: statusOK}
}
if launched < len(ring) {
ring = ring[:launched]
}

buf := bufferPool.Get()
defer bufferPool.Put(buf)
dst := exec.prm.objWriter

result := statusError{status: statusOK}
for i := 0; i < launched; i++ {
if se := exec.streamChildSlot(ring[i%len(ring)], dst, *buf.(*[]byte), checkHdr); se.status != statusOK {
result = se
break
}

if launch(launched) {
launched++
}
}

cancel()
closeChildSlots(ring)
wg.Wait()
return result
}

func (exec *execCtx) streamChildSlot(s *childSlot, dst ChunkWriter, buf []byte, checkHdr bool) statusError {
<-s.ready

if s.se.status != statusOK {
return s.se
}

if checkHdr && s.hdr != nil && !exec.isChild(s.hdr) {
exec.log.Debug("parent address in child object differs")
}

_, copyErr := copyPayloadStreamBuffer(dst, s.rc, buf)
closeErr := s.rc.Close()
s.rc = nil

if copyErr == nil && closeErr != nil {
copyErr = closeErr
}
if copyErr != nil {
return statusError{status: statusUndefined, err: copyErr}
}

return statusError{status: statusOK}
}

func closeChildSlots(slots []*childSlot) {
for _, s := range slots {
if s == nil {
continue
}
<-s.ready
if s.rc != nil {
_ = s.rc.Close()
s.rc = nil
}
}
}
228 changes: 228 additions & 0 deletions pkg/services/object/get/assemble_pipeline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package getsvc

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

type splitScene struct {
svc *Service
addr oid.Address
payload []byte
childIDs []oid.ID
c1, c2 *testClient
}

const splitChildPayloadLen = 10

func buildSplitScene(t testing.TB, ver, n int, delay time.Duration) *splitScene {
idCnr := cidtest.ID()

addr := oidtest.Address()
addr.SetContainer(idCnr)
addr.SetObject(oidtest.ID())

srcObj := generateObject(addr, nil, nil)

ns, as := testNodeMatrix(t, []int{2})

children, childIDs, payload := generateChain(n, idCnr)
srcObj.SetPayload(payload)
srcObj.SetPayloadSize(uint64(len(payload)))
children[len(children)-1].SetParent(srcObj)

linkID := oidtest.ID()
linkAddr := oid.NewAddress(idCnr, linkID)

splitInfo := object.NewSplitInfo()
splitInfo.SetLink(linkID)

var linkObj *object.Object
switch ver {
case 1:
splitInfo.SetSplitID(object.NewSplitID())
linkObj = generateObject(linkAddr, nil, nil, childIDs...)
case 2:
splitInfo.SetFirstPart(childIDs[0])
linkObj = generateObject(linkAddr, nil, nil)
linkObj.SetFirstID(childIDs[0])
linkChildren := make([]object.MeasuredObject, len(children))
for i := range children {
children[i].SetParentID(addr.Object())
linkChildren[i].SetObjectID(children[i].GetID())
linkChildren[i].SetObjectSize(uint32(children[i].PayloadSize()))
}
var link object.Link
link.SetObjects(linkChildren)
linkObj.WriteLink(link)
}
linkObj.SetParentID(addr.Object())
linkObj.SetParent(srcObj)

c1 := newTestClient()
c2 := newTestClient()
c2.delay = delay

c1.addResult(addr, nil, errors.New("any error"))
c2.addResult(addr, nil, object.NewSplitInfoError(splitInfo))

c1.addResult(linkAddr, nil, errors.New("any error"))
c2.addResult(linkAddr, linkObj, nil)

vectors := map[oid.Address][][]netmap.NodeInfo{
addr: ns,
linkAddr: ns,
}

for i := range children {
var childAddr oid.Address
childAddr.SetContainer(idCnr)
childAddr.SetObject(childIDs[i])
c1.addResult(childAddr, nil, errors.New("any error"))
c2.addResult(childAddr, children[i], nil)
vectors[childAddr] = ns
}

svc := &Service{cfg: new(cfg)}
svc.log = zap.NewNop()
svc.localStorage = newTestStorage()
svc.neoFSNet = &testNeoFS{vectors: vectors}
svc.clientCache = &testClientCache{
clients: map[string]*testClient{
as[0][0]: c1,
as[0][1]: c2,
},
}

return &splitScene{svc: svc, addr: addr, payload: payload, childIDs: childIDs, c1: c1, c2: c2}
}

func (sc *splitScene) childAddr(i int) oid.Address {
return oid.NewAddress(sc.addr.Container(), sc.childIDs[i])
}

func getInto(ctx context.Context, svc *Service, addr oid.Address, w ObjectWriter) error {
var p Prm
p.SetObjectWriter(w)
p.common = new(util.CommonPrm).WithLocalOnly(false)
p.WithAddress(addr)
return svc.Get(ctx, p)
}

func getFull(ctx context.Context, svc *Service, addr oid.Address) (*SimpleObjectWriter, error) {
w := NewSimpleObjectWriter()
return w, getInto(ctx, svc, addr, w)
}

func setPrefetchWindow(t testing.TB, w int) {
old := prefetchWindow
prefetchWindow = w
t.Cleanup(func() { prefetchWindow = old })
}

func TestAssembleSplitPipelined(t *testing.T) {
ctx := context.Background()

for _, sv := range []struct {
name string
ver int
}{
{"V1", 1},
{"V2", 2},
} {
t.Run(sv.name, func(t *testing.T) {
t.Run("ordered output across windows", func(t *testing.T) {
for _, win := range []int{1, 2, 3, 8} {
t.Run(fmt.Sprintf("window=%d", win), func(t *testing.T) {
setPrefetchWindow(t, win)

sc := buildSplitScene(t, sv.ver, 8, 2*time.Millisecond)

w, err := getFull(ctx, sc.svc, sc.addr)
require.NoError(t, err)
require.Equal(t, sc.payload, w.Object().Payload())
})
}
})

t.Run("remote failure discards later children", func(t *testing.T) {
const n = 6
const failIdx = 3

sc := buildSplitScene(t, sv.ver, n, time.Millisecond)
sc.c2.addResult(sc.childAddr(failIdx), nil, apistatus.ErrObjectNotFound)

w, err := getFull(ctx, sc.svc, sc.addr)
require.ErrorIs(t, err, apistatus.ErrObjectNotFound)
require.Equal(t, sc.payload[:failIdx*splitChildPayloadLen], w.Object().Payload())
})

t.Run("virtual child is reported, not assembled recursively", func(t *testing.T) {
setPrefetchWindow(t, 3)

const failIdx = 1
sc := buildSplitScene(t, sv.ver, 6, time.Millisecond)
sc.c2.addResult(sc.childAddr(failIdx), nil, object.NewSplitInfoError(object.NewSplitInfo()))

w, err := getFull(ctx, sc.svc, sc.addr)
var splitErr *object.SplitInfoError
require.ErrorAs(t, err, &splitErr)
require.Equal(t, sc.payload[:failIdx*splitChildPayloadLen], w.Object().Payload())
})

t.Run("client write failure aborts cleanly", func(t *testing.T) {
setPrefetchWindow(t, 4)

sc := buildSplitScene(t, sv.ver, 8, 2*time.Millisecond)
w := &trackingWriter{failAfterChunks: 3, err: errors.New("client gone")}

require.Error(t, getInto(ctx, sc.svc, sc.addr, w))
})

t.Run("context cancel mid-stream", func(t *testing.T) {
sc := buildSplitScene(t, sv.ver, 16, 20*time.Millisecond)

cctx, cancel := context.WithCancel(ctx)
go func() {
time.Sleep(30 * time.Millisecond)
cancel()
}()

_, err := getFull(cctx, sc.svc, sc.addr)
require.Error(t, err)
})
})
}
}

func BenchmarkAssembleSplitObject(b *testing.B) {
ctx := context.Background()

for _, win := range []int{1, 2, 4} {
b.Run(fmt.Sprintf("window=%d", win), func(b *testing.B) {
setPrefetchWindow(b, win)

sc := buildSplitScene(b, 1, 16, time.Millisecond)

b.ReportAllocs()
b.ResetTimer()
for range b.N {
_, err := getFull(ctx, sc.svc, sc.addr)
require.NoError(b, err)
}
})
}
}
Loading
Loading