Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't return an error from dial if the balancer returns no initial servers #1112

Merged
merged 2 commits into from
Mar 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 33 additions & 35 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ var (
errConnClosing = errors.New("grpc: the connection is closing")
// errConnUnavailable indicates that the connection is unavailable.
errConnUnavailable = errors.New("grpc: the connection is unavailable")
errNoAddr = errors.New("grpc: there is no address available to dial")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
Expand Down Expand Up @@ -339,17 +338,13 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
cc.authority = target[:colonPos]
}
var ok bool
waitC := make(chan error, 1)
go func() {
var addrs []Address
defer close(waitC)
if cc.dopts.balancer == nil && cc.sc.LB != nil {
cc.dopts.balancer = cc.sc.LB
}
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
} else {
if cc.dopts.balancer != nil {
var credsClone credentials.TransportCredentials
if creds != nil {
credsClone = creds.Clone()
Expand All @@ -362,24 +357,22 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return
}
ch := cc.dopts.balancer.Notify()
if ch == nil {
// There is no name resolver installed.
addrs = append(addrs, Address{Addr: target})
} else {
addrs, ok = <-ch
if !ok || len(addrs) == 0 {
waitC <- errNoAddr
return
if ch != nil {
if cc.dopts.block {
doneChan := make(chan struct{})
go cc.lbWatcher(doneChan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In blocking mode, we need a mechanism for lbwatcher to send an error back(if there's one) to DialContext so that the users can be intimated about it.
Also, we need to discuss which error should lbwatcher return since there could be multiple errors while connecting to multiple servers

<-doneChan
} else {
go cc.lbWatcher(nil)
}
}
}
for _, a := range addrs {
if err := cc.resetAddrConn(a, false, nil); err != nil {
waitC <- err
return
}
}
close(waitC)
// No balancer, or no resolver within the balancer. Connect directly.
if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil {
waitC <- err
return
}
}()
select {
case <-ctx.Done():
Expand All @@ -390,15 +383,10 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}

// If balancer is nil or balancer.Notify() is nil, ok will be false here.
// The lbWatcher goroutine will not be created.
if ok {
go cc.lbWatcher()
}

if cc.dopts.scChan != nil {
go cc.scWatcher()
}

return cc, nil
}

Expand Down Expand Up @@ -449,7 +437,10 @@ type ClientConn struct {
conns map[Address]*addrConn
}

func (cc *ClientConn) lbWatcher() {
// lbWatcher watches the Notify channel of the balancer in cc and manages
// connections accordingly. If doneChan is not nil, it is closed after the
// first successfull connection is made.
func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
for addrs := range cc.dopts.balancer.Notify() {
var (
add []Address // Addresses need to setup connections.
Expand All @@ -476,7 +467,15 @@ func (cc *ClientConn) lbWatcher() {
}
cc.mu.Unlock()
for _, a := range add {
cc.resetAddrConn(a, true, nil)
if doneChan != nil {
err := cc.resetAddrConn(a, true, nil)
if err == nil {
close(doneChan)
doneChan = nil
}
} else {
cc.resetAddrConn(a, false, nil)
}
}
for _, c := range del {
c.tearDown(errConnDrain)
Expand Down Expand Up @@ -505,7 +504,7 @@ func (cc *ClientConn) scWatcher() {
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
ac := &addrConn{
cc: cc,
addr: addr,
Expand Down Expand Up @@ -555,8 +554,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
stale.tearDown(tearDownErr)
}
}
// skipWait may overwrite the decision in ac.dopts.block.
if ac.dopts.block && !skipWait {
if block {
if err := ac.resetTransport(false); err != nil {
if err != errConnClosing {
// Tear down ac and delete it from cc.conns.
Expand Down Expand Up @@ -857,9 +855,9 @@ func (ac *addrConn) transportMonitor() {
// In both cases, a new ac is created.
select {
case <-t.Error():
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
default:
ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
ac.cc.resetAddrConn(ac.addr, false, errConnDrain)
}
return
case <-t.Error():
Expand All @@ -868,7 +866,7 @@ func (ac *addrConn) transportMonitor() {
t.Close()
return
case <-t.GoAway():
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
return
default:
}
Expand Down
41 changes: 41 additions & 0 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,44 @@ func TestDialWithBlockErrorOnNonTemporaryErrorDialer(t *testing.T) {
t.Fatalf("Dial(%q) = %v, want %v", "", err, context.DeadlineExceeded)
}
}

// emptyBalancer returns an empty set of servers.
type emptyBalancer struct {
ch chan []Address
}

func newEmptyBalancer() Balancer {
return &emptyBalancer{ch: make(chan []Address, 1)}
}
func (b *emptyBalancer) Start(_ string, _ BalancerConfig) error {
b.ch <- nil
return nil
}
func (b *emptyBalancer) Up(_ Address) func(error) {
return nil
}
func (b *emptyBalancer) Get(_ context.Context, _ BalancerGetOptions) (Address, func(), error) {
return Address{}, nil, nil
}
func (b *emptyBalancer) Notify() <-chan []Address {
return b.ch
}
func (b *emptyBalancer) Close() error {
close(b.ch)
return nil
}

func TestNonblockingDialWithEmptyBalancer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
dialDone := make(chan struct{})
go func() {
conn, err := DialContext(ctx, "Non-Existent.Server:80", WithInsecure(), WithBalancer(newEmptyBalancer()))
if err != nil {
t.Fatalf("unexpected error dialing connection: %v", err)
}
conn.Close()
close(dialDone)
}()
<-dialDone
cancel()
}
56 changes: 27 additions & 29 deletions grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,25 +332,19 @@ func TestDropRequest(t *testing.T) {
if err != nil {
t.Fatalf("Failed to generate the port number %v", err)
}
var bes []*lbpb.Server
be := &lbpb.Server{
IpAddress: []byte(beAddr1[0]),
Port: int32(bePort1),
LoadBalanceToken: lbToken,
DropRequest: true,
}
bes = append(bes, be)
be = &lbpb.Server{
IpAddress: []byte(beAddr2[0]),
Port: int32(bePort2),
LoadBalanceToken: lbToken,
DropRequest: false,
}
bes = append(bes, be)
sl := &lbpb.ServerList{
Servers: bes,
}
sls := []*lbpb.ServerList{sl}
sls := []*lbpb.ServerList{{
Servers: []*lbpb.Server{{
IpAddress: []byte(beAddr1[0]),
Port: int32(bePort1),
LoadBalanceToken: lbToken,
DropRequest: true,
}, {
IpAddress: []byte(beAddr2[0]),
Port: int32(bePort2),
LoadBalanceToken: lbToken,
DropRequest: false,
}},
}}
intervals := []time.Duration{0}
ls := newRemoteBalancer(sls, intervals)
lbpb.RegisterLoadBalancerServer(lb, ls)
Expand All @@ -371,20 +365,24 @@ func TestDropRequest(t *testing.T) {
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
// The 1st fail-fast RPC should fail because the 1st backend has DropRequest set to true.
helloC := hwpb.NewGreeterClient(cc)
if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); grpc.Code(err) != codes.Unavailable {
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, %s", helloC, err, codes.Unavailable)
}
// The 2nd fail-fast RPC should succeed since it chooses the non-drop-request backend according
// to the round robin policy.
if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); err != nil {
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, <nil>", helloC, err)
}
// The 3nd non-fail-fast RPC should succeed.
// The 1st, non-fail-fast RPC should succeed. This ensures both server
// connections are made, because the first one has DropRequest set to true.
if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}, grpc.FailFast(false)); err != nil {
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, <nil>", helloC, err)
}
for i := 0; i < 3; i++ {
// Odd fail-fast RPCs should fail, because the 1st backend has DropRequest
// set to true.
if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); grpc.Code(err) != codes.Unavailable {
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, %s", helloC, err, codes.Unavailable)
}
// Even fail-fast RPCs should succeed since they choose the
// non-drop-request backend according to the round robin policy.
if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); err != nil {
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, <nil>", helloC, err)
}
}
cc.Close()
}

Expand Down