1
0
mirror of https://github.com/gluster/glusterd2.git synced 2026-02-05 12:45:38 +01:00

snapshot:clone Implement snapshot clone

This PR includes functional changes, client api changes,
Test cases.

Signed-off-by: Mohammed Rafi KC <rkavunga@redhat.com>
This commit is contained in:
Mohammed Rafi KC
2018-06-14 17:34:35 +05:30
committed by Kaushal M
parent f0381c6329
commit e22c0e07dc
13 changed files with 560 additions and 56 deletions

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/gluster/glusterd2/glusterd2/snapshot/lvm"
"github.com/gluster/glusterd2/glusterd2/volume"
config "github.com/spf13/viper"
)
@@ -186,13 +187,32 @@ func Cleanup(prefix string, brickCount int) {
brickPrefix = prefix
exec.Command("pkill", "gluster").Output()
time.Sleep(2 * time.Second)
time.Sleep(3 * time.Second)
snapDirPrefix := config.GetString("rundir") + "/snaps/"
cloneDirPrefix := config.GetString("rundir") + "/clones/"
mtabEntries, err := volume.GetMounts()
if err != nil {
return
}
for _, m := range mtabEntries {
if strings.HasPrefix(m.MntDir, snapDirPrefix) || strings.HasPrefix(m.MntDir, cloneDirPrefix) {
//Remove any dangling mount pounts
exec.Command("umount", "-f", "-l", m.MntDir).Output()
}
}
snapDirPrefix := config.GetString("rundir") + "/snaps/*"
//Remove any dangling snapshot mount pounts
exec.Command("umount", snapDirPrefix).Output()
deleteVHD(brickCount, true)
deleteLV(brickCount, true)
vg := fmt.Sprintf("%s_vg_", lvmPrefix)
out, err := exec.Command(lvm.LVSCommand, "--noheadings", "-o", "vg_name").Output()
for _, entry := range strings.Split(string(out), "\n") {
if strings.HasPrefix(entry, vg) {
exec.Command(lvm.RemoveCommand, "-f", entry)
}
}
os.RemoveAll(brickPrefix)
}

View File

