1
0
mirror of https://github.com/gluster/glusterd2.git synced 2026-02-05 12:45:38 +01:00

blockvolume: Code re-org to fix race condition and cleanup bhv

The block host volume will still exist even when the blocks are all
deleted. Manually deleting block host volume will need another step
of unmounting. With this patch we auto-delete the block host volume
if there are no blocks.

The availablility check of bhv free space is not done within lock, hence
there is a possiblity that the available space has changed by the
time we decide to create the volume. This patch also fixes the race
condition.

Signed-off-by: Poornima G <pgurusid@redhat.com>
This commit is contained in:
Poornima G
2019-03-09 12:25:00 +05:30
committed by Aravinda VK
parent 9054f74a6b
commit 54ce5f6f7a
15 changed files with 335 additions and 144 deletions

View File

@@ -60,13 +60,13 @@ var defaultGroupOptions = map[string]*api.OptionGroup{
{Name: "performance/readdir-ahead", OnValue: "off"},
{Name: "performance/write-behind.strict-O_DIRECT", OnValue: "on"},
{Name: "protocol/client.filter-O_DIRECT", OnValue: "disable"},
{Name: "cluster/replicate.eager-lock", OnValue: "disable"},
{Name: "cluster/replicate.eager-lock", OnValue: "enable"},
{Name: "cluster/replicate.quorum-type", OnValue: "auto"},
{Name: "cluster/replicate.data-self-heal-algorithm", OnValue: "full"},
{Name: "cluster/replicate.locking-scheme", OnValue: "granular"},
{Name: "cluster/replicate.shd-max-threads", OnValue: "8"},
{Name: "cluster/replicate.shd-wait-qlength", OnValue: "10000"},
{Name: "features/shard", OnValue: "on"},
{Name: "features/shard", OnValue: "off"},
{Name: "features/shard.shard-block-size", OnValue: "64MB"},
{Name: "user.cifs", OnValue: "off"},
{Name: "protocol/server.rpc-auth-allow-insecure", OnValue: "on"},

View File

@@ -1,6 +1,8 @@
package volumecommands
import (
"context"
"errors"
"fmt"
"net/http"
@@ -37,39 +39,46 @@ func registerVolDeleteStepFuncs() {
}
func volumeDeleteHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := gdctx.GetReqLogger(ctx)
volname := mux.Vars(r)["volname"]
volinfo, status, err := DeleteVolume(ctx, volname)
if err != nil {
restutils.SendHTTPError(ctx, w, status, err)
return
}
events.Broadcast(volume.NewEvent(volume.EventVolumeDeleted, volinfo))
restutils.SendHTTPResponse(ctx, w, http.StatusNoContent, nil)
}
// DeleteVolume deletes the volume
func DeleteVolume(ctx context.Context, volname string) (*volume.Volinfo, int, error) {
logger := gdctx.GetReqLogger(ctx)
ctx, span := trace.StartSpan(ctx, "/volumeDeleteHandler")
defer span.End()
txn, err := transactionv2.NewTxnWithLocks(ctx, volname)
if err != nil {
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
return
return nil, status, err
}
defer txn.Done()
volinfo, err := volume.GetVolume(volname)
if err != nil {
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
return
return nil, status, err
}
if volinfo.State == volume.VolStarted {
errMsg := "Volume must be in stopped state before deleting."
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, errMsg)
return
return nil, http.StatusBadRequest, errors.New("volume must be in stopped state before deleting")
}
if len(volinfo.SnapList) > 0 {
errMsg := fmt.Sprintf("Cannot delete Volume %s ,as it has %d snapshots.", volname, len(volinfo.SnapList))
restutils.SendHTTPError(ctx, w, http.StatusFailedDependency, errMsg)
return
err = fmt.Errorf("cannot delete Volume %s ,as it has %d snapshots", volname, len(volinfo.SnapList))
return nil, http.StatusFailedDependency, err
}
bricksAutoProvisioned := volinfo.IsAutoProvisioned() || volinfo.IsSnapshotProvisioned()
@@ -87,8 +96,7 @@ func volumeDeleteHandler(w http.ResponseWriter, r *http.Request) {
}
if err := txn.Ctx.Set("volinfo", volinfo); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
return nil, http.StatusInternalServerError, err
}
span.AddAttributes(
@@ -99,11 +107,8 @@ func volumeDeleteHandler(w http.ResponseWriter, r *http.Request) {
if err := txn.Do(); err != nil {
logger.WithError(err).WithField(
"volume", volname).Error("transaction to delete volume failed")
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
return nil, http.StatusInternalServerError, err
}
events.Broadcast(volume.NewEvent(volume.EventVolumeDeleted, volinfo))
restutils.SendHTTPResponse(ctx, w, http.StatusNoContent, nil)
return volinfo, http.StatusNoContent, nil
}

View File

@@ -1,6 +1,7 @@
package volumecommands
import (
"context"
"net/http"
"os"
@@ -118,35 +119,44 @@ func registerVolStopStepFuncs() {
func volumeStopHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
volname := mux.Vars(r)["volname"]
volinfo, status, err := StopVolume(ctx, volname)
if err != nil {
restutils.SendHTTPError(ctx, w, status, err)
return
}
events.Broadcast(volume.NewEvent(volume.EventVolumeStopped, volinfo))
resp := createVolumeStopResp(volinfo)
restutils.SendHTTPResponse(ctx, w, http.StatusOK, resp)
}
// StopVolume will stop the volume
func StopVolume(ctx context.Context, volname string) (*volume.Volinfo, int, error) {
logger := gdctx.GetReqLogger(ctx)
ctx, span := trace.StartSpan(ctx, "/volumeStopHandler")
defer span.End()
logger := gdctx.GetReqLogger(ctx)
volname := mux.Vars(r)["volname"]
txn, err := transactionv2.NewTxnWithLocks(ctx, volname)
if err != nil {
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
return
return nil, status, err
}
defer txn.Done()
volinfo, err := volume.GetVolume(volname)
if err != nil {
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
return
return nil, status, err
}
if volinfo.State == volume.VolStopped {
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, errors.ErrVolAlreadyStopped)
return
return nil, http.StatusBadRequest, errors.ErrVolAlreadyStopped
}
if volinfo.State != volume.VolStarted {
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, errors.ErrVolNotStarted)
return
return nil, http.StatusBadRequest, errors.ErrVolNotStarted
}
txn.Steps = []*transaction.Step{
@@ -168,15 +178,13 @@ func volumeStopHandler(w http.ResponseWriter, r *http.Request) {
}
if err := txn.Ctx.Set("oldvolinfo", volinfo); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
return nil, http.StatusInternalServerError, err
}
volinfo.State = volume.VolStopped
if err := txn.Ctx.Set("volinfo", volinfo); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
return nil, http.StatusInternalServerError, err
}
span.AddAttributes(
@@ -187,14 +195,10 @@ func volumeStopHandler(w http.ResponseWriter, r *http.Request) {
if err := txn.Do(); err != nil {
logger.WithError(err).WithField(
"volume", volname).Error("transaction to stop volume failed")
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
return nil, http.StatusInternalServerError, err
}
events.Broadcast(volume.NewEvent(volume.EventVolumeStopped, volinfo))
resp := createVolumeStopResp(volinfo)
restutils.SendHTTPResponse(ctx, w, http.StatusOK, resp)
return volinfo, http.StatusOK, nil
}
func createVolumeStopResp(v *volume.Volinfo) *api.VolumeStopResp {

View File

@@ -38,6 +38,7 @@ var ClusterOptMap = map[string]*ClusterOption{
"auto-create-block-hosting-volumes": {"auto-create-block-hosting-volumes", "true", OptionTypeBool, nil},
"block-hosting-volume-replica-count": {"block-hosting-volume-replica-count", "3", OptionTypeInt, nil},
"block-hosting-volume-type": {"block-hosting-volume-type", "Replicate", OptionTypeStr, nil},
"auto-delete-block-hosting-volumes": {"auto-delete-block-hosting-volumes", "false", OptionTypeBool, nil},
}
// RegisterClusterOpValidationFunc registers a validation function for provided

View File

@@ -9,6 +9,7 @@ import (
"net"
"os"
"os/exec"
"path"
"strings"
"syscall"
"time"
@@ -67,7 +68,11 @@ func MountVolume(name string, mountpoint string, mntOptns string) error {
buffer.WriteString(fmt.Sprintf(" --volfile-server %s", shost))
buffer.WriteString(fmt.Sprintf(" --volfile-server-port %s", sport))
buffer.WriteString(fmt.Sprintf(" --volfile-id %s", name))
buffer.WriteString(" --log-file /dev/null")
mountpointWithoutSlash := strings.Trim(strings.Replace(mountpoint, "/", "-", -1), "-")
logfilepath := path.Join(config.GetString("logdir"), "glusterfs", mountpointWithoutSlash)
buffer.WriteString(" --log-file " + logfilepath)
buffer.WriteString(mntOptns)
buffer.WriteString(" " + mountpoint)

View File

@@ -9,4 +9,6 @@ const (
BlockHosting = "block-hosting"
// BlockPrefix is the prefix of the volume metadata which will contain BlockPrefix + blockname as the key and size of the block as value.
BlockPrefix = "block-vol:"
// BlockHostMarkedForPrune is a metadata that is set to indicate that the bhv is being deleted as it has no block volumes present
BlockHostMarkedForPrune = "_block-hosting-marked-for-prune"
)

View File

@@ -20,6 +20,7 @@ var (
type Provider interface {
CreateBlockVolume(name string, size uint64, hostVolume string, options ...BlockVolOption) (BlockVolume, error)
DeleteBlockVolume(name string, options ...BlockVolOption) error
GetAndDeleteBlockVolume(name string, options ...BlockVolOption) (BlockVolume, error)
GetBlockVolume(id string) (BlockVolume, error)
BlockVolumes() []BlockVolume
ProviderName() string

View File

@@ -2,7 +2,6 @@ package glusterblock
import (
"context"
"errors"
"github.com/gluster/glusterd2/glusterd2/volume"
"github.com/gluster/glusterd2/pkg/size"
"github.com/gluster/glusterd2/plugins/blockvolume/blockprovider"
@@ -11,6 +10,7 @@ import (
"github.com/gluster/gluster-block-restapi/client"
"github.com/gluster/gluster-block-restapi/pkg/api"
"github.com/gluster/glusterd2/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
@@ -78,11 +78,6 @@ func (g *GlusterBlock) CreateBlockVolume(name string, size uint64, hostVolume st
return nil, err
}
resizeFunc := func(blockHostingAvailableSize, blockSize uint64) uint64 { return blockHostingAvailableSize - blockSize }
if err = hostvol.ResizeBlockHostingVolume(hostVolume, size, resizeFunc); err != nil {
logger.WithError(err).Error("failed in updating hostvolume _block-hosting-available-size metadata")
}
return &BlockVolume{
hostVolume: hostVolume,
name: name,
@@ -95,31 +90,40 @@ func (g *GlusterBlock) CreateBlockVolume(name string, size uint64, hostVolume st
}, err
}
// GetAndDeleteBlockVolume deletes a gluster block volume of give name
func (g *GlusterBlock) GetAndDeleteBlockVolume(name string, options ...blockprovider.BlockVolOption) (blockprovider.BlockVolume, error) {
blkVol, err := g.deleteBlock(name, options...)
if err != nil {
log.WithError(err).Errorf("couldn't delete block volume :%s", name)
return nil, err
}
return blkVol, nil
}
// DeleteBlockVolume deletes a gluster block volume of give name
func (g *GlusterBlock) DeleteBlockVolume(name string, options ...blockprovider.BlockVolOption) error {
_, err := g.deleteBlock(name, options...)
if err != nil {
log.WithError(err).Errorf("couldn't delete block volume :%s", name)
return err
}
return nil
}
// deleteBlock deletes a gluster block volume of give name
func (g *GlusterBlock) deleteBlock(name string, options ...blockprovider.BlockVolOption) (blockprovider.BlockVolume, error) {
var (
blockVolOpts = &blockprovider.BlockVolumeOptions{}
hostVol string
)
blockVolOpts.ApplyOpts(options...)
blockVols := g.BlockVolumes()
for _, blockVol := range blockVols {
if blockVol.Name() == name {
hostVol = blockVol.HostVolume()
break
}
}
if hostVol == "" {
return errors.New("block volume not found")
}
blockInfo, err := g.client.BlockVolumeInfo(hostVol, name)
if err != nil {
return err
blockInfo, err := g.GetBlockVolume(name)
if err != nil || blockInfo == nil {
return nil, errors.ErrBlockVolNotFound
}
req := &api.BlockVolumeDeleteReq{
@@ -127,20 +131,21 @@ func (g *GlusterBlock) DeleteBlockVolume(name string, options ...blockprovider.B
Force: blockVolOpts.ForceDelete,
}
if err := g.client.DeleteBlockVolume(hostVol, name, req); err != nil {
return err
err = g.client.DeleteBlockVolume(blockInfo.HostVolume(), name, req)
if err != nil {
return nil, err
}
resizeFunc := func(blockHostingAvailableSize, blockSize uint64) uint64 { return blockHostingAvailableSize + blockSize }
if err = hostvol.ResizeBlockHostingVolume(hostVol, blockInfo.Size, resizeFunc); err != nil {
if err = hostvol.ResizeBlockHostingVolume(blockInfo.HostVolume(), blockInfo.Size, resizeFunc); err != nil {
log.WithFields(log.Fields{
"error": err,
"size": blockInfo.Size,
}).Error("error in resizing the block hosting volume")
}
return err
return blockInfo, err
}
// GetBlockVolume gives info about a gluster block volume
@@ -158,7 +163,7 @@ func (g *GlusterBlock) GetBlockVolume(name string) (blockprovider.BlockVolume, e
}
if blockVolume == nil {
return nil, errors.New("block volume not found")
return nil, errors.ErrBlockVolNotFound
}
blockInfo, err := g.client.BlockVolumeInfo(blockVolume.HostVolume(), blockVolume.Name())

View File

@@ -7,12 +7,10 @@ import (
"strconv"
"strings"
"github.com/gluster/glusterd2/glusterd2/transaction"
"github.com/gluster/glusterd2/glusterd2/volume"
"github.com/gluster/glusterd2/pkg/errors"
"github.com/gluster/glusterd2/pkg/utils"
"github.com/gluster/glusterd2/plugins/blockvolume/blockprovider"
"github.com/gluster/glusterd2/plugins/blockvolume/hostvol"
log "github.com/sirupsen/logrus"
config "github.com/spf13/viper"
@@ -75,7 +73,6 @@ func mountHost(g *GlusterVirtBlk, hostVolume string) (string, error) {
func (g *GlusterVirtBlk) CreateBlockVolume(name string, size uint64, hostVolume string, options ...blockprovider.BlockVolOption) (blockprovider.BlockVolume, error) {
var (
blockVolOpts = &blockprovider.BlockVolumeOptions{}
clusterLocks = transaction.Locks{}
)
blockVolOpts.ApplyOpts(options...)
@@ -108,32 +105,6 @@ func (g *GlusterVirtBlk) CreateBlockVolume(name string, size uint64, hostVolume
}
}
if err = clusterLocks.Lock(hostVolume); err != nil {
logger.WithError(err).Error("error in acquiring cluster lock")
return nil, err
}
defer clusterLocks.UnLock(context.Background())
volInfo, err := volume.GetVolume(hostVolume)
if err != nil {
logger.WithError(err).Errorf("failed to get host volume info %s", hostVolume)
return nil, err
}
resizeFunc := func(blockHostingAvailableSize, blockSize uint64) uint64 { return blockHostingAvailableSize - blockSize }
if err = hostvol.UpdateBlockHostingVolumeSize(volInfo, size, resizeFunc); err != nil {
logger.WithError(err).Error("failed in updating hostvolume _block-hosting-available-size metadata")
return nil, err
}
key := volume.BlockPrefix + name
val := strconv.FormatUint(size, 10)
volInfo.Metadata[key] = val
if err := volume.AddOrUpdateVolume(volInfo); err != nil {
logger.WithError(err).Error("failed in updating volume info to store")
return nil, err
}
return &BlockVolume{
hostVolume: hostVolume,
name: name,
@@ -141,65 +112,56 @@ func (g *GlusterVirtBlk) CreateBlockVolume(name string, size uint64, hostVolume
}, nil
}
// GetAndDeleteBlockVolume deletes a gluster block volume of give name
func (g *GlusterVirtBlk) GetAndDeleteBlockVolume(name string, options ...blockprovider.BlockVolOption) (blockprovider.BlockVolume, error) {
blkVol, err := g.deleteBlock(name, options...)
if err != nil {
log.WithError(err).Errorf("couldn't delete block volume :%s", name)
return nil, err
}
return blkVol, nil
}
// DeleteBlockVolume deletes a gluster block volume of give name
func (g *GlusterVirtBlk) DeleteBlockVolume(name string, options ...blockprovider.BlockVolOption) error {
_, err := g.deleteBlock(name, options...)
if err != nil {
log.WithError(err).Errorf("couldn't delete block volume :%s", name)
return err
}
return nil
}
// DeleteBlockVolume deletes a gluster block volume of give name
func (g *GlusterVirtBlk) deleteBlock(name string, options ...blockprovider.BlockVolOption) (blockprovider.BlockVolume, error) {
var (
blockVolOpts = &blockprovider.BlockVolumeOptions{}
clusterLocks = transaction.Locks{}
)
blockVolOpts.ApplyOpts(options...)
blkVol, err := g.GetBlockVolume(name)
if err != nil || blkVol == nil {
return errors.ErrBlockVolNotFound
return nil, errors.ErrBlockVolNotFound
}
hostName := blkVol.HostVolume()
hostDir, err := mountHost(g, hostName)
if err != nil {
log.WithError(err).Errorf("error mounting block hosting volume :%s", hostName)
return err
return nil, err
}
blockFileName := hostDir + "/" + name
err = os.Remove(blockFileName)
if err != nil {
log.WithError(err).Errorf("error removing block :%s", blockFileName)
return err
return nil, err
}
if err := clusterLocks.Lock(hostName); err != nil {
log.WithError(err).Error("error in acquiring cluster lock")
return err
}
defer clusterLocks.UnLock(context.Background())
hostVol, err := volume.GetVolume(hostName)
if err != nil {
log.WithError(err).Error("failed to get block host vol info")
return err
}
for k := range hostVol.Metadata {
if k == (volume.BlockPrefix + name) {
delete(hostVol.Metadata, k)
}
}
resizeFunc := func(blockHostingAvailableSize, blockSize uint64) uint64 { return blockHostingAvailableSize + blockSize }
if err = hostvol.UpdateBlockHostingVolumeSize(hostVol, blkVol.Size(), resizeFunc); err != nil {
log.WithFields(log.Fields{
"error": err,
"size": blkVol.Size(),
}).Error("error in resizing the block hosting volume")
}
if err := volume.AddOrUpdateVolume(hostVol); err != nil {
log.WithError(err).Error("failed in updating volume info to store")
return err
}
return err
return blkVol, nil
}
// GetBlockVolume gives info about a gluster block volume

View File

@@ -75,7 +75,7 @@ func WithHosts(hosts []string) BlockVolOption {
func WithBlockType(blockType string) BlockVolOption {
return func(options *BlockVolumeOptions) {
if blockType == "" {
options.BlockType = "xfs"
options.BlockType = "ext4"
} else {
options.BlockType = blockType
}

View File

@@ -40,7 +40,7 @@ func (b *BlockVolume) CreateVolume(w http.ResponseWriter, r *http.Request) {
return
}
hostVolInfo, err := b.hostVolManager.GetOrCreateHostingVolume(req.HostingVolume, req.Size, &req.HostVolumeInfo)
hostVolInfo, err := b.hostVolManager.GetOrCreateHostingVolume(req.HostingVolume, req.Name, req.Size, &req.HostVolumeInfo)
if err != nil {
utils.SendHTTPError(r.Context(), w, http.StatusInternalServerError, err)
return
@@ -48,6 +48,7 @@ func (b *BlockVolume) CreateVolume(w http.ResponseWriter, r *http.Request) {
blockVol, err := blockProvider.CreateBlockVolume(req.Name, req.Size, hostVolInfo.Name, opts...)
if err != nil {
_ = b.hostVolManager.DeleteBlockInfoFromBHV(hostVolInfo.Name, req.Name, req.Size)
utils.SendHTTPError(r.Context(), w, http.StatusInternalServerError, err)
return
}
@@ -74,7 +75,14 @@ func (b *BlockVolume) DeleteVolume(w http.ResponseWriter, r *http.Request) {
return
}
if err := blockProvider.DeleteBlockVolume(pathParams["name"]); err != nil {
blkVol, err := blockProvider.GetAndDeleteBlockVolume(pathParams["name"])
if err != nil {
utils.SendHTTPError(r.Context(), w, http.StatusInternalServerError, err)
return
}
err = b.hostVolManager.DeleteBlockInfoFromBHV(blkVol.HostVolume(), pathParams["name"], blkVol.Size())
if err != nil {
utils.SendHTTPError(r.Context(), w, http.StatusInternalServerError, err)
return
}

View File

@@ -7,11 +7,16 @@ import (
"path"
"strconv"
"strings"
"syscall"
"time"
"github.com/gluster/glusterd2/glusterd2/commands/volumes"
"github.com/gluster/glusterd2/glusterd2/gdctx"
"github.com/gluster/glusterd2/glusterd2/peer"
"github.com/gluster/glusterd2/glusterd2/transaction"
"github.com/gluster/glusterd2/glusterd2/volume"
"github.com/gluster/glusterd2/plugins/blockvolume/api"
config "github.com/spf13/viper"
log "github.com/sirupsen/logrus"
)
@@ -23,7 +28,8 @@ const (
// HostingVolumeManager provides methods for host volume management
type HostingVolumeManager interface {
GetHostingVolumesInUse() []*volume.Volinfo
GetOrCreateHostingVolume(name string, minSizeLimit uint64, hostVolumeInfo *api.HostVolumeInfo) (*volume.Volinfo, error)
GetOrCreateHostingVolume(name string, blkName string, minSizeLimit uint64, hostVolumeInfo *api.HostVolumeInfo) (*volume.Volinfo, error)
DeleteBlockInfoFromBHV(hostVol string, blkName string, size uint64) error
}
// GlusterVolManager is a concrete implementation of HostingVolumeManager
@@ -55,7 +61,7 @@ func (g *GlusterVolManager) GetHostingVolumesInUse() []*volume.Volinfo {
// GetOrCreateHostingVolume will returns volume details for a given volume name and having a minimum size of `minSizeLimit`.
// If volume name is not provided then it will create a gluster volume with default size for hosting gluster block.
func (g *GlusterVolManager) GetOrCreateHostingVolume(name string, minSizeLimit uint64, hostVolumeInfo *api.HostVolumeInfo) (*volume.Volinfo, error) {
func (g *GlusterVolManager) GetOrCreateHostingVolume(name string, blkName string, minSizeLimit uint64, hostVolumeInfo *api.HostVolumeInfo) (*volume.Volinfo, error) {
var (
volInfo *volume.Volinfo
clusterLocks = transaction.Locks{}
@@ -95,6 +101,10 @@ func (g *GlusterVolManager) GetOrCreateHostingVolume(name string, minSizeLimit u
// If HostingVolume is not specified. List all available volumes and see if any volume is
// available with Metadata:block-hosting=yes
// TODO: Since this is not done within volume lock, this volumes' available size might have been
// changed by the time we actually reserve the size in updateBhvInfoAndSize(). This can lead
// updateBhvInfoAndSize() to fail with no space. We do not retry block create in this case,
// the application can retry to workaround this race.
if name == "" {
vInfo, err := GetExistingBlockHostingVolume(minSizeLimit, g.hostVolOpts)
if err != nil {
@@ -115,6 +125,34 @@ func (g *GlusterVolManager) GetOrCreateHostingVolume(name string, minSizeLimit u
volInfo = vInfo
}
if err = clusterLocks.Lock(volInfo.Name); err != nil {
log.WithError(err).Error("error in acquiring cluster lock")
return nil, err
}
defer clusterLocks.UnLock(context.Background())
volInfo, err = g.updateBhvInfoAndSize(volInfo.Name, blkName, minSizeLimit)
if err != nil {
log.WithError(err).Error("error in obtaining block host volume")
return nil, err
}
return volInfo, nil
}
// updateBhvInfoAndSize will set the block host vol info in metadata and also reserve the size required for creating the new block in the input hostvolume
func (g *GlusterVolManager) updateBhvInfoAndSize(hostVolume string, blkName string, minSizeLimit uint64) (*volume.Volinfo, error) {
volInfo, err := volume.GetVolume(hostVolume)
if err != nil {
log.WithError(err).Errorf("failed to get host volume info %s", hostVolume)
return nil, err
}
if volInfo.Metadata[volume.BlockHostMarkedForPrune] == "true" {
return nil, errors.New("block host volume masked for prune, retry")
}
if _, found := volInfo.Metadata[volume.BlockHosting]; !found {
volInfo.Metadata[volume.BlockHosting] = "yes"
}
@@ -143,9 +181,157 @@ func (g *GlusterVolManager) GetOrCreateHostingVolume(name string, minSizeLimit u
return nil, errors.New("volume has not been started")
}
key := volume.BlockPrefix + blkName
val := strconv.FormatUint(minSizeLimit, 10)
volInfo.Metadata[key] = val
resizeFunc := func(blockHostingAvailableSize, blockSize uint64) uint64 { return blockHostingAvailableSize - blockSize }
if err = UpdateBlockHostingVolumeSize(volInfo, minSizeLimit, resizeFunc); err != nil {
log.WithError(err).Error("failed in updating hostvolume _block-hosting-available-size metadata")
return nil, err
}
// Note that any further error exit conditions should undo the above hostsize change
if err := volume.AddOrUpdateVolume(volInfo); err != nil {
log.WithError(err).Error("failed in updating volume info to store")
}
return volInfo, nil
}
// DeleteBlockInfoFromBHV resets the available space on the bhv and also deletes the block entry in the metadata of the bhv
// In this function, if the bhv is empty i.e. there are no blocks, then the bhv delete is initiated
func (g *GlusterVolManager) DeleteBlockInfoFromBHV(hostVol string, blkName string, size uint64) error {
var (
clusterLocks = transaction.Locks{}
prune = false
)
if err := clusterLocks.Lock(hostVol); err != nil {
log.WithError(err).Error("error in acquiring cluster lock")
return err
}
volInfo, err := volume.GetVolume(hostVol)
if err != nil {
log.WithError(err).Errorf("failed to get host volume info %s", hostVol)
clusterLocks.UnLock(context.Background())
return err
}
for k := range volInfo.Metadata {
if k == (volume.BlockPrefix + blkName) {
delete(volInfo.Metadata, k)
}
}
resizeFunc := func(blockHostingAvailableSize, blockSize uint64) uint64 { return blockHostingAvailableSize + blockSize }
if err = UpdateBlockHostingVolumeSize(volInfo, size, resizeFunc); err != nil {
log.WithFields(log.Fields{
"error": err,
"size": size,
}).Error("error in resizing the block hosting volume")
}
// TODO: Also make sure volInfo.Metadata[volume.BlockPrefix*] has no keys left
availableSizeInBytes, err := strconv.ParseUint(volInfo.Metadata[volume.BlockHostingAvailableSize], 10, 64)
if err != nil {
clusterLocks.UnLock(context.Background())
return err
}
if availableSizeInBytes == volInfo.Capacity {
if g.hostVolOpts.AutoDelete {
volInfo.Metadata[volume.BlockHostMarkedForPrune] = "true"
prune = true
}
}
if err := volume.AddOrUpdateVolume(volInfo); err != nil {
log.WithError(err).Error("failed in updating volume info to store")
clusterLocks.UnLock(context.Background())
return err
}
clusterLocks.UnLock(context.Background())
if prune == true {
err = g.pruneBHV(volInfo.Name, blkName, size)
log.WithError(err).Errorf("failed to prune block host volume %s after deleting block %s", volInfo.Name, blkName)
}
return nil
}
// RegisterBHVstepFunctions registers the functions for the transaction
func RegisterBHVstepFunctions() {
var sfs = []struct {
name string
sf transaction.StepFunc
}{
{"bhv.unmount", BhvUnmount},
}
for _, sf := range sfs {
transaction.RegisterStepFunc(sf.sf, sf.name)
}
}
// BhvUnmount unmount the block host volume
func BhvUnmount(c transaction.TxnCtx) error {
var hostVol string
if err := c.Get("bhvName", &hostVol); err != nil {
return err
}
mntPath := path.Join(config.GetString("rundir"), "/blockvolume/", hostVol)
_ = syscall.Unmount(mntPath, syscall.MNT_FORCE)
return nil
}
// pruneBHV deletes the block host volume that is marked for deletion
func (g *GlusterVolManager) pruneBHV(hostVol string, blkName string, size uint64) error {
var (
ctx = gdctx.WithReqLogger(context.Background(), log.StandardLogger())
)
if !g.hostVolOpts.AutoDelete {
return nil
}
logger := gdctx.GetReqLogger(ctx)
logger.Info("Unmounting and deleting block host volume:%s", hostVol)
allNodes, err := peer.GetPeerIDs()
if err != nil {
log.WithError(err).Error("error in getting peerIDs")
return err
}
txn := transaction.NewTxn(ctx)
txn.Steps = []*transaction.Step{
{
DoFunc: "bhv.unmount",
Nodes: allNodes,
},
}
txn.Ctx.Set("bhvName", hostVol)
// Some nodes may not be up, which is okay.
txn.DontCheckAlive = true
txn.DisableRollback = true
_ = txn.Do()
txn.Done()
_, _, err = volumecommands.StopVolume(ctx, hostVol)
if err != nil {
log.WithError(err).Error("error in stopping auto created block hosting volume")
return err
}
_, _, err = volumecommands.DeleteVolume(ctx, hostVol)
if err != nil {
log.WithError(err).Error("error in auto deleting block hosting volume")
return err
}
return nil
}

View File

@@ -19,6 +19,7 @@ const (
defaultHostVolType = "Replicate"
defaultHostVolReplicaCount = 3
hostVolautoCreate = true
hostVolautoDelete = true
)
// HostingVolumeOptions holds various information which will be used in creating hosting volume
@@ -27,6 +28,7 @@ type HostingVolumeOptions struct {
Type string
ReplicaCount int
AutoCreate bool
AutoDelete bool
ThinArbPath string
ShardSize uint64
}
@@ -37,6 +39,7 @@ func newHostingVolumeOptions() *HostingVolumeOptions {
Type: defaultHostVolType,
ReplicaCount: defaultHostVolReplicaCount,
AutoCreate: hostVolautoCreate,
AutoDelete: hostVolautoDelete,
}
}
@@ -117,6 +120,14 @@ func (h *HostingVolumeOptions) SetFromClusterOptions() {
h.AutoCreate = val
}
}
autoDelete, err := options.GetClusterOption("auto-delete-block-hosting-volumes")
if err == nil {
if val, err := strconv.ParseBool(autoDelete); err == nil {
h.AutoDelete = val
}
}
h.ThinArbPath = ""
h.ShardSize = 0
}

View File

@@ -30,7 +30,7 @@ func BlockSizeFilter(size uint64) volume.Filter {
continue
}
if availableSizeInBytes, err := strconv.ParseUint(availableSize, 10, 64); err == nil && availableSizeInBytes > size {
if availableSizeInBytes, err := strconv.ParseUint(availableSize, 10, 64); err == nil && availableSizeInBytes >= size {
volumes = append(volumes, volinfo)
}
}
@@ -82,7 +82,7 @@ func CreateAndStartHostingVolume(req *api.VolCreateReq) (*volume.Volinfo, error)
}
vInfo.Metadata[volume.BlockHostingVolumeAutoCreated] = "yes"
log.WithField("name", vInfo.Name).Debug("host volume created and started successfully")
log.WithField("name", vInfo.Name).Info("host volume created and started successfully")
return vInfo, nil
}
@@ -127,6 +127,8 @@ func UpdateBlockHostingVolumeSize(volInfo *volume.Volinfo, blockSize interface{}
return errors.New("block-hosting-available-size metadata not found for volume")
}
err := errors.New("Test")
availableSizeInBytes, err := strconv.ParseUint(volInfo.Metadata[volume.BlockHostingAvailableSize], 10, 64)
if err != nil {
return err

View File

@@ -60,9 +60,8 @@ func (b *BlockVolume) RestRoutes() route.Routes {
}
// RegisterStepFuncs registers all step functions
// Here it is a no-op method
func (*BlockVolume) RegisterStepFuncs() {
hostvol.RegisterBHVstepFunctions()
}
// Init will initialize the underlying HostVolume manager only once.