summaryrefslogtreecommitdiff
path: root/vendor/github.com/hashicorp/go-plugin/rpc_client.go
blob: 29f9bf063e7bb7c4ff17ad797a206a634bc0f4d3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package plugin

import (
	"fmt"
	"io"
	"net"
	"net/rpc"

	"github.com/hashicorp/yamux"
)

// RPCClient connects to an RPCServer over net/rpc to dispense plugin types.
type RPCClient struct {
	broker  *MuxBroker
	control *rpc.Client
	plugins map[string]Plugin

	// These are the streams used for the various stdout/err overrides
	stdout, stderr net.Conn
}

// NewRPCClient creates a client from an already-open connection-like value.
// Dial is typically used instead.
func NewRPCClient(conn io.ReadWriteCloser, plugins map[string]Plugin) (*RPCClient, error) {
	// Create the yamux client so we can multiplex
	mux, err := yamux.Client(conn, nil)
	if err != nil {
		conn.Close()
		return nil, err
	}

	// Connect to the control stream.
	control, err := mux.Open()
	if err != nil {
		mux.Close()
		return nil, err
	}

	// Connect stdout, stderr streams
	stdstream := make([]net.Conn, 2)
	for i, _ := range stdstream {
		stdstream[i], err = mux.Open()
		if err != nil {
			mux.Close()
			return nil, err
		}
	}

	// Create the broker and start it up
	broker := newMuxBroker(mux)
	go broker.Run()

	// Build the client using our broker and control channel.
	return &RPCClient{
		broker:  broker,
		control: rpc.NewClient(control),
		plugins: plugins,
		stdout:  stdstream[0],
		stderr:  stdstream[1],
	}, nil
}

// SyncStreams should be called to enable syncing of stdout,
// stderr with the plugin.
//
// This will return immediately and the syncing will continue to happen
// in the background. You do not need to launch this in a goroutine itself.
//
// This should never be called multiple times.
func (c *RPCClient) SyncStreams(stdout io.Writer, stderr io.Writer) error {
	go copyStream("stdout", stdout, c.stdout)
	go copyStream("stderr", stderr, c.stderr)
	return nil
}

// Close closes the connection. The client is no longer usable after this
// is called.
func (c *RPCClient) Close() error {
	// Call the control channel and ask it to gracefully exit. If this
	// errors, then we save it so that we always return an error but we
	// want to try to close the other channels anyways.
	var empty struct{}
	returnErr := c.control.Call("Control.Quit", true, &empty)

	// Close the other streams we have
	if err := c.control.Close(); err != nil {
		return err
	}
	if err := c.stdout.Close(); err != nil {
		return err
	}
	if err := c.stderr.Close(); err != nil {
		return err
	}
	if err := c.broker.Close(); err != nil {
		return err
	}

	// Return back the error we got from Control.Quit. This is very important
	// since we MUST return non-nil error if this fails so that Client.Kill
	// will properly try a process.Kill.
	return returnErr
}

func (c *RPCClient) Dispense(name string) (interface{}, error) {
	p, ok := c.plugins[name]
	if !ok {
		return nil, fmt.Errorf("unknown plugin type: %s", name)
	}

	var id uint32
	if err := c.control.Call(
		"Dispenser.Dispense", name, &id); err != nil {
		return nil, err
	}

	conn, err := c.broker.Dial(id)
	if err != nil {
		return nil, err
	}

	return p.Client(c.broker, rpc.NewClient(conn))
}