summaryrefslogtreecommitdiff
path: root/vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/server.go543
1 files changed, 324 insertions, 219 deletions
diff --git a/vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/server.go b/vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/server.go
index 985226d6..42733e22 100644
--- a/vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/server.go
+++ b/vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/server.go
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2014, Google Inc.
- * All rights reserved.
+ * Copyright 2014 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -53,12 +38,19 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
+ "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
+ "google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
"google.golang.org/grpc/transport"
)
+const (
+ defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
+ defaultServerMaxSendMessageSize = 1024 * 1024 * 4
+)
+
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
// MethodDesc represents an RPC service's method specification.
@@ -94,6 +86,7 @@ type Server struct {
mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[io.Closer]bool
+ serve bool
drain bool
ctx context.Context
cancel context.CancelFunc
@@ -105,24 +98,63 @@ type Server struct {
}
type options struct {
- creds credentials.TransportCredentials
- codec Codec
- cp Compressor
- dc Decompressor
- maxMsgSize int
- unaryInt UnaryServerInterceptor
- streamInt StreamServerInterceptor
- inTapHandle tap.ServerInHandle
- statsHandler stats.Handler
- maxConcurrentStreams uint32
- useHandlerImpl bool // use http.Handler-based server
+ creds credentials.TransportCredentials
+ codec Codec
+ cp Compressor
+ dc Decompressor
+ unaryInt UnaryServerInterceptor
+ streamInt StreamServerInterceptor
+ inTapHandle tap.ServerInHandle
+ statsHandler stats.Handler
+ maxConcurrentStreams uint32
+ maxReceiveMessageSize int
+ maxSendMessageSize int
+ useHandlerImpl bool // use http.Handler-based server
+ unknownStreamDesc *StreamDesc
+ keepaliveParams keepalive.ServerParameters
+ keepalivePolicy keepalive.EnforcementPolicy
+ initialWindowSize int32
+ initialConnWindowSize int32
}
-var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
+var defaultServerOptions = options{
+ maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
+ maxSendMessageSize: defaultServerMaxSendMessageSize,
+}
-// A ServerOption sets options.
+// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption func(*options)
+// InitialWindowSize returns a ServerOption that sets window size for stream.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func InitialWindowSize(s int32) ServerOption {
+ return func(o *options) {
+ o.initialWindowSize = s
+ }
+}
+
+// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func InitialConnWindowSize(s int32) ServerOption {
+ return func(o *options) {
+ o.initialConnWindowSize = s
+ }
+}
+
+// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
+func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
+ return func(o *options) {
+ o.keepaliveParams = kp
+ }
+}
+
+// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
+func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
+ return func(o *options) {
+ o.keepalivePolicy = kep
+ }
+}
+
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
func CustomCodec(codec Codec) ServerOption {
return func(o *options) {
@@ -144,11 +176,25 @@ func RPCDecompressor(dc Decompressor) ServerOption {
}
}
-// MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages.
-// If this is not set, gRPC uses the default 4MB.
+// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
+// If this is not set, gRPC uses the default limit. Deprecated: use MaxRecvMsgSize instead.
func MaxMsgSize(m int) ServerOption {
+ return MaxRecvMsgSize(m)
+}
+
+// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
+// If this is not set, gRPC uses the default 4MB.
+func MaxRecvMsgSize(m int) ServerOption {
return func(o *options) {
- o.maxMsgSize = m
+ o.maxReceiveMessageSize = m
+ }
+}
+
+// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
+// If this is not set, gRPC uses the default 4MB.
+func MaxSendMsgSize(m int) ServerOption {
+ return func(o *options) {
+ o.maxSendMessageSize = m
}
}
@@ -173,7 +219,7 @@ func Creds(c credentials.TransportCredentials) ServerOption {
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
return func(o *options) {
if o.unaryInt != nil {
- panic("The unary server interceptor has been set.")
+ panic("The unary server interceptor was already set and may not be reset.")
}
o.unaryInt = i
}
@@ -184,7 +230,7 @@ func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
return func(o *options) {
if o.streamInt != nil {
- panic("The stream server interceptor has been set.")
+ panic("The stream server interceptor was already set and may not be reset.")
}
o.streamInt = i
}
@@ -195,7 +241,7 @@ func StreamInterceptor(i StreamServerInterceptor) ServerOption {
func InTapHandle(h tap.ServerInHandle) ServerOption {
return func(o *options) {
if o.inTapHandle != nil {
- panic("The tap handle has been set.")
+ panic("The tap handle was already set and may not be reset.")
}
o.inTapHandle = h
}
@@ -208,11 +254,28 @@ func StatsHandler(h stats.Handler) ServerOption {
}
}
+// UnknownServiceHandler returns a ServerOption that allows for adding a custom
+// unknown service handler. The provided method is a bidi-streaming RPC service
+// handler that will be invoked instead of returning the "unimplemented" gRPC
+// error whenever a request is received for an unregistered service or method.
+// The handling function has full access to the Context of the request and the
+// stream, and the invocation passes through interceptors.
+func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
+ return func(o *options) {
+ o.unknownStreamDesc = &StreamDesc{
+ StreamName: "unknown_service_handler",
+ Handler: streamHandler,
+ // We need to assume that the users of the streamHandler will want to use both.
+ ClientStreams: true,
+ ServerStreams: true,
+ }
+ }
+}
+
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
- var opts options
- opts.maxMsgSize = defaultMaxMsgSize
+ opts := defaultServerOptions
for _, o := range opt {
o(&opts)
}
@@ -251,8 +314,8 @@ func (s *Server) errorf(format string, a ...interface{}) {
}
}
-// RegisterService register a service and its implementation to the gRPC
-// server. Called from the IDL generated code. This must be called before
+// RegisterService registers a service and its implementation to the gRPC
+// server. It is called from the IDL generated code. This must be called before
// invoking Serve.
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
ht := reflect.TypeOf(sd.HandlerType).Elem()
@@ -267,6 +330,9 @@ func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
+ if s.serve {
+ grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
+ }
if _, ok := s.m[sd.ServiceName]; ok {
grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
@@ -297,7 +363,7 @@ type MethodInfo struct {
IsServerStream bool
}
-// ServiceInfo contains unary RPC method info, streaming RPC methid info and metadata for a service.
+// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
type ServiceInfo struct {
Methods []MethodInfo
// Metadata is the metadata specified in ServiceDesc when registering service.
@@ -355,6 +421,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
+ s.serve = true
if s.lis == nil {
s.mu.Unlock()
lis.Close()
@@ -390,10 +457,12 @@ func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("Accept error: %v; retrying in %v", err, tempDelay)
s.mu.Unlock()
+ timer := time.NewTimer(tempDelay)
select {
- case <-time.After(tempDelay):
+ case <-timer.C:
case <-s.ctx.Done():
}
+ timer.Stop()
continue
}
s.mu.Lock()
@@ -416,7 +485,7 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
- grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
+ grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
// If serverHandShake returns ErrConnDispatched, keep rawConn open.
if err != credentials.ErrConnDispatched {
rawConn.Close()
@@ -446,10 +515,14 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
// transport.NewServerTransport).
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
config := &transport.ServerConfig{
- MaxStreams: s.opts.maxConcurrentStreams,
- AuthInfo: authInfo,
- InTapHandle: s.opts.inTapHandle,
- StatsHandler: s.opts.statsHandler,
+ MaxStreams: s.opts.maxConcurrentStreams,
+ AuthInfo: authInfo,
+ InTapHandle: s.opts.inTapHandle,
+ StatsHandler: s.opts.statsHandler,
+ KeepaliveParams: s.opts.keepaliveParams,
+ KeepalivePolicy: s.opts.keepalivePolicy,
+ InitialWindowSize: s.opts.initialWindowSize,
+ InitialConnWindowSize: s.opts.initialConnWindowSize,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
@@ -457,7 +530,7 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
s.mu.Unlock()
c.Close()
- grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
+ grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
return
}
if !s.addConn(st) {
@@ -515,6 +588,30 @@ func (s *Server) serveUsingHandler(conn net.Conn) {
})
}
+// ServeHTTP implements the Go standard library's http.Handler
+// interface by responding to the gRPC request r, by looking up
+// the requested gRPC method in the gRPC server s.
+//
+// The provided HTTP request must have arrived on an HTTP/2
+// connection. When using the Go standard library's server,
+// practically this means that the Request must also have arrived
+// over TLS.
+//
+// To share one port (such as 443 for https) between gRPC and an
+// existing http.Handler, use a root http.Handler such as:
+//
+// if r.ProtoMajor == 2 && strings.HasPrefix(
+// r.Header.Get("Content-Type"), "application/grpc") {
+// grpcServer.ServeHTTP(w, r)
+// } else {
+// yourMux.ServeHTTP(w, r)
+// }
+//
+// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
+// separate from grpc-go's HTTP/2 server. Performance and features may vary
+// between the two paths. ServeHTTP does not support some gRPC features
+// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
+// and subject to change.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
st, err := transport.NewServerHandlerTransport(w, r)
if err != nil {
@@ -581,14 +678,11 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
}
p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
if err != nil {
- // This typically indicates a fatal issue (e.g., memory
- // corruption or hardware faults) the application program
- // cannot handle.
- //
- // TODO(zhaoq): There exist other options also such as only closing the
- // faulty stream locally and remotely (Other streams can keep going). Find
- // the optimal option.
- grpclog.Fatalf("grpc: Server failed to encode response %v", err)
+ grpclog.Errorln("grpc: server failed to encode response: ", err)
+ return err
+ }
+ if len(p) > s.opts.maxSendMessageSize {
+ return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(p), s.opts.maxSendMessageSize)
}
err = t.Write(stream, p, opts)
if err == nil && outPayload != nil {
@@ -605,9 +699,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
BeginTime: time.Now(),
}
sh.HandleRPC(stream.Context(), begin)
- }
- defer func() {
- if sh != nil {
+ defer func() {
end := &stats.End{
EndTime: time.Now(),
}
@@ -615,8 +707,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
end.Error = toRPCErr(err)
}
sh.HandleRPC(stream.Context(), end)
- }
- }()
+ }()
+ }
if trInfo != nil {
defer trInfo.tr.Finish()
trInfo.firstLine.client = false
@@ -633,136 +725,137 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
stream.SetSendCompress(s.opts.cp.Type())
}
p := &parser{r: stream}
- for {
- pf, req, err := p.recvMsg(s.opts.maxMsgSize)
- if err == io.EOF {
- // The entire stream is done (for unary RPC only).
- return err
- }
- if err == io.ErrUnexpectedEOF {
- err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
- }
- if err != nil {
- switch err := err.(type) {
- case *rpcError:
- if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
+ pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
+ if err == io.EOF {
+ // The entire stream is done (for unary RPC only).
+ return err
+ }
+ if err == io.ErrUnexpectedEOF {
+ err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
+ }
+ if err != nil {
+ if st, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, st); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+ } else {
+ switch st := err.(type) {
case transport.ConnectionError:
// Nothing to do here.
case transport.StreamError:
- if e := t.WriteStatus(stream, err.Code, err.Desc); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
default:
- panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
+ panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
}
- return err
}
+ return err
+ }
- if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
- switch err := err.(type) {
- case *rpcError:
- if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- return err
- default:
- if e := t.WriteStatus(stream, codes.Internal, err.Error()); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
+ if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
+ if st, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, st); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
+ return err
}
- var inPayload *stats.InPayload
- if sh != nil {
- inPayload = &stats.InPayload{
- RecvTime: time.Now(),
- }
+ if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
- statusCode := codes.OK
- statusDesc := ""
- df := func(v interface{}) error {
- if inPayload != nil {
- inPayload.WireLength = len(req)
- }
- if pf == compressionMade {
- var err error
- req, err = s.opts.dc.Do(bytes.NewReader(req))
- if err != nil {
- if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
- }
- return Errorf(codes.Internal, err.Error())
- }
- }
- if len(req) > s.opts.maxMsgSize {
- // TODO: Revisit the error code. Currently keep it consistent with
- // java implementation.
- statusCode = codes.Internal
- statusDesc = fmt.Sprintf("grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
- }
- if err := s.opts.codec.Unmarshal(req, v); err != nil {
- return err
- }
- if inPayload != nil {
- inPayload.Payload = v
- inPayload.Data = req
- inPayload.Length = len(req)
- sh.HandleRPC(stream.Context(), inPayload)
- }
- if trInfo != nil {
- trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
- }
- return nil
+
+ // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
+ }
+ var inPayload *stats.InPayload
+ if sh != nil {
+ inPayload = &stats.InPayload{
+ RecvTime: time.Now(),
}
- reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
- if appErr != nil {
- if err, ok := appErr.(*rpcError); ok {
- statusCode = err.code
- statusDesc = err.desc
- } else {
- statusCode = convertCode(appErr)
- statusDesc = appErr.Error()
- }
- if trInfo != nil && statusCode != codes.OK {
- trInfo.tr.LazyLog(stringer(statusDesc), true)
- trInfo.tr.SetError()
- }
- if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
+ }
+ df := func(v interface{}) error {
+ if inPayload != nil {
+ inPayload.WireLength = len(req)
+ }
+ if pf == compressionMade {
+ var err error
+ req, err = s.opts.dc.Do(bytes.NewReader(req))
+ if err != nil {
+ return Errorf(codes.Internal, err.Error())
}
- return Errorf(statusCode, statusDesc)
+ }
+ if len(req) > s.opts.maxReceiveMessageSize {
+ // TODO: Revisit the error code. Currently keep it consistent with
+ // java implementation.
+ return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
+ }
+ if err := s.opts.codec.Unmarshal(req, v); err != nil {
+ return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
+ }
+ if inPayload != nil {
+ inPayload.Payload = v
+ inPayload.Data = req
+ inPayload.Length = len(req)
+ sh.HandleRPC(stream.Context(), inPayload)
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
+ }
+ return nil
+ }
+ reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
+ if appErr != nil {
+ appStatus, ok := status.FromError(appErr)
+ if !ok {
+ // Convert appErr if it is not a grpc status error.
+ appErr = status.Error(convertCode(appErr), appErr.Error())
+ appStatus, _ = status.FromError(appErr)
}
if trInfo != nil {
- trInfo.tr.LazyLog(stringer("OK"), false)
+ trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
+ trInfo.tr.SetError()
}
- opts := &transport.Options{
- Last: true,
- Delay: false,
+ if e := t.WriteStatus(stream, appStatus); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
+ }
+ return appErr
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(stringer("OK"), false)
+ }
+ opts := &transport.Options{
+ Last: true,
+ Delay: false,
+ }
+ if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
+ if err == io.EOF {
+ // The entire stream is done (for unary RPC only).
+ return err
}
- if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
- switch err := err.(type) {
+ if s, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, s); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
+ }
+ } else {
+ switch st := err.(type) {
case transport.ConnectionError:
// Nothing to do here.
case transport.StreamError:
- statusCode = err.Code
- statusDesc = err.Desc
+ if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
default:
- statusCode = codes.Unknown
- statusDesc = err.Error()
+ panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
}
- return err
}
- if trInfo != nil {
- trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
- }
- errWrite := t.WriteStatus(stream, statusCode, statusDesc)
- if statusCode != codes.OK {
- return Errorf(statusCode, statusDesc)
- }
- return errWrite
+ return err
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
}
+ // TODO: Should we be logging if writing status failed here, like above?
+ // Should the logging be in WriteStatus? Should we ignore the WriteStatus
+ // error or allow the stats handler to see it?
+ return t.WriteStatus(stream, status.New(codes.OK, ""))
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
@@ -772,9 +865,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
BeginTime: time.Now(),
}
sh.HandleRPC(stream.Context(), begin)
- }
- defer func() {
- if sh != nil {
+ defer func() {
end := &stats.End{
EndTime: time.Now(),
}
@@ -782,21 +873,22 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
end.Error = toRPCErr(err)
}
sh.HandleRPC(stream.Context(), end)
- }
- }()
+ }()
+ }
if s.opts.cp != nil {
stream.SetSendCompress(s.opts.cp.Type())
}
ss := &serverStream{
- t: t,
- s: stream,
- p: &parser{r: stream},
- codec: s.opts.codec,
- cp: s.opts.cp,
- dc: s.opts.dc,
- maxMsgSize: s.opts.maxMsgSize,
- trInfo: trInfo,
- statsHandler: sh,
+ t: t,
+ s: stream,
+ p: &parser{r: stream},
+ codec: s.opts.codec,
+ cp: s.opts.cp,
+ dc: s.opts.dc,
+ maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
+ maxSendMessageSize: s.opts.maxSendMessageSize,
+ trInfo: trInfo,
+ statsHandler: sh,
}
if ss.cp != nil {
ss.cbuf = new(bytes.Buffer)
@@ -815,43 +907,47 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}()
}
var appErr error
+ var server interface{}
+ if srv != nil {
+ server = srv.server
+ }
if s.opts.streamInt == nil {
- appErr = sd.Handler(srv.server, ss)
+ appErr = sd.Handler(server, ss)
} else {
info := &StreamServerInfo{
FullMethod: stream.Method(),
IsClientStream: sd.ClientStreams,
IsServerStream: sd.ServerStreams,
}
- appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
+ appErr = s.opts.streamInt(server, ss, info, sd.Handler)
}
if appErr != nil {
- if err, ok := appErr.(*rpcError); ok {
- ss.statusCode = err.code
- ss.statusDesc = err.desc
- } else if err, ok := appErr.(transport.StreamError); ok {
- ss.statusCode = err.Code
- ss.statusDesc = err.Desc
- } else {
- ss.statusCode = convertCode(appErr)
- ss.statusDesc = appErr.Error()
+ appStatus, ok := status.FromError(appErr)
+ if !ok {
+ switch err := appErr.(type) {
+ case transport.StreamError:
+ appStatus = status.New(err.Code, err.Desc)
+ default:
+ appStatus = status.New(convertCode(appErr), appErr.Error())
+ }
+ appErr = appStatus.Err()
+ }
+ if trInfo != nil {
+ ss.mu.Lock()
+ ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
+ ss.trInfo.tr.SetError()
+ ss.mu.Unlock()
}
+ t.WriteStatus(ss.s, appStatus)
+ // TODO: Should we log an error from WriteStatus here and below?
+ return appErr
}
if trInfo != nil {
ss.mu.Lock()
- if ss.statusCode != codes.OK {
- ss.trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
- ss.trInfo.tr.SetError()
- } else {
- ss.trInfo.tr.LazyLog(stringer("OK"), false)
- }
+ ss.trInfo.tr.LazyLog(stringer("OK"), false)
ss.mu.Unlock()
}
- errWrite := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
- if ss.statusCode != codes.OK {
- return Errorf(ss.statusCode, ss.statusDesc)
- }
- return errWrite
+ return t.WriteStatus(ss.s, status.New(codes.OK, ""))
}
@@ -867,12 +963,12 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
- if err := t.WriteStatus(stream, codes.InvalidArgument, errDesc); err != nil {
+ if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
- grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
+ grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
@@ -883,17 +979,21 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
method := sm[pos+1:]
srv, ok := s.m[service]
if !ok {
+ if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
+ s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
+ return
+ }
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("unknown service %v", service)
- if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
+ if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
- grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
+ grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
@@ -913,13 +1013,17 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
trInfo.tr.SetError()
}
+ if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
+ s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
+ return
+ }
errDesc := fmt.Sprintf("unknown method %v", method)
- if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
+ if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
- grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
+ grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
@@ -957,8 +1061,9 @@ func (s *Server) Stop() {
s.mu.Unlock()
}
-// GracefulStop stops the gRPC server gracefully. It stops the server to accept new
-// connections and RPCs and blocks until all the pending RPCs are finished.
+// GracefulStop stops the gRPC server gracefully. It stops the server from
+// accepting new connections and RPCs and blocks until all the pending RPCs are
+// finished.
func (s *Server) GracefulStop() {
s.mu.Lock()
defer s.mu.Unlock()