Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ Changelog for NeoFS Node
- Data race in RANGE response buffer (#4013)
- Incorrect access denial during EACL rechecks for payload-only GET (#4024, #4029)
- neofs-ir does not fill `neofs_ir_state_epoch` metrics on startup and does not keep it actual after FS chain connection loss (#4035)
- Ranged object reads from FSTree sometimes returning more payload bytes than requested (#4046)
- Ranged object reads from FSTree sometimes returning more payload bytes than requested (#4046, #4051)
- Incorrect remote EC part ranges used during policer checks (#4051)

### Changed
- Optimized EC GET request execution (#3996)
Expand Down
8 changes: 8 additions & 0 deletions pkg/local_object_storage/blobstor/fstree/fstree.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,9 @@ func (t *FSTree) shiftPayloadRangeStream(prefix []byte, pldLen uint64, pldFldOff
}

if ln <= uint64(len(prefix)) {
if stream != nil {
stream.Close()
}
return nopCloser(bytes.NewReader(prefix[:ln])), nil
}

Expand Down Expand Up @@ -676,6 +679,11 @@ func (t *FSTree) shiftPayloadRangeStream(prefix []byte, pldLen uint64, pldFldOff
}

prefix = prefix[off:]
if ln <= uint64(len(prefix)) {
stream.Close()
return nopCloser(bytes.NewReader(prefix[:ln])), nil
Comment thread
roman-khimov marked this conversation as resolved.
}

return newPrefixedReadSeekCloser(prefix, &limitedFileReader{ReadSeekCloser: stream, limit: int64(ln) - int64(len(prefix))}), nil
}

Expand Down
120 changes: 88 additions & 32 deletions pkg/local_object_storage/blobstor/fstree/fstree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,50 +70,106 @@ func testReadPayloadRange(t *testing.T, fst *FSTree) {
})
}

func TestFSTree_ReadPayloadRangeLimitsEmptyPrefixStream(t *testing.T) {
fst := setupFSTree(t)
payload := []byte("payload")
func TestFSTree_PayloadRangeStreamsLimitBufferedPayload(t *testing.T) {
const prefixLen = iobject.NonPayloadFieldsBufferLength
const readBufLen = 2 * iobject.NonPayloadFieldsBufferLength

readers := []struct {
name string
read func(*FSTree, oid.Address, uint64, uint64) (io.ReadCloser, error)
}{
{
name: "ReadPayloadRange",
read: func(fst *FSTree, addr oid.Address, off, ln uint64) (io.ReadCloser, error) {
return fst.ReadPayloadRange(addr, off, ln, make([]byte, readBufLen))
},
},
{
name: "GetRangeStream",
read: func(fst *FSTree, addr oid.Address, off, ln uint64) (io.ReadCloser, error) {
_, stream, err := fst.GetRangeStream(addr, off, ln)
return stream, err
},
},
}

for _, tc := range []struct {
name string
payload []byte
off, ln uint64
padHeaderToPayloadOffset bool
}{
{
name: "empty payload prefix stream",
payload: []byte("payload"),
ln: 3,
padHeaderToPayloadOffset: true,
},
{
name: "range inside payload prefix stream",
payload: testutil.RandByteSlice(2 * prefixLen),
off: 222,
ln: 378,
},
} {
t.Run(tc.name, func(t *testing.T) {
fst := setupFSTree(t)
objWire := objectWireForPayloadRangeStreamTest(t, tc.payload, tc.padHeaderToPayloadOffset)

addr := oidtest.Address()
require.NoError(t, fst.Put(addr, objWire))

for _, reader := range readers {
t.Run(reader.name, func(t *testing.T) {
stream, err := reader.read(fst, addr, tc.off, tc.ln)
require.NoError(t, err)
defer stream.Close()

actual, err := io.ReadAll(stream)
require.NoError(t, err)
require.Equal(t, tc.payload[tc.off:tc.off+tc.ln], actual)
})
}
})
}
}

func objectWireForPayloadRangeStreamTest(t *testing.T, payload []byte, padHeaderToPayloadOffset bool) []byte {
t.Helper()

baseHeader := protowire.AppendTag(nil, protoobject.FieldHeaderPayloadLength, protowire.VarintType)
baseHeader = protowire.AppendVarint(baseHeader, uint64(len(payload)))

// keep payload bytes out of the initially buffered prefix while preserving valid protobuf wire
const prefixLen = iobject.NonPayloadFieldsBufferLength
const readBufLen = 2 * iobject.NonPayloadFieldsBufferLength
payloadPrefixLen := protowire.SizeTag(protoobject.FieldObjectPayload) + protowire.SizeVarint(uint64(len(payload)))
headerPaddingFieldNum := protowire.Number(protoobject.FieldHeaderPayloadLength + 1)
headerPaddingLen := prefixLen
for {
headerPaddingFieldLen := protowire.SizeTag(headerPaddingFieldNum) + protowire.SizeVarint(uint64(headerPaddingLen)) + headerPaddingLen
headerLen := len(baseHeader) + headerPaddingFieldLen
n := prefixLen - protowire.SizeTag(protoobject.FieldObjectHeader) - protowire.SizeVarint(uint64(headerLen)) - len(baseHeader) - protowire.SizeTag(headerPaddingFieldNum) - protowire.SizeVarint(uint64(headerPaddingLen)) - payloadPrefixLen
require.GreaterOrEqual(t, n, 0)
if n == headerPaddingLen {
break
if padHeaderToPayloadOffset {
// keep payload bytes out of the initially buffered prefix while preserving valid protobuf wire
const prefixLen = iobject.NonPayloadFieldsBufferLength
payloadPrefixLen := protowire.SizeTag(protoobject.FieldObjectPayload) + protowire.SizeVarint(uint64(len(payload)))
headerPaddingFieldNum := protowire.Number(protoobject.FieldHeaderPayloadLength + 1)
headerPaddingLen := prefixLen
for {
headerPaddingFieldLen := protowire.SizeTag(headerPaddingFieldNum) + protowire.SizeVarint(uint64(headerPaddingLen)) + headerPaddingLen
headerLen := len(baseHeader) + headerPaddingFieldLen
n := prefixLen - protowire.SizeTag(protoobject.FieldObjectHeader) - protowire.SizeVarint(uint64(headerLen)) - len(baseHeader) - protowire.SizeTag(headerPaddingFieldNum) - protowire.SizeVarint(uint64(headerPaddingLen)) - payloadPrefixLen
require.GreaterOrEqual(t, n, 0)
if n == headerPaddingLen {
break
}
headerPaddingLen = n
}
headerPaddingLen = n
}

baseHeader = protowire.AppendTag(baseHeader, headerPaddingFieldNum, protowire.BytesType)
baseHeader = protowire.AppendBytes(baseHeader, make([]byte, headerPaddingLen))
baseHeader = protowire.AppendTag(baseHeader, headerPaddingFieldNum, protowire.BytesType)
baseHeader = protowire.AppendBytes(baseHeader, make([]byte, headerPaddingLen))
}

objWire := protowire.AppendTag(nil, protoobject.FieldObjectHeader, protowire.BytesType)
objWire = protowire.AppendBytes(objWire, baseHeader)
objWire = protowire.AppendTag(objWire, protoobject.FieldObjectPayload, protowire.BytesType)
objWire = protowire.AppendVarint(objWire, uint64(len(payload)))
require.Len(t, objWire, prefixLen)
objWire = append(objWire, payload...)

addr := oidtest.Address()
require.NoError(t, fst.Put(addr, objWire))

stream, err := fst.ReadPayloadRange(addr, 0, 3, make([]byte, readBufLen))
require.NoError(t, err)
defer stream.Close()
if padHeaderToPayloadOffset {
require.Len(t, objWire, iobject.NonPayloadFieldsBufferLength)
}

actual, err := io.ReadAll(stream)
require.NoError(t, err)
require.Equal(t, payload[:3], actual)
return append(objWire, payload...)
}

func testGetRangeStreamFunc(t *testing.T, fst *FSTree, fn func(fst *FSTree, addr oid.Address, off, ln uint64) (io.ReadCloser, error)) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/object/head/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob
}

// GetRange requests object payload range from the remote node.
func (h *RemoteHeader) GetRange(ctx context.Context, node netmap.NodeInfo, cnr cid.ID, id oid.ID, ln, off uint64, xs []string) (io.ReadCloser, error) {
func (h *RemoteHeader) GetRange(ctx context.Context, node netmap.NodeInfo, cnr cid.ID, id oid.ID, off, ln uint64, xs []string) (io.ReadCloser, error) {
key, err := h.keyStorage.GetKey(nil)
if err != nil {
return nil, fmt.Errorf("get local SN private key: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/policer/policer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type localStorage interface {
// interface of [headsvc.RemoteHeader] used by [Policer] for overriding in tests.
type apiConnections interface {
headObject(context.Context, netmap.NodeInfo, oid.Address, bool, []string) (object.Object, error)
GetRange(ctx context.Context, node netmap.NodeInfo, cnr cid.ID, id oid.ID, ln, off uint64, xs []string) (io.ReadCloser, error)
GetRange(ctx context.Context, node netmap.NodeInfo, cnr cid.ID, id oid.ID, off, ln uint64, xs []string) (io.ReadCloser, error)
}

// Policer represents the utility that verifies
Expand Down
Loading