summaryrefslogtreecommitdiff
path: root/vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/clientconn.go601
1 files changed, 397 insertions, 204 deletions
diff --git a/vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/clientconn.go b/vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/clientconn.go
index 146166a7..e3f6cb19 100644
--- a/vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/github.com/hashicorp/terraform/vendor/google.golang.org/grpc/clientconn.go
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2014, Google Inc.
- * All rights reserved.
+ * Copyright 2014 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -35,7 +20,6 @@ package grpc
import (
"errors"
- "fmt"
"net"
"strings"
"sync"
@@ -43,8 +27,10 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
+ "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@@ -55,8 +41,7 @@ var (
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
- // DEPRECATED: Please use context.DeadlineExceeded instead. This error will be
- // removed in Q1 2017.
+ // DEPRECATED: Please use context.DeadlineExceeded instead.
ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
// errNoTransportSecurity indicates that there is no transport security
@@ -78,7 +63,8 @@ var (
errConnClosing = errors.New("grpc: the connection is closing")
// errConnUnavailable indicates that the connection is unavailable.
errConnUnavailable = errors.New("grpc: the connection is unavailable")
- errNoAddr = errors.New("grpc: there is no address available to dial")
+ // errBalancerClosed indicates that the balancer is closed.
+ errBalancerClosed = errors.New("grpc: balancer is closed")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@@ -86,23 +72,57 @@ var (
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
- unaryInt UnaryClientInterceptor
- streamInt StreamClientInterceptor
- codec Codec
- cp Compressor
- dc Decompressor
- bs backoffStrategy
- balancer Balancer
- block bool
- insecure bool
- timeout time.Duration
- scChan <-chan ServiceConfig
- copts transport.ConnectOptions
+ unaryInt UnaryClientInterceptor
+ streamInt StreamClientInterceptor
+ codec Codec
+ cp Compressor
+ dc Decompressor
+ bs backoffStrategy
+ balancer Balancer
+ block bool
+ insecure bool
+ timeout time.Duration
+ scChan <-chan ServiceConfig
+ copts transport.ConnectOptions
+ callOptions []CallOption
}
+const (
+ defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
+ defaultClientMaxSendMessageSize = 1024 * 1024 * 4
+)
+
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)
+// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func WithInitialWindowSize(s int32) DialOption {
+ return func(o *dialOptions) {
+ o.copts.InitialWindowSize = s
+ }
+}
+
+// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func WithInitialConnWindowSize(s int32) DialOption {
+ return func(o *dialOptions) {
+ o.copts.InitialConnWindowSize = s
+ }
+}
+
+// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
+func WithMaxMsgSize(s int) DialOption {
+ return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
+}
+
+// WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
+func WithDefaultCallOptions(cos ...CallOption) DialOption {
+ return func(o *dialOptions) {
+ o.callOptions = append(o.callOptions, cos...)
+ }
+}
+
// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
func WithCodec(c Codec) DialOption {
return func(o *dialOptions) {
@@ -194,7 +214,7 @@ func WithTransportCredentials(creds credentials.TransportCredentials) DialOption
}
// WithPerRPCCredentials returns a DialOption which sets
-// credentials which will place auth state on each outbound RPC.
+// credentials and places auth state on each outbound RPC.
func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
return func(o *dialOptions) {
o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
@@ -203,6 +223,7 @@ func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
// initially. This is valid if and only if WithBlock() is present.
+// Deprecated: use DialContext and context.WithTimeout instead.
func WithTimeout(d time.Duration) DialOption {
return func(o *dialOptions) {
o.timeout = d
@@ -231,7 +252,7 @@ func WithStatsHandler(h stats.Handler) DialOption {
}
}
-// FailOnNonTempDialError returns a DialOption that specified if gRPC fails on non-temporary dial errors.
+// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
// address and won't try to reconnect.
// The default value of FailOnNonTempDialError is false.
@@ -249,6 +270,13 @@ func WithUserAgent(s string) DialOption {
}
}
+// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
+func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
+ return func(o *dialOptions) {
+ o.copts.KeepaliveParams = kp
+ }
+}
+
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
@@ -263,25 +291,52 @@ func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
}
}
+// WithAuthority returns a DialOption that specifies the value to be used as
+// the :authority pseudo-header. This value only works with WithInsecure and
+// has no effect if TransportCredentials are present.
+func WithAuthority(a string) DialOption {
+ return func(o *dialOptions) {
+ o.copts.Authority = a
+ }
+}
+
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
// DialContext creates a client connection to the given target. ctx can be used to
-// cancel or expire the pending connecting. Once this function returns, the
+// cancel or expire the pending connection. Once this function returns, the
// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
// to terminate all the pending operations after this function returns.
-// This is the EXPERIMENTAL API.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
+ csMgr: &connectivityStateManager{},
conns: make(map[Address]*addrConn),
}
+ cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
+
for _, opt := range opts {
opt(&cc.dopts)
}
+ cc.mkp = cc.dopts.copts.KeepaliveParams
+
+ if cc.dopts.copts.Dialer == nil {
+ cc.dopts.copts.Dialer = newProxyDialer(
+ func(ctx context.Context, addr string) (net.Conn, error) {
+ return dialContext(ctx, "tcp", addr)
+ },
+ )
+ }
+
+ if cc.dopts.copts.UserAgent != "" {
+ cc.dopts.copts.UserAgent += " " + grpcUA
+ } else {
+ cc.dopts.copts.UserAgent = grpcUA
+ }
+
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
@@ -300,15 +355,16 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}()
+ scSet := false
if cc.dopts.scChan != nil {
- // Wait for the initial service config.
+ // Try to get an initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = sc
+ scSet = true
}
- case <-ctx.Done():
- return nil, ctx.Err()
+ default:
}
}
// Set defaults.
@@ -321,54 +377,47 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
+ } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
+ cc.authority = cc.dopts.copts.Authority
} else {
- colonPos := strings.LastIndex(target, ":")
- if colonPos == -1 {
- colonPos = len(target)
- }
- cc.authority = target[:colonPos]
+ cc.authority = target
}
- var ok bool
waitC := make(chan error, 1)
go func() {
- var addrs []Address
+ defer close(waitC)
if cc.dopts.balancer == nil && cc.sc.LB != nil {
cc.dopts.balancer = cc.sc.LB
}
- if cc.dopts.balancer == nil {
- // Connect to target directly if balancer is nil.
- addrs = append(addrs, Address{Addr: target})
- } else {
+ if cc.dopts.balancer != nil {
var credsClone credentials.TransportCredentials
if creds != nil {
credsClone = creds.Clone()
}
config := BalancerConfig{
DialCreds: credsClone,
+ Dialer: cc.dopts.copts.Dialer,
}
if err := cc.dopts.balancer.Start(target, config); err != nil {
waitC <- err
return
}
ch := cc.dopts.balancer.Notify()
- if ch == nil {
- // There is no name resolver installed.
- addrs = append(addrs, Address{Addr: target})
- } else {
- addrs, ok = <-ch
- if !ok || len(addrs) == 0 {
- waitC <- errNoAddr
- return
+ if ch != nil {
+ if cc.dopts.block {
+ doneChan := make(chan struct{})
+ go cc.lbWatcher(doneChan)
+ <-doneChan
+ } else {
+ go cc.lbWatcher(nil)
}
- }
- }
- for _, a := range addrs {
- if err := cc.resetAddrConn(a, false, nil); err != nil {
- waitC <- err
return
}
}
- close(waitC)
+ // No balancer, or no resolver within the balancer. Connect directly.
+ if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil {
+ waitC <- err
+ return
+ }
}()
select {
case <-ctx.Done():
@@ -378,50 +427,113 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return nil, err
}
}
-
- // If balancer is nil or balancer.Notify() is nil, ok will be false here.
- // The lbWatcher goroutine will not be created.
- if ok {
- go cc.lbWatcher()
+ if cc.dopts.scChan != nil && !scSet {
+ // Blocking wait for the initial service config.
+ select {
+ case sc, ok := <-cc.dopts.scChan:
+ if ok {
+ cc.sc = sc
+ }
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
}
-
if cc.dopts.scChan != nil {
go cc.scWatcher()
}
+
return cc, nil
}
-// ConnectivityState indicates the state of a client connection.
-type ConnectivityState int
+// connectivityStateEvaluator gets updated by addrConns when their
+// states transition, based on which it evaluates the state of
+// ClientConn.
+// Note: This code will eventually sit in the balancer in the new design.
+type connectivityStateEvaluator struct {
+ csMgr *connectivityStateManager
+ mu sync.Mutex
+ numReady uint64 // Number of addrConns in ready state.
+ numConnecting uint64 // Number of addrConns in connecting state.
+ numTransientFailure uint64 // Number of addrConns in transientFailure.
+}
-const (
- // Idle indicates the ClientConn is idle.
- Idle ConnectivityState = iota
- // Connecting indicates the ClienConn is connecting.
- Connecting
- // Ready indicates the ClientConn is ready for work.
- Ready
- // TransientFailure indicates the ClientConn has seen a failure but expects to recover.
- TransientFailure
- // Shutdown indicates the ClientConn has started shutting down.
- Shutdown
-)
+// recordTransition records state change happening in every addrConn and based on
+// that it evaluates what state the ClientConn is in.
+// It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states,
+// Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection
+// before any addrConn is created ClientConn is in idle state. In the end when ClientConn
+// closes it is in connectivity.Shutdown state.
+// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
+func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) {
+ cse.mu.Lock()
+ defer cse.mu.Unlock()
-func (s ConnectivityState) String() string {
- switch s {
- case Idle:
- return "IDLE"
- case Connecting:
- return "CONNECTING"
- case Ready:
- return "READY"
- case TransientFailure:
- return "TRANSIENT_FAILURE"
- case Shutdown:
- return "SHUTDOWN"
- default:
- panic(fmt.Sprintf("unknown connectivity state: %d", s))
+ // Update counters.
+ for idx, state := range []connectivity.State{oldState, newState} {
+ updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
+ switch state {
+ case connectivity.Ready:
+ cse.numReady += updateVal
+ case connectivity.Connecting:
+ cse.numConnecting += updateVal
+ case connectivity.TransientFailure:
+ cse.numTransientFailure += updateVal
+ }
}
+
+ // Evaluate.
+ if cse.numReady > 0 {
+ cse.csMgr.updateState(connectivity.Ready)
+ return
+ }
+ if cse.numConnecting > 0 {
+ cse.csMgr.updateState(connectivity.Connecting)
+ return
+ }
+ cse.csMgr.updateState(connectivity.TransientFailure)
+}
+
+// connectivityStateManager keeps the connectivity.State of ClientConn.
+// This struct will eventually be exported so the balancers can access it.
+type connectivityStateManager struct {
+ mu sync.Mutex
+ state connectivity.State
+ notifyChan chan struct{}
+}
+
+// updateState updates the connectivity.State of ClientConn.
+// If there's a change it notifies goroutines waiting on state change to
+// happen.
+func (csm *connectivityStateManager) updateState(state connectivity.State) {
+ csm.mu.Lock()
+ defer csm.mu.Unlock()
+ if csm.state == connectivity.Shutdown {
+ return
+ }
+ if csm.state == state {
+ return
+ }
+ csm.state = state
+ if csm.notifyChan != nil {
+ // There are other goroutines waiting on this channel.
+ close(csm.notifyChan)
+ csm.notifyChan = nil
+ }
+}
+
+func (csm *connectivityStateManager) getState() connectivity.State {
+ csm.mu.Lock()
+ defer csm.mu.Unlock()
+ return csm.state
+}
+
+func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
+ csm.mu.Lock()
+ defer csm.mu.Unlock()
+ if csm.notifyChan == nil {
+ csm.notifyChan = make(chan struct{})
+ }
+ return csm.notifyChan
}
// ClientConn represents a client connection to an RPC server.
@@ -432,13 +544,51 @@ type ClientConn struct {
target string
authority string
dopts dialOptions
+ csMgr *connectivityStateManager
+ csEvltr *connectivityStateEvaluator // This will eventually be part of balancer.
mu sync.RWMutex
sc ServiceConfig
conns map[Address]*addrConn
+ // Keepalive parameter can be updated if a GoAway is received.
+ mkp keepalive.ClientParameters
}
-func (cc *ClientConn) lbWatcher() {
+// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
+// ctx expires. A true value is returned in former case and false in latter.
+// This is an EXPERIMENTAL API.
+func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
+ ch := cc.csMgr.getNotifyChan()
+ if cc.csMgr.getState() != sourceState {
+ return true
+ }
+ select {
+ case <-ctx.Done():
+ return false
+ case <-ch:
+ return true
+ }
+}
+
+// GetState returns the connectivity.State of ClientConn.
+// This is an EXPERIMENTAL API.
+func (cc *ClientConn) GetState() connectivity.State {
+ return cc.csMgr.getState()
+}
+
+// lbWatcher watches the Notify channel of the balancer in cc and manages
+// connections accordingly. If doneChan is not nil, it is closed after the
+// first successfull connection is made.
+func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
+ defer func() {
+ // In case channel from cc.dopts.balancer.Notify() gets closed before a
+ // successful connection gets established, don't forget to notify the
+ // caller.
+ if doneChan != nil {
+ close(doneChan)
+ }
+ }()
+
for addrs := range cc.dopts.balancer.Notify() {
var (
add []Address // Addresses need to setup connections.
@@ -465,7 +615,19 @@ func (cc *ClientConn) lbWatcher() {
}
cc.mu.Unlock()
for _, a := range add {
- cc.resetAddrConn(a, true, nil)
+ var err error
+ if doneChan != nil {
+ err = cc.resetAddrConn(a, true, nil)
+ if err == nil {
+ close(doneChan)
+ doneChan = nil
+ }
+ } else {
+ err = cc.resetAddrConn(a, false, nil)
+ }
+ if err != nil {
+ grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
+ }
}
for _, c := range del {
c.tearDown(errConnDrain)
@@ -494,14 +656,18 @@ func (cc *ClientConn) scWatcher() {
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
-func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
+//
+// We should never need to replace an addrConn with a new one. This function is only used
+// as newAddrConn to create new addrConn.
+// TODO rename this function and clean up the code.
+func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
ac := &addrConn{
cc: cc,
addr: addr,
dopts: cc.dopts,
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
- ac.stateCV = sync.NewCond(&ac.mu)
+ ac.csEvltr = cc.csEvltr
if EnableTracing {
ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
}
@@ -530,10 +696,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
cc.mu.Unlock()
if stale != nil {
// There is an addrConn alive on ac.addr already. This could be due to
- // 1) a buggy Balancer notifies duplicated Addresses;
- // 2) goaway was received, a new ac will replace the old ac.
- // The old ac should be deleted from cc.conns, but the
- // underlying transport should drain rather than close.
+ // a buggy Balancer that reports duplicated Addresses.
if tearDownErr == nil {
// tearDownErr is nil if resetAddrConn is called by
// 1) Dial
@@ -544,8 +707,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
stale.tearDown(tearDownErr)
}
}
- // skipWait may overwrite the decision in ac.dopts.block.
- if ac.dopts.block && !skipWait {
+ if block {
if err := ac.resetTransport(false); err != nil {
if err != errConnClosing {
// Tear down ac and delete it from cc.conns.
@@ -565,7 +727,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
// Start a goroutine connecting to the server asynchronously.
go func() {
if err := ac.resetTransport(false); err != nil {
- grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
+ grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
@@ -578,12 +740,23 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
return nil
}
-// TODO: Avoid the locking here.
-func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) {
+// GetMethodConfig gets the method config of the input method.
+// If there's an exact match for input method (i.e. /service/method), we return
+// the corresponding MethodConfig.
+// If there isn't an exact match for the input method, we look for the default config
+// under the service (i.e /service/). If there is a default MethodConfig for
+// the serivce, we return it.
+// Otherwise, we return an empty MethodConfig.
+func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
+ // TODO: Avoid the locking here.
cc.mu.RLock()
defer cc.mu.RUnlock()
- m, ok = cc.sc.Methods[method]
- return
+ m, ok := cc.sc.Methods[method]
+ if !ok {
+ i := strings.LastIndex(method, "/")
+ m, _ = cc.sc.Methods[method[:i+1]]
+ }
+ return m
}
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
@@ -624,6 +797,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
}
if !ok {
if put != nil {
+ updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
put()
}
return nil, nil, errConnClosing
@@ -631,6 +805,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
if err != nil {
if put != nil {
+ updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
put()
}
return nil, nil, err
@@ -649,6 +824,7 @@ func (cc *ClientConn) Close() error {
}
conns := cc.conns
cc.conns = nil
+ cc.csMgr.updateState(connectivity.Shutdown)
cc.mu.Unlock()
if cc.dopts.balancer != nil {
cc.dopts.balancer.Close()
@@ -669,10 +845,11 @@ type addrConn struct {
dopts dialOptions
events trace.EventLog
- mu sync.Mutex
- state ConnectivityState
- stateCV *sync.Cond
- down func(error) // the handler called when a connection is down.
+ csEvltr *connectivityStateEvaluator
+
+ mu sync.Mutex
+ state connectivity.State
+ down func(error) // the handler called when a connection is down.
// ready is closed and becomes nil when a new transport is up or failed
// due to timeout.
ready chan struct{}
@@ -682,6 +859,20 @@ type addrConn struct {
tearDownErr error
}
+// adjustParams updates parameters used to create transports upon
+// receiving a GoAway.
+func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
+ switch r {
+ case transport.TooManyPings:
+ v := 2 * ac.dopts.copts.KeepaliveParams.Time
+ ac.cc.mu.Lock()
+ if v > ac.cc.mkp.Time {
+ ac.cc.mkp.Time = v
+ }
+ ac.cc.mu.Unlock()
+ }
+}
+
// printf records an event in ac's event log, unless ac has been closed.
// REQUIRES ac.mu is held.
func (ac *addrConn) printf(format string, a ...interface{}) {
@@ -698,62 +889,41 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {
}
}
-// getState returns the connectivity state of the Conn
-func (ac *addrConn) getState() ConnectivityState {
- ac.mu.Lock()
- defer ac.mu.Unlock()
- return ac.state
-}
-
-// waitForStateChange blocks until the state changes to something other than the sourceState.
-func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
+// resetTransport recreates a transport to the address for ac.
+// For the old transport:
+// - if drain is true, it will be gracefully closed.
+// - otherwise, it will be closed.
+func (ac *addrConn) resetTransport(drain bool) error {
ac.mu.Lock()
- defer ac.mu.Unlock()
- if sourceState != ac.state {
- return ac.state, nil
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return errConnClosing
}
- done := make(chan struct{})
- var err error
- go func() {
- select {
- case <-ctx.Done():
- ac.mu.Lock()
- err = ctx.Err()
- ac.stateCV.Broadcast()
- ac.mu.Unlock()
- case <-done:
- }
- }()
- defer close(done)
- for sourceState == ac.state {
- ac.stateCV.Wait()
- if err != nil {
- return ac.state, err
- }
+ ac.printf("connecting")
+ if ac.down != nil {
+ ac.down(downErrorf(false, true, "%v", errNetworkIO))
+ ac.down = nil
}
- return ac.state, nil
-}
-
-func (ac *addrConn) resetTransport(closeTransport bool) error {
+ oldState := ac.state
+ ac.state = connectivity.Connecting
+ ac.csEvltr.recordTransition(oldState, ac.state)
+ t := ac.transport
+ ac.transport = nil
+ ac.mu.Unlock()
+ if t != nil && !drain {
+ t.Close()
+ }
+ ac.cc.mu.RLock()
+ ac.dopts.copts.KeepaliveParams = ac.cc.mkp
+ ac.cc.mu.RUnlock()
for retries := 0; ; retries++ {
ac.mu.Lock()
- ac.printf("connecting")
- if ac.state == Shutdown {
+ if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
- if ac.down != nil {
- ac.down(downErrorf(false, true, "%v", errNetworkIO))
- ac.down = nil
- }
- ac.state = Connecting
- ac.stateCV.Broadcast()
- t := ac.transport
ac.mu.Unlock()
- if closeTransport && t != nil {
- t.Close()
- }
sleepTime := ac.dopts.bs.backoff(retries)
timeout := minConnectTimeout
if timeout < sleepTime {
@@ -766,45 +936,51 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
Metadata: ac.addr.Metadata,
}
newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
+ // Don't call cancel in success path due to a race in Go 1.6:
+ // https://github.com/golang/go/issues/15078.
if err != nil {
cancel()
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
return err
}
- grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
+ grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
ac.mu.Lock()
- if ac.state == Shutdown {
+ if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
ac.errorf("transient failure: %v", err)
- ac.state = TransientFailure
- ac.stateCV.Broadcast()
+ oldState = ac.state
+ ac.state = connectivity.TransientFailure
+ ac.csEvltr.recordTransition(oldState, ac.state)
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
ac.mu.Unlock()
- closeTransport = false
+ timer := time.NewTimer(sleepTime - time.Since(connectTime))
select {
- case <-time.After(sleepTime - time.Since(connectTime)):
+ case <-timer.C:
case <-ac.ctx.Done():
+ timer.Stop()
return ac.ctx.Err()
}
+ timer.Stop()
continue
}
ac.mu.Lock()
ac.printf("ready")
- if ac.state == Shutdown {
+ if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
newTransport.Close()
return errConnClosing
}
- ac.state = Ready
- ac.stateCV.Broadcast()
+ oldState = ac.state
+ ac.state = connectivity.Ready
+ ac.csEvltr.recordTransition(oldState, ac.state)
ac.transport = newTransport
if ac.ready != nil {
close(ac.ready)
@@ -836,43 +1012,59 @@ func (ac *addrConn) transportMonitor() {
}
return
case <-t.GoAway():
- // If GoAway happens without any network I/O error, ac is closed without shutting down the
- // underlying transport (the transport will be closed when all the pending RPCs finished or
- // failed.).
- // If GoAway and some network I/O error happen concurrently, ac and its underlying transport
- // are closed.
- // In both cases, a new ac is created.
+ ac.adjustParams(t.GetGoAwayReason())
+ // If GoAway happens without any network I/O error, the underlying transport
+ // will be gracefully closed, and a new transport will be created.
+ // (The transport will be closed when all the pending RPCs finished or failed.)
+ // If GoAway and some network I/O error happen concurrently, the underlying transport
+ // will be closed, and a new transport will be created.
+ var drain bool
select {
case <-t.Error():
- ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
default:
- ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
+ drain = true
+ }
+ if err := ac.resetTransport(drain); err != nil {
+ grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
+ if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
+ ac.tearDown(err)
+ }
+ return
}
- return
case <-t.Error():
select {
case <-ac.ctx.Done():
t.Close()
return
case <-t.GoAway():
- ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
- return
+ ac.adjustParams(t.GetGoAwayReason())
+ if err := ac.resetTransport(false); err != nil {
+ grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
+ if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
+ ac.tearDown(err)
+ }
+ return
+ }
default:
}
ac.mu.Lock()
- if ac.state == Shutdown {
+ if ac.state == connectivity.Shutdown {
// ac has been shutdown.
ac.mu.Unlock()
return
}
- ac.state = TransientFailure
- ac.stateCV.Broadcast()
+ oldState := ac.state
+ ac.state = connectivity.TransientFailure
+ ac.csEvltr.recordTransition(oldState, ac.state)
ac.mu.Unlock()
- if err := ac.resetTransport(true); err != nil {
+ if err := ac.resetTransport(false); err != nil {
+ grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
- grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
+ grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
@@ -884,12 +1076,12 @@ func (ac *addrConn) transportMonitor() {
}
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
-// iv) transport is in TransientFailure and there is a balancer/failfast is true.
+// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()
switch {
- case ac.state == Shutdown:
+ case ac.state == connectivity.Shutdown:
if failfast || !hasBalancer {
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
err := ac.tearDownErr
@@ -898,11 +1090,11 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans
}
ac.mu.Unlock()
return nil, errConnClosing
- case ac.state == Ready:
+ case ac.state == connectivity.Ready:
ct := ac.transport
ac.mu.Unlock()
return ct, nil
- case ac.state == TransientFailure:
+ case ac.state == connectivity.TransientFailure:
if failfast || hasBalancer {
ac.mu.Unlock()
return nil, errConnUnavailable
@@ -944,12 +1136,13 @@ func (ac *addrConn) tearDown(err error) {
// address removal and GoAway.
ac.transport.GracefulClose()
}
- if ac.state == Shutdown {
+ if ac.state == connectivity.Shutdown {
return
}
- ac.state = Shutdown
+ oldState := ac.state
+ ac.state = connectivity.Shutdown
ac.tearDownErr = err
- ac.stateCV.Broadcast()
+ ac.csEvltr.recordTransition(oldState, ac.state)
if ac.events != nil {
ac.events.Finish()
ac.events = nil