1
0
mirror of https://github.com/gluster/glusterd2.git synced 2026-02-06 06:45:35 +01:00

rpc: Add infra to provide RPC programs access to net.Conn

To allow RPC programs to get access to underlying connection, this
"hack" creates one rpc.Server instance per client. While this has
very minimal overhead, it amplifies the logging problem we've had
with server.Register().

RPC programs can now access underlying connection using GetConn()
method. Primary consumer of this shall be the pmap package which
needs this infra to detect disconnections and prune its internal
mapping accordingly.

The 'cleaner' alternative is to fork net/rpc library to make it
accept RPC programs with methods that accept net.Conn and/or
context.Context

Signed-off-by: Prashanth Pai <ppai@redhat.com>
This commit is contained in:
Prashanth Pai
2018-09-14 12:05:17 +05:30
parent 98f0d58aba
commit bdaa2c971c
5 changed files with 102 additions and 31 deletions

View File

@@ -1,7 +1,11 @@
package pmap
import (
"net"
"github.com/gluster/glusterd2/pkg/sunrpc"
log "github.com/sirupsen/logrus"
)
const (
@@ -24,6 +28,7 @@ type GfPortmap struct {
progNum uint32
progVersion uint32
procedures []sunrpc.Procedure
conn net.Conn
}
// NewGfPortmap returns a new instance of GfPortmap type
@@ -67,6 +72,16 @@ func (p *GfPortmap) Procedures() []sunrpc.Procedure {
return p.procedures
}
// GetConn returns the underlying net.Conn.
func (p *GfPortmap) GetConn() net.Conn {
return p.conn
}
// SetConn returns stores the net.Conn instance provided.
func (p *GfPortmap) SetConn(conn net.Conn) {
p.conn = conn
}
// PortByBrickReq is sent by the glusterfs client
type PortByBrickReq struct {
Brick string
@@ -109,8 +124,17 @@ type SignInRsp struct {
// SignIn stores the brick and port mapping in registry
func (p *GfPortmap) SignIn(args *SignInReq, reply *SignInRsp) error {
// FIXME: Xprt (net.Conn instance) isn't available here yet.
// Passing nil for now.
var address string
if p.GetConn() != nil {
address = p.GetConn().RemoteAddr().String()
}
log.WithFields(log.Fields{
"address": address,
"brick": args.Brick,
"port": args.Port,
}).Debug("brick signed in")
// TODO: Store net.Conn instance in pmap
registryBind(args.Port, args.Brick, GfPmapPortBrickserver, nil)
return nil
@@ -132,8 +156,17 @@ type SignOutRsp struct {
// SignOut removes the brick and port mapping in registry
func (p *GfPortmap) SignOut(args *SignOutReq, reply *SignOutRsp) error {
// FIXME: Xprt (net.Conn instance) isn't available here yet.
// Passing nil for now.
var address string
if p.GetConn() != nil {
address = p.GetConn().RemoteAddr().String()
}
log.WithFields(log.Fields{
"address": address,
"brick": args.Brick,
"port": args.Port,
}).Debug("brick signed out")
// TODO: Store net.Conn instance in pmap
registryRemove(args.Port, args.Brick, GfPmapPortBrickserver, nil)
return nil

View File

@@ -0,0 +1,14 @@
package sunrpc
import (
"net"
)
// Conn is an interface that RPC programs can implement that has setter and
// getter methods that set and return the underlying net.Conn connection
// object. This is a hack/workaround because net/rpc doesn't provide RPC
// programs access to net.Conn or context.Context.
type Conn interface {
GetConn() net.Conn
SetConn(net.Conn)
}

View File

@@ -3,6 +3,7 @@ package sunrpc
import (
"context"
"errors"
"net"
"strconv"
"strings"
"syscall"
@@ -73,6 +74,16 @@ func (p *GfHandshake) Procedures() []sunrpc.Procedure {
return p.procedures
}
// GetConn returns the underlying net.Conn.
func (p *GfHandshake) GetConn() net.Conn {
return p.conn
}
// SetConn returns stores the net.Conn instance provided.
func (p *GfHandshake) SetConn(conn net.Conn) {
p.conn = conn
}
// GfGetspecReq is sent by glusterfs client and primarily contains volume name.
// Xdata field is a serialized gluster dict containing op version.
type GfGetspecReq struct {
@@ -109,6 +120,11 @@ func (p *GfHandshake) ServerGetspec(args *GfGetspecReq, reply *GfGetspecRsp) err
log.WithError(err).Error("ServerGetspec(): dict.Unserialize() failed")
}
log.WithFields(log.Fields{
"client": p.GetConn().RemoteAddr().String(),
"volfile-id": args.Key,
}).Debug("client wants volfile")
// Get Volfile from store
volfileID := strings.TrimPrefix(args.Key, "/")
resp, err := store.Get(context.TODO(), volfilePrefix+volfileID)

View File

@@ -1,7 +1,7 @@
package sunrpc
import (
"net/rpc"
"net"
"reflect"
"strings"
@@ -16,9 +16,11 @@ type genericProgram struct {
progNum uint32
progVersion uint32
procedures []sunrpc.Procedure
conn net.Conn
}
func registerProgram(server *rpc.Server, program sunrpc.Program) error {
// registerProcedures creates procedure number to procedure name mappings for sunrpc codec
func registerProcedures(program sunrpc.Program) error {
logger := log.WithFields(log.Fields{
"program": program.Name(),
"prognum": program.Number(),
@@ -27,16 +29,6 @@ func registerProgram(server *rpc.Server, program sunrpc.Program) error {
logger.Debug("registering sunrpc program")
// NOTE: This will throw some benign log messages complaining about
// signatures of methods in Program interface. rpc.Server.Register()
// expects all methods of program to be of the kind:
// func (t *T) MethodName(argType T1, replyType *T2) error
// These log entries (INFO) can be ignored.
err := server.Register(program)
if err != nil {
return err
}
// Create procedure number to procedure name mappings for sunrpc codec
typeName := reflect.Indirect(reflect.ValueOf(program)).Type().Name()
for _, procedure := range program.Procedures() {
@@ -48,12 +40,11 @@ func registerProgram(server *rpc.Server, program sunrpc.Program) error {
if !strings.HasPrefix(procedure.Name, typeName+".") {
procedure.Name = typeName + "." + procedure.Name
}
err = sunrpc.RegisterProcedure(
if err := sunrpc.RegisterProcedure(
sunrpc.Procedure{
ID: procedure.ID,
Name: procedure.Name,
}, true)
if err != nil {
}, true); err != nil {
return err
}
}

View File

@@ -2,6 +2,7 @@ package sunrpc
import (
"expvar"
"fmt"
"io"
"net"
"net/rpc"
@@ -23,9 +24,14 @@ var (
clientCount = expvar.NewInt("sunrpc_clients_connected")
)
var programsList = []sunrpc.Program{
newGfHandshake(),
newGfDump(),
pmap.NewGfPortmap(),
}
// SunRPC implements a suture service
type SunRPC struct {
server *rpc.Server
tcpListener net.Listener
tcpStopCh chan struct{}
unixListener net.Listener
@@ -33,8 +39,6 @@ type SunRPC struct {
notifyCloseCh chan io.ReadWriteCloser
}
var programsList []sunrpc.Program
// clientsList is global as it needs to be accessed by RPC procedures
// that notify connected clients.
var clientsList = struct {
@@ -58,7 +62,6 @@ func NewMuxed(m cmux.CMux) *SunRPC {
uL.(*net.UnixListener).SetUnlinkOnClose(true)
srv := &SunRPC{
server: rpc.NewServer(),
tcpListener: m.Match(sunrpc.CmuxMatcher()),
unixListener: uL,
tcpStopCh: make(chan struct{}),
@@ -66,14 +69,8 @@ func NewMuxed(m cmux.CMux) *SunRPC {
notifyCloseCh: make(chan io.ReadWriteCloser, 10),
}
programsList = []sunrpc.Program{
newGfHandshake(),
newGfDump(),
pmap.NewGfPortmap(),
}
for _, prog := range programsList {
err := registerProgram(srv.server, prog)
err := registerProcedures(prog)
if err != nil {
log.WithError(err).WithField("program", prog.Name()).Error("could not register SunRPC program")
return nil
@@ -132,14 +129,34 @@ func (s *SunRPC) acceptLoop(stopCh chan struct{}, l net.Listener, wg *sync.WaitG
if err != nil {
continue
}
logger.WithField("address", conn.RemoteAddr().String()).Info("client connected")
clientCount.Add(1)
clientsList.Lock()
clientsList.c[conn] = struct{}{}
clientsList.Unlock()
// Create one rpc.Server instance per client. This is a
// workaround to allow RPC programs to access underlying
// net.Conn object and has minimal overhead. See:
// https://groups.google.com/d/msg/golang-nuts/Gt-1ikXovCA/aK8r9MAftDQJ
server := rpc.NewServer()
for _, p := range programsList {
if v, ok := p.(Conn); ok {
v.SetConn(conn)
}
// server.Register() throws some benign but very
// annoying log messages complaining about signatures
// of methods. These logs can be safely ignored. See:
// https://github.com/golang/go/issues/19957
if err := server.Register(p); err != nil {
panic(fmt.Sprintf("rpc.Register failed: %s", err.Error()))
}
}
session := sunrpc.NewServerCodec(conn, s.notifyCloseCh)
go s.server.ServeCodec(session)
go server.ServeCodec(session)
sessions = append(sessions, session)
}
}