summaryrefslogtreecommitdiff
path: root/vendor/github.com/hashicorp/go-plugin/rpc_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hashicorp/go-plugin/rpc_server.go')
-rw-r--r--vendor/github.com/hashicorp/go-plugin/rpc_server.go185
1 files changed, 185 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/go-plugin/rpc_server.go b/vendor/github.com/hashicorp/go-plugin/rpc_server.go
new file mode 100644
index 00000000..3984dc89
--- /dev/null
+++ b/vendor/github.com/hashicorp/go-plugin/rpc_server.go
@@ -0,0 +1,185 @@
+package plugin
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "net"
+ "net/rpc"
+ "sync"
+
+ "github.com/hashicorp/yamux"
+)
+
+// RPCServer listens for network connections and then dispenses interface
+// implementations over net/rpc.
+//
+// After setting the fields below, they shouldn't be read again directly
+// from the structure which may be reading/writing them concurrently.
+type RPCServer struct {
+ Plugins map[string]Plugin
+
+ // Stdout, Stderr are what this server will use instead of the
+ // normal stdin/out/err. This is because due to the multi-process nature
+ // of our plugin system, we can't use the normal process values so we
+ // make our own custom one we pipe across.
+ Stdout io.Reader
+ Stderr io.Reader
+
+ // DoneCh should be set to a non-nil channel that will be closed
+ // when the control requests the RPC server to end.
+ DoneCh chan<- struct{}
+
+ lock sync.Mutex
+}
+
+// Accept accepts connections on a listener and serves requests for
+// each incoming connection. Accept blocks; the caller typically invokes
+// it in a go statement.
+func (s *RPCServer) Accept(lis net.Listener) {
+ for {
+ conn, err := lis.Accept()
+ if err != nil {
+ log.Printf("[ERR] plugin: plugin server: %s", err)
+ return
+ }
+
+ go s.ServeConn(conn)
+ }
+}
+
+// ServeConn runs a single connection.
+//
+// ServeConn blocks, serving the connection until the client hangs up.
+func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
+ // First create the yamux server to wrap this connection
+ mux, err := yamux.Server(conn, nil)
+ if err != nil {
+ conn.Close()
+ log.Printf("[ERR] plugin: error creating yamux server: %s", err)
+ return
+ }
+
+ // Accept the control connection
+ control, err := mux.Accept()
+ if err != nil {
+ mux.Close()
+ if err != io.EOF {
+ log.Printf("[ERR] plugin: error accepting control connection: %s", err)
+ }
+
+ return
+ }
+
+ // Connect the stdstreams (in, out, err)
+ stdstream := make([]net.Conn, 2)
+ for i, _ := range stdstream {
+ stdstream[i], err = mux.Accept()
+ if err != nil {
+ mux.Close()
+ log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
+ return
+ }
+ }
+
+ // Copy std streams out to the proper place
+ go copyStream("stdout", stdstream[0], s.Stdout)
+ go copyStream("stderr", stdstream[1], s.Stderr)
+
+ // Create the broker and start it up
+ broker := newMuxBroker(mux)
+ go broker.Run()
+
+ // Use the control connection to build the dispenser and serve the
+ // connection.
+ server := rpc.NewServer()
+ server.RegisterName("Control", &controlServer{
+ server: s,
+ })
+ server.RegisterName("Dispenser", &dispenseServer{
+ broker: broker,
+ plugins: s.Plugins,
+ })
+ server.ServeConn(control)
+}
+
+// done is called internally by the control server to trigger the
+// doneCh to close which is listened to by the main process to cleanly
+// exit.
+func (s *RPCServer) done() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if s.DoneCh != nil {
+ close(s.DoneCh)
+ s.DoneCh = nil
+ }
+}
+
+// dispenseServer dispenses variousinterface implementations for Terraform.
+type controlServer struct {
+ server *RPCServer
+}
+
+func (c *controlServer) Quit(
+ null bool, response *struct{}) error {
+ // End the server
+ c.server.done()
+
+ // Always return true
+ *response = struct{}{}
+
+ return nil
+}
+
+// dispenseServer dispenses variousinterface implementations for Terraform.
+type dispenseServer struct {
+ broker *MuxBroker
+ plugins map[string]Plugin
+}
+
+func (d *dispenseServer) Dispense(
+ name string, response *uint32) error {
+ // Find the function to create this implementation
+ p, ok := d.plugins[name]
+ if !ok {
+ return fmt.Errorf("unknown plugin type: %s", name)
+ }
+
+ // Create the implementation first so we know if there is an error.
+ impl, err := p.Server(d.broker)
+ if err != nil {
+ // We turn the error into an errors error so that it works across RPC
+ return errors.New(err.Error())
+ }
+
+ // Reserve an ID for our implementation
+ id := d.broker.NextId()
+ *response = id
+
+ // Run the rest in a goroutine since it can only happen once this RPC
+ // call returns. We wait for a connection for the plugin implementation
+ // and serve it.
+ go func() {
+ conn, err := d.broker.Accept(id)
+ if err != nil {
+ log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
+ return
+ }
+
+ serve(conn, "Plugin", impl)
+ }()
+
+ return nil
+}
+
+func serve(conn io.ReadWriteCloser, name string, v interface{}) {
+ server := rpc.NewServer()
+ if err := server.RegisterName(name, v); err != nil {
+ log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
+ return
+ }
+
+ server.ServeConn(conn)
+}