1
0
mirror of https://github.com/gluster/glusterd2.git synced 2026-02-05 12:45:38 +01:00

Import sunrpc into pkg

This is an as-is import of sunrpc codec without introducing any
functional code changes. There are few opportunities for cleanup and
optimizations and that'll be done in future.

Signed-off-by: Prashanth Pai <ppai@redhat.com>
This commit is contained in:
Prashanth Pai
2018-02-09 11:14:48 +05:30
parent 8390f3078d
commit 23e5a679c5
27 changed files with 1070 additions and 25 deletions

7
Gopkg.lock generated
View File

@@ -209,11 +209,6 @@
packages = ["difflib"]
revision = "d8ed2627bdf02c080bf22230dbb337003b7aba2d"
[[projects]]
name = "github.com/prashanthpai/sunrpc"
packages = ["."]
revision = "8b1e9b0138a37e93705639161d846baed18290c7"
[[projects]]
name = "github.com/prometheus/client_golang"
packages = ["prometheus"]
@@ -348,6 +343,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "dc95f6a3bf9f2aa98d30b085c01eefb0ad1023a1f3e67d64563d64558d7871f1"
inputs-digest = "0c5e9e4c4e8c562ceb86c47fa341c2d7677db5672a010bdbdbaf6d97b2caf33e"
solver-name = "gps-cdcl"
solver-version = 1

View File

@@ -58,9 +58,6 @@
name = "github.com/pelletier/go-toml"
version = "1.0.0"
[[constraint]]
name = "github.com/prashanthpai/sunrpc"
[[constraint]]
name = "github.com/rasky/go-xdr"

View File

