1
0
mirror of https://github.com/lxc/incus.git synced 2026-02-05 09:46:19 +01:00
Files
incus/client/operations.go
2025-05-23 01:39:12 -04:00

378 lines
7.9 KiB
Go

package incus
import (
"context"
"encoding/json"
"errors"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/lxc/incus/v6/shared/api"
)
// The Operation type represents an ongoing Incus operation (asynchronous processing).
type operation struct {
api.Operation
r *ProtocolIncus
listener *EventListener
handlerReady bool
handlerLock sync.Mutex
skipListener bool
chActive chan bool
}
// AddHandler adds a function to be called whenever an event is received.
func (op *operation) AddHandler(function func(api.Operation)) (*EventTarget, error) {
if op.skipListener {
return nil, errors.New("Cannot add handler, client operation does not support event listeners")
}
// Make sure we have a listener setup
err := op.setupListener()
if err != nil {
return nil, err
}
// Make sure we're not racing with ourselves
op.handlerLock.Lock()
defer op.handlerLock.Unlock()
// If we're done already, just return
if op.StatusCode.IsFinal() {
return nil, nil
}
// Wrap the function to filter unwanted messages
wrapped := func(event api.Event) {
op.handlerLock.Lock()
newOp := api.Operation{}
err := json.Unmarshal(event.Metadata, &newOp)
if err != nil || newOp.ID != op.ID {
op.handlerLock.Unlock()
return
}
op.handlerLock.Unlock()
function(newOp)
}
return op.listener.AddHandler([]string{"operation"}, wrapped)
}
// Cancel will request that Incus cancels the operation (if supported).
func (op *operation) Cancel() error {
return op.r.DeleteOperation(op.ID)
}
// Get returns the API operation struct.
func (op *operation) Get() api.Operation {
return op.Operation
}
// GetWebsocket returns a raw websocket connection from the operation.
func (op *operation) GetWebsocket(secret string) (*websocket.Conn, error) {
return op.r.GetOperationWebsocket(op.ID, secret)
}
// RemoveHandler removes a function to be called whenever an event is received.
func (op *operation) RemoveHandler(target *EventTarget) error {
if op.skipListener {
return errors.New("Cannot remove handler, client operation does not support event listeners")
}
// Make sure we're not racing with ourselves
op.handlerLock.Lock()
defer op.handlerLock.Unlock()
// If the listener is gone, just return
if op.listener == nil {
return nil
}
return op.listener.RemoveHandler(target)
}
// Refresh pulls the current version of the operation and updates the struct.
func (op *operation) Refresh() error {
// Get the current version of the operation
newOp, _, err := op.r.GetOperation(op.ID)
if err != nil {
return err
}
// Update the operation struct
op.Operation = *newOp
return nil
}
// Wait lets you wait until the operation reaches a final state.
func (op *operation) Wait() error {
return op.WaitContext(context.Background())
}
// WaitContext lets you wait until the operation reaches a final state with context.Context.
func (op *operation) WaitContext(ctx context.Context) error {
if op.skipListener {
timeout := -1
deadline, ok := ctx.Deadline()
if ok {
timeout = int(time.Until(deadline).Seconds())
}
opAPI, _, err := op.r.GetOperationWait(op.ID, timeout)
if err != nil {
return err
}
op.Operation = *opAPI
if opAPI.Err != "" {
return errors.New(opAPI.Err)
}
return nil
}
op.handlerLock.Lock()
// Check if not done already
if op.StatusCode.IsFinal() {
if op.Err != "" {
op.handlerLock.Unlock()
return errors.New(op.Err)
}
op.handlerLock.Unlock()
return nil
}
op.handlerLock.Unlock()
// Make sure we have a listener setup
err := op.setupListener()
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-op.chActive:
}
// We're done, parse the result
if op.Err != "" {
return errors.New(op.Err)
}
return nil
}
// setupListener initiates an event listener for an operation and manages updates to the operation's state.
// It adds handlers to process events, monitors the listener for completion or errors,
// and triggers a manual refresh of the operation's state to prevent race conditions.
func (op *operation) setupListener() error {
if op.skipListener {
return errors.New("Cannot set up event listener, client operation does not support event listeners")
}
// Make sure we're not racing with ourselves
op.handlerLock.Lock()
defer op.handlerLock.Unlock()
// We already have a listener setup
if op.handlerReady {
return nil
}
op.handlerReady = true
// Get a new listener
if op.listener == nil {
listener, err := op.r.GetEvents()
if err != nil {
return err
}
op.listener = listener
}
// Setup the handler
chReady := make(chan bool)
_, err := op.listener.AddHandler([]string{"operation"}, func(event api.Event) {
<-chReady
// We don't want concurrency while processing events
op.handlerLock.Lock()
defer op.handlerLock.Unlock()
// Check if we're done already (because of another event)
if op.listener == nil {
return
}
// Get an operation struct out of this data
newOp := api.Operation{}
err := json.Unmarshal(event.Metadata, &newOp)
if err != nil || newOp.ID != op.ID {
return
}
// Update the struct
op.Operation = newOp
// And check if we're done
if op.StatusCode.IsFinal() {
op.listener.Disconnect()
op.listener = nil
close(op.chActive)
return
}
})
if err != nil {
op.listener.Disconnect()
op.listener = nil
close(op.chActive)
close(chReady)
return err
}
// Monitor event listener
go func() {
<-chReady
// We don't want concurrency while accessing the listener
op.handlerLock.Lock()
// Check if we're done already (because of another event)
listener := op.listener
if listener == nil {
op.handlerLock.Unlock()
return
}
op.handlerLock.Unlock()
// Wait for the listener or operation to be done
select {
case <-listener.ctx.Done():
op.handlerLock.Lock()
if op.listener != nil {
op.Err = listener.err.Error()
close(op.chActive)
}
op.handlerLock.Unlock()
case <-op.chActive:
return
}
}()
// And do a manual refresh to avoid races
err = op.Refresh()
if err != nil {
op.listener.Disconnect()
op.listener = nil
close(op.chActive)
close(chReady)
return err
}
// Check if not done already
if op.StatusCode.IsFinal() {
op.listener.Disconnect()
op.listener = nil
close(op.chActive)
close(chReady)
if op.Err != "" {
return errors.New(op.Err)
}
return nil
}
// Start processing background updates
close(chReady)
return nil
}
// The remoteOperation type represents an ongoing Incus operation between two servers.
type remoteOperation struct {
targetOp Operation
handlers []func(api.Operation)
handlerLock sync.Mutex
chDone chan bool
chPost chan bool
err error
}
// AddHandler adds a function to be called whenever an event is received.
func (op *remoteOperation) AddHandler(function func(api.Operation)) (*EventTarget, error) {
var err error
var target *EventTarget
op.handlerLock.Lock()
defer op.handlerLock.Unlock()
// Attach to the existing target operation
if op.targetOp != nil {
target, err = op.targetOp.AddHandler(function)
if err != nil {
return nil, err
}
} else {
// Generate a mock EventTarget
target = &EventTarget{
function: func(api.Event) { function(api.Operation{}) },
types: []string{"operation"},
}
}
// Add the handler to our list
op.handlers = append(op.handlers, function)
return target, nil
}
// CancelTarget attempts to cancel the target operation.
func (op *remoteOperation) CancelTarget() error {
if op.targetOp == nil {
return errors.New("No associated target operation")
}
return op.targetOp.Cancel()
}
// GetTarget returns the target operation.
func (op *remoteOperation) GetTarget() (*api.Operation, error) {
if op.targetOp == nil {
return nil, errors.New("No associated target operation")
}
opAPI := op.targetOp.Get()
return &opAPI, nil
}
// Wait lets you wait until the operation reaches a final state.
func (op *remoteOperation) Wait() error {
<-op.chDone
if op.chPost != nil {
<-op.chPost
}
return op.err
}