@@ -20,6 +20,7 @@ import (
var (
snapname = "snaptest"
clonename = "clonevolume"
snapTestName string
)
@@ -34,8 +35,9 @@ func TestSnapshot(t *testing.T) {
prefix := fmt.Sprintf("%s/%s/bricks/", baseWorkdir, snapTestName)
lvmtest.Cleanup(prefix, brickCount)
defer lvmtest.Cleanup(prefix, brickCount)
defer func() {
lvmtest.Cleanup(prefix, brickCount)
}()
gds, err = setupCluster("./config/1.toml", "./config/2.toml")
r.Nil(err)
defer teardownCluster(gds)
@@ -71,6 +73,7 @@ func TestSnapshot(t *testing.T) {
t.Run("List", testSnapshotList)
t.Run("StatusAndForceActivate", testSnapshotStatusForceActivate)
t.Run("Info", testSnapshotInfo)
t.Run("Clone", testSnapshotClone)
t.Run("Restore", testSnapshotRestore)
t.Run("MountRestoredVolume", testRestoredVolumeMount)
t.Run("Deactivate", testSnapshotDeactivate)
@@ -133,6 +136,30 @@ func testSnapshotCreate(t *testing.T) {
}
func testSnapshotClone(t *testing.T) {
r := require.New(t)
snapshotCloneReq := api.SnapCloneReq{
CloneName: clonename,
}
_, err := client.SnapshotClone(snapname, snapshotCloneReq)
r.Nil(err, "snapshot clone failed")
err = client.VolumeStart(clonename, true)
r.Nil(err, "Failed to start cloned volume")
volumes, err := client.Volumes("")
r.Nil(err)
r.Len(volumes, 2)
r.Nil(client.VolumeStop(clonename), "volume stop failed")
r.Nil(client.VolumeDelete(clonename))
volumes, err = client.Volumes("")
r.Nil(err)
r.Len(volumes, 1)
}
func testSnapshotList(t *testing.T) {
r := require.New(t)

View File

@@ -0,0 +1,52 @@
package cmd
import (
"fmt"
"github.com/gluster/glusterd2/pkg/api"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
const (
snapshotCloneHelpShort = "Clone a Gluster Snapshot"
snapshotCloneHelpLong = "Clone a Gluster snapshot into a new volume. This new volume will be a writable copy of snapshot."
)
var (
snapshotCloneCmd = &cobra.Command{
Use: "clone <clonename> <snapname>",
Short: snapshotCloneHelpShort,
Long: snapshotCloneHelpLong,
Args: cobra.MinimumNArgs(2),
Run: snapshotCloneCmdRun,
}
)
func init() {
snapshotCmd.AddCommand(snapshotCloneCmd)
}
func snapshotCloneCmdRun(cmd *cobra.Command, args []string) {
clonename := args[0]
snapname := args[1]
req := api.SnapCloneReq{
CloneName: clonename,
}
vol, err := client.SnapshotClone(snapname, req)
if err != nil {
if verbose {
log.WithFields(log.Fields{
"clonename": clonename,
"snapshot": snapname,
"error": err.Error(),
}).Error("snapshot clone failed")
}
failure("Failed to clone Snapshot", err, 1)
}
fmt.Printf("New Volume %s cloned from Snapshot %s\n", vol.Name, snapname)
fmt.Println("Clone Volume ID: ", vol.ID)
}

View File

@@ -90,11 +90,6 @@ func (c *Command) Routes() route.Routes {
}
}
func snapshotCloneHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
restutils.SendHTTPResponse(ctx, w, http.StatusNotImplemented, "Snapshot Clone")
}
func snapshotConfigGetHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
restutils.SendHTTPResponse(ctx, w, http.StatusNotImplemented, "Snapshot Config Get")
@@ -119,5 +114,6 @@ func (c *Command) RegisterStepFuncs() {
registerSnapDeleteStepFuncs()
registerSnapshotStatusStepFuncs()
registerSnapRestoreStepFuncs()
registerSnapCloneStepFuncs()
return
}

View File

@@ -0,0 +1,387 @@
package snapshotcommands
import (
"errors"
"fmt"
"net/http"
"strings"
"github.com/gluster/glusterd2/glusterd2/gdctx"
restutils "github.com/gluster/glusterd2/glusterd2/servers/rest/utils"
"github.com/gluster/glusterd2/glusterd2/snapshot"
"github.com/gluster/glusterd2/glusterd2/snapshot/lvm"
"github.com/gluster/glusterd2/glusterd2/transaction"
volgen2 "github.com/gluster/glusterd2/glusterd2/volgen2"
"github.com/gluster/glusterd2/glusterd2/volume"
"github.com/gluster/glusterd2/pkg/api"
"github.com/gorilla/mux"
"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
config "github.com/spf13/viper"
)
func undoSnapshotClone(c transaction.TxnCtx) error {
var volinfo volume.Volinfo
if err := c.Get("volinfo", &volinfo); err != nil {
return err
}
for _, b := range volinfo.GetLocalBricks() {
snapshot.UmountBrick(b)
if err := lvm.RemoveBrickSnapshot(b.MountInfo.DevicePath); err != nil {
c.Logger().WithError(err).WithField(
"brick", b.Path).Debug("Failed to remove snapshotted LVM")
return err
}
}
return nil
}
func undoStoreSnapshotClone(c transaction.TxnCtx) error {
var vol volume.Volinfo
if err := c.Get("volinfo", &vol); err != nil {
return err
}
// TODO Delete volfile from etcd properly
volume.DeleteVolume(vol.Name)
return nil
}
func storeSnapshotClone(c transaction.TxnCtx) error {
var vol volume.Volinfo
if err := c.Get("volinfo", &vol); err != nil {
return err
}
if err := volume.AddOrUpdateVolumeFunc(&vol); err != nil {
c.Logger().WithError(err).WithField(
"volume", vol.Name).Debug("storeVolume: failed to store Volinfo")
return err
}
if err := volgen2.Generate(); err != nil {
c.Logger().WithError(err).WithField(
"volume", vol.Name).Debug("generateVolfiles: failed to generate volfiles")
return err
}
return nil
}
func takeSnapshotClone(c transaction.TxnCtx) error {
var snapname string
var newVol volume.Volinfo
if err := c.Get("snapname", &snapname); err != nil {
return err
}
snapinfo, err := snapshot.GetSnapshot(snapname)
if err != nil {
return err
}
if err := c.Get("volinfo", &newVol); err != nil {
return err
}
snapVol := &snapinfo.SnapVolinfo
if err = takeVolumeSnapshots(&newVol, snapVol); err != nil {
return err
}
for _, b := range newVol.GetLocalBricks() {
if err := snapshot.MountSnapBrickDirectory(&newVol, &b); err != nil {
return err
}
}
return nil
}
func populateCloneBrickMountData(volinfo *volume.Volinfo, name string) (map[string]snapshot.BrickMountData, error) {
nodeData := make(map[string]snapshot.BrickMountData)
brickCount := 0
for _, b := range volinfo.GetLocalBricks() {
brickCount++
mountRoot, err := volume.GetBrickMountRoot(b.Path)
if err != nil {
return nil, err
}
mountDir := b.Path[len(mountRoot):]
mntInfo, err := volume.GetBrickMountInfo(mountRoot)
if err != nil {
log.WithError(err).WithField(
"brick", b.Path,
).Error("Failed to mount information")
return nil, err
}
vG, err := lvm.GetVgName(mntInfo.FsName)
if err != nil {
log.WithError(err).WithField(
"brick", b.Path,
).Error("Failed to get vg name")
return nil, err
}
devicePath := fmt.Sprintf("/dev/%s/%s_%s_%d", vG, "clone", name, brickCount)
nodeID := strings.Replace(b.ID.String(), "-", "", -1)
nodeData[b.String()] = snapshot.BrickMountData{
MountDir: mountDir,
DevicePath: devicePath,
FsType: mntInfo.MntType,
MntOpts: updateMntOps(mntInfo.MntType, mntInfo.MntOpts),
Path: snapshotCloneBrickCreate(volinfo.Name, name, nodeID, mountDir, brickCount),
}
// Store the results in transaction context. This will be consumed by
// the node that initiated the transaction.
}
return nodeData, nil
}
func validateSnapClone(c transaction.TxnCtx) error {
var statusStr []string
var err error
var snapname, clonename string
var nodeData map[string]snapshot.BrickMountData
if err = lvm.CommonPrevalidation(lvm.CreateCommand); err != nil {
log.WithError(err).WithField(
"command", lvm.CreateCommand,
).Error("Failed to find lvm packages")
return err
}
if err := c.Get("snapname", &snapname); err != nil {
return err
}
if err := c.Get("clonename", &clonename); err != nil {
return err
}
snapinfo, err := snapshot.GetSnapshot(snapname)
if err != nil {
return err
}
volinfo := &snapinfo.SnapVolinfo
brickStatuses, err := volume.CheckBricksStatus(volinfo)
if err != nil {
return err
}
for _, brickStatus := range brickStatuses {
if brickStatus.Online == false {
statusStr = append(statusStr, brickStatus.Info.String())
}
}
if statusStr != nil {
log.WithError(err).WithField(
"Bricks", statusStr,
).Error("Bricks are offline")
return errors.New("One or more brick is offline")
}
if nodeData, err = populateCloneBrickMountData(volinfo, clonename); err != nil {
return err
}
c.SetNodeResult(gdctx.MyUUID, snapshot.NodeDataTxnKey, &nodeData)
//Quorum check ?
return nil
}
func createCloneVolinfo(c transaction.TxnCtx) error {
var clonename, snapname string
nodeData := make(map[string]snapshot.BrickMountData)
if err := c.Get("snapname", &snapname); err != nil {
return err
}
if err := c.Get("clonename", &clonename); err != nil {
return err
}
snapinfo, err := snapshot.GetSnapshot(snapname)
if err != nil {
return err
}
volinfo := &snapinfo.SnapVolinfo
for _, node := range volinfo.Nodes() {
tmp := make(map[string]snapshot.BrickMountData)
err := c.GetNodeResult(node, snapshot.NodeDataTxnKey, &tmp)
if err != nil {
return err
}
for k, v := range tmp {
nodeData[k] = v
}
}
newVol := new(volume.Volinfo)
duplicateVolinfo(volinfo, newVol)
for key, value := range snapinfo.OptionChange {
newVol.Options[key] = value
}
newVol.State = volume.VolCreated
newVol.GraphMap = volinfo.GraphMap
newVol.ID = uuid.NewRandom()
newVol.Name = clonename
newVol.VolfileID = clonename
err = createSnapSubvols(newVol, volinfo, nodeData)
if err != nil {
log.WithFields(log.Fields{
"snapshot": snapname,
"volume name": clonename,
}).Error("Failed to create clone volinfo")
return err
}
err = c.Set("volinfo", newVol)
if err != nil {
return err
}
return nil
}
func snapshotCloneBrickCreate(snapName, cloneName, nodeID, mountDir string, brickCount int) string {
cloneDirPrefix := config.GetString("rundir") + "/clones/"
brickPath := fmt.Sprintf("%s%s/%s/%s/brick%d%s", cloneDirPrefix, snapName, nodeID, cloneName, brickCount, mountDir)
return brickPath
}
func registerSnapCloneStepFuncs() {
var sfs = []struct {
name string
sf transaction.StepFunc
}{
{"snap-clone.Validate", validateSnapClone},
{"snap-clone.CreateCloneVolinfo", createCloneVolinfo},
{"snap-clone.TakeBrickSnapshots", takeSnapshotClone},
{"snap-clone.UndoBrickSnapshots", undoSnapshotClone},
{"snap-clone.StoreSnapshot", storeSnapshotClone},
{"snap-clone.UndoStoreSnapshotOnClone", undoStoreSnapshotClone},
}
for _, sf := range sfs {
transaction.RegisterStepFunc(sf.sf, sf.name)
}
}
func snapshotCloneHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req := new(api.SnapCloneReq)
logger := gdctx.GetReqLogger(ctx)
snapname := mux.Vars(r)["snapname"]
if snapname == "" {
restutils.SendHTTPError(ctx, w, http.StatusUnprocessableEntity, errors.New("Snapshot name should not be empty"))
return
}
if err := restutils.UnmarshalRequest(r, &req); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusUnprocessableEntity, err)
return
}
txn, err := transaction.NewTxnWithLocks(ctx, req.CloneName, snapname)
if err != nil {
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
return
}
defer txn.Done()
snapinfo, err := snapshot.GetSnapshot(snapname)
if err != nil {
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
return
}
snapVol := &snapinfo.SnapVolinfo
if volume.Exists(req.CloneName) {
errMsg := "A volume with the same clone name exist."
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, errMsg)
return
}
if snapVol.State != volume.VolStarted {
errMsg := "Snapshot must be in started state before cloning."
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, errMsg)
return
}
txn.Nodes = snapVol.Nodes()
txn.Steps = []*transaction.Step{
{
DoFunc: "snap-clone.Validate",
Nodes: txn.Nodes,
},
{
DoFunc: "snap-clone.CreateCloneVolinfo",
Nodes: []uuid.UUID{gdctx.MyUUID},
},
{
DoFunc: "snap-clone.TakeBrickSnapshots",
UndoFunc: "snap-clone.UndoBrickSnapshots",
Nodes: txn.Nodes,
},
{
DoFunc: "snap-clone.StoreSnapshot",
UndoFunc: "snap-clone.UndoStoreSnapshotOnClone",
Nodes: []uuid.UUID{gdctx.MyUUID},
},
}
err = txn.Ctx.Set("snapname", &snapname)
if err != nil {
logger.WithError(err).Error("failed to set request in transaction context")
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
}
err = txn.Ctx.Set("clonename", &req.CloneName)
if err != nil {
logger.WithError(err).Error("failed to set request in transaction context")
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
}
err = txn.Do()
if err != nil {
logger.WithError(err).Error("snapshot clone transaction failed")
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
return
}
txn.Ctx.Logger().WithField("CloneName", req.CloneName).Info("new volume cloned from snapshot")
vol, err := volume.GetVolume(req.CloneName)
if err != nil {
// FIXME: If volume was created successfully in the txn above and
// then the store goes down by the time we reach here, what do
// we return to the client ?
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
}
resp := createVolumeCreateResp(vol)
restutils.SendHTTPResponse(ctx, w, http.StatusCreated, resp)
}
func createVolumeCreateResp(v *volume.Volinfo) *api.VolumeCreateResp {
return (*api.VolumeCreateResp)(volume.CreateVolumeInfoResp(v))
}