@@ -1,7 +1,7 @@
package brick
import (
"github.com/prashanthpai/sunrpc"
"github.com/gluster/glusterd2/pkg/sunrpc"
)
const (

View File

@@ -6,7 +6,7 @@ import (
"net/rpc"
"sync"
"github.com/prashanthpai/sunrpc"
"github.com/gluster/glusterd2/pkg/sunrpc"
log "github.com/sirupsen/logrus"
)

View File

@@ -2,7 +2,7 @@ package plugin
import (
"github.com/gluster/glusterd2/glusterd2/servers/rest/route"
"github.com/prashanthpai/sunrpc"
"github.com/gluster/glusterd2/pkg/sunrpc"
)
// GlusterdPlugin is an interface that every Glusterd plugin will

View File

@@ -1,7 +1,7 @@
package pmap
import (
"github.com/prashanthpai/sunrpc"
"github.com/gluster/glusterd2/pkg/sunrpc"
)
const (

View File

@@ -6,9 +6,9 @@ import (
"sync/atomic"
"github.com/gluster/glusterd2/glusterd2/transaction"
"github.com/gluster/glusterd2/pkg/sunrpc"
"github.com/gluster/glusterd2/pkg/utils"
"github.com/prashanthpai/sunrpc"
"github.com/rasky/go-xdr/xdr2"
log "github.com/sirupsen/logrus"
)

View File

@@ -1,7 +1,7 @@
package sunrpc
import (
"github.com/prashanthpai/sunrpc"
"github.com/gluster/glusterd2/pkg/sunrpc"
)
const (

View File

@@ -10,8 +10,8 @@ import (
"github.com/gluster/glusterd2/glusterd2/servers/sunrpc/dict"
"github.com/gluster/glusterd2/glusterd2/store"
"github.com/gluster/glusterd2/glusterd2/volume"
"github.com/gluster/glusterd2/pkg/sunrpc"
"github.com/prashanthpai/sunrpc"
log "github.com/sirupsen/logrus"
)

View File

@@ -5,7 +5,8 @@ import (
"reflect"
"strings"
"github.com/prashanthpai/sunrpc"
"github.com/gluster/glusterd2/pkg/sunrpc"
log "github.com/sirupsen/logrus"
)

View File

@@ -11,8 +11,8 @@ import (
"github.com/gluster/glusterd2/glusterd2/plugin"
"github.com/gluster/glusterd2/glusterd2/pmap"
"github.com/gluster/glusterd2/pkg/sunrpc"
"github.com/prashanthpai/sunrpc"
log "github.com/sirupsen/logrus"
"github.com/soheilhy/cmux"
config "github.com/spf13/viper"

194
pkg/sunrpc/clientcodec.go Normal file
View File

@@ -0,0 +1,194 @@
package sunrpc
import (
"bytes"
"io"
"net"
"net/rpc"
"sync"
"github.com/rasky/go-xdr/xdr2"
)
type clientCodec struct {
conn io.ReadWriteCloser // network connection
recordReader io.Reader // reader for RPC record
notifyClose chan<- io.ReadWriteCloser
// Sun RPC responses include Seq (XID) but not ServiceMethod (procedure
// number). Go package net/rpc expects both. So we save ServiceMethod
// when sending the request and look it up when filling rpc.Response
mutex sync.Mutex // protects pending
pending map[uint64]string // maps Seq (XID) to ServiceMethod
}
// NewClientCodec returns a new rpc.ClientCodec using Sun RPC on conn.
// If a non-nil channel is passed as second argument, the conn is sent on
// that channel when Close() is called on conn.
func NewClientCodec(conn io.ReadWriteCloser, notifyClose chan<- io.ReadWriteCloser) rpc.ClientCodec {
return &clientCodec{
conn: conn,
notifyClose: notifyClose,
pending: make(map[uint64]string),
}
}
// NewClient returns a new rpc.Client which internally uses Sun RPC codec
func NewClient(conn io.ReadWriteCloser) *rpc.Client {
return rpc.NewClientWithCodec(NewClientCodec(conn, nil))
}
// Dial connects to a Sun-RPC server at the specified network address
func Dial(network, address string) (*rpc.Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), err
}
func (c *clientCodec) WriteRequest(req *rpc.Request, param interface{}) error {
// rpc.Request.Seq is initialized (from 0) and incremented by net/rpc
// package on each call. This is unit64. But XID as per RFC should
// really be uint32. This increment should be capped till maxOf(uint32)
procedureID, ok := GetProcedureID(req.ServiceMethod)
if !ok {
return ErrProcUnavail
}
c.mutex.Lock()
c.pending[req.Seq] = req.ServiceMethod
c.mutex.Unlock()
// Encapsulate rpc.Request.Seq and rpc.Request.ServiceMethod
call := RPCMsg{
Xid: uint32(req.Seq),
Type: Call,
CBody: CallBody{
RPCVersion: RPCProtocolVersion,
Program: procedureID.ProgramNumber,
Version: procedureID.ProgramVersion,
Procedure: procedureID.ProcedureNumber,
},
}
payload := new(bytes.Buffer)
if _, err := xdr.Marshal(payload, &call); err != nil {
return err
}
if param != nil {
// Marshall actual params/args of the remote procedure
if _, err := xdr.Marshal(payload, &param); err != nil {
return err
}
}
// Write payload to network
_, err := WriteFullRecord(c.conn, payload.Bytes())
if err != nil {
if err == io.EOF && c.notifyClose != nil {
c.notifyClose <- c.conn
}
return err
}
return nil
}
func (c *clientCodec) checkReplyForErr(reply *RPCMsg) error {
if reply.Type != Reply {
return ErrInvalidRPCMessageType
}
switch reply.RBody.Stat {
case MsgAccepted:
switch reply.RBody.Areply.Stat {
case Success:
case ProgMismatch:
return ErrProgMismatch{
reply.RBody.Areply.MismatchInfo.Low,
reply.RBody.Areply.MismatchInfo.High}
case ProgUnavail:
return ErrProgUnavail
case ProcUnavail:
return ErrProcUnavail
case GarbageArgs:
return ErrGarbageArgs
case SystemErr:
return ErrSystemErr
default:
return ErrInvalidMsgAccepted
}
case MsgDenied:
switch reply.RBody.Rreply.Stat {
case RPCMismatch:
return ErrRPCMismatch{
reply.RBody.Rreply.MismatchInfo.Low,
reply.RBody.Rreply.MismatchInfo.High}
case AuthError:
return ErrAuthError
default:
return ErrInvalidMsgDeniedType
}
default:
return ErrInvalidRPCRepyType
}
return nil
}
func (c *clientCodec) ReadResponseHeader(resp *rpc.Response) error {
// Read entire RPC message from network
record, err := ReadFullRecord(c.conn)
if err != nil {
if err == io.EOF && c.notifyClose != nil {
c.notifyClose <- c.conn
}
return err
}
c.recordReader = bytes.NewReader(record)
// Unmarshal record as RPC reply
var reply RPCMsg
if _, err = xdr.Unmarshal(c.recordReader, &reply); err != nil {
return err
}
// Unpack rpc.Request.Seq and set rpc.Request.ServiceMethod
resp.Seq = uint64(reply.Xid)
c.mutex.Lock()
resp.ServiceMethod = c.pending[resp.Seq]
delete(c.pending, resp.Seq)
c.mutex.Unlock()
if err := c.checkReplyForErr(&reply); err != nil {
return err
}
return nil
}
func (c *clientCodec) ReadResponseBody(result interface{}) error {
if result == nil {
// read and drain it out ?
return nil
}
if _, err := xdr.Unmarshal(c.recordReader, &result); err != nil {
return err
}
return nil
}
func (c *clientCodec) Close() error {
return c.conn.Close()
}

View File

@@ -0,0 +1,74 @@
package sunrpc
import (
"bytes"
"encoding/binary"
"io"
"unsafe"
"github.com/rasky/go-xdr/xdr2"
)
type rpcHeader struct {
Xid uint32
MsgType int32
RPCVersion uint32
Program uint32
Version uint32
Procedure uint32
}
var minFragmentSize = unsafe.Sizeof(rpcHeader{})
var maxRPCReadSize = 4 + minFragmentSize // 28 bytes
// CmuxMatcher reads 28 bytes of the request to guess if the request is
// a Sun RPC Call. You can also match RPC requests targeted at specific
// program and version by passing variable params (hack for lack of function
// overloading)
func CmuxMatcher(progAndVersion ...uint32) func(io.Reader) bool {
return func(reader io.Reader) bool {
// read from connection
buf := make([]byte, maxRPCReadSize)
bytesRead, err := io.ReadFull(reader, buf)
if err != nil || bytesRead != int(maxRPCReadSize) {
return false
}
bufReader := bytes.NewReader(buf)
// validate fragment size
var fragmentHeader uint32
err = binary.Read(bufReader, binary.BigEndian, &fragmentHeader)
if err != nil {
return false
}
fragmentSize := getFragmentSize(fragmentHeader)
if fragmentSize < uint32(minFragmentSize) || fragmentSize > uint32(maxRecordFragmentSize) {
return false
}
// validate RPC call
var header rpcHeader
bytesRead, err = xdr.Unmarshal(bufReader, &header)
if err != nil || bytesRead != int(minFragmentSize) {
return false
}
if header.MsgType != int32(Call) {
return false
}
if header.RPCVersion != RPCProtocolVersion {
return false
}
if header.Version == uint32(0) {
return false
}
// match specific program number and version
if len(progAndVersion) == 2 {
if progAndVersion[0] != header.Program || progAndVersion[1] != header.Version {
return false
}
}
return true
}
}

54
pkg/sunrpc/errors.go Normal file
View File

@@ -0,0 +1,54 @@
package sunrpc
import (
"errors"
"fmt"
)
// Internal errors
var (
ErrInvalidFragmentSize = errors.New("The RPC fragment size is invalid")
ErrRPCMessageSizeExceeded = errors.New("The RPC message size is too big")
)
// RPC errors
// ErrRPCMismatch contains the lowest and highest version of RPC protocol
// supported by the remote server
type ErrRPCMismatch struct {
Low uint32
High uint32
}
func (e ErrRPCMismatch) Error() string {
return fmt.Sprintf("RPC version not supported by server. Lowest and highest supported versions are %d and %d respectively", e.Low, e.High)
}
// ErrProgMismatch contains the lowest and highest version of program version
// supported by the remote program
type ErrProgMismatch struct {
Low uint32
High uint32
}
func (e ErrProgMismatch) Error() string {
return fmt.Sprintf("Program version not supported. Lowest and highest supported versions are %d and %d respectively", e.Low, e.High)
}
// Given that the remote server accepted the RPC call, following errors
// represent error status of an attempt to call remote procedure
var (
ErrProgUnavail = errors.New("Remote server has not exported program")
ErrProcUnavail = errors.New("Remote server has no such procedure")
ErrGarbageArgs = errors.New("Remote procedure cannot decode params")
ErrSystemErr = errors.New("System error on remote server")
)
// These errors represent invalid replies from server and auth rejection.
var (
ErrInvalidRPCMessageType = errors.New("Invalid RPC message type received")
ErrInvalidRPCRepyType = errors.New("Invalid RPC reply received. Reply type should be MsgAccepted or MsgDenied")
ErrInvalidMsgDeniedType = errors.New("Invalid MsgDenied reply. Possible values are RPCMismatch and AuthError")
ErrInvalidMsgAccepted = errors.New("Invalid MsgAccepted reply received")
ErrAuthError = errors.New("Remote server rejected identity of the caller")
)

137
pkg/sunrpc/message_types.go Normal file
View File

@@ -0,0 +1,137 @@
package sunrpc
// RPCProtocolVersion is the version of RPC protocol as described in RFC 5531
const RPCProtocolVersion = 2
// As per XDR (RFC 4506):
// Enumerations have the same representation as 32 bit signed integers.
// MsgType is an enumeration representing the type of RPC message
type MsgType int32
// A RPC message can be of two types: call or reply
const (
Call MsgType = 0
Reply MsgType = 1
)
// ReplyStat is an enumeration representing the type of reply
type ReplyStat int32
// A reply to a call message can take two forms: the message was either
// accepted or rejected
const (
MsgAccepted ReplyStat = 0
MsgDenied ReplyStat = 1
)
// AcceptStat is an enumeration representing the status of procedure called
type AcceptStat int32
// Given that a call message was accepted, the following is the status of an
// attempt to call a remote procedure
const (
Success AcceptStat = iota // RPC executed successfully
ProgUnavail // Remote hasn't exported the program
ProgMismatch // Remote can't support version number
ProcUnavail // Program can't support procedure
GarbageArgs // Procedure can't decode params
SystemErr // Other errors
)
// RejectStat is an enumeration representing the reason for rejection
type RejectStat int32
// Why call was rejected
const (
RPCMismatch RejectStat = 0 // RPC version number != 2
AuthError RejectStat = 1 // Remote can't authenticate caller
)
// AuthStat represents the reason for authentication failure
type AuthStat int32
// Why authentication failed
const (
AuthOk AuthStat = iota // Success
AuthBadcred // Bad credential (seal broken)
AuthRejectedcred // Client must begin new session
AuthBadverf // Bad verifier (seal broken)
AuthRejectedVerf // Verifier expired or replayed
AuthTooweak // Rejected for security reasons
AuthInvalidresp // Bogus response verifier
AuthFailed // Reason unknown
)
// AuthFlavor represents the type of authentication used
type AuthFlavor int32
// Sun-assigned authentication flavor numbers
const (
AuthNone AuthFlavor = iota // No authentication
AuthSys // Unix style (uid+gids)
AuthShort // Short hand unix style
AuthDh // DES style (encrypted timestamp)
AuthKerb // Keberos Auth
AuthRSA // RSA authentication
RPCsecGss // GSS-based RPC security
)
// OpaqueAuth is a structure with AuthFlavor enumeration followed by up to
// 400 bytes that are opaque to (uninterpreted by) the RPC protocol
// implementation.
type OpaqueAuth struct {
Flavor AuthFlavor
Body []byte
}
// CallBody represents the body of a RPC Call
type CallBody struct {
RPCVersion uint32 // must be equal to 2
Program uint32 // Remote program
Version uint32 // Remote program's version
Procedure uint32 // Procedure number
Cred OpaqueAuth // Authentication credential
Verf OpaqueAuth // Authentication verifier
}
// MismatchReply is used in ProgMismatch and RPCMismatch cases to denote
// lowest and highest version of RPC version or program version supported
type MismatchReply struct {
Low uint32
High uint32
}
// AcceptedReply contains reply accepted by the RPC server. Note that there
// could be an error even though the call was accepted.
type AcceptedReply struct {
Verf OpaqueAuth
Stat AcceptStat `xdr:"union"`
MismatchInfo MismatchReply `xdr:"unioncase=2"` // ProgMismatch
// procedure-specific results start here
}
// RejectedReply represents a reply to a call rejected by the RPC server. The
/// call can be ejected for two reasons: either the server is not running a
// compatible version of the RPC protocol (RPCMismatch) or the server rejects
// the identity of the caller (AuthError)
type RejectedReply struct {
Stat RejectStat `xdr:"union"`
MismatchInfo MismatchReply `xdr:"unioncase=0"` // RPCMismatch
AuthStat AuthStat `xdr:"unioncase=1"` // AuthError
}
// ReplyBody represents a generic RPC reply to a `Call`
type ReplyBody struct {
Stat ReplyStat `xdr:"union"`
Areply AcceptedReply `xdr:"unioncase=0"`
Rreply RejectedReply `xdr:"unioncase=1"`
}
// RPCMsg represents a complete RPC message (call or reply)
type RPCMsg struct {
Xid uint32
Type MsgType `xdr:"union"`
CBody CallBody `xdr:"unioncase=0"`
RBody ReplyBody `xdr:"unioncase=1"`
}

179
pkg/sunrpc/portmapper.go Normal file
View File

@@ -0,0 +1,179 @@
package sunrpc
import (
"errors"
"net"
"net/rpc"
"strconv"
"sync"
)
const (
pmapPort = 111
portmapperProgramNumber = 100000
portmapperProgramVersion = 2
)
// Protocol is a type representing the protocol (TCP or UDP) over which the
// program/server being registered listens on.
type Protocol uint32
const (
// IPProtoTCP is the protocol number for TCP/IP
IPProtoTCP Protocol = 6
// IPProtoUDP is the protocol number for UDP/IP
IPProtoUDP Protocol = 17
)
var defaultAddress = "127.0.0.1:" + strconv.Itoa(pmapPort)
// PortMapping is a mapping between (program, version, protocol) to port number
type PortMapping struct {
Program uint32
Version uint32
Protocol uint32
Port uint32
}
var registryInit sync.Once
func initRegistry() {
procedureID := ProcedureID{
ProgramNumber: portmapperProgramNumber,
ProgramVersion: portmapperProgramVersion,
}
// This is ordered as per procedure number
remoteProcedures := [6]string{
"Pmap.ProcNull", "Pmap.ProcSet", "Pmap.ProcUnset",
"Pmap.ProcGetPort", "Pmap.ProcDump", "Pmap.ProcCallIt"}
for id, procName := range remoteProcedures {
procedureID.ProcedureNumber = uint32(id)
_ = RegisterProcedure(Procedure{procedureID, procName}, true)
}
}
func initPmapClient(host string) *rpc.Client {
if host == "" {
host = defaultAddress
}
registryInit.Do(initRegistry)
conn, err := net.Dial("tcp", host)
if err != nil {
return nil
}
return rpc.NewClientWithCodec(NewClientCodec(conn, nil))
}
// PmapSet creates port mapping of the program specified. It return true on
// success and false otherwise.
func PmapSet(programNumber, programVersion uint32, protocol Protocol, port uint32) (bool, error) {
var result bool
client := initPmapClient("")
if client == nil {
return result, errors.New("Could not create pmap client")
}
defer client.Close()
mapping := &PortMapping{
Program: programNumber,
Version: programVersion,
Protocol: uint32(protocol),
Port: port,
}
err := client.Call("Pmap.ProcSet", mapping, &result)
return result, err
}
// PmapUnset will unregister the program specified. It returns true on success
// and false otherwise.
func PmapUnset(programNumber, programVersion uint32) (bool, error) {
var result bool
client := initPmapClient("")
if client == nil {
return result, errors.New("Could not create pmap client")
}
defer client.Close()
mapping := &PortMapping{
Program: programNumber,
Version: programVersion,
}
err := client.Call("Pmap.ProcUnset", mapping, &result)
return result, err
}
// PmapGetPort returns the port number on which the program specified is
// awaiting call requests. If host is empty string, localhost is used.
func PmapGetPort(host string, programNumber, programVersion uint32, protocol Protocol) (uint32, error) {
var port uint32
client := initPmapClient(host)
if client == nil {
return port, errors.New("Could not create pmap client")
}
defer client.Close()
mapping := &PortMapping{
Program: programNumber,
Version: programVersion,
Protocol: uint32(protocol),
}
err := client.Call("Pmap.ProcGetPort", mapping, &port)
return port, err
}
type portMappingList struct {
Map PortMapping
Next *portMappingList `xdr:"optional"`
}
type getMapsReply struct {
Next *portMappingList `xdr:"optional"`
}
// PmapGetMaps returns a list of PortMapping entries present in portmapper's
// database. If host is empty string, localhost is used.
func PmapGetMaps(host string) ([]PortMapping, error) {
var mappings []PortMapping
var result getMapsReply
client := initPmapClient(host)
if client == nil {
return nil, errors.New("Could not create pmap client")
}
defer client.Close()
err := client.Call("Pmap.ProcDump", nil, &result)
if err != nil {
return nil, err
}
if result.Next != nil {
trav := result.Next
for {
entry := PortMapping(trav.Map)
mappings = append(mappings, entry)
trav = trav.Next
if trav == nil {
break
}
}
}
return mappings, nil
}

View File

@@ -0,0 +1,133 @@
package sunrpc
import (
"errors"
"fmt"
"strings"
"sync"
"unicode"
"unicode/utf8"
)
/*
From RFC 5531:
The RPC call message has three unsigned-integer fields -- remote
program number, remote program version number, and remote procedure
number -- that uniquely identify the procedure to be called.
*/
// ProcedureID uniquely identifies a remote procedure
type ProcedureID struct {
ProgramNumber uint32
ProgramVersion uint32
ProcedureNumber uint32
}
// Procedure represents a ProcedureID and name pair.
type Procedure struct {
ID ProcedureID
Name string
}
// pMap is looked up in ServerCodec to map ProcedureID to method name.
// rMap is looked up in ClientCodec to map method name to ProcedureID.
var procedureRegistry = struct {
sync.RWMutex
pMap map[ProcedureID]string
rMap map[string]ProcedureID
}{
pMap: make(map[ProcedureID]string),
rMap: make(map[string]ProcedureID),
}
func isExported(name string) bool {
firstRune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(firstRune)
}
func isValidProcedureName(procedureName string) bool {
// procedureName must be of the format 'T.MethodName' to satisfy
// criteria set by 'net/rpc' package for remote functions.
procedureTypeName := strings.Split(procedureName, ".")
if len(procedureTypeName) != 2 {
return false
}
for _, name := range procedureTypeName {
if !isExported(name) {
return false
}
}
return true
}
// RegisterProcedure will register the procedure in the registry.
func RegisterProcedure(procedure Procedure, validateProcName bool) error {
if validateProcName && !isValidProcedureName(procedure.Name) {
return errors.New("Invalid procedure name")
}
procedureRegistry.Lock()
defer procedureRegistry.Unlock()
procedureRegistry.pMap[procedure.ID] = procedure.Name
procedureRegistry.rMap[procedure.Name] = procedure.ID
return nil
}
// GetProcedureName will return a string containing procedure name and a bool
// value which is set to true only if the procedure is found in registry.
func GetProcedureName(procedureID ProcedureID) (string, bool) {
procedureRegistry.RLock()
defer procedureRegistry.RUnlock()
procedureName, ok := procedureRegistry.pMap[procedureID]
return procedureName, ok
}
// GetProcedureID will return ProcedureID given the procedure name. It also
// returns a bool which is set to true only if the procedure is found in
// the registry.
func GetProcedureID(procedureName string) (ProcedureID, bool) {
procedureRegistry.RLock()
defer procedureRegistry.RUnlock()
procedureID, ok := procedureRegistry.rMap[procedureName]
return procedureID, ok
}
// RemoveProcedure takes a string or ProcedureID struct as argument and deletes
// the corresponding procedure from procedure registry.
func RemoveProcedure(procedure interface{}) {
procedureRegistry.Lock()
defer procedureRegistry.Unlock()
switch p := procedure.(type) {
case string:
procedureID, ok := procedureRegistry.rMap[p]
if ok {
delete(procedureRegistry.pMap, procedureID)
delete(procedureRegistry.rMap, p)
}
case ProcedureID:
procedureName, ok := procedureRegistry.pMap[p]
if ok {
delete(procedureRegistry.pMap, p)
delete(procedureRegistry.rMap, procedureName)
}
}
}
// DumpProcedureRegistry will print the entire procedure map.
// Use this for logging/debugging.
func DumpProcedureRegistry() {
procedureRegistry.RLock()
defer procedureRegistry.RUnlock()
for key, value := range procedureRegistry.rMap {
fmt.Printf("%s : %+v\n", key, value)
}
}

View File

@@ -0,0 +1,10 @@
package sunrpc
// Program is an interface that every RPC program can implement and
// use internally for convenience during procedure registration
type Program interface {
Name() string
Number() uint32
Version() uint32
Procedures() []Procedure
}

131
pkg/sunrpc/record.go Normal file
View File

@@ -0,0 +1,131 @@
package sunrpc
import (
"bytes"
"encoding/binary"
"io"
)
/*
From RFC 5531 (https://tools.ietf.org/html/rfc5531)
A record is composed of one or more record fragments. A record
fragment is a four-byte header followed by 0 to (2**31) - 1 bytes of
fragment data. The bytes encode an unsigned binary number; as with
XDR integers, the byte order is from highest to lowest. The number
encodes two values -- a boolean that indicates whether the fragment
is the last fragment of the record (bit value 1 implies the fragment
is the last fragment) and a 31-bit unsigned binary value that is the
length in bytes of the fragment's data. The boolean value is the
highest-order bit of the header; the length is the 31 low-order bits.
*/
const (
// This is maximum size in bytes for an individual record fragment.
// The entire RPC message (record) has no size restriction imposed
// by RFC 5531. Refer: include/linux/sunrpc/msg_prot.h
maxRecordFragmentSize = (1 << 31) - 1
// Max size of RPC message that a client is allowed to send.
maxRecordSize = 1 * 1024 * 1024
)
func isLastFragment(fragmentHeader uint32) bool {
return (fragmentHeader >> 31) == 1
}
func getFragmentSize(fragmentHeader uint32) uint32 {
return fragmentHeader &^ (1 << 31)
}
func createFragmentHeader(size uint32, lastFragment bool) uint32 {
fragmentHeader := size &^ (1 << 31)
if lastFragment {
fragmentHeader |= (1 << 31)
}
return fragmentHeader
}
func minOf(a, b int64) int64 {
if a < b {
return a
}
return b
}
// WriteFullRecord writes the fully formed RPC message reply to network
// by breaking it into one or more record fragments.
func WriteFullRecord(conn io.Writer, data []byte) (int64, error) {
dataSize := int64(len(data))
var totalBytesWritten int64
var lastFragment bool
fragmentHeaderBytes := make([]byte, 4)
for {
remainingBytes := dataSize - totalBytesWritten
if remainingBytes <= maxRecordFragmentSize {
lastFragment = true
}
fragmentSize := uint32(minOf(maxRecordFragmentSize, remainingBytes))
// Create fragment header
binary.BigEndian.PutUint32(fragmentHeaderBytes, createFragmentHeader(fragmentSize, lastFragment))
// Write fragment header and fragment body to network
bytesWritten, err := conn.Write(append(fragmentHeaderBytes, data[totalBytesWritten:fragmentSize]...))
if err != nil {
return int64(totalBytesWritten), err
}
totalBytesWritten += int64(bytesWritten)
if lastFragment {
break
}
}
return totalBytesWritten, nil
}
// ReadFullRecord reads the entire RPC message from network and returns a
// a []byte sequence which contains the record.
func ReadFullRecord(conn io.Reader) ([]byte, error) {
// In almost all cases, RPC message contain only one fragment which
// is not too big in size. But set a cap on buffer size to prevent
// rogue clients from filling up memory.
record := bytes.NewBuffer(make([]byte, 0, maxRecordSize))
var fragmentHeader uint32
for {
// Read record fragment header
err := binary.Read(conn, binary.BigEndian, &fragmentHeader)
if err != nil {
return nil, err
}
fragmentSize := getFragmentSize(fragmentHeader)
if fragmentSize > maxRecordFragmentSize {
return nil, ErrInvalidFragmentSize
}
if int(fragmentSize) > (record.Cap() - record.Len()) {
return nil, ErrRPCMessageSizeExceeded
}
// Copy fragment body (data) from network to buffer
bytesCopied, err := io.CopyN(record, conn, int64(fragmentSize))
if err != nil || (bytesCopied != int64(fragmentSize)) {
return nil, err
}
if isLastFragment(fragmentHeader) {
break
}
}
return record.Bytes(), nil
}

140
pkg/sunrpc/servercodec.go Normal file
View File

@@ -0,0 +1,140 @@
package sunrpc
import (
"bytes"
"io"
"log"
"net/rpc"
"github.com/rasky/go-xdr/xdr2"
)
type serverCodec struct {
conn io.ReadWriteCloser
closed bool
notifyClose chan<- io.ReadWriteCloser
recordReader io.Reader
}
// NewServerCodec returns a new rpc.ServerCodec using Sun RPC on conn.
// If a non-nil channel is passed as second argument, the conn is sent on
// that channel when Close() is called on conn.
func NewServerCodec(conn io.ReadWriteCloser, notifyClose chan<- io.ReadWriteCloser) rpc.ServerCodec {
return &serverCodec{conn: conn, notifyClose: notifyClose}
}
func (c *serverCodec) ReadRequestHeader(req *rpc.Request) error {
// NOTE:
// Errors returned by this function aren't relayed back to the client
// as WriteResponse() isn't called. The net/rpc package will call
// c.Close() when this function returns an error.
// Read entire RPC message from network
record, err := ReadFullRecord(c.conn)
if err != nil {
if err != io.EOF {
log.Println(err)
}
return err
}
c.recordReader = bytes.NewReader(record)
// Unmarshall RPC message
var call RPCMsg
_, err = xdr.Unmarshal(c.recordReader, &call)
if err != nil {
log.Println(err)
return err
}
if call.Type != Call {
log.Println(ErrInvalidRPCMessageType)
return ErrInvalidRPCMessageType
}
// Set req.Seq and req.ServiceMethod
req.Seq = uint64(call.Xid)
procedureID := ProcedureID{call.CBody.Program, call.CBody.Version, call.CBody.Procedure}
procedureName, ok := GetProcedureName(procedureID)
if ok {
req.ServiceMethod = procedureName
} else {
// Due to our simpler map implementation, we cannot distinguish
// between ErrProgUnavail and ErrProcUnavail
log.Printf("%s: %+v\n", ErrProcUnavail, procedureID)
return ErrProcUnavail
}
return nil
}
func (c *serverCodec) ReadRequestBody(funcArgs interface{}) error {
if funcArgs == nil {
return nil
}
if _, err := xdr.Unmarshal(c.recordReader, &funcArgs); err != nil {
c.Close()
return err
}
return nil
}
func (c *serverCodec) WriteResponse(resp *rpc.Response, result interface{}) error {
if resp.Error != "" {
// The remote function returned error (shouldn't really happen)
log.Println(resp.Error)
}
var buf bytes.Buffer
reply := RPCMsg{
Xid: uint32(resp.Seq),
Type: Reply,
RBody: ReplyBody{
Stat: MsgAccepted,
Areply: AcceptedReply{
Stat: Success,
},
},
}
if _, err := xdr.Marshal(&buf, reply); err != nil {
c.Close()
return err
}
// Marshal and fill procedure-specific reply into the buffer
if _, err := xdr.Marshal(&buf, result); err != nil {
c.Close()
return err
}
// Write buffer contents to network
if _, err := WriteFullRecord(c.conn, buf.Bytes()); err != nil {
c.Close()
return err
}
return nil
}
func (c *serverCodec) Close() error {
if c.closed {
return nil
}
err := c.conn.Close()
if err == nil {
c.closed = true
if c.notifyClose != nil {
c.notifyClose <- c.conn
}
}
return err
}

View File

@@ -3,7 +3,7 @@ package bitrot
import (
"github.com/gluster/glusterd2/glusterd2/servers/rest/route"
"github.com/gluster/glusterd2/glusterd2/transaction"
"github.com/prashanthpai/sunrpc"
"github.com/gluster/glusterd2/pkg/sunrpc"
)
// Plugin is a structure which implements GlusterdPlugin interface

View File

@@ -2,7 +2,7 @@ package events
import (
"github.com/gluster/glusterd2/glusterd2/servers/rest/route"
"github.com/prashanthpai/sunrpc"
"github.com/gluster/glusterd2/pkg/sunrpc"
)
// Plugin is a structure which implements GlusterdPlugin interface

View File

@@ -3,7 +3,7 @@ package georeplication
import (
"github.com/gluster/glusterd2/glusterd2/servers/rest/route"
"github.com/gluster/glusterd2/glusterd2/transaction"
"github.com/prashanthpai/sunrpc"
"github.com/gluster/glusterd2/pkg/sunrpc"
)
// Plugin is a structure which implements GlusterdPlugin interface

View File

@@ -3,7 +3,7 @@ package glustershd
import (
"github.com/gluster/glusterd2/glusterd2/servers/rest/route"
"github.com/gluster/glusterd2/glusterd2/transaction"
"github.com/prashanthpai/sunrpc"
"github.com/gluster/glusterd2/pkg/sunrpc"
)
// Plugin is a structure which implements GlusterdPlugin interface

View File

@@ -2,7 +2,7 @@ package hello
import (
"github.com/gluster/glusterd2/glusterd2/servers/rest/route"
"github.com/prashanthpai/sunrpc"
"github.com/gluster/glusterd2/pkg/sunrpc"
)
// Plugin is a structure which implements GlusterdPlugin interface

View File

@@ -2,7 +2,7 @@ package quota
import (
"github.com/gluster/glusterd2/glusterd2/servers/rest/route"
"github.com/prashanthpai/sunrpc"
"github.com/gluster/glusterd2/pkg/sunrpc"
)
// Plugin is a structure which implements GlusterdPlugin interface

View File

@@ -40,7 +40,7 @@ INIT_GO_TMPL = """package $name
import (
"github.com/gluster/glusterd2/glusterd2/servers/rest/route"
"github.com/prashanthpai/sunrpc"
"github.com/gluster/glusterd2/pkg/sunrpc"
)
// Plugin is a structure which implements GlusterdPlugin interface