diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index fc815bace0..b1cc603c7a 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -7,6 +7,7 @@ import ( "io/fs" "math/big" "net" + "net/http" "os" "os/signal" "path/filepath" @@ -233,8 +234,10 @@ func (c *cfg) CurrentEpoch() uint64 { return c.networkState.CurrentEpoch() } type cfgGRPC struct { mu sync.Mutex + gs *grpc.Server + mux *http.ServeMux listeners []net.Listener - servers []*grpc.Server + servers []*http.Server // serviceRegistrators stores functions that register gRPC service // implementations into a gRPC server. @@ -247,10 +250,7 @@ func (g *cfgGRPC) registerService(f func(*grpc.Server)) { g.mu.Lock() defer g.mu.Unlock() - g.serviceRegistrators = append(g.serviceRegistrators, f) - for _, srv := range g.servers { - f(srv) - } + f(g.gs) } type cfgMeta struct { diff --git a/cmd/neofs-node/grpc.go b/cmd/neofs-node/grpc.go index 20008e933c..046199d61c 100644 --- a/cmd/neofs-node/grpc.go +++ b/cmd/neofs-node/grpc.go @@ -9,7 +9,9 @@ import ( "io" "math" "net" + "net/http" "os" + "strings" "time" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" @@ -19,7 +21,6 @@ import ( "go.uber.org/zap" "golang.org/x/net/netutil" "google.golang.org/grpc" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/resolver" ) @@ -106,11 +107,11 @@ func initGRPC(c *cfg) { // at the time of shutdown (including those created by reload). c.onShutdown(func() { c.cfgGRPC.mu.Lock() - srvs := make([]*grpc.Server, len(c.cfgGRPC.servers)) + srvs := make([]*http.Server, len(c.cfgGRPC.servers)) copy(srvs, c.cfgGRPC.servers) c.cfgGRPC.mu.Unlock() for _, srv := range srvs { - stopGRPC("NeoFS Public API", srv, c.log) + srv.Close() } }) } @@ -157,18 +158,6 @@ func buildGRPCServers(c *cfg, maxRecvMsgSizeOpt grpc.ServerOption) error { if len(c.appCfg.GRPC) == 0 { return errors.New("could not listen to any gRPC endpoints") } - for _, sc := range c.appCfg.GRPC { - srv, lis, err := buildSingleGRPCServer(c, sc, maxRecvMsgSizeOpt) - if err != nil { - return err - } - c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis) - c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv) - } - return nil -} - -func buildSingleGRPCServer(c *cfg, sc grpcconfig.GRPC, maxRecvMsgSizeOpt grpc.ServerOption) (*grpc.Server, net.Listener, error) { serverOpts := []grpc.ServerOption{ grpc.MaxSendMsgSize(maxMsgSize), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ @@ -185,30 +174,58 @@ func buildSingleGRPCServer(c *cfg, sc grpcconfig.GRPC, maxRecvMsgSizeOpt grpc.Se serverOpts = append(serverOpts, maxRecvMsgSizeOpt) } + c.cfgGRPC.gs = grpc.NewServer(serverOpts...) + c.cfgGRPC.mux = http.NewServeMux() + + for _, sc := range c.appCfg.GRPC { + srv, lis, err := buildSingleGRPCServer(c, muxHandler{c.cfgGRPC.gs, c.cfgGRPC.mux}, sc, maxRecvMsgSizeOpt) + if err != nil { + return err + } + c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis) + c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv) + } + return nil +} + +type muxHandler struct { + gs *grpc.Server + mux *http.ServeMux +} + +func (h muxHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.ProtoMajor == 2 && strings.HasPrefix(r.Header.Get("Content-Type"), "application/grpc") { + h.gs.ServeHTTP(w, r) + } else { + h.mux.ServeHTTP(w, r) + } +} + +func buildSingleGRPCServer(c *cfg, h http.Handler, sc grpcconfig.GRPC, maxRecvMsgSizeOpt grpc.ServerOption) (*http.Server, net.Listener, error) { + var protos http.Protocols + + protos.SetHTTP1(true) + protos.SetHTTP2(true) + protos.SetUnencryptedHTTP2(true) + + var srv = &http.Server{ + Handler: h, + Protocols: &protos, + } + tlsCfg := sc.TLS if tlsCfg.Key != "" { certFile, keyFile := tlsCfg.Certificate, tlsCfg.Key - if _, err := tls.LoadX509KeyPair(certFile, keyFile); err != nil { + crt, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { c.log.Error("could not read certificate from file", zap.Error(err)) return nil, nil, err } - - // read certificate from disk on each handshake to pick up renewals automatically. - creds := credentials.NewTLS(&tls.Config{ - GetConfigForClient: func(*tls.ClientHelloInfo) (*tls.Config, error) { - cert, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return nil, fmt.Errorf("reload TLS certificate: %w", err) - } - return &tls.Config{ - Certificates: []tls.Certificate{cert}, - }, nil - }, - }) - - serverOpts = append(serverOpts, grpc.Creds(creds)) + srv.TLSConfig = &tls.Config{ + Certificates: []tls.Certificate{crt}, + } } lis, err := net.Listen("tcp", sc.Endpoint) @@ -221,7 +238,7 @@ func buildSingleGRPCServer(c *cfg, sc grpcconfig.GRPC, maxRecvMsgSizeOpt grpc.Se lis = netutil.LimitListener(lis, connLimit) } - return grpc.NewServer(serverOpts...), lis, nil + return srv, lis, nil } // reloadGRPC performs a fine-grained reload: only gRPC servers whose @@ -239,7 +256,7 @@ func reloadGRPC(c *cfg, oldCfg grpcConfigSnapshot) error { defer c.cfgGRPC.mu.Unlock() type serverEntry struct { - srv *grpc.Server + srv *http.Server lis net.Listener snap grpcServerSnapshot } @@ -254,11 +271,11 @@ func reloadGRPC(c *cfg, oldCfg grpcConfigSnapshot) error { } } - newServers := make([]*grpc.Server, 0, len(newCfg)) + newServers := make([]*http.Server, 0, len(newCfg)) newListeners := make([]net.Listener, 0, len(newCfg)) // freshServers/freshListeners hold only newly created servers that need // service registration and must start serving. - var freshServers []*grpc.Server + var freshServers []*http.Server var freshListeners []net.Listener for _, newSnap := range newCfg { @@ -269,10 +286,10 @@ func reloadGRPC(c *cfg, oldCfg grpcConfigSnapshot) error { newListeners = append(newListeners, old.lis) continue } - stopGRPC("NeoFS Public API", old.srv, c.log) + old.srv.Close() } - srv, lis, err := buildSingleGRPCServer(c, newSnap.GRPC, maxRecvMsgSizeOpt) + srv, lis, err := buildSingleGRPCServer(c, muxHandler{c.cfgGRPC.gs, c.cfgGRPC.mux}, newSnap.GRPC, maxRecvMsgSizeOpt) if err != nil { c.log.Error("failed to start gRPC server", zap.String("endpoint", newSnap.Endpoint), zap.Error(err)) @@ -286,7 +303,7 @@ func reloadGRPC(c *cfg, oldCfg grpcConfigSnapshot) error { // stop servers that were removed from the config entirely for _, entry := range oldByEndpoint { - stopGRPC("NeoFS Public API", entry.srv, c.log) + entry.srv.Close() } if len(newServers) == 0 { @@ -296,20 +313,15 @@ func reloadGRPC(c *cfg, oldCfg grpcConfigSnapshot) error { c.cfgGRPC.servers = newServers c.cfgGRPC.listeners = newListeners - for _, reg := range c.cfgGRPC.serviceRegistrators { - for _, srv := range freshServers { - reg(srv) - } - } - serveGRPCList(c, freshServers, freshListeners) + serveGRPCList(c, c.cfgGRPC.gs, freshServers, freshListeners) return nil } func serveGRPC(c *cfg) { - serveGRPCList(c, c.cfgGRPC.servers, c.cfgGRPC.listeners) + serveGRPCList(c, c.cfgGRPC.gs, c.cfgGRPC.servers, c.cfgGRPC.listeners) } -func serveGRPCList(c *cfg, servers []*grpc.Server, listeners []net.Listener) { +func serveGRPCList(c *cfg, gs *grpc.Server, servers []*http.Server, listeners []net.Listener) { for i := range servers { srv := servers[i] lis := listeners[i] @@ -325,7 +337,13 @@ func serveGRPCList(c *cfg, servers []*grpc.Server, listeners []net.Listener) { zap.Stringer("endpoint", lis.Addr()), ) - if err := srv.Serve(lis); err != nil { + var err error + if srv.TLSConfig == nil { + err = srv.Serve(lis) + } else { + err = srv.ServeTLS(lis, "", "") + } + if err != nil { c.log.Error("gRPC server failed", zap.Stringer("endpoint", lis.Addr()), zap.Error(err)) } }) diff --git a/cmd/neofs-node/netmap.go b/cmd/neofs-node/netmap.go index dd647e3b4c..b5b9d3db63 100644 --- a/cmd/neofs-node/netmap.go +++ b/cmd/neofs-node/netmap.go @@ -154,9 +154,7 @@ func initNetmapService(c *cfg) { server := netmapService.New(&c.key.PrivateKey, c) - for _, srv := range c.cfgGRPC.servers { - protonetmap.RegisterNetmapServiceServer(srv, server) - } + protonetmap.RegisterNetmapServiceServer(c.cfgGRPC.gs, server) addNewEpochNotificationHandler(c, func(ev event.Event) { c.cfgNetmap.state.setCurrentEpoch(ev.(netmapEvent.NewEpoch).EpochNumber()) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 8e70c94131..987938226a 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -348,6 +348,8 @@ func initObjectService(c *cfg) { server := objectService.New(objSvc, mNumber, c.cfgObject.pool.search, fsChain, storage, c.metaService, c.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc, coreConstructor) os.server = server + c.cfgGRPC.mux.HandleFunc("/get", server.GetHTTP) + svcDesc := protoobject.ObjectService_ServiceDesc svcDesc.Methods = slices.Clone(protoobject.ObjectService_ServiceDesc.Methods) diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index eb44a377c0..de7e733a9c 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -3,6 +3,7 @@ package client import ( "context" "io" + "net/http" "github.com/nspcc-dev/neofs-sdk-go/client" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -37,4 +38,6 @@ type MultiAddressClient interface { // ForAnyGRPCConn executes op over gRPC connections to given multi-address // endpoint-by-endpoint until success. ForAnyGRPCConn(context.Context, func(context.Context, *grpc.ClientConn) error) error + + ForAnyHTTPClient(context.Context, func(context.Context, *http.Client, string) error) error } diff --git a/pkg/network/cache/clients.go b/pkg/network/cache/clients.go index f17afdcac0..d10a0a4fb4 100644 --- a/pkg/network/cache/clients.go +++ b/pkg/network/cache/clients.go @@ -8,6 +8,7 @@ import ( "io" "iter" "maps" + "net/http" "slices" "sync" "time" @@ -175,8 +176,17 @@ func (x *Clients) syncWithNetmapSN(ctx context.Context, sn netmap.NodeInfo) erro return nil } +func newHTTPClient() *http.Client { + tr := http.DefaultTransport.(*http.Transport).Clone() + tr.Proxy = nil + tr.Protocols = new(http.Protocols) + tr.Protocols.SetHTTP1(true) + return &http.Client{Transport: tr} +} + func (x *Clients) initConnections(ctx context.Context, pub []byte, addrs iter.Seq[string]) (*connections, error) { m := make(map[string]*client.Client) + mh := make(map[string]*http.Client) l := x.log.With(zap.String("public key", hex.EncodeToString(pub))) for s := range addrs { l.Info("initializing connection to the SN...", zap.String("address", s)) @@ -190,12 +200,14 @@ func (x *Clients) initConnections(ctx context.Context, pub []byte, addrs iter.Se } l.Info("connection to the SN successfully initialized", zap.String("address", s)) m[s] = c + mh[s] = newHTTPClient() } var hexKey = hex.EncodeToString(pub) return &connections{ log: x.log.With(zap.String("SN public key", hexKey)), nodeID: hexKey, m: m, + mh: mh, }, nil } @@ -260,6 +272,7 @@ type connections struct { mtx sync.RWMutex m map[string]*client.Client // keys are multiaddrs + mh map[string]*http.Client // keys are multiaddrs } func (x *connections) closeAll() { @@ -278,6 +291,12 @@ func (x *connections) all(f func(ma string, c *client.Client) bool) { x.mtx.RUnlock() } +func (x *connections) allHTTP(yield func(ma string, c *http.Client) bool) { + x.mtx.RLock() + maps.All(x.mh)(yield) + x.mtx.RUnlock() +} + func (x *connections) forAny(ctx context.Context, f func(context.Context, *client.Client) error) error { var firstErr error for ma, c := range x.all { @@ -295,12 +314,38 @@ func (x *connections) forAny(ctx context.Context, f func(context.Context, *clien return newMultiEndpointError(x.nodeID, firstErr) } +func (x *connections) forAnyHTTP(ctx context.Context, f func(context.Context, *http.Client, string) error) error { + var firstErr error + for ma, c := range x.allHTTP { + // FIXME: pending removal in #3982. + var a network.Address + if err := a.FromString(ma); err != nil { + return fmt.Errorf("parse network address %q: %w", ma, err) + } + err := f(ctx, c, a.URIAddr()) + if err == nil { + return nil + } + if !isTempError(err) { + return newEndpointError(ma, err) + } + if firstErr == nil { + firstErr = newEndpointError(ma, err) + } + } + return newMultiEndpointError(x.nodeID, firstErr) +} + func (x *connections) ForAnyGRPCConn(ctx context.Context, f func(context.Context, *grpc.ClientConn) error) error { return x.forAny(ctx, func(ctx context.Context, c *client.Client) error { return f(ctx, c.Conn()) }) } +func (x *connections) ForAnyHTTPClient(ctx context.Context, f func(context.Context, *http.Client, string) error) error { + return x.forAnyHTTP(ctx, f) +} + func (x *connections) ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, opts client.PrmObjectPutInit) (client.ObjectWriter, error) { var res client.ObjectWriter return res, x.forAny(ctx, func(ctx context.Context, c *client.Client) error { diff --git a/pkg/services/object/get.go b/pkg/services/object/get.go index eb6a174f7f..bd47b8dd18 100644 --- a/pkg/services/object/get.go +++ b/pkg/services/object/get.go @@ -7,6 +7,8 @@ import ( "errors" "fmt" "io" + "net/http" + "strings" iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" "github.com/nspcc-dev/neofs-node/internal/protobuf/protoscan" @@ -18,6 +20,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/mem" "google.golang.org/protobuf/encoding/protowire" + "google.golang.org/protobuf/proto" ) type getStreamProgress struct { @@ -25,6 +28,51 @@ type getStreamProgress struct { readPayload int } +func continueHTTP(ctx context.Context, w io.Writer, req *protoobject.GetRequest, buf []byte, conn *http.Client, pref string) error { + switch { + case strings.HasPrefix(pref, "http://"), strings.HasPrefix(pref, "https://"): + case strings.HasPrefix(pref, "grpcs://"): + pref = "https://" + strings.TrimPrefix(pref, "grpcs://") + case strings.HasPrefix(pref, "grpc://"): + pref = "http://" + strings.TrimPrefix(pref, "grpc://") + default: + pref = "http://" + pref + } + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, pref+"/get", bytes.NewBuffer(buf)) + if err != nil { + return err + } + httpReq.Header.Set("Content-Type", "application/protobuf") + httpReq.ContentLength = int64(len(buf)) + r, err := conn.Do(httpReq) + if err != nil { + return err + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + return getHTTPStatusError(r) + } + _, err = io.Copy(w, r.Body) + return err +} + +func getHTTPStatusError(r *http.Response) error { + body, err := io.ReadAll(io.LimitReader(r.Body, 64*1024)) + if err != nil { + return fmt.Errorf("read forwarded peer error response %s: %w", r.Status, err) + } + if len(body) > 0 { + var st protostatus.Status + if err := proto.Unmarshal(body, &st); err == nil { + if stErr := apistatus.ToError(&st); stErr != nil { + return stErr + } + return fmt.Errorf("forwarded peer returned status %s: %s", r.Status, st.GetMessage()) + } + } + return fmt.Errorf("forwarded peer returned status %s", r.Status) +} + // returns: // - nil on completed object transmission // - [object.SplitInfoError]/nil on split info response and unset/set raw flag in request diff --git a/pkg/services/object/put/service_test.go b/pkg/services/object/put/service_test.go index 708a53feda..8b2b432ad4 100644 --- a/pkg/services/object/put/service_test.go +++ b/pkg/services/object/put/service_test.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "math" + "net/http" "slices" "strconv" "sync" @@ -1125,6 +1126,10 @@ func (m *serviceClient) ForAnyGRPCConn(context.Context, func(context.Context, *g panic("unimplemented") } +func (m *serviceClient) ForAnyHTTPClient(context.Context, func(context.Context, *http.Client, string) error) error { + panic("unimplemented") +} + type testPayloadStream Streamer func (x *testPayloadStream) Write(p []byte) (int, error) { diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index f1d14ec77e..77342f14bc 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -10,7 +10,9 @@ import ( "fmt" "hash" "io" + "net/http" "slices" + "strconv" "strings" "sync" "time" @@ -52,6 +54,7 @@ import ( grpccodes "google.golang.org/grpc/codes" "google.golang.org/grpc/mem" grpcstatus "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protowire" "google.golang.org/protobuf/proto" ) @@ -850,6 +853,7 @@ func (s *Server) sendStatusGetResponse(stream protoobject.ObjectService_GetServe type getStream struct { base protoobject.ObjectService_GetServer + w io.Writer srv *Server reqInfo aclsvc.RequestInfo @@ -919,6 +923,206 @@ func (s *getStream) WriteChunk(chunk []byte) error { return nil } +func writeHTTPGetError(w http.ResponseWriter, httpCode int, err error) { + w.Header().Set("Content-Type", "application/protobuf") + if err != nil { + if st := util.ToStatus(err); st != nil { + if b, mErr := proto.Marshal(st); mErr == nil { + w.Header().Set("Content-Length", strconv.Itoa(len(b))) + w.WriteHeader(httpCode) + _, _ = w.Write(b) + return + } + } + } + w.WriteHeader(httpCode) +} + +type httpGetResponseWriter struct { + http.ResponseWriter + wroteHeader bool +} + +func (w *httpGetResponseWriter) WriteHeader(code int) { + if w.wroteHeader { + return + } + w.wroteHeader = true + w.ResponseWriter.WriteHeader(code) +} + +func (w *httpGetResponseWriter) Write(p []byte) (int, error) { + if !w.wroteHeader { + w.wroteHeader = true + } + return w.ResponseWriter.Write(p) +} + +func (w *httpGetResponseWriter) Written() bool { + return w.wroteHeader +} + +func (s *Server) GetHTTP(w http.ResponseWriter, r *http.Request) { + trackedW := &httpGetResponseWriter{ResponseWriter: w} + w = trackedW + + var ( + err error + recheckEACL bool + t = time.Now() + ) + defer func() { s.pushOpExecResult(stat.MethodObjectGet, err, t) }() + + var req = new(protoobject.GetRequest) + + buf, err := io.ReadAll(io.LimitReader(r.Body, 64*1024)) + if err != nil { + writeHTTPGetError(w, 500, err) + return + } + if len(buf) == 0 { + bad := new(apistatus.BadRequest) + bad.SetMessage("empty request body") + err = bad + writeHTTPGetError(w, 400, err) + return + } + if err = proto.Unmarshal(buf, req); err != nil { + bad := new(apistatus.BadRequest) + bad.SetMessage("malformed request: " + err.Error()) + err = bad + writeHTTPGetError(w, 400, err) + return + } + + needSignResp := needSignGetResponse(req) + + if err = icrypto.VerifyRequestSignatures(req); err != nil { + writeHTTPGetError(w, 403, err) + return + } + + if s.fsChain.LocalNodeUnderMaintenance() { + err = new(apistatus.NodeUnderMaintenance) + writeHTTPGetError(w, 503, err) + return + } + + reqInfo, err := s.reqInfoProc.GetRequestToInfo(req) + if err != nil { + if !errors.Is(err, apistatus.Error) { + var bad = new(apistatus.BadRequest) + bad.SetMessage(err.Error()) + err = bad // defer + } + writeHTTPGetError(w, 500, err) + return + } + if !s.aclChecker.CheckBasicACL(reqInfo) { + err = basicACLErr(reqInfo) // needed for defer + writeHTTPGetError(w, 403, err) + return + } + err = s.aclChecker.CheckEACL(req, reqInfo) + if err != nil { + if !errors.Is(err, aclsvc.ErrNotMatched) { + err = eACLErr(reqInfo, err) // needed for defer + writeHTTPGetError(w, 403, err) + return + } + recheckEACL = true + } + + p, err := convertGetPrmHTTP(s.signer, reqInfo.Container, req, buf, &getStream{ + w: w, + srv: s, + reqInfo: reqInfo, + recheckEACL: recheckEACL, + signResponse: needSignResp, + }) + if err != nil { + if !errors.Is(err, apistatus.Error) { + var bad = new(apistatus.BadRequest) + bad.SetMessage(err.Error()) + err = bad // defer + } + writeHTTPGetError(w, 500, err) + return + } + + // TODO: consider optimization + // We could acquire ~256K buffer (like for chunks) if storage would try to read it full. + // Then small objects would fit into a single buffer, and for large ones it'd be possible to + // encode the first chunk response using the heading buffer. + hdrRespBuf, hdrBuf := getBufferForHeadResponse() + defer hdrRespBuf.Free() + + hdrLen := -1 + var stream io.ReadCloser + defer func() { + if stream != nil { + stream.Close() + } + }() + + p.WithBuffer(hdrBuf, func(ln int, s io.ReadCloser) { hdrLen, stream = ln, s }) + + err = s.handlers.Get(r.Context(), p) + if err != nil { + httpCode := 500 + if errors.Is(err, apistatus.ErrBadRequest) { + httpCode = 400 + } else if errors.Is(err, apistatus.ErrObjectNotFound) { + httpCode = 404 + } else if errors.Is(err, apistatus.ErrObjectAccessDenied) { + httpCode = 403 + } else if errors.Is(err, apistatus.ErrNodeUnderMaintenance) { + httpCode = 503 + } + if !trackedW.Written() { + writeHTTPGetError(w, httpCode, err) + } + return + } + + if hdrLen < 0 { + err = errors.New("internal: empty get response") + if !trackedW.Written() { + writeHTTPGetError(w, 500, err) + } + return + } + + idf, sigf, hdrf, err := iobject.GetNonPayloadFieldBounds(hdrBuf[:hdrLen]) + if err != nil { + if !trackedW.Written() { + writeHTTPGetError(w, 500, err) + } + return + } + + if recheckEACL { // previous check didn't match, but we have a header now. + err = s.aclChecker.CheckEACL(hdrBuf[hdrf.ValueFrom:hdrf.To], reqInfo) + if err != nil && !errors.Is(err, aclsvc.ErrNotMatched) { // Not matched -> follow basic ACL. + err = eACLErr(reqInfo, err) // defer + if !trackedW.Written() { + writeHTTPGetError(w, 403, err) + } + return + } + } + + pldFldOff := max(idf.To, sigf.To, hdrf.To) + + err = s.copyHTTPStream(w, hdrBuf, hdrLen, stream, pldFldOff) // defer + if err != nil { + if !trackedW.Written() { + writeHTTPGetError(w, 500, err) + } + return + } +} + func (s *Server) Get(req *protoobject.GetRequest, gStream protoobject.ObjectService_GetServer) error { var ( err error @@ -1220,6 +1424,144 @@ func convertGetPrm(signer ecdsa.PrivateKey, cnr container.Container, req *protoo return p, nil } +type httpWriter struct { + io.Writer +} + +func (w httpWriter) WriteHeader(hdr *object.Object) error { + var b [32]byte + + pref := protowire.AppendTag(b[:0], 3, protowire.BytesType) + hdrBin := hdr.CutPayload().Marshal() + pref = protowire.AppendVarint(pref, uint64(len(hdrBin))) + + _, err := w.Write(pref) + if err != nil { + return err + } + _, err = w.Write(hdrBin) + if err != nil { + return err + } + + pref = protowire.AppendTag(b[:0], 4, protowire.BytesType) + pref = protowire.AppendVarint(pref, hdr.PayloadSize()) + + _, err = w.Write(pref) + if err != nil { + return err + } + return nil +} + +func (w httpWriter) WriteChunk(data []byte) error { + _, err := w.Write(data) + return err +} + +// converts original request into parameters accepted by the internal handler. +// Note that the stream is untouched within this call, errors are not reported +// into it. +func convertGetPrmHTTP(signer ecdsa.PrivateKey, cnr container.Container, req *protoobject.GetRequest, buf []byte, stream *getStream) (getsvc.Prm, error) { + body := req.GetBody() + ma := body.GetAddress() + if ma == nil { // includes nil body + return getsvc.Prm{}, errors.New("missing object address") + } + + var addr oid.Address + if err := addr.FromProtoMessage(ma); err != nil { + return getsvc.Prm{}, fmt.Errorf("invalid object address: %w", err) + } + + cp, err := objutil.CommonPrmFromRequest(req) + if err != nil { + return getsvc.Prm{}, err + } + + var p getsvc.Prm + p.SetCommonParameters(cp) + p.WithAddress(addr) + p.WithContainer(cnr) + p.WithRawFlag(body.Raw) + p.SetObjectWriter(httpWriter{stream.w}) + if cp.LocalOnly() { + return p, nil + } + + var onceResign sync.Once + meta := req.GetMetaHeader() + if meta == nil { + return getsvc.Prm{}, errors.New("missing meta header") + } + + p.SetRequestForwarder(func(ctx context.Context, c client.MultiAddressClient) error { + var resignErr error + onceResign.Do(func() { + req.MetaHeader = &protosession.RequestMetaHeader{ + Ttl: meta.GetTtl() - 1, + Origin: meta, + } + req.VerifyHeader, resignErr = neofscrypto.SignRequestWithBuffer(neofsecdsa.Signer(signer), req, nil) + if resignErr != nil { + return + } + buf, resignErr = proto.MarshalOptions{}.MarshalAppend(buf[:0], req) + }) + if resignErr != nil { + return resignErr + } + return c.ForAnyHTTPClient(ctx, func(ctx context.Context, conn *http.Client, pref string) error { + return continueHTTP(ctx, stream.w, req, buf, conn, pref) // TODO: log error + }) + }) + return p, nil +} + +func (s *Server) copyHTTPStream(w io.Writer, hdrBuf []byte, hdrLen int, stream io.Reader, pldFldOff int) error { + payloadSize := uint64(0) + payloadValueStart := hdrLen + if pldFldOff < hdrLen { + num, typ, tagSz := protowire.ConsumeTag(hdrBuf[pldFldOff:hdrLen]) + if tagSz < 0 || num != 4 || typ != protowire.BytesType { + return fmt.Errorf("bad payload field tag at offset %d", pldFldOff) + } + var lenSz int + payloadSize, lenSz = protowire.ConsumeVarint(hdrBuf[pldFldOff+tagSz : hdrLen]) + if lenSz < 0 { + return errors.New("bad payload length varint") + } + payloadValueStart = pldFldOff + tagSz + lenSz + } + + var pref [16]byte + p := protowire.AppendTag(pref[:0], 3, protowire.BytesType) + p = protowire.AppendVarint(p, uint64(pldFldOff)) + if _, err := w.Write(p); err != nil { + return err + } + if _, err := w.Write(hdrBuf[:pldFldOff]); err != nil { + return err + } + + p = protowire.AppendTag(pref[:0], 4, protowire.BytesType) + p = protowire.AppendVarint(p, payloadSize) + if _, err := w.Write(p); err != nil { + return err + } + if payloadValueStart < hdrLen { + if _, err := w.Write(hdrBuf[payloadValueStart:hdrLen]); err != nil { + return err + } + } + if stream != nil { + if _, err := io.Copy(w, stream); err != nil { + return err + } + } + return nil +} + type getProxyContext struct { req *protoobject.GetRequest reqOID oid.ID diff --git a/pkg/services/object/server_test.go b/pkg/services/object/server_test.go index 9c66ae890d..dfd585a856 100644 --- a/pkg/services/object/server_test.go +++ b/pkg/services/object/server_test.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "io" + "net/http" "path/filepath" "sync/atomic" "testing" @@ -819,6 +820,10 @@ func (emptyRemoteNode) ForAnyGRPCConn(context.Context, func(context.Context, *gr return errors.New("any transport error") } +func (emptyRemoteNode) ForAnyHTTPClient(context.Context, func(context.Context, *http.Client, string) error) error { + return errors.New("any transport error") +} + type mockGRPCConn struct { unimplementedConn conn *grpc.ClientConn @@ -827,3 +832,7 @@ type mockGRPCConn struct { func (x *mockGRPCConn) ForAnyGRPCConn(ctx context.Context, f func(context.Context, *grpc.ClientConn) error) error { return f(ctx, x.conn) } + +func (x *mockGRPCConn) ForAnyHTTPClient(context.Context, func(context.Context, *http.Client, string) error) error { + return errors.New("unimplemented") +}