summaryrefslogtreecommitdiff
path: root/vendor/github.com/hashicorp/go-plugin/rpc_server.go
blob: 3984dc891ba6fa5afaf2db1dca4666f7895ad4fc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package plugin

import (
	"errors"
	"fmt"
	"io"
	"log"
	"net"
	"net/rpc"
	"sync"

	"github.com/hashicorp/yamux"
)

// RPCServer listens for network connections and then dispenses interface
// implementations over net/rpc.
//
// After setting the fields below, they shouldn't be read again directly
// from the structure which may be reading/writing them concurrently.
type RPCServer struct {
	Plugins map[string]Plugin

	// Stdout, Stderr are what this server will use instead of the
	// normal stdin/out/err. This is because due to the multi-process nature
	// of our plugin system, we can't use the normal process values so we
	// make our own custom one we pipe across.
	Stdout io.Reader
	Stderr io.Reader

	// DoneCh should be set to a non-nil channel that will be closed
	// when the control requests the RPC server to end.
	DoneCh chan<- struct{}

	lock sync.Mutex
}

// Accept accepts connections on a listener and serves requests for
// each incoming connection. Accept blocks; the caller typically invokes
// it in a go statement.
func (s *RPCServer) Accept(lis net.Listener) {
	for {
		conn, err := lis.Accept()
		if err != nil {
			log.Printf("[ERR] plugin: plugin server: %s", err)
			return
		}

		go s.ServeConn(conn)
	}
}

// ServeConn runs a single connection.
//
// ServeConn blocks, serving the connection until the client hangs up.
func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
	// First create the yamux server to wrap this connection
	mux, err := yamux.Server(conn, nil)
	if err != nil {
		conn.Close()
		log.Printf("[ERR] plugin: error creating yamux server: %s", err)
		return
	}

	// Accept the control connection
	control, err := mux.Accept()
	if err != nil {
		mux.Close()
		if err != io.EOF {
			log.Printf("[ERR] plugin: error accepting control connection: %s", err)
		}

		return
	}

	// Connect the stdstreams (in, out, err)
	stdstream := make([]net.Conn, 2)
	for i, _ := range stdstream {
		stdstream[i], err = mux.Accept()
		if err != nil {
			mux.Close()
			log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
			return
		}
	}

	// Copy std streams out to the proper place
	go copyStream("stdout", stdstream[0], s.Stdout)
	go copyStream("stderr", stdstream[1], s.Stderr)

	// Create the broker and start it up
	broker := newMuxBroker(mux)
	go broker.Run()

	// Use the control connection to build the dispenser and serve the
	// connection.
	server := rpc.NewServer()
	server.RegisterName("Control", &controlServer{
		server: s,
	})
	server.RegisterName("Dispenser", &dispenseServer{
		broker:  broker,
		plugins: s.Plugins,
	})
	server.ServeConn(control)
}

// done is called internally by the control server to trigger the
// doneCh to close which is listened to by the main process to cleanly
// exit.
func (s *RPCServer) done() {
	s.lock.Lock()
	defer s.lock.Unlock()

	if s.DoneCh != nil {
		close(s.DoneCh)
		s.DoneCh = nil
	}
}

// dispenseServer dispenses variousinterface implementations for Terraform.
type controlServer struct {
	server *RPCServer
}

func (c *controlServer) Quit(
	null bool, response *struct{}) error {
	// End the server
	c.server.done()

	// Always return true
	*response = struct{}{}

	return nil
}

// dispenseServer dispenses variousinterface implementations for Terraform.
type dispenseServer struct {
	broker  *MuxBroker
	plugins map[string]Plugin
}

func (d *dispenseServer) Dispense(
	name string, response *uint32) error {
	// Find the function to create this implementation
	p, ok := d.plugins[name]
	if !ok {
		return fmt.Errorf("unknown plugin type: %s", name)
	}

	// Create the implementation first so we know if there is an error.
	impl, err := p.Server(d.broker)
	if err != nil {
		// We turn the error into an errors error so that it works across RPC
		return errors.New(err.Error())
	}

	// Reserve an ID for our implementation
	id := d.broker.NextId()
	*response = id

	// Run the rest in a goroutine since it can only happen once this RPC
	// call returns. We wait for a connection for the plugin implementation
	// and serve it.
	go func() {
		conn, err := d.broker.Accept(id)
		if err != nil {
			log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
			return
		}

		serve(conn, "Plugin", impl)
	}()

	return nil
}

func serve(conn io.ReadWriteCloser, name string, v interface{}) {
	server := rpc.NewServer()
	if err := server.RegisterName(name, v); err != nil {
		log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
		return
	}

	server.ServeConn(conn)
}