diff --git a/CHANGELOG.md b/CHANGELOG.md index 59cb3b3d89..a513fb5ca9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,8 @@ Changelog for NeoFS Node - Data race in RANGE response buffer (#4013) - Incorrect access denial during EACL rechecks for payload-only GET (#4024, #4029) - neofs-ir does not fill `neofs_ir_state_epoch` metrics on startup and does not keep it actual after FS chain connection loss (#4035) -- Ranged object reads from FSTree sometimes returning more payload bytes than requested (#4046) +- Ranged object reads from FSTree sometimes returning more payload bytes than requested (#4046, #4051) +- Incorrect remote EC part ranges used during policer checks (#4051) ### Changed - Optimized EC GET request execution (#3996) diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 62c6c9b0ad..45b35a1fee 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -640,6 +640,9 @@ func (t *FSTree) shiftPayloadRangeStream(prefix []byte, pldLen uint64, pldFldOff } if ln <= uint64(len(prefix)) { + if stream != nil { + stream.Close() + } return nopCloser(bytes.NewReader(prefix[:ln])), nil } @@ -676,6 +679,11 @@ func (t *FSTree) shiftPayloadRangeStream(prefix []byte, pldLen uint64, pldFldOff } prefix = prefix[off:] + if ln <= uint64(len(prefix)) { + stream.Close() + return nopCloser(bytes.NewReader(prefix[:ln])), nil + } + return newPrefixedReadSeekCloser(prefix, &limitedFileReader{ReadSeekCloser: stream, limit: int64(ln) - int64(len(prefix))}), nil } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index de742eab85..78ec34f074 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -70,50 +70,106 @@ func testReadPayloadRange(t *testing.T, fst *FSTree) { }) } -func TestFSTree_ReadPayloadRangeLimitsEmptyPrefixStream(t *testing.T) { - fst := setupFSTree(t) - payload := []byte("payload") +func TestFSTree_PayloadRangeStreamsLimitBufferedPayload(t *testing.T) { + const prefixLen = iobject.NonPayloadFieldsBufferLength + const readBufLen = 2 * iobject.NonPayloadFieldsBufferLength + + readers := []struct { + name string + read func(*FSTree, oid.Address, uint64, uint64) (io.ReadCloser, error) + }{ + { + name: "ReadPayloadRange", + read: func(fst *FSTree, addr oid.Address, off, ln uint64) (io.ReadCloser, error) { + return fst.ReadPayloadRange(addr, off, ln, make([]byte, readBufLen)) + }, + }, + { + name: "GetRangeStream", + read: func(fst *FSTree, addr oid.Address, off, ln uint64) (io.ReadCloser, error) { + _, stream, err := fst.GetRangeStream(addr, off, ln) + return stream, err + }, + }, + } + + for _, tc := range []struct { + name string + payload []byte + off, ln uint64 + padHeaderToPayloadOffset bool + }{ + { + name: "empty payload prefix stream", + payload: []byte("payload"), + ln: 3, + padHeaderToPayloadOffset: true, + }, + { + name: "range inside payload prefix stream", + payload: testutil.RandByteSlice(2 * prefixLen), + off: 222, + ln: 378, + }, + } { + t.Run(tc.name, func(t *testing.T) { + fst := setupFSTree(t) + objWire := objectWireForPayloadRangeStreamTest(t, tc.payload, tc.padHeaderToPayloadOffset) + + addr := oidtest.Address() + require.NoError(t, fst.Put(addr, objWire)) + + for _, reader := range readers { + t.Run(reader.name, func(t *testing.T) { + stream, err := reader.read(fst, addr, tc.off, tc.ln) + require.NoError(t, err) + defer stream.Close() + + actual, err := io.ReadAll(stream) + require.NoError(t, err) + require.Equal(t, tc.payload[tc.off:tc.off+tc.ln], actual) + }) + } + }) + } +} + +func objectWireForPayloadRangeStreamTest(t *testing.T, payload []byte, padHeaderToPayloadOffset bool) []byte { + t.Helper() baseHeader := protowire.AppendTag(nil, protoobject.FieldHeaderPayloadLength, protowire.VarintType) baseHeader = protowire.AppendVarint(baseHeader, uint64(len(payload))) - // keep payload bytes out of the initially buffered prefix while preserving valid protobuf wire - const prefixLen = iobject.NonPayloadFieldsBufferLength - const readBufLen = 2 * iobject.NonPayloadFieldsBufferLength - payloadPrefixLen := protowire.SizeTag(protoobject.FieldObjectPayload) + protowire.SizeVarint(uint64(len(payload))) - headerPaddingFieldNum := protowire.Number(protoobject.FieldHeaderPayloadLength + 1) - headerPaddingLen := prefixLen - for { - headerPaddingFieldLen := protowire.SizeTag(headerPaddingFieldNum) + protowire.SizeVarint(uint64(headerPaddingLen)) + headerPaddingLen - headerLen := len(baseHeader) + headerPaddingFieldLen - n := prefixLen - protowire.SizeTag(protoobject.FieldObjectHeader) - protowire.SizeVarint(uint64(headerLen)) - len(baseHeader) - protowire.SizeTag(headerPaddingFieldNum) - protowire.SizeVarint(uint64(headerPaddingLen)) - payloadPrefixLen - require.GreaterOrEqual(t, n, 0) - if n == headerPaddingLen { - break + if padHeaderToPayloadOffset { + // keep payload bytes out of the initially buffered prefix while preserving valid protobuf wire + const prefixLen = iobject.NonPayloadFieldsBufferLength + payloadPrefixLen := protowire.SizeTag(protoobject.FieldObjectPayload) + protowire.SizeVarint(uint64(len(payload))) + headerPaddingFieldNum := protowire.Number(protoobject.FieldHeaderPayloadLength + 1) + headerPaddingLen := prefixLen + for { + headerPaddingFieldLen := protowire.SizeTag(headerPaddingFieldNum) + protowire.SizeVarint(uint64(headerPaddingLen)) + headerPaddingLen + headerLen := len(baseHeader) + headerPaddingFieldLen + n := prefixLen - protowire.SizeTag(protoobject.FieldObjectHeader) - protowire.SizeVarint(uint64(headerLen)) - len(baseHeader) - protowire.SizeTag(headerPaddingFieldNum) - protowire.SizeVarint(uint64(headerPaddingLen)) - payloadPrefixLen + require.GreaterOrEqual(t, n, 0) + if n == headerPaddingLen { + break + } + headerPaddingLen = n } - headerPaddingLen = n - } - baseHeader = protowire.AppendTag(baseHeader, headerPaddingFieldNum, protowire.BytesType) - baseHeader = protowire.AppendBytes(baseHeader, make([]byte, headerPaddingLen)) + baseHeader = protowire.AppendTag(baseHeader, headerPaddingFieldNum, protowire.BytesType) + baseHeader = protowire.AppendBytes(baseHeader, make([]byte, headerPaddingLen)) + } objWire := protowire.AppendTag(nil, protoobject.FieldObjectHeader, protowire.BytesType) objWire = protowire.AppendBytes(objWire, baseHeader) objWire = protowire.AppendTag(objWire, protoobject.FieldObjectPayload, protowire.BytesType) objWire = protowire.AppendVarint(objWire, uint64(len(payload))) - require.Len(t, objWire, prefixLen) - objWire = append(objWire, payload...) - - addr := oidtest.Address() - require.NoError(t, fst.Put(addr, objWire)) - - stream, err := fst.ReadPayloadRange(addr, 0, 3, make([]byte, readBufLen)) - require.NoError(t, err) - defer stream.Close() + if padHeaderToPayloadOffset { + require.Len(t, objWire, iobject.NonPayloadFieldsBufferLength) + } - actual, err := io.ReadAll(stream) - require.NoError(t, err) - require.Equal(t, payload[:3], actual) + return append(objWire, payload...) } func testGetRangeStreamFunc(t *testing.T, fst *FSTree, fn func(fst *FSTree, addr oid.Address, off, ln uint64) (io.ReadCloser, error)) { diff --git a/pkg/services/object/head/remote.go b/pkg/services/object/head/remote.go index 114450ae31..c98a46ccdc 100644 --- a/pkg/services/object/head/remote.go +++ b/pkg/services/object/head/remote.go @@ -112,7 +112,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob } // GetRange requests object payload range from the remote node. -func (h *RemoteHeader) GetRange(ctx context.Context, node netmap.NodeInfo, cnr cid.ID, id oid.ID, ln, off uint64, xs []string) (io.ReadCloser, error) { +func (h *RemoteHeader) GetRange(ctx context.Context, node netmap.NodeInfo, cnr cid.ID, id oid.ID, off, ln uint64, xs []string) (io.ReadCloser, error) { key, err := h.keyStorage.GetKey(nil) if err != nil { return nil, fmt.Errorf("get local SN private key: %w", err) diff --git a/pkg/services/policer/policer.go b/pkg/services/policer/policer.go index c588a2fdfd..13cd6302da 100644 --- a/pkg/services/policer/policer.go +++ b/pkg/services/policer/policer.go @@ -39,7 +39,7 @@ type localStorage interface { // interface of [headsvc.RemoteHeader] used by [Policer] for overriding in tests. type apiConnections interface { headObject(context.Context, netmap.NodeInfo, oid.Address, bool, []string) (object.Object, error) - GetRange(ctx context.Context, node netmap.NodeInfo, cnr cid.ID, id oid.ID, ln, off uint64, xs []string) (io.ReadCloser, error) + GetRange(ctx context.Context, node netmap.NodeInfo, cnr cid.ID, id oid.ID, off, ln uint64, xs []string) (io.ReadCloser, error) } // Policer represents the utility that verifies