From 23e5a679c52aa17f550e94fcfe164fcdbed1dd7e Mon Sep 17 00:00:00 2001 From: Prashanth Pai Date: Fri, 9 Feb 2018 11:14:48 +0530 Subject: [PATCH] Import sunrpc into pkg This is an as-is import of sunrpc codec without introducing any functional code changes. There are few opportunities for cleanup and optimizations and that'll be done in future. Signed-off-by: Prashanth Pai --- Gopkg.lock | 7 +- Gopkg.toml | 3 - glusterd2/brick/brick_rpc_prog.go | 2 +- glusterd2/daemon/connection.go | 2 +- glusterd2/plugin/common.go | 2 +- glusterd2/pmap/rpc_prog.go | 2 +- glusterd2/servers/sunrpc/callback.go | 2 +- glusterd2/servers/sunrpc/dump_prog.go | 2 +- glusterd2/servers/sunrpc/handshake_prog.go | 2 +- glusterd2/servers/sunrpc/program.go | 3 +- glusterd2/servers/sunrpc/server.go | 2 +- pkg/sunrpc/clientcodec.go | 194 +++++++++++++++++++++ pkg/sunrpc/cmux_matcher.go | 74 ++++++++ pkg/sunrpc/errors.go | 54 ++++++ pkg/sunrpc/message_types.go | 137 +++++++++++++++ pkg/sunrpc/portmapper.go | 179 +++++++++++++++++++ pkg/sunrpc/procedure_registry.go | 133 ++++++++++++++ pkg/sunrpc/program_types.go | 10 ++ pkg/sunrpc/record.go | 131 ++++++++++++++ pkg/sunrpc/servercodec.go | 140 +++++++++++++++ plugins/bitrot/init.go | 2 +- plugins/events/init.go | 2 +- plugins/georeplication/init.go | 2 +- plugins/glustershd/init.go | 2 +- plugins/hello/init.go | 2 +- plugins/quota/init.go | 2 +- scripts/new-gd2-plugin.py | 2 +- 27 files changed, 1070 insertions(+), 25 deletions(-) create mode 100644 pkg/sunrpc/clientcodec.go create mode 100644 pkg/sunrpc/cmux_matcher.go create mode 100644 pkg/sunrpc/errors.go create mode 100644 pkg/sunrpc/message_types.go create mode 100644 pkg/sunrpc/portmapper.go create mode 100644 pkg/sunrpc/procedure_registry.go create mode 100644 pkg/sunrpc/program_types.go create mode 100644 pkg/sunrpc/record.go create mode 100644 pkg/sunrpc/servercodec.go diff --git a/Gopkg.lock b/Gopkg.lock index 1e761b09..e2f7fa3c 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -209,11 +209,6 @@ packages = ["difflib"] revision = "d8ed2627bdf02c080bf22230dbb337003b7aba2d" -[[projects]] - name = "github.com/prashanthpai/sunrpc" - packages = ["."] - revision = "8b1e9b0138a37e93705639161d846baed18290c7" - [[projects]] name = "github.com/prometheus/client_golang" packages = ["prometheus"] @@ -348,6 +343,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "dc95f6a3bf9f2aa98d30b085c01eefb0ad1023a1f3e67d64563d64558d7871f1" + inputs-digest = "0c5e9e4c4e8c562ceb86c47fa341c2d7677db5672a010bdbdbaf6d97b2caf33e" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 5a79cdec..659058b6 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -58,9 +58,6 @@ name = "github.com/pelletier/go-toml" version = "1.0.0" -[[constraint]] - name = "github.com/prashanthpai/sunrpc" - [[constraint]] name = "github.com/rasky/go-xdr" diff --git a/glusterd2/brick/brick_rpc_prog.go b/glusterd2/brick/brick_rpc_prog.go index d884fb17..facf05c8 100644 --- a/glusterd2/brick/brick_rpc_prog.go +++ b/glusterd2/brick/brick_rpc_prog.go @@ -1,7 +1,7 @@ package brick import ( - "github.com/prashanthpai/sunrpc" + "github.com/gluster/glusterd2/pkg/sunrpc" ) const ( diff --git a/glusterd2/daemon/connection.go b/glusterd2/daemon/connection.go index db9acec4..caf8368e 100644 --- a/glusterd2/daemon/connection.go +++ b/glusterd2/daemon/connection.go @@ -6,7 +6,7 @@ import ( "net/rpc" "sync" - "github.com/prashanthpai/sunrpc" + "github.com/gluster/glusterd2/pkg/sunrpc" log "github.com/sirupsen/logrus" ) diff --git a/glusterd2/plugin/common.go b/glusterd2/plugin/common.go index dbcd5a43..48ce1b51 100644 --- a/glusterd2/plugin/common.go +++ b/glusterd2/plugin/common.go @@ -2,7 +2,7 @@ package plugin import ( "github.com/gluster/glusterd2/glusterd2/servers/rest/route" - "github.com/prashanthpai/sunrpc" + "github.com/gluster/glusterd2/pkg/sunrpc" ) // GlusterdPlugin is an interface that every Glusterd plugin will diff --git a/glusterd2/pmap/rpc_prog.go b/glusterd2/pmap/rpc_prog.go index 9e0362df..a856099d 100644 --- a/glusterd2/pmap/rpc_prog.go +++ b/glusterd2/pmap/rpc_prog.go @@ -1,7 +1,7 @@ package pmap import ( - "github.com/prashanthpai/sunrpc" + "github.com/gluster/glusterd2/pkg/sunrpc" ) const ( diff --git a/glusterd2/servers/sunrpc/callback.go b/glusterd2/servers/sunrpc/callback.go index b31ec206..3ee33a45 100644 --- a/glusterd2/servers/sunrpc/callback.go +++ b/glusterd2/servers/sunrpc/callback.go @@ -6,9 +6,9 @@ import ( "sync/atomic" "github.com/gluster/glusterd2/glusterd2/transaction" + "github.com/gluster/glusterd2/pkg/sunrpc" "github.com/gluster/glusterd2/pkg/utils" - "github.com/prashanthpai/sunrpc" "github.com/rasky/go-xdr/xdr2" log "github.com/sirupsen/logrus" ) diff --git a/glusterd2/servers/sunrpc/dump_prog.go b/glusterd2/servers/sunrpc/dump_prog.go index fb803669..689195c7 100644 --- a/glusterd2/servers/sunrpc/dump_prog.go +++ b/glusterd2/servers/sunrpc/dump_prog.go @@ -1,7 +1,7 @@ package sunrpc import ( - "github.com/prashanthpai/sunrpc" + "github.com/gluster/glusterd2/pkg/sunrpc" ) const ( diff --git a/glusterd2/servers/sunrpc/handshake_prog.go b/glusterd2/servers/sunrpc/handshake_prog.go index 2bf95fd6..f2e8325a 100644 --- a/glusterd2/servers/sunrpc/handshake_prog.go +++ b/glusterd2/servers/sunrpc/handshake_prog.go @@ -10,8 +10,8 @@ import ( "github.com/gluster/glusterd2/glusterd2/servers/sunrpc/dict" "github.com/gluster/glusterd2/glusterd2/store" "github.com/gluster/glusterd2/glusterd2/volume" + "github.com/gluster/glusterd2/pkg/sunrpc" - "github.com/prashanthpai/sunrpc" log "github.com/sirupsen/logrus" ) diff --git a/glusterd2/servers/sunrpc/program.go b/glusterd2/servers/sunrpc/program.go index 75bfa5a2..dbd5f6a8 100644 --- a/glusterd2/servers/sunrpc/program.go +++ b/glusterd2/servers/sunrpc/program.go @@ -5,7 +5,8 @@ import ( "reflect" "strings" - "github.com/prashanthpai/sunrpc" + "github.com/gluster/glusterd2/pkg/sunrpc" + log "github.com/sirupsen/logrus" ) diff --git a/glusterd2/servers/sunrpc/server.go b/glusterd2/servers/sunrpc/server.go index 63594b95..4583fb15 100644 --- a/glusterd2/servers/sunrpc/server.go +++ b/glusterd2/servers/sunrpc/server.go @@ -11,8 +11,8 @@ import ( "github.com/gluster/glusterd2/glusterd2/plugin" "github.com/gluster/glusterd2/glusterd2/pmap" + "github.com/gluster/glusterd2/pkg/sunrpc" - "github.com/prashanthpai/sunrpc" log "github.com/sirupsen/logrus" "github.com/soheilhy/cmux" config "github.com/spf13/viper" diff --git a/pkg/sunrpc/clientcodec.go b/pkg/sunrpc/clientcodec.go new file mode 100644 index 00000000..be2971c8 --- /dev/null +++ b/pkg/sunrpc/clientcodec.go @@ -0,0 +1,194 @@ +package sunrpc + +import ( + "bytes" + "io" + "net" + "net/rpc" + "sync" + + "github.com/rasky/go-xdr/xdr2" +) + +type clientCodec struct { + conn io.ReadWriteCloser // network connection + recordReader io.Reader // reader for RPC record + notifyClose chan<- io.ReadWriteCloser + + // Sun RPC responses include Seq (XID) but not ServiceMethod (procedure + // number). Go package net/rpc expects both. So we save ServiceMethod + // when sending the request and look it up when filling rpc.Response + mutex sync.Mutex // protects pending + pending map[uint64]string // maps Seq (XID) to ServiceMethod +} + +// NewClientCodec returns a new rpc.ClientCodec using Sun RPC on conn. +// If a non-nil channel is passed as second argument, the conn is sent on +// that channel when Close() is called on conn. +func NewClientCodec(conn io.ReadWriteCloser, notifyClose chan<- io.ReadWriteCloser) rpc.ClientCodec { + return &clientCodec{ + conn: conn, + notifyClose: notifyClose, + pending: make(map[uint64]string), + } +} + +// NewClient returns a new rpc.Client which internally uses Sun RPC codec +func NewClient(conn io.ReadWriteCloser) *rpc.Client { + return rpc.NewClientWithCodec(NewClientCodec(conn, nil)) +} + +// Dial connects to a Sun-RPC server at the specified network address +func Dial(network, address string) (*rpc.Client, error) { + conn, err := net.Dial(network, address) + if err != nil { + return nil, err + } + return NewClient(conn), err +} + +func (c *clientCodec) WriteRequest(req *rpc.Request, param interface{}) error { + + // rpc.Request.Seq is initialized (from 0) and incremented by net/rpc + // package on each call. This is unit64. But XID as per RFC should + // really be uint32. This increment should be capped till maxOf(uint32) + + procedureID, ok := GetProcedureID(req.ServiceMethod) + if !ok { + return ErrProcUnavail + } + + c.mutex.Lock() + c.pending[req.Seq] = req.ServiceMethod + c.mutex.Unlock() + + // Encapsulate rpc.Request.Seq and rpc.Request.ServiceMethod + call := RPCMsg{ + Xid: uint32(req.Seq), + Type: Call, + CBody: CallBody{ + RPCVersion: RPCProtocolVersion, + Program: procedureID.ProgramNumber, + Version: procedureID.ProgramVersion, + Procedure: procedureID.ProcedureNumber, + }, + } + + payload := new(bytes.Buffer) + + if _, err := xdr.Marshal(payload, &call); err != nil { + return err + } + + if param != nil { + // Marshall actual params/args of the remote procedure + if _, err := xdr.Marshal(payload, ¶m); err != nil { + return err + } + } + + // Write payload to network + _, err := WriteFullRecord(c.conn, payload.Bytes()) + if err != nil { + if err == io.EOF && c.notifyClose != nil { + c.notifyClose <- c.conn + } + return err + } + + return nil +} + +func (c *clientCodec) checkReplyForErr(reply *RPCMsg) error { + + if reply.Type != Reply { + return ErrInvalidRPCMessageType + } + + switch reply.RBody.Stat { + case MsgAccepted: + switch reply.RBody.Areply.Stat { + case Success: + case ProgMismatch: + return ErrProgMismatch{ + reply.RBody.Areply.MismatchInfo.Low, + reply.RBody.Areply.MismatchInfo.High} + case ProgUnavail: + return ErrProgUnavail + case ProcUnavail: + return ErrProcUnavail + case GarbageArgs: + return ErrGarbageArgs + case SystemErr: + return ErrSystemErr + default: + return ErrInvalidMsgAccepted + } + case MsgDenied: + switch reply.RBody.Rreply.Stat { + case RPCMismatch: + return ErrRPCMismatch{ + reply.RBody.Rreply.MismatchInfo.Low, + reply.RBody.Rreply.MismatchInfo.High} + case AuthError: + return ErrAuthError + default: + return ErrInvalidMsgDeniedType + } + default: + return ErrInvalidRPCRepyType + } + + return nil +} + +func (c *clientCodec) ReadResponseHeader(resp *rpc.Response) error { + + // Read entire RPC message from network + record, err := ReadFullRecord(c.conn) + if err != nil { + if err == io.EOF && c.notifyClose != nil { + c.notifyClose <- c.conn + } + return err + } + + c.recordReader = bytes.NewReader(record) + + // Unmarshal record as RPC reply + var reply RPCMsg + if _, err = xdr.Unmarshal(c.recordReader, &reply); err != nil { + return err + } + + // Unpack rpc.Request.Seq and set rpc.Request.ServiceMethod + resp.Seq = uint64(reply.Xid) + c.mutex.Lock() + resp.ServiceMethod = c.pending[resp.Seq] + delete(c.pending, resp.Seq) + c.mutex.Unlock() + + if err := c.checkReplyForErr(&reply); err != nil { + return err + } + + return nil +} + +func (c *clientCodec) ReadResponseBody(result interface{}) error { + + if result == nil { + // read and drain it out ? + return nil + } + + if _, err := xdr.Unmarshal(c.recordReader, &result); err != nil { + return err + } + + return nil +} + +func (c *clientCodec) Close() error { + return c.conn.Close() +} diff --git a/pkg/sunrpc/cmux_matcher.go b/pkg/sunrpc/cmux_matcher.go new file mode 100644 index 00000000..e0cad6ff --- /dev/null +++ b/pkg/sunrpc/cmux_matcher.go @@ -0,0 +1,74 @@ +package sunrpc + +import ( + "bytes" + "encoding/binary" + "io" + "unsafe" + + "github.com/rasky/go-xdr/xdr2" +) + +type rpcHeader struct { + Xid uint32 + MsgType int32 + RPCVersion uint32 + Program uint32 + Version uint32 + Procedure uint32 +} + +var minFragmentSize = unsafe.Sizeof(rpcHeader{}) +var maxRPCReadSize = 4 + minFragmentSize // 28 bytes + +// CmuxMatcher reads 28 bytes of the request to guess if the request is +// a Sun RPC Call. You can also match RPC requests targeted at specific +// program and version by passing variable params (hack for lack of function +// overloading) +func CmuxMatcher(progAndVersion ...uint32) func(io.Reader) bool { + return func(reader io.Reader) bool { + // read from connection + buf := make([]byte, maxRPCReadSize) + bytesRead, err := io.ReadFull(reader, buf) + if err != nil || bytesRead != int(maxRPCReadSize) { + return false + } + bufReader := bytes.NewReader(buf) + + // validate fragment size + var fragmentHeader uint32 + err = binary.Read(bufReader, binary.BigEndian, &fragmentHeader) + if err != nil { + return false + } + fragmentSize := getFragmentSize(fragmentHeader) + if fragmentSize < uint32(minFragmentSize) || fragmentSize > uint32(maxRecordFragmentSize) { + return false + } + + // validate RPC call + var header rpcHeader + bytesRead, err = xdr.Unmarshal(bufReader, &header) + if err != nil || bytesRead != int(minFragmentSize) { + return false + } + if header.MsgType != int32(Call) { + return false + } + if header.RPCVersion != RPCProtocolVersion { + return false + } + if header.Version == uint32(0) { + return false + } + + // match specific program number and version + if len(progAndVersion) == 2 { + if progAndVersion[0] != header.Program || progAndVersion[1] != header.Version { + return false + } + } + + return true + } +} diff --git a/pkg/sunrpc/errors.go b/pkg/sunrpc/errors.go new file mode 100644 index 00000000..a99a59e9 --- /dev/null +++ b/pkg/sunrpc/errors.go @@ -0,0 +1,54 @@ +package sunrpc + +import ( + "errors" + "fmt" +) + +// Internal errors +var ( + ErrInvalidFragmentSize = errors.New("The RPC fragment size is invalid") + ErrRPCMessageSizeExceeded = errors.New("The RPC message size is too big") +) + +// RPC errors + +// ErrRPCMismatch contains the lowest and highest version of RPC protocol +// supported by the remote server +type ErrRPCMismatch struct { + Low uint32 + High uint32 +} + +func (e ErrRPCMismatch) Error() string { + return fmt.Sprintf("RPC version not supported by server. Lowest and highest supported versions are %d and %d respectively", e.Low, e.High) +} + +// ErrProgMismatch contains the lowest and highest version of program version +// supported by the remote program +type ErrProgMismatch struct { + Low uint32 + High uint32 +} + +func (e ErrProgMismatch) Error() string { + return fmt.Sprintf("Program version not supported. Lowest and highest supported versions are %d and %d respectively", e.Low, e.High) +} + +// Given that the remote server accepted the RPC call, following errors +// represent error status of an attempt to call remote procedure +var ( + ErrProgUnavail = errors.New("Remote server has not exported program") + ErrProcUnavail = errors.New("Remote server has no such procedure") + ErrGarbageArgs = errors.New("Remote procedure cannot decode params") + ErrSystemErr = errors.New("System error on remote server") +) + +// These errors represent invalid replies from server and auth rejection. +var ( + ErrInvalidRPCMessageType = errors.New("Invalid RPC message type received") + ErrInvalidRPCRepyType = errors.New("Invalid RPC reply received. Reply type should be MsgAccepted or MsgDenied") + ErrInvalidMsgDeniedType = errors.New("Invalid MsgDenied reply. Possible values are RPCMismatch and AuthError") + ErrInvalidMsgAccepted = errors.New("Invalid MsgAccepted reply received") + ErrAuthError = errors.New("Remote server rejected identity of the caller") +) diff --git a/pkg/sunrpc/message_types.go b/pkg/sunrpc/message_types.go new file mode 100644 index 00000000..cd989a1e --- /dev/null +++ b/pkg/sunrpc/message_types.go @@ -0,0 +1,137 @@ +package sunrpc + +// RPCProtocolVersion is the version of RPC protocol as described in RFC 5531 +const RPCProtocolVersion = 2 + +// As per XDR (RFC 4506): +// Enumerations have the same representation as 32 bit signed integers. + +// MsgType is an enumeration representing the type of RPC message +type MsgType int32 + +// A RPC message can be of two types: call or reply +const ( + Call MsgType = 0 + Reply MsgType = 1 +) + +// ReplyStat is an enumeration representing the type of reply +type ReplyStat int32 + +// A reply to a call message can take two forms: the message was either +// accepted or rejected +const ( + MsgAccepted ReplyStat = 0 + MsgDenied ReplyStat = 1 +) + +// AcceptStat is an enumeration representing the status of procedure called +type AcceptStat int32 + +// Given that a call message was accepted, the following is the status of an +// attempt to call a remote procedure +const ( + Success AcceptStat = iota // RPC executed successfully + ProgUnavail // Remote hasn't exported the program + ProgMismatch // Remote can't support version number + ProcUnavail // Program can't support procedure + GarbageArgs // Procedure can't decode params + SystemErr // Other errors +) + +// RejectStat is an enumeration representing the reason for rejection +type RejectStat int32 + +// Why call was rejected +const ( + RPCMismatch RejectStat = 0 // RPC version number != 2 + AuthError RejectStat = 1 // Remote can't authenticate caller +) + +// AuthStat represents the reason for authentication failure +type AuthStat int32 + +// Why authentication failed +const ( + AuthOk AuthStat = iota // Success + AuthBadcred // Bad credential (seal broken) + AuthRejectedcred // Client must begin new session + AuthBadverf // Bad verifier (seal broken) + AuthRejectedVerf // Verifier expired or replayed + AuthTooweak // Rejected for security reasons + AuthInvalidresp // Bogus response verifier + AuthFailed // Reason unknown +) + +// AuthFlavor represents the type of authentication used +type AuthFlavor int32 + +// Sun-assigned authentication flavor numbers +const ( + AuthNone AuthFlavor = iota // No authentication + AuthSys // Unix style (uid+gids) + AuthShort // Short hand unix style + AuthDh // DES style (encrypted timestamp) + AuthKerb // Keberos Auth + AuthRSA // RSA authentication + RPCsecGss // GSS-based RPC security +) + +// OpaqueAuth is a structure with AuthFlavor enumeration followed by up to +// 400 bytes that are opaque to (uninterpreted by) the RPC protocol +// implementation. +type OpaqueAuth struct { + Flavor AuthFlavor + Body []byte +} + +// CallBody represents the body of a RPC Call +type CallBody struct { + RPCVersion uint32 // must be equal to 2 + Program uint32 // Remote program + Version uint32 // Remote program's version + Procedure uint32 // Procedure number + Cred OpaqueAuth // Authentication credential + Verf OpaqueAuth // Authentication verifier +} + +// MismatchReply is used in ProgMismatch and RPCMismatch cases to denote +// lowest and highest version of RPC version or program version supported +type MismatchReply struct { + Low uint32 + High uint32 +} + +// AcceptedReply contains reply accepted by the RPC server. Note that there +// could be an error even though the call was accepted. +type AcceptedReply struct { + Verf OpaqueAuth + Stat AcceptStat `xdr:"union"` + MismatchInfo MismatchReply `xdr:"unioncase=2"` // ProgMismatch + // procedure-specific results start here +} + +// RejectedReply represents a reply to a call rejected by the RPC server. The +/// call can be ejected for two reasons: either the server is not running a +// compatible version of the RPC protocol (RPCMismatch) or the server rejects +// the identity of the caller (AuthError) +type RejectedReply struct { + Stat RejectStat `xdr:"union"` + MismatchInfo MismatchReply `xdr:"unioncase=0"` // RPCMismatch + AuthStat AuthStat `xdr:"unioncase=1"` // AuthError +} + +// ReplyBody represents a generic RPC reply to a `Call` +type ReplyBody struct { + Stat ReplyStat `xdr:"union"` + Areply AcceptedReply `xdr:"unioncase=0"` + Rreply RejectedReply `xdr:"unioncase=1"` +} + +// RPCMsg represents a complete RPC message (call or reply) +type RPCMsg struct { + Xid uint32 + Type MsgType `xdr:"union"` + CBody CallBody `xdr:"unioncase=0"` + RBody ReplyBody `xdr:"unioncase=1"` +} diff --git a/pkg/sunrpc/portmapper.go b/pkg/sunrpc/portmapper.go new file mode 100644 index 00000000..9f118a6c --- /dev/null +++ b/pkg/sunrpc/portmapper.go @@ -0,0 +1,179 @@ +package sunrpc + +import ( + "errors" + "net" + "net/rpc" + "strconv" + "sync" +) + +const ( + pmapPort = 111 + portmapperProgramNumber = 100000 + portmapperProgramVersion = 2 +) + +// Protocol is a type representing the protocol (TCP or UDP) over which the +// program/server being registered listens on. +type Protocol uint32 + +const ( + // IPProtoTCP is the protocol number for TCP/IP + IPProtoTCP Protocol = 6 + // IPProtoUDP is the protocol number for UDP/IP + IPProtoUDP Protocol = 17 +) + +var defaultAddress = "127.0.0.1:" + strconv.Itoa(pmapPort) + +// PortMapping is a mapping between (program, version, protocol) to port number +type PortMapping struct { + Program uint32 + Version uint32 + Protocol uint32 + Port uint32 +} + +var registryInit sync.Once + +func initRegistry() { + + procedureID := ProcedureID{ + ProgramNumber: portmapperProgramNumber, + ProgramVersion: portmapperProgramVersion, + } + + // This is ordered as per procedure number + remoteProcedures := [6]string{ + "Pmap.ProcNull", "Pmap.ProcSet", "Pmap.ProcUnset", + "Pmap.ProcGetPort", "Pmap.ProcDump", "Pmap.ProcCallIt"} + + for id, procName := range remoteProcedures { + procedureID.ProcedureNumber = uint32(id) + _ = RegisterProcedure(Procedure{procedureID, procName}, true) + } +} + +func initPmapClient(host string) *rpc.Client { + if host == "" { + host = defaultAddress + } + + registryInit.Do(initRegistry) + + conn, err := net.Dial("tcp", host) + if err != nil { + return nil + } + + return rpc.NewClientWithCodec(NewClientCodec(conn, nil)) +} + +// PmapSet creates port mapping of the program specified. It return true on +// success and false otherwise. +func PmapSet(programNumber, programVersion uint32, protocol Protocol, port uint32) (bool, error) { + + var result bool + + client := initPmapClient("") + if client == nil { + return result, errors.New("Could not create pmap client") + } + defer client.Close() + + mapping := &PortMapping{ + Program: programNumber, + Version: programVersion, + Protocol: uint32(protocol), + Port: port, + } + + err := client.Call("Pmap.ProcSet", mapping, &result) + return result, err +} + +// PmapUnset will unregister the program specified. It returns true on success +// and false otherwise. +func PmapUnset(programNumber, programVersion uint32) (bool, error) { + + var result bool + + client := initPmapClient("") + if client == nil { + return result, errors.New("Could not create pmap client") + } + defer client.Close() + + mapping := &PortMapping{ + Program: programNumber, + Version: programVersion, + } + + err := client.Call("Pmap.ProcUnset", mapping, &result) + return result, err +} + +// PmapGetPort returns the port number on which the program specified is +// awaiting call requests. If host is empty string, localhost is used. +func PmapGetPort(host string, programNumber, programVersion uint32, protocol Protocol) (uint32, error) { + + var port uint32 + + client := initPmapClient(host) + if client == nil { + return port, errors.New("Could not create pmap client") + } + defer client.Close() + + mapping := &PortMapping{ + Program: programNumber, + Version: programVersion, + Protocol: uint32(protocol), + } + + err := client.Call("Pmap.ProcGetPort", mapping, &port) + return port, err +} + +type portMappingList struct { + Map PortMapping + Next *portMappingList `xdr:"optional"` +} + +type getMapsReply struct { + Next *portMappingList `xdr:"optional"` +} + +// PmapGetMaps returns a list of PortMapping entries present in portmapper's +// database. If host is empty string, localhost is used. +func PmapGetMaps(host string) ([]PortMapping, error) { + + var mappings []PortMapping + var result getMapsReply + + client := initPmapClient(host) + if client == nil { + return nil, errors.New("Could not create pmap client") + } + defer client.Close() + + err := client.Call("Pmap.ProcDump", nil, &result) + if err != nil { + return nil, err + } + + if result.Next != nil { + trav := result.Next + for { + entry := PortMapping(trav.Map) + mappings = append(mappings, entry) + trav = trav.Next + if trav == nil { + break + } + } + } + + return mappings, nil +} diff --git a/pkg/sunrpc/procedure_registry.go b/pkg/sunrpc/procedure_registry.go new file mode 100644 index 00000000..1704c59f --- /dev/null +++ b/pkg/sunrpc/procedure_registry.go @@ -0,0 +1,133 @@ +package sunrpc + +import ( + "errors" + "fmt" + "strings" + "sync" + "unicode" + "unicode/utf8" +) + +/* +From RFC 5531: + The RPC call message has three unsigned-integer fields -- remote + program number, remote program version number, and remote procedure + number -- that uniquely identify the procedure to be called. +*/ + +// ProcedureID uniquely identifies a remote procedure +type ProcedureID struct { + ProgramNumber uint32 + ProgramVersion uint32 + ProcedureNumber uint32 +} + +// Procedure represents a ProcedureID and name pair. +type Procedure struct { + ID ProcedureID + Name string +} + +// pMap is looked up in ServerCodec to map ProcedureID to method name. +// rMap is looked up in ClientCodec to map method name to ProcedureID. +var procedureRegistry = struct { + sync.RWMutex + pMap map[ProcedureID]string + rMap map[string]ProcedureID +}{ + pMap: make(map[ProcedureID]string), + rMap: make(map[string]ProcedureID), +} + +func isExported(name string) bool { + firstRune, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(firstRune) +} + +func isValidProcedureName(procedureName string) bool { + // procedureName must be of the format 'T.MethodName' to satisfy + // criteria set by 'net/rpc' package for remote functions. + + procedureTypeName := strings.Split(procedureName, ".") + if len(procedureTypeName) != 2 { + return false + } + + for _, name := range procedureTypeName { + if !isExported(name) { + return false + } + } + + return true +} + +// RegisterProcedure will register the procedure in the registry. +func RegisterProcedure(procedure Procedure, validateProcName bool) error { + + if validateProcName && !isValidProcedureName(procedure.Name) { + return errors.New("Invalid procedure name") + } + + procedureRegistry.Lock() + defer procedureRegistry.Unlock() + + procedureRegistry.pMap[procedure.ID] = procedure.Name + procedureRegistry.rMap[procedure.Name] = procedure.ID + return nil +} + +// GetProcedureName will return a string containing procedure name and a bool +// value which is set to true only if the procedure is found in registry. +func GetProcedureName(procedureID ProcedureID) (string, bool) { + procedureRegistry.RLock() + defer procedureRegistry.RUnlock() + + procedureName, ok := procedureRegistry.pMap[procedureID] + return procedureName, ok +} + +// GetProcedureID will return ProcedureID given the procedure name. It also +// returns a bool which is set to true only if the procedure is found in +// the registry. +func GetProcedureID(procedureName string) (ProcedureID, bool) { + procedureRegistry.RLock() + defer procedureRegistry.RUnlock() + + procedureID, ok := procedureRegistry.rMap[procedureName] + return procedureID, ok +} + +// RemoveProcedure takes a string or ProcedureID struct as argument and deletes +// the corresponding procedure from procedure registry. +func RemoveProcedure(procedure interface{}) { + procedureRegistry.Lock() + defer procedureRegistry.Unlock() + + switch p := procedure.(type) { + case string: + procedureID, ok := procedureRegistry.rMap[p] + if ok { + delete(procedureRegistry.pMap, procedureID) + delete(procedureRegistry.rMap, p) + } + case ProcedureID: + procedureName, ok := procedureRegistry.pMap[p] + if ok { + delete(procedureRegistry.pMap, p) + delete(procedureRegistry.rMap, procedureName) + } + } +} + +// DumpProcedureRegistry will print the entire procedure map. +// Use this for logging/debugging. +func DumpProcedureRegistry() { + procedureRegistry.RLock() + defer procedureRegistry.RUnlock() + + for key, value := range procedureRegistry.rMap { + fmt.Printf("%s : %+v\n", key, value) + } +} diff --git a/pkg/sunrpc/program_types.go b/pkg/sunrpc/program_types.go new file mode 100644 index 00000000..86aee9d1 --- /dev/null +++ b/pkg/sunrpc/program_types.go @@ -0,0 +1,10 @@ +package sunrpc + +// Program is an interface that every RPC program can implement and +// use internally for convenience during procedure registration +type Program interface { + Name() string + Number() uint32 + Version() uint32 + Procedures() []Procedure +} diff --git a/pkg/sunrpc/record.go b/pkg/sunrpc/record.go new file mode 100644 index 00000000..fa29104f --- /dev/null +++ b/pkg/sunrpc/record.go @@ -0,0 +1,131 @@ +package sunrpc + +import ( + "bytes" + "encoding/binary" + "io" +) + +/* +From RFC 5531 (https://tools.ietf.org/html/rfc5531) + + A record is composed of one or more record fragments. A record + fragment is a four-byte header followed by 0 to (2**31) - 1 bytes of + fragment data. The bytes encode an unsigned binary number; as with + XDR integers, the byte order is from highest to lowest. The number + encodes two values -- a boolean that indicates whether the fragment + is the last fragment of the record (bit value 1 implies the fragment + is the last fragment) and a 31-bit unsigned binary value that is the + length in bytes of the fragment's data. The boolean value is the + highest-order bit of the header; the length is the 31 low-order bits. +*/ + +const ( + // This is maximum size in bytes for an individual record fragment. + // The entire RPC message (record) has no size restriction imposed + // by RFC 5531. Refer: include/linux/sunrpc/msg_prot.h + maxRecordFragmentSize = (1 << 31) - 1 + + // Max size of RPC message that a client is allowed to send. + maxRecordSize = 1 * 1024 * 1024 +) + +func isLastFragment(fragmentHeader uint32) bool { + return (fragmentHeader >> 31) == 1 +} + +func getFragmentSize(fragmentHeader uint32) uint32 { + return fragmentHeader &^ (1 << 31) +} + +func createFragmentHeader(size uint32, lastFragment bool) uint32 { + + fragmentHeader := size &^ (1 << 31) + + if lastFragment { + fragmentHeader |= (1 << 31) + } + + return fragmentHeader +} + +func minOf(a, b int64) int64 { + if a < b { + return a + } + return b +} + +// WriteFullRecord writes the fully formed RPC message reply to network +// by breaking it into one or more record fragments. +func WriteFullRecord(conn io.Writer, data []byte) (int64, error) { + + dataSize := int64(len(data)) + + var totalBytesWritten int64 + var lastFragment bool + + fragmentHeaderBytes := make([]byte, 4) + for { + remainingBytes := dataSize - totalBytesWritten + if remainingBytes <= maxRecordFragmentSize { + lastFragment = true + } + fragmentSize := uint32(minOf(maxRecordFragmentSize, remainingBytes)) + + // Create fragment header + binary.BigEndian.PutUint32(fragmentHeaderBytes, createFragmentHeader(fragmentSize, lastFragment)) + + // Write fragment header and fragment body to network + bytesWritten, err := conn.Write(append(fragmentHeaderBytes, data[totalBytesWritten:fragmentSize]...)) + if err != nil { + return int64(totalBytesWritten), err + } + totalBytesWritten += int64(bytesWritten) + + if lastFragment { + break + } + } + + return totalBytesWritten, nil +} + +// ReadFullRecord reads the entire RPC message from network and returns a +// a []byte sequence which contains the record. +func ReadFullRecord(conn io.Reader) ([]byte, error) { + + // In almost all cases, RPC message contain only one fragment which + // is not too big in size. But set a cap on buffer size to prevent + // rogue clients from filling up memory. + record := bytes.NewBuffer(make([]byte, 0, maxRecordSize)) + var fragmentHeader uint32 + for { + // Read record fragment header + err := binary.Read(conn, binary.BigEndian, &fragmentHeader) + if err != nil { + return nil, err + } + + fragmentSize := getFragmentSize(fragmentHeader) + if fragmentSize > maxRecordFragmentSize { + return nil, ErrInvalidFragmentSize + } + + if int(fragmentSize) > (record.Cap() - record.Len()) { + return nil, ErrRPCMessageSizeExceeded + } + + // Copy fragment body (data) from network to buffer + bytesCopied, err := io.CopyN(record, conn, int64(fragmentSize)) + if err != nil || (bytesCopied != int64(fragmentSize)) { + return nil, err + } + + if isLastFragment(fragmentHeader) { + break + } + } + + return record.Bytes(), nil +} diff --git a/pkg/sunrpc/servercodec.go b/pkg/sunrpc/servercodec.go new file mode 100644 index 00000000..86125a46 --- /dev/null +++ b/pkg/sunrpc/servercodec.go @@ -0,0 +1,140 @@ +package sunrpc + +import ( + "bytes" + "io" + "log" + "net/rpc" + + "github.com/rasky/go-xdr/xdr2" +) + +type serverCodec struct { + conn io.ReadWriteCloser + closed bool + notifyClose chan<- io.ReadWriteCloser + recordReader io.Reader +} + +// NewServerCodec returns a new rpc.ServerCodec using Sun RPC on conn. +// If a non-nil channel is passed as second argument, the conn is sent on +// that channel when Close() is called on conn. +func NewServerCodec(conn io.ReadWriteCloser, notifyClose chan<- io.ReadWriteCloser) rpc.ServerCodec { + return &serverCodec{conn: conn, notifyClose: notifyClose} +} + +func (c *serverCodec) ReadRequestHeader(req *rpc.Request) error { + // NOTE: + // Errors returned by this function aren't relayed back to the client + // as WriteResponse() isn't called. The net/rpc package will call + // c.Close() when this function returns an error. + + // Read entire RPC message from network + record, err := ReadFullRecord(c.conn) + if err != nil { + if err != io.EOF { + log.Println(err) + } + return err + } + + c.recordReader = bytes.NewReader(record) + + // Unmarshall RPC message + var call RPCMsg + _, err = xdr.Unmarshal(c.recordReader, &call) + if err != nil { + log.Println(err) + return err + } + + if call.Type != Call { + log.Println(ErrInvalidRPCMessageType) + return ErrInvalidRPCMessageType + } + + // Set req.Seq and req.ServiceMethod + req.Seq = uint64(call.Xid) + procedureID := ProcedureID{call.CBody.Program, call.CBody.Version, call.CBody.Procedure} + procedureName, ok := GetProcedureName(procedureID) + if ok { + req.ServiceMethod = procedureName + } else { + // Due to our simpler map implementation, we cannot distinguish + // between ErrProgUnavail and ErrProcUnavail + log.Printf("%s: %+v\n", ErrProcUnavail, procedureID) + return ErrProcUnavail + } + + return nil +} + +func (c *serverCodec) ReadRequestBody(funcArgs interface{}) error { + + if funcArgs == nil { + return nil + } + + if _, err := xdr.Unmarshal(c.recordReader, &funcArgs); err != nil { + c.Close() + return err + } + + return nil +} + +func (c *serverCodec) WriteResponse(resp *rpc.Response, result interface{}) error { + + if resp.Error != "" { + // The remote function returned error (shouldn't really happen) + log.Println(resp.Error) + } + + var buf bytes.Buffer + + reply := RPCMsg{ + Xid: uint32(resp.Seq), + Type: Reply, + RBody: ReplyBody{ + Stat: MsgAccepted, + Areply: AcceptedReply{ + Stat: Success, + }, + }, + } + + if _, err := xdr.Marshal(&buf, reply); err != nil { + c.Close() + return err + } + + // Marshal and fill procedure-specific reply into the buffer + if _, err := xdr.Marshal(&buf, result); err != nil { + c.Close() + return err + } + + // Write buffer contents to network + if _, err := WriteFullRecord(c.conn, buf.Bytes()); err != nil { + c.Close() + return err + } + + return nil +} + +func (c *serverCodec) Close() error { + if c.closed { + return nil + } + + err := c.conn.Close() + if err == nil { + c.closed = true + if c.notifyClose != nil { + c.notifyClose <- c.conn + } + } + + return err +} diff --git a/plugins/bitrot/init.go b/plugins/bitrot/init.go index 93a37e8a..d964b208 100644 --- a/plugins/bitrot/init.go +++ b/plugins/bitrot/init.go @@ -3,7 +3,7 @@ package bitrot import ( "github.com/gluster/glusterd2/glusterd2/servers/rest/route" "github.com/gluster/glusterd2/glusterd2/transaction" - "github.com/prashanthpai/sunrpc" + "github.com/gluster/glusterd2/pkg/sunrpc" ) // Plugin is a structure which implements GlusterdPlugin interface diff --git a/plugins/events/init.go b/plugins/events/init.go index ec8571ae..f16c1424 100644 --- a/plugins/events/init.go +++ b/plugins/events/init.go @@ -2,7 +2,7 @@ package events import ( "github.com/gluster/glusterd2/glusterd2/servers/rest/route" - "github.com/prashanthpai/sunrpc" + "github.com/gluster/glusterd2/pkg/sunrpc" ) // Plugin is a structure which implements GlusterdPlugin interface diff --git a/plugins/georeplication/init.go b/plugins/georeplication/init.go index fbeac919..37d502d4 100644 --- a/plugins/georeplication/init.go +++ b/plugins/georeplication/init.go @@ -3,7 +3,7 @@ package georeplication import ( "github.com/gluster/glusterd2/glusterd2/servers/rest/route" "github.com/gluster/glusterd2/glusterd2/transaction" - "github.com/prashanthpai/sunrpc" + "github.com/gluster/glusterd2/pkg/sunrpc" ) // Plugin is a structure which implements GlusterdPlugin interface diff --git a/plugins/glustershd/init.go b/plugins/glustershd/init.go index 3487c33a..a4b4c60a 100644 --- a/plugins/glustershd/init.go +++ b/plugins/glustershd/init.go @@ -3,7 +3,7 @@ package glustershd import ( "github.com/gluster/glusterd2/glusterd2/servers/rest/route" "github.com/gluster/glusterd2/glusterd2/transaction" - "github.com/prashanthpai/sunrpc" + "github.com/gluster/glusterd2/pkg/sunrpc" ) // Plugin is a structure which implements GlusterdPlugin interface diff --git a/plugins/hello/init.go b/plugins/hello/init.go index 1c34796e..dafb0ab4 100644 --- a/plugins/hello/init.go +++ b/plugins/hello/init.go @@ -2,7 +2,7 @@ package hello import ( "github.com/gluster/glusterd2/glusterd2/servers/rest/route" - "github.com/prashanthpai/sunrpc" + "github.com/gluster/glusterd2/pkg/sunrpc" ) // Plugin is a structure which implements GlusterdPlugin interface diff --git a/plugins/quota/init.go b/plugins/quota/init.go index 41de93bf..52c64a0d 100644 --- a/plugins/quota/init.go +++ b/plugins/quota/init.go @@ -2,7 +2,7 @@ package quota import ( "github.com/gluster/glusterd2/glusterd2/servers/rest/route" - "github.com/prashanthpai/sunrpc" + "github.com/gluster/glusterd2/pkg/sunrpc" ) // Plugin is a structure which implements GlusterdPlugin interface diff --git a/scripts/new-gd2-plugin.py b/scripts/new-gd2-plugin.py index 164c80c2..0bbc2a94 100755 --- a/scripts/new-gd2-plugin.py +++ b/scripts/new-gd2-plugin.py @@ -40,7 +40,7 @@ INIT_GO_TMPL = """package $name import ( "github.com/gluster/glusterd2/glusterd2/servers/rest/route" - "github.com/prashanthpai/sunrpc" + "github.com/gluster/glusterd2/pkg/sunrpc" ) // Plugin is a structure which implements GlusterdPlugin interface