diff --git a/CHANGELOG.md b/CHANGELOG.md index eebf6d3e22..1b340f104e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Changelog for NeoFS Node - Optimized EC GET request execution (#3996) - Optimized GET/HEAD/SEARCH/RANGE request signing when forwarding (#4008, #4021) - Optimized ranged GET in FSTree (#4016) +- Split object assembly now prefetches child objects concurrently while streaming (#4014) ### Removed diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index 531ead9e34..5eab6d95a2 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -159,6 +159,21 @@ func (exec *execCtx) initFromChild(obj oid.ID) (*oid.ID, []oid.ID) { func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []object.Range, checkRight bool) { withRng := len(rngs) > 0 && exec.ctxRange() != nil + if len(children) > 1 && prefetchWindow > 1 { + at := func(i int) (oid.ID, *object.Range, bool) { + if i >= len(children) { + return oid.ID{}, nil, false + } + if withRng { + return children[i], &rngs[i], true + } + return children[i], nil, true + } + + exec.statusError = exec.streamChildrenPipelined(at, !withRng && checkRight) + return + } + for i := range children { var r *object.Range if withRng { diff --git a/pkg/services/object/get/assemble_pipeline.go b/pkg/services/object/get/assemble_pipeline.go new file mode 100644 index 0000000000..66488fa1c0 --- /dev/null +++ b/pkg/services/object/get/assemble_pipeline.go @@ -0,0 +1,120 @@ +package getsvc + +import ( + "context" + "io" + "sync" + + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" +) + +// prefetchWindow is the split-assembly look-ahead: one child streams to the +// client while up to prefetchWindow-1 others are fetched concurrently. +// value <=1 disables pipelining and falls back to the serial path. +var prefetchWindow = 2 + +// childSlot carries one child's stream and the producer's result. +type childSlot struct { + ready chan struct{} + rc io.ReadCloser + hdr *object.Object + se statusError +} + +// streamChildrenPipelined fetches children concurrently, up to prefetchWindow in flight, +// and writes their payloads to the client writer strictly in index order. at(i) +// yields the i-th child ID and its sub-range; ok=false means there are no more +// children. +func (exec *execCtx) streamChildrenPipelined(at func(i int) (oid.ID, *object.Range, bool), checkHdr bool) statusError { + ring := make([]*childSlot, prefetchWindow) + + gCtx, cancel := context.WithCancel(exec.context()) + defer cancel() + + var wg sync.WaitGroup + launch := func(i int) bool { + id, rng, ok := at(i) + if !ok { + return false + } + + s := &childSlot{ready: make(chan struct{})} + ring[i%len(ring)] = s + wg.Go(func() { + defer close(s.ready) + s.hdr, s.rc, s.se = exec.fetchChildStream(gCtx, id, rng) + }) + return true + } + + launched := 0 + for launched < prefetchWindow && launch(launched) { + launched++ + } + if launched == 0 { + return statusError{status: statusOK} + } + if launched < len(ring) { + ring = ring[:launched] + } + + buf := bufferPool.Get() + defer bufferPool.Put(buf) + dst := exec.prm.objWriter + + result := statusError{status: statusOK} + for i := 0; i < launched; i++ { + if se := exec.streamChildSlot(ring[i%len(ring)], dst, *buf.(*[]byte), checkHdr); se.status != statusOK { + result = se + break + } + + if launch(launched) { + launched++ + } + } + + cancel() + closeChildSlots(ring) + wg.Wait() + return result +} + +func (exec *execCtx) streamChildSlot(s *childSlot, dst ChunkWriter, buf []byte, checkHdr bool) statusError { + <-s.ready + + if s.se.status != statusOK { + return s.se + } + + if checkHdr && s.hdr != nil && !exec.isChild(s.hdr) { + exec.log.Debug("parent address in child object differs") + } + + _, copyErr := copyPayloadStreamBuffer(dst, s.rc, buf) + closeErr := s.rc.Close() + s.rc = nil + + if copyErr == nil && closeErr != nil { + copyErr = closeErr + } + if copyErr != nil { + return statusError{status: statusUndefined, err: copyErr} + } + + return statusError{status: statusOK} +} + +func closeChildSlots(slots []*childSlot) { + for _, s := range slots { + if s == nil { + continue + } + <-s.ready + if s.rc != nil { + _ = s.rc.Close() + s.rc = nil + } + } +} diff --git a/pkg/services/object/get/assemble_pipeline_test.go b/pkg/services/object/get/assemble_pipeline_test.go new file mode 100644 index 0000000000..614227b40c --- /dev/null +++ b/pkg/services/object/get/assemble_pipeline_test.go @@ -0,0 +1,228 @@ +package getsvc + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +type splitScene struct { + svc *Service + addr oid.Address + payload []byte + childIDs []oid.ID + c1, c2 *testClient +} + +const splitChildPayloadLen = 10 + +func buildSplitScene(t testing.TB, ver, n int, delay time.Duration) *splitScene { + idCnr := cidtest.ID() + + addr := oidtest.Address() + addr.SetContainer(idCnr) + addr.SetObject(oidtest.ID()) + + srcObj := generateObject(addr, nil, nil) + + ns, as := testNodeMatrix(t, []int{2}) + + children, childIDs, payload := generateChain(n, idCnr) + srcObj.SetPayload(payload) + srcObj.SetPayloadSize(uint64(len(payload))) + children[len(children)-1].SetParent(srcObj) + + linkID := oidtest.ID() + linkAddr := oid.NewAddress(idCnr, linkID) + + splitInfo := object.NewSplitInfo() + splitInfo.SetLink(linkID) + + var linkObj *object.Object + switch ver { + case 1: + splitInfo.SetSplitID(object.NewSplitID()) + linkObj = generateObject(linkAddr, nil, nil, childIDs...) + case 2: + splitInfo.SetFirstPart(childIDs[0]) + linkObj = generateObject(linkAddr, nil, nil) + linkObj.SetFirstID(childIDs[0]) + linkChildren := make([]object.MeasuredObject, len(children)) + for i := range children { + children[i].SetParentID(addr.Object()) + linkChildren[i].SetObjectID(children[i].GetID()) + linkChildren[i].SetObjectSize(uint32(children[i].PayloadSize())) + } + var link object.Link + link.SetObjects(linkChildren) + linkObj.WriteLink(link) + } + linkObj.SetParentID(addr.Object()) + linkObj.SetParent(srcObj) + + c1 := newTestClient() + c2 := newTestClient() + c2.delay = delay + + c1.addResult(addr, nil, errors.New("any error")) + c2.addResult(addr, nil, object.NewSplitInfoError(splitInfo)) + + c1.addResult(linkAddr, nil, errors.New("any error")) + c2.addResult(linkAddr, linkObj, nil) + + vectors := map[oid.Address][][]netmap.NodeInfo{ + addr: ns, + linkAddr: ns, + } + + for i := range children { + var childAddr oid.Address + childAddr.SetContainer(idCnr) + childAddr.SetObject(childIDs[i]) + c1.addResult(childAddr, nil, errors.New("any error")) + c2.addResult(childAddr, children[i], nil) + vectors[childAddr] = ns + } + + svc := &Service{cfg: new(cfg)} + svc.log = zap.NewNop() + svc.localStorage = newTestStorage() + svc.neoFSNet = &testNeoFS{vectors: vectors} + svc.clientCache = &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c1, + as[0][1]: c2, + }, + } + + return &splitScene{svc: svc, addr: addr, payload: payload, childIDs: childIDs, c1: c1, c2: c2} +} + +func (sc *splitScene) childAddr(i int) oid.Address { + return oid.NewAddress(sc.addr.Container(), sc.childIDs[i]) +} + +func getInto(ctx context.Context, svc *Service, addr oid.Address, w ObjectWriter) error { + var p Prm + p.SetObjectWriter(w) + p.common = new(util.CommonPrm).WithLocalOnly(false) + p.WithAddress(addr) + return svc.Get(ctx, p) +} + +func getFull(ctx context.Context, svc *Service, addr oid.Address) (*SimpleObjectWriter, error) { + w := NewSimpleObjectWriter() + return w, getInto(ctx, svc, addr, w) +} + +func setPrefetchWindow(t testing.TB, w int) { + old := prefetchWindow + prefetchWindow = w + t.Cleanup(func() { prefetchWindow = old }) +} + +func TestAssembleSplitPipelined(t *testing.T) { + ctx := context.Background() + + for _, sv := range []struct { + name string + ver int + }{ + {"V1", 1}, + {"V2", 2}, + } { + t.Run(sv.name, func(t *testing.T) { + t.Run("ordered output across windows", func(t *testing.T) { + for _, win := range []int{1, 2, 3, 8} { + t.Run(fmt.Sprintf("window=%d", win), func(t *testing.T) { + setPrefetchWindow(t, win) + + sc := buildSplitScene(t, sv.ver, 8, 2*time.Millisecond) + + w, err := getFull(ctx, sc.svc, sc.addr) + require.NoError(t, err) + require.Equal(t, sc.payload, w.Object().Payload()) + }) + } + }) + + t.Run("remote failure discards later children", func(t *testing.T) { + const n = 6 + const failIdx = 3 + + sc := buildSplitScene(t, sv.ver, n, time.Millisecond) + sc.c2.addResult(sc.childAddr(failIdx), nil, apistatus.ErrObjectNotFound) + + w, err := getFull(ctx, sc.svc, sc.addr) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + require.Equal(t, sc.payload[:failIdx*splitChildPayloadLen], w.Object().Payload()) + }) + + t.Run("virtual child is reported, not assembled recursively", func(t *testing.T) { + setPrefetchWindow(t, 3) + + const failIdx = 1 + sc := buildSplitScene(t, sv.ver, 6, time.Millisecond) + sc.c2.addResult(sc.childAddr(failIdx), nil, object.NewSplitInfoError(object.NewSplitInfo())) + + w, err := getFull(ctx, sc.svc, sc.addr) + var splitErr *object.SplitInfoError + require.ErrorAs(t, err, &splitErr) + require.Equal(t, sc.payload[:failIdx*splitChildPayloadLen], w.Object().Payload()) + }) + + t.Run("client write failure aborts cleanly", func(t *testing.T) { + setPrefetchWindow(t, 4) + + sc := buildSplitScene(t, sv.ver, 8, 2*time.Millisecond) + w := &trackingWriter{failAfterChunks: 3, err: errors.New("client gone")} + + require.Error(t, getInto(ctx, sc.svc, sc.addr, w)) + }) + + t.Run("context cancel mid-stream", func(t *testing.T) { + sc := buildSplitScene(t, sv.ver, 16, 20*time.Millisecond) + + cctx, cancel := context.WithCancel(ctx) + go func() { + time.Sleep(30 * time.Millisecond) + cancel() + }() + + _, err := getFull(cctx, sc.svc, sc.addr) + require.Error(t, err) + }) + }) + } +} + +func BenchmarkAssembleSplitObject(b *testing.B) { + ctx := context.Background() + + for _, win := range []int{1, 2, 4} { + b.Run(fmt.Sprintf("window=%d", win), func(b *testing.B) { + setPrefetchWindow(b, win) + + sc := buildSplitScene(b, 1, 16, time.Millisecond) + + b.ReportAllocs() + b.ResetTimer() + for range b.N { + _, err := getFull(ctx, sc.svc, sc.addr) + require.NoError(b, err) + } + }) + } +} diff --git a/pkg/services/object/get/assembly_v2.go b/pkg/services/object/get/assembly_v2.go index 10f81f71b9..eae452b730 100644 --- a/pkg/services/object/get/assembly_v2.go +++ b/pkg/services/object/get/assembly_v2.go @@ -10,7 +10,10 @@ import ( "go.uber.org/zap" ) -var errNoLinkNoLastPart = errors.New("no link and no last part in split info") +var ( + errNoLinkNoLastPart = errors.New("no link and no last part in split info") + errWrongChildHeader = errors.New("wrong child header") +) func (exec *execCtx) processV2Split(si *object.SplitInfo) { if si.GetFirstPart().IsZero() { @@ -74,7 +77,7 @@ func (exec *execCtx) processV2Link(linkID oid.ID) bool { linkObj := w.Object() if !exec.isChild(linkObj) { exec.status = statusUndefined - exec.err = errors.New("wrong child header") + exec.err = errWrongChildHeader exec.log.Debug("parent address in link object differs") return false } @@ -135,24 +138,39 @@ func (exec *execCtx) rangeFromLink(link object.Link) bool { children := link.Objects() rng := exec.ctxRange() first, firstOffset, last, lastBound := requiredChildren(rng.GetOffset(), rng.GetLength(), children) + n := last - first + 1 - for i := first; i <= last; i++ { - child := children[i] + at := func(i int) (oid.ID, *object.Range, bool) { + if i >= n { + return oid.ID{}, nil, false + } - var rngPerChild *object.Range - if i == first || i == last { - rngPerChild = new(object.Range) + idx := first + i + child := children[idx] - if i == first { + var rngPerChild object.Range + if idx == first || idx == last { + if idx == first { rngPerChild.SetOffset(firstOffset) rngPerChild.SetLength(uint64(child.ObjectSize()) - firstOffset) } - if i == last { + if idx == last { rngPerChild.SetLength(lastBound - rngPerChild.GetOffset()) } + return child.ObjectID(), &rngPerChild, true } - if !exec.copyChild(child.ObjectID(), rngPerChild, false) { + return child.ObjectID(), nil, true + } + + if n > 1 && prefetchWindow > 1 { + exec.statusError = exec.streamChildrenPipelined(at, false) + return true + } + + for i := range n { + id, r, _ := at(i) + if !exec.copyChild(id, r, false) { return true } } diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index fe13462f7c..7b42367f8b 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -1,6 +1,7 @@ package getsvc import ( + "bytes" "context" "crypto/ecdsa" "errors" @@ -75,6 +76,18 @@ type execCtx struct { payloadOnly bool legacyRange bool + + // collectOnly keeps one fetched stream and skips writing/assembly. + // Virtual children are reported to the caller, not assembled recursively. + collectOnly bool + + // collectDst receives collectOnly result from Service.get. + collectDst *collectResult +} + +type collectResult struct { + hdr *object.Object + rc io.ReadCloser } type execOption func(*execCtx) @@ -119,6 +132,13 @@ func withLogger(l *zap.Logger) execOption { } } +func withCollectOnlyResult(dst *collectResult) execOption { + return func(ctx *execCtx) { + ctx.collectOnly = true + ctx.collectDst = dst + } +} + func withPreSortedContainerNodes(nodeLists [][]netmap.NodeInfo, repRules []uint) execOption { return func(ctx *execCtx) { ctx.nodeLists = nodeLists @@ -267,6 +287,48 @@ func (exec *execCtx) headOnly() bool { return exec.head } +// fetchChildStream fetches a single physical child stream. Virtual children are +// returned as statusVIRTUAL for the caller to handle. +func (exec *execCtx) fetchChildStream(ctx context.Context, id oid.ID, rng *object.Range) (*object.Object, io.ReadCloser, statusError) { + log := exec.log + if rng != nil { + log = log.With(zap.String("child range", prettyRange(rng))) + } + + p := exec.prm + // keep concurrent fetches race-free + if p.common != nil { + c := *p.common + c.WithLocalOnly(false) + p.common = &c + } + p.SetRange(rng) + p.addr.SetContainer(exec.containerID()) + p.addr.SetObject(id) + + var res collectResult + se := exec.svc.get(ctx, p.commonPrm, + withPayloadRange(rng), + withPayloadOnly(exec.payloadOnly), + withLegacyRange(exec.legacyRange), + withLogger(log), + withCollectOnlyResult(&res), + ) + if se.status != statusOK { + return res.hdr, nil, se + } + + if res.rc == nil { + var payload []byte + if res.hdr != nil { + payload = res.hdr.Payload() + } + res.rc = io.NopCloser(bytes.NewReader(payload)) + } + + return res.hdr, res.rc, se +} + // copyChild fetches child object payload and streams it directly into current exec writer. // Returns true if full payload (or requested range) was successfully written and, if requested, header validated. func (exec *execCtx) copyChild(id oid.ID, rng *object.Range, withHdr bool) bool { @@ -278,7 +340,12 @@ func (exec *execCtx) copyChild(id oid.ID, rng *object.Range, withHdr bool) bool childWriter := newDirectChildWriter(exec.prm.objWriter) p := exec.prm - p.common = p.common.WithLocalOnly(false) + // keep concurrent fetches race-free + if p.common != nil { + c := *p.common + c.WithLocalOnly(false) + p.common = &c + } p.objWriter = childWriter p.SetRange(rng) p.addr.SetContainer(exec.containerID()) @@ -300,7 +367,7 @@ func (exec *execCtx) copyChild(id oid.ID, rng *object.Range, withHdr bool) bool } if !exec.isChild(hdr) { exec.status = statusUndefined - exec.err = errors.New("wrong child header") + exec.err = errWrongChildHeader exec.log.Debug("parent address in child object differs") } } diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 3ada8a0e7a..0a66e0ea4f 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -294,6 +294,11 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st exec.execute() //nolint:contextcheck // It is in fact passed via execCtx + if exec.collectDst != nil { + exec.collectDst.hdr = exec.collectedHeader + exec.collectDst.rc = exec.collectedReader + } + return exec.statusError } @@ -313,6 +318,9 @@ func (exec *execCtx) analyzeStatus(execCnr bool) { exec.log.Debug("operation finished successfully") case statusVIRTUAL: exec.log.Debug("requested object is virtual") + if exec.collectOnly { + return + } exec.assemble() if errors.Is(exec.err, errNoLinkNoLastPart) && execCnr { exec.executeOnContainer() diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 8cb89f7dd6..0a2baa5ab0 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -10,6 +10,7 @@ import ( "strconv" "sync/atomic" "testing" + "time" iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -45,6 +46,8 @@ type testClientCache struct { } type testClient struct { + delay time.Duration + results map[oid.Address]struct { obj *object.Object err error @@ -94,6 +97,10 @@ func newTestClient() *testClient { } func (c *testClient) getObject(exec *execCtx) (*object.Object, io.ReadCloser, error) { + if c.delay > 0 { + time.Sleep(c.delay) + } + v, ok := c.results[exec.address()] if !ok { var errNotFound apistatus.ObjectNotFound diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index 401e757414..438d11e36d 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -32,7 +32,7 @@ func (exec *execCtx) executeLocal() { exec.status = statusOK exec.err = nil - if exec.collectedHeader != nil || exec.collectedReader != nil { + if !exec.collectOnly && (exec.collectedHeader != nil || exec.collectedReader != nil) { exec.writeCollectedObject() } case errors.Is(err, apistatus.Error): diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index fb6106886e..d5f5950fe9 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -45,7 +45,9 @@ func (exec *execCtx) processNode(info netmap.NodeInfo) bool { if obj != nil || reader != nil { exec.collectedHeader = obj exec.collectedReader = reader - exec.writeCollectedObject() + if !exec.collectOnly { + exec.writeCollectedObject() + } } case errors.Is(err, apistatus.Error) && !errors.Is(err, apistatus.ErrObjectNotFound): exec.status = statusAPIResponse