mirror of
https://github.com/gluster/glusterd2.git
synced 2026-02-06 06:45:35 +01:00
rpc: Add infra to provide RPC programs access to net.Conn
To allow RPC programs to get access to underlying connection, this "hack" creates one rpc.Server instance per client. While this has very minimal overhead, it amplifies the logging problem we've had with server.Register(). RPC programs can now access underlying connection using GetConn() method. Primary consumer of this shall be the pmap package which needs this infra to detect disconnections and prune its internal mapping accordingly. The 'cleaner' alternative is to fork net/rpc library to make it accept RPC programs with methods that accept net.Conn and/or context.Context Signed-off-by: Prashanth Pai <ppai@redhat.com>
This commit is contained in:
@@ -1,7 +1,11 @@
|
||||
package pmap
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"github.com/gluster/glusterd2/pkg/sunrpc"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -24,6 +28,7 @@ type GfPortmap struct {
|
||||
progNum uint32
|
||||
progVersion uint32
|
||||
procedures []sunrpc.Procedure
|
||||
conn net.Conn
|
||||
}
|
||||
|
||||
// NewGfPortmap returns a new instance of GfPortmap type
|
||||
@@ -67,6 +72,16 @@ func (p *GfPortmap) Procedures() []sunrpc.Procedure {
|
||||
return p.procedures
|
||||
}
|
||||
|
||||
// GetConn returns the underlying net.Conn.
|
||||
func (p *GfPortmap) GetConn() net.Conn {
|
||||
return p.conn
|
||||
}
|
||||
|
||||
// SetConn returns stores the net.Conn instance provided.
|
||||
func (p *GfPortmap) SetConn(conn net.Conn) {
|
||||
p.conn = conn
|
||||
}
|
||||
|
||||
// PortByBrickReq is sent by the glusterfs client
|
||||
type PortByBrickReq struct {
|
||||
Brick string
|
||||
@@ -109,8 +124,17 @@ type SignInRsp struct {
|
||||
// SignIn stores the brick and port mapping in registry
|
||||
func (p *GfPortmap) SignIn(args *SignInReq, reply *SignInRsp) error {
|
||||
|
||||
// FIXME: Xprt (net.Conn instance) isn't available here yet.
|
||||
// Passing nil for now.
|
||||
var address string
|
||||
if p.GetConn() != nil {
|
||||
address = p.GetConn().RemoteAddr().String()
|
||||
}
|
||||
log.WithFields(log.Fields{
|
||||
"address": address,
|
||||
"brick": args.Brick,
|
||||
"port": args.Port,
|
||||
}).Debug("brick signed in")
|
||||
|
||||
// TODO: Store net.Conn instance in pmap
|
||||
registryBind(args.Port, args.Brick, GfPmapPortBrickserver, nil)
|
||||
|
||||
return nil
|
||||
@@ -132,8 +156,17 @@ type SignOutRsp struct {
|
||||
// SignOut removes the brick and port mapping in registry
|
||||
func (p *GfPortmap) SignOut(args *SignOutReq, reply *SignOutRsp) error {
|
||||
|
||||
// FIXME: Xprt (net.Conn instance) isn't available here yet.
|
||||
// Passing nil for now.
|
||||
var address string
|
||||
if p.GetConn() != nil {
|
||||
address = p.GetConn().RemoteAddr().String()
|
||||
}
|
||||
log.WithFields(log.Fields{
|
||||
"address": address,
|
||||
"brick": args.Brick,
|
||||
"port": args.Port,
|
||||
}).Debug("brick signed out")
|
||||
|
||||
// TODO: Store net.Conn instance in pmap
|
||||
registryRemove(args.Port, args.Brick, GfPmapPortBrickserver, nil)
|
||||
|
||||
return nil
|
||||
|
||||
14
glusterd2/servers/sunrpc/conn_hack.go
Normal file
14
glusterd2/servers/sunrpc/conn_hack.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package sunrpc
|
||||
|
||||
import (
|
||||
"net"
|
||||
)
|
||||
|
||||
// Conn is an interface that RPC programs can implement that has setter and
|
||||
// getter methods that set and return the underlying net.Conn connection
|
||||
// object. This is a hack/workaround because net/rpc doesn't provide RPC
|
||||
// programs access to net.Conn or context.Context.
|
||||
type Conn interface {
|
||||
GetConn() net.Conn
|
||||
SetConn(net.Conn)
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package sunrpc
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
@@ -73,6 +74,16 @@ func (p *GfHandshake) Procedures() []sunrpc.Procedure {
|
||||
return p.procedures
|
||||
}
|
||||
|
||||
// GetConn returns the underlying net.Conn.
|
||||
func (p *GfHandshake) GetConn() net.Conn {
|
||||
return p.conn
|
||||
}
|
||||
|
||||
// SetConn returns stores the net.Conn instance provided.
|
||||
func (p *GfHandshake) SetConn(conn net.Conn) {
|
||||
p.conn = conn
|
||||
}
|
||||
|
||||
// GfGetspecReq is sent by glusterfs client and primarily contains volume name.
|
||||
// Xdata field is a serialized gluster dict containing op version.
|
||||
type GfGetspecReq struct {
|
||||
@@ -109,6 +120,11 @@ func (p *GfHandshake) ServerGetspec(args *GfGetspecReq, reply *GfGetspecRsp) err
|
||||
log.WithError(err).Error("ServerGetspec(): dict.Unserialize() failed")
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"client": p.GetConn().RemoteAddr().String(),
|
||||
"volfile-id": args.Key,
|
||||
}).Debug("client wants volfile")
|
||||
|
||||
// Get Volfile from store
|
||||
volfileID := strings.TrimPrefix(args.Key, "/")
|
||||
resp, err := store.Get(context.TODO(), volfilePrefix+volfileID)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package sunrpc
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
@@ -16,9 +16,11 @@ type genericProgram struct {
|
||||
progNum uint32
|
||||
progVersion uint32
|
||||
procedures []sunrpc.Procedure
|
||||
conn net.Conn
|
||||
}
|
||||
|
||||
func registerProgram(server *rpc.Server, program sunrpc.Program) error {
|
||||
// registerProcedures creates procedure number to procedure name mappings for sunrpc codec
|
||||
func registerProcedures(program sunrpc.Program) error {
|
||||
logger := log.WithFields(log.Fields{
|
||||
"program": program.Name(),
|
||||
"prognum": program.Number(),
|
||||
@@ -27,16 +29,6 @@ func registerProgram(server *rpc.Server, program sunrpc.Program) error {
|
||||
|
||||
logger.Debug("registering sunrpc program")
|
||||
|
||||
// NOTE: This will throw some benign log messages complaining about
|
||||
// signatures of methods in Program interface. rpc.Server.Register()
|
||||
// expects all methods of program to be of the kind:
|
||||
// func (t *T) MethodName(argType T1, replyType *T2) error
|
||||
// These log entries (INFO) can be ignored.
|
||||
err := server.Register(program)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create procedure number to procedure name mappings for sunrpc codec
|
||||
typeName := reflect.Indirect(reflect.ValueOf(program)).Type().Name()
|
||||
for _, procedure := range program.Procedures() {
|
||||
@@ -48,12 +40,11 @@ func registerProgram(server *rpc.Server, program sunrpc.Program) error {
|
||||
if !strings.HasPrefix(procedure.Name, typeName+".") {
|
||||
procedure.Name = typeName + "." + procedure.Name
|
||||
}
|
||||
err = sunrpc.RegisterProcedure(
|
||||
if err := sunrpc.RegisterProcedure(
|
||||
sunrpc.Procedure{
|
||||
ID: procedure.ID,
|
||||
Name: procedure.Name,
|
||||
}, true)
|
||||
if err != nil {
|
||||
}, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package sunrpc
|
||||
|
||||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/rpc"
|
||||
@@ -23,9 +24,14 @@ var (
|
||||
clientCount = expvar.NewInt("sunrpc_clients_connected")
|
||||
)
|
||||
|
||||
var programsList = []sunrpc.Program{
|
||||
newGfHandshake(),
|
||||
newGfDump(),
|
||||
pmap.NewGfPortmap(),
|
||||
}
|
||||
|
||||
// SunRPC implements a suture service
|
||||
type SunRPC struct {
|
||||
server *rpc.Server
|
||||
tcpListener net.Listener
|
||||
tcpStopCh chan struct{}
|
||||
unixListener net.Listener
|
||||
@@ -33,8 +39,6 @@ type SunRPC struct {
|
||||
notifyCloseCh chan io.ReadWriteCloser
|
||||
}
|
||||
|
||||
var programsList []sunrpc.Program
|
||||
|
||||
// clientsList is global as it needs to be accessed by RPC procedures
|
||||
// that notify connected clients.
|
||||
var clientsList = struct {
|
||||
@@ -58,7 +62,6 @@ func NewMuxed(m cmux.CMux) *SunRPC {
|
||||
uL.(*net.UnixListener).SetUnlinkOnClose(true)
|
||||
|
||||
srv := &SunRPC{
|
||||
server: rpc.NewServer(),
|
||||
tcpListener: m.Match(sunrpc.CmuxMatcher()),
|
||||
unixListener: uL,
|
||||
tcpStopCh: make(chan struct{}),
|
||||
@@ -66,14 +69,8 @@ func NewMuxed(m cmux.CMux) *SunRPC {
|
||||
notifyCloseCh: make(chan io.ReadWriteCloser, 10),
|
||||
}
|
||||
|
||||
programsList = []sunrpc.Program{
|
||||
newGfHandshake(),
|
||||
newGfDump(),
|
||||
pmap.NewGfPortmap(),
|
||||
}
|
||||
|
||||
for _, prog := range programsList {
|
||||
err := registerProgram(srv.server, prog)
|
||||
err := registerProcedures(prog)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("program", prog.Name()).Error("could not register SunRPC program")
|
||||
return nil
|
||||
@@ -132,14 +129,34 @@ func (s *SunRPC) acceptLoop(stopCh chan struct{}, l net.Listener, wg *sync.WaitG
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
logger.WithField("address", conn.RemoteAddr().String()).Info("client connected")
|
||||
clientCount.Add(1)
|
||||
clientsList.Lock()
|
||||
clientsList.c[conn] = struct{}{}
|
||||
clientsList.Unlock()
|
||||
|
||||
// Create one rpc.Server instance per client. This is a
|
||||
// workaround to allow RPC programs to access underlying
|
||||
// net.Conn object and has minimal overhead. See:
|
||||
// https://groups.google.com/d/msg/golang-nuts/Gt-1ikXovCA/aK8r9MAftDQJ
|
||||
server := rpc.NewServer()
|
||||
|
||||
for _, p := range programsList {
|
||||
if v, ok := p.(Conn); ok {
|
||||
v.SetConn(conn)
|
||||
}
|
||||
// server.Register() throws some benign but very
|
||||
// annoying log messages complaining about signatures
|
||||
// of methods. These logs can be safely ignored. See:
|
||||
// https://github.com/golang/go/issues/19957
|
||||
if err := server.Register(p); err != nil {
|
||||
panic(fmt.Sprintf("rpc.Register failed: %s", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
session := sunrpc.NewServerCodec(conn, s.notifyCloseCh)
|
||||
go s.server.ServeCodec(session)
|
||||
go server.ServeCodec(session)
|
||||
sessions = append(sessions, session)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user