View File

@@ -288,7 +288,7 @@ func updateMntOps(FsType, MntOpts string) string {
}
return MntOpts
}
func populateBrickMountData(volinfo *volume.Volinfo, snapName string) (map[string]snapshot.BrickMountData, error) {
func populateSnapBrickMountData(volinfo *volume.Volinfo, snapName string) (map[string]snapshot.BrickMountData, error) {
nodeData := make(map[string]snapshot.BrickMountData)
brickCount := 0
@@ -317,7 +317,7 @@ func populateBrickMountData(volinfo *volume.Volinfo, snapName string) (map[strin
return nil, err
}
devicePath := fmt.Sprintf("/dev/%s/%s_%d", vG, snapName, brickCount)
devicePath := fmt.Sprintf("/dev/%s/%s_%s_%d", vG, "snap", snapName, brickCount)
nodeID := strings.Replace(b.ID.String(), "-", "", -1)
@@ -381,28 +381,16 @@ func validateSnapCreate(c transaction.TxnCtx) error {
return errors.New("one or more brick is not compatable")
}
if nodeData, err = populateBrickMountData(volinfo, req.SnapName); err != nil {
if nodeData, err = populateSnapBrickMountData(volinfo, req.SnapName); err != nil {
return err
}
c.SetNodeResult(gdctx.MyUUID, snapshot.NodeDataTxnKey, &nodeData)
//Quorum check ?
return nil
}
func takeBrickSnapshots(c transaction.TxnCtx) error {
var snapInfo snapshot.Snapinfo
if err := c.Get("snapinfo", &snapInfo); err != nil {
return err
}
snapVol := snapInfo.SnapVolinfo
volinfo, err := volume.GetVolume(snapInfo.ParentVolume)
if err != nil {
return err
}
for subvolCount, subvol := range volinfo.Subvols {
func takeVolumeSnapshots(newVol, oldVol *volume.Volinfo) error {
for subvolCount, subvol := range oldVol.Subvols {
for count, b := range subvol.Bricks {
if !uuid.Equal(b.PeerID, gdctx.MyUUID) {
continue
@@ -410,7 +398,7 @@ func takeBrickSnapshots(c transaction.TxnCtx) error {
/*
TODO : Run as a go routine
*/
snapBrick := snapVol.Subvols[subvolCount].Bricks[count]
snapBrick := newVol.Subvols[subvolCount].Bricks[count]
mountData := snapBrick.MountInfo
length := len(b.Path) - len(mountData.Mountdir)
mountRoot := b.Path[:length]
@@ -426,8 +414,12 @@ func takeBrickSnapshots(c transaction.TxnCtx) error {
}).Debug("Running snapshot create command")
if err := lvm.LVSnapshot(mntInfo.FsName, mountData.DevicePath); err != nil {
c.Logger().WithError(err).WithField(
"brick", b.Path).Error("Snapshot failed")
log.WithFields(log.Fields{
"mount device": mntInfo.FsName,
"devicePath": mountData.DevicePath,
"Path": b.Path,
}).Error("Running snapshot create command failed")
return err
}
@@ -440,16 +432,32 @@ func takeBrickSnapshots(c transaction.TxnCtx) error {
}
}
}
return nil
}
func createSnapSubvols(snapVolinfo, volinfo *volume.Volinfo, nodeData map[string]snapshot.BrickMountData) error {
func takeSnapshots(c transaction.TxnCtx) error {
var snapInfo snapshot.Snapinfo
if err := c.Get("snapinfo", &snapInfo); err != nil {
return err
}
snapVol := &snapInfo.SnapVolinfo
volinfo, err := volume.GetVolume(snapInfo.ParentVolume)
if err != nil {
return err
}
err = takeVolumeSnapshots(snapVol, volinfo)
return err
}
func createSnapSubvols(newVolinfo, origVolinfo *volume.Volinfo, nodeData map[string]snapshot.BrickMountData) error {
var err error
for idx, subvol := range volinfo.Subvols {
for idx, subvol := range origVolinfo.Subvols {
subvolType := volume.SubvolTypeToString(subvol.Type)
name := fmt.Sprintf("%s-%s-%d", snapVolinfo.Name, strings.ToLower(subvolType), idx)
name := fmt.Sprintf("%s-%s-%d", newVolinfo.Name, strings.ToLower(subvolType), idx)
s := volume.Subvol{
Name: name,
ID: uuid.NewRandom(),
@@ -474,7 +482,7 @@ func createSnapSubvols(snapVolinfo, volinfo *volume.Volinfo, nodeData map[string
bricks = append(bricks, brick)
}
s.Bricks, err = volume.NewBrickEntriesFunc(bricks, snapVolinfo.Name, snapVolinfo.ID)
s.Bricks, err = volume.NewBrickEntriesFunc(bricks, newVolinfo.Name, newVolinfo.ID)
if err != nil {
return err
}
@@ -490,7 +498,7 @@ func createSnapSubvols(snapVolinfo, volinfo *volume.Volinfo, nodeData map[string
}
}
snapVolinfo.Subvols = append(snapVolinfo.Subvols, s)
newVolinfo.Subvols = append(newVolinfo.Subvols, s)
}
return nil
@@ -504,6 +512,7 @@ func createSnapinfo(c transaction.TxnCtx) error {
"feature.deem-statfs",
"features.quota-deem-statfs",
"bitrot-stub.bitrot",
"replicate.self-heal-daemon",
}
nodeData := make(map[string]snapshot.BrickMountData)
@@ -535,7 +544,7 @@ func createSnapinfo(c transaction.TxnCtx) error {
for _, key := range ignoreOps {
snapInfo.OptionChange[key] = snapVolinfo.Options[key]
delete(snapVolinfo.Options, key)
snapVolinfo.Options[key] = "disable"
}
snapVolinfo.State = volume.VolCreated
@@ -547,7 +556,6 @@ func createSnapinfo(c transaction.TxnCtx) error {
TODO
For now disabling heal
*/
snapVolinfo.Options["replicate.self-heal-daemon"] = "off"
err = createSnapSubvols(snapVolinfo, volinfo, nodeData)
if err != nil {
@@ -650,7 +658,7 @@ func registerSnapCreateStepFuncs() {
{"snap-create.Validate", validateSnapCreate},
{"snap-create.CreateSnapinfo", createSnapinfo},
{"snap-create.ActivateBarrier", activateBarrier},
{"snap-create.TakeBrickSnapshots", takeBrickSnapshots},
{"snap-create.TakeBrickSnapshots", takeSnapshots},
{"snap-create.UndoBrickSnapshots", undoBrickSnapshots},
{"snap-create.DeactivateBarrier", deactivateBarrier},
{"snap-create.StoreSnapshot", storeSnapshot},

View File

@@ -26,7 +26,7 @@ func snapshotDelete(c transaction.TxnCtx) error {
snapVol := snapinfo.SnapVolinfo
for _, b := range snapVol.GetLocalBricks() {
if snapVol.State == volume.VolStarted {
if err := snapshot.StopBrick(b); err != nil {
if err := snapshot.StopBrick(b, c.Logger()); err != nil {
c.Logger().WithError(err).WithField(
"brick", b.Path).Warning("Failed to cleanup the brick.Earlier it might have stopped abnormaly")

View File

@@ -53,7 +53,7 @@ func snapRestore(c transaction.TxnCtx) error {
//Do a proper snapshot stop, once there is a generic way of stopping all the proceess of a volume.
for _, b := range onlineBricks {
if err = b.TerminateBrick(); err != nil {
if err = b.StopBrick(); err != nil {
if err = b.StopBrick(c.Logger()); err != nil {
return err
}
}
@@ -136,7 +136,7 @@ func undoSnapRestore(c transaction.TxnCtx) error {
return err
}
if snapVol.State == volume.VolStarted {
if err := b.StartBrick(); err != nil {
if err := b.StartBrick(c.Logger()); err != nil {
if err == errors.ErrProcessAlreadyRunning {
continue
}
@@ -144,7 +144,7 @@ func undoSnapRestore(c transaction.TxnCtx) error {
}
} else {
if err = b.TerminateBrick(); err != nil {
if err = b.StopBrick(); err != nil {
if err = b.StopBrick(c.Logger()); err != nil {
//Process might not be running,
//TODO once we have errors.ErrProcessAlreadyStopped
//check for other errors

View File

@@ -49,9 +49,9 @@ func GetSnapshots() ([]*Snapinfo, error) {
if err := json.Unmarshal(kv.Value, &snap); err != nil {
log.WithFields(log.Fields{
"volume": string(kv.Key),
"error": err,
}).Error("Failed to unmarshal volume")
"snapshot": string(kv.Key),
"error": err,
}).Error("Failed to unmarshal snapshot")
continue
}
@@ -78,13 +78,13 @@ func GetSnapshotVolumes() ([]*volume.Volinfo, error) {
func AddOrUpdateSnap(snapInfo *Snapinfo) error {
json, e := json.Marshal(snapInfo)
if e != nil {
log.WithField("error", e).Error("Failed to marshal the volinfo object")
log.WithField("error", e).Error("Failed to marshal the snapinfo object")
return e
}
_, e = gdstore.Put(context.TODO(), GetStorePath(snapInfo), string(json))
if e != nil {
log.WithError(e).Error("Couldn't add volume to store")
log.WithError(e).Error("Couldn't add snapshot to store")
return e
}
return nil
@@ -101,12 +101,12 @@ func GetSnapshot(name string) (*Snapinfo, error) {
}
if resp.Count != 1 {
log.WithField("volume", name).Error("volume not found")
return nil, errors.New("volume not found")
log.WithField("snapshot", name).Error("snapshot not found")
return nil, errors.New("Snapshot not found")
}
if e = json.Unmarshal(resp.Kvs[0].Value, &snap); e != nil {
log.WithError(e).Error("Failed to unmarshal the data into volinfo object")
log.WithError(e).Error("Failed to unmarshal the data into snapinfo object")
return nil, e
}
return &snap, nil

View File

@@ -124,7 +124,8 @@ func readMountEntry(entry string) *Mntent {
}
}
func getMounts() ([]*Mntent, error) {
//GetMounts returns all the mount point entries from /proc/mounts
func GetMounts() ([]*Mntent, error) {
content, err := ioutil.ReadFile("/proc/mounts")
if err != nil {

View File

@@ -71,7 +71,7 @@ func isBrickPathAvailable(peerID uuid.UUID, brickPath string) error {
func CheckBricksStatus(volinfo *Volinfo) ([]brick.Brickstatus, error) {
var brickStatuses []brick.Brickstatus
mtabEntries, err := getMounts()
mtabEntries, err := GetMounts()
if err != nil {
log.WithError(err).Error("Failed to read /etc/mtab file.")
return brickStatuses, err
@@ -148,7 +148,7 @@ func GetBrickMountRoot(brickPath string) (string, error) {
//GetBrickMountInfo return mount related information
func GetBrickMountInfo(mountRoot string) (*Mntent, error) {
mtabEntries, err := getMounts()
mtabEntries, err := GetMounts()
if err != nil {
return nil, err
}

View File

@@ -13,3 +13,8 @@ type SnapCreateReq struct {
type SnapActivateReq struct {
Force bool `json:"force,omitempty"`
}
//SnapCloneReq represents a request to clone a snapshot
type SnapCloneReq struct {
CloneName string `json:"clonename"`
}

View File

@@ -71,3 +71,11 @@ func (c *Client) SnapshotRestore(snapname string) (api.VolumeGetResp, error) {
err := c.post(url, nil, http.StatusOK, &resp)
return resp, err
}
// SnapshotClone creates a writable Gluster Snapshot, it will be similiar to a volume
func (c *Client) SnapshotClone(snapname string, req api.SnapCloneReq) (api.VolumeCreateResp, error) {
var vol api.VolumeCreateResp
url := fmt.Sprintf("/v1/snapshot/%s/clone", snapname)
err := c.post(url, req, http.StatusCreated, &vol)
return vol, err
}