1
0
mirror of https://github.com/lxc/incus.git synced 2026-02-05 09:46:19 +01:00
Files
incus/cmd/incusd/migrate_instance.go
2025-05-23 01:46:13 -04:00

297 lines
8.9 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"io"
"net/url"
"os/exec"
"strings"
"time"
"github.com/lxc/incus/v6/internal/server/instance"
"github.com/lxc/incus/v6/internal/server/instance/instancetype"
"github.com/lxc/incus/v6/internal/server/instance/operationlock"
"github.com/lxc/incus/v6/internal/server/migration"
"github.com/lxc/incus/v6/internal/server/operations"
internalUtil "github.com/lxc/incus/v6/internal/util"
"github.com/lxc/incus/v6/shared/api"
"github.com/lxc/incus/v6/shared/logger"
)
func newMigrationSource(inst instance.Instance, stateful bool, instanceOnly bool, allowInconsistent bool, clusterMoveSourceName string, storagePool string, pushTarget *api.InstancePostTarget) (*migrationSourceWs, error) {
ret := migrationSourceWs{
migrationFields: migrationFields{
instance: inst,
allowInconsistent: allowInconsistent,
storagePool: storagePool,
},
clusterMoveSourceName: clusterMoveSourceName,
}
if pushTarget != nil {
ret.pushCertificate = pushTarget.Certificate
ret.pushOperationURL = pushTarget.Operation
ret.pushSecrets = pushTarget.Websockets
}
ret.instanceOnly = instanceOnly
secretNames := []string{api.SecretNameControl, api.SecretNameFilesystem}
if stateful && inst.IsRunning() {
if inst.Type() == instancetype.Container {
_, err := exec.LookPath("criu")
if err != nil {
return nil, migration.ErrNoLiveMigrationSource
}
}
ret.live = true
secretNames = append(secretNames, api.SecretNameState)
}
ret.conns = make(map[string]*migrationConn, len(secretNames))
for _, connName := range secretNames {
if ret.pushOperationURL != "" {
if ret.pushSecrets[connName] == "" {
return nil, fmt.Errorf("Expected %q connection secret missing from migration source target request", connName)
}
dialer, err := setupWebsocketDialer(ret.pushCertificate)
if err != nil {
return nil, fmt.Errorf("Failed setting up websocket dialer for migration source %q connection: %w", connName, err)
}
u, err := url.Parse(fmt.Sprintf("wss://%s/websocket", strings.TrimPrefix(ret.pushOperationURL, "https://")))
if err != nil {
return nil, fmt.Errorf("Failed parsing websocket URL for migration source %q connection: %w", connName, err)
}
ret.conns[connName] = newMigrationConn(ret.pushSecrets[connName], dialer, u)
} else {
secret, err := internalUtil.RandomHexString(32)
if err != nil {
return nil, fmt.Errorf("Failed creating migration source secret for %q connection: %w", connName, err)
}
ret.conns[connName] = newMigrationConn(secret, nil, nil)
}
}
return &ret, nil
}
func (s *migrationSourceWs) do(migrateOp *operations.Operation) error {
l := logger.AddContext(logger.Ctx{"project": s.instance.Project().Name, "instance": s.instance.Name(), "live": s.live, "clusterMoveSourceName": s.clusterMoveSourceName, "push": s.pushOperationURL != ""})
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
defer cancel()
l.Debug("Waiting for migration control connection on source")
_, err := s.conns[api.SecretNameControl].WebSocket(ctx)
if err != nil {
return fmt.Errorf("Failed waiting for migration control connection on source: %w", err)
}
l.Debug("Migration control connection established on source")
defer l.Debug("Migration channels disconnected on source")
defer s.disconnect()
stateConnFunc := func(ctx context.Context) (io.ReadWriteCloser, error) {
conn := s.conns[api.SecretNameState]
if conn == nil {
return nil, errors.New("Migration source control connection not initialized")
}
wsConn, err := conn.WebsocketIO(ctx)
if err != nil {
return nil, fmt.Errorf("Failed getting migration source control connection: %w", err)
}
return wsConn, nil
}
filesystemConnFunc := func(ctx context.Context) (io.ReadWriteCloser, error) {
conn := s.conns[api.SecretNameFilesystem]
if conn == nil {
return nil, errors.New("Migration source filesystem connection not initialized")
}
wsConn, err := conn.WebsocketIO(ctx)
if err != nil {
return nil, fmt.Errorf("Failed getting migration source filesystem connection: %w", err)
}
return wsConn, nil
}
s.instance.SetOperation(migrateOp)
err = s.instance.MigrateSend(instance.MigrateSendArgs{
MigrateArgs: instance.MigrateArgs{
ControlSend: s.send,
ControlReceive: s.recv,
StateConn: stateConnFunc,
FilesystemConn: filesystemConnFunc,
Snapshots: !s.instanceOnly,
Live: s.live,
Disconnect: func() {
for connName, conn := range s.conns {
if connName != api.SecretNameControl {
conn.Close()
}
}
},
ClusterMoveSourceName: s.clusterMoveSourceName,
StoragePool: s.storagePool,
},
AllowInconsistent: s.allowInconsistent,
})
if err != nil {
l.Error("Failed migration on source", logger.Ctx{"err": err})
errMsg := fmt.Errorf("Failed migration on source: %w", err)
s.sendControl(errMsg)
return errMsg
}
return nil
}
func newMigrationSink(args *migrationSinkArgs) (*migrationSink, error) {
sink := migrationSink{
migrationFields: migrationFields{
instance: args.Instance,
instanceOnly: args.InstanceOnly,
live: args.Live,
storagePool: args.StoragePool,
},
url: args.URL,
clusterMoveSourceName: args.ClusterMoveSourceName,
push: args.Push,
refresh: args.Refresh,
refreshExcludeOlder: args.RefreshExcludeOlder,
}
secretNames := []string{api.SecretNameControl, api.SecretNameFilesystem}
if sink.live {
if sink.instance.Type() == instancetype.Container {
_, err := exec.LookPath("criu")
if err != nil {
return nil, migration.ErrNoLiveMigrationTarget
}
}
secretNames = append(secretNames, api.SecretNameState)
}
sink.conns = make(map[string]*migrationConn, len(secretNames))
for _, connName := range secretNames {
if !sink.push {
if args.Secrets[connName] == "" {
return nil, fmt.Errorf("Expected %q connection secret missing from migration sink target request", connName)
}
u, err := url.Parse(fmt.Sprintf("wss://%s/websocket", strings.TrimPrefix(args.URL, "https://")))
if err != nil {
return nil, fmt.Errorf("Failed parsing websocket URL for migration sink %q connection: %w", connName, err)
}
sink.conns[connName] = newMigrationConn(args.Secrets[connName], args.Dialer, u)
} else {
secret, err := internalUtil.RandomHexString(32)
if err != nil {
return nil, fmt.Errorf("Failed creating migration sink secret for %q connection: %w", connName, err)
}
sink.conns[connName] = newMigrationConn(secret, nil, nil)
}
}
return &sink, nil
}
func (c *migrationSink) do(instOp *operationlock.InstanceOperation) error {
l := logger.AddContext(logger.Ctx{"project": c.instance.Project().Name, "instance": c.instance.Name(), "live": c.live, "clusterMoveSourceName": c.clusterMoveSourceName, "push": c.push})
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
defer cancel()
l.Debug("Waiting for migration control connection on target")
_, err := c.conns[api.SecretNameControl].WebSocket(ctx)
if err != nil {
return fmt.Errorf("Failed waiting for migration control connection on target: %w", err)
}
l.Debug("Migration control connection established on target")
defer l.Debug("Migration channels disconnected on target")
if c.push {
defer c.disconnect()
}
stateConnFunc := func(ctx context.Context) (io.ReadWriteCloser, error) {
conn := c.conns[api.SecretNameState]
if conn == nil {
return nil, errors.New("Migration target control connection not initialized")
}
wsConn, err := conn.WebsocketIO(ctx)
if err != nil {
return nil, fmt.Errorf("Failed getting migration target control connection: %w", err)
}
return wsConn, nil
}
filesystemConnFunc := func(ctx context.Context) (io.ReadWriteCloser, error) {
conn := c.conns[api.SecretNameFilesystem]
if conn == nil {
return nil, errors.New("Migration target filesystem connection not initialized")
}
wsConn, err := conn.WebsocketIO(ctx)
if err != nil {
return nil, fmt.Errorf("Failed getting migration target filesystem connection: %w", err)
}
return wsConn, nil
}
err = c.instance.MigrateReceive(instance.MigrateReceiveArgs{
MigrateArgs: instance.MigrateArgs{
ControlSend: c.send,
ControlReceive: c.recv,
StateConn: stateConnFunc,
FilesystemConn: filesystemConnFunc,
Snapshots: !c.instanceOnly,
Live: c.live,
Disconnect: func() {
for connName, conn := range c.conns {
if connName != api.SecretNameControl {
conn.Close()
}
}
},
ClusterMoveSourceName: c.clusterMoveSourceName,
StoragePool: c.storagePool,
},
InstanceOperation: instOp,
Refresh: c.refresh,
RefreshExcludeOlder: c.refreshExcludeOlder,
})
if err != nil {
l.Error("Failed migration on target", logger.Ctx{"err": err})
errMsg := fmt.Errorf("Failed migration on target: %w", err)
c.sendControl(errMsg)
return errMsg
}
return nil
}