mirror of
https://github.com/lxc/incus.git
synced 2026-02-05 09:46:19 +01:00
172 lines
3.9 KiB
Go
172 lines
3.9 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/lxc/incus/v6/internal/server/events"
|
|
"github.com/lxc/incus/v6/internal/server/response"
|
|
"github.com/lxc/incus/v6/shared/api"
|
|
"github.com/lxc/incus/v6/shared/logger"
|
|
"github.com/lxc/incus/v6/shared/ws"
|
|
)
|
|
|
|
var eventsCmd = APIEndpoint{
|
|
Path: "events",
|
|
|
|
Get: APIEndpointAction{Handler: eventsGet},
|
|
Post: APIEndpointAction{Handler: eventsPost},
|
|
}
|
|
|
|
type eventsServe struct {
|
|
req *http.Request
|
|
d *Daemon
|
|
}
|
|
|
|
func (r *eventsServe) Render(w http.ResponseWriter) error {
|
|
return eventsSocket(r.d, r.req, w)
|
|
}
|
|
|
|
func (r *eventsServe) String() string {
|
|
return "event handler"
|
|
}
|
|
|
|
// Code returns the HTTP code.
|
|
func (r *eventsServe) Code() int {
|
|
return http.StatusOK
|
|
}
|
|
|
|
func eventsSocket(d *Daemon, r *http.Request, w http.ResponseWriter) error {
|
|
typeStr := r.FormValue("type")
|
|
if typeStr == "" {
|
|
// We add 'config' here to allow listeners on /dev/incus/sock to receive config changes.
|
|
typeStr = "logging,operation,lifecycle,config,device"
|
|
}
|
|
|
|
var listenerConnection events.EventListenerConnection
|
|
|
|
// If the client has not requested a websocket connection then fallback to long polling event stream mode.
|
|
if r.Header.Get("Upgrade") == "websocket" {
|
|
// Upgrade the connection to websocket
|
|
conn, err := ws.Upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() { _ = conn.Close() }() // Ensure listener below ends when this function ends.
|
|
|
|
listenerConnection = events.NewWebsocketListenerConnection(conn)
|
|
} else {
|
|
h, ok := w.(http.Hijacker)
|
|
if !ok {
|
|
return errors.New("Missing implemented http.Hijacker interface")
|
|
}
|
|
|
|
conn, _, err := h.Hijack()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() { _ = conn.Close() }() // Ensure listener below ends when this function ends.
|
|
|
|
listenerConnection, err = events.NewStreamListenerConnection(conn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// As we don't know which project we are in, subscribe to events from all projects.
|
|
listener, err := d.events.AddListener("", true, nil, listenerConnection, strings.Split(typeStr, ","), nil, nil, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
listener.Wait(r.Context())
|
|
|
|
return nil
|
|
}
|
|
|
|
func eventsGet(d *Daemon, r *http.Request) response.Response {
|
|
return &eventsServe{req: r, d: d}
|
|
}
|
|
|
|
func eventsPost(d *Daemon, r *http.Request) response.Response {
|
|
var event api.Event
|
|
|
|
err := json.NewDecoder(r.Body).Decode(&event)
|
|
if err != nil {
|
|
return response.InternalError(err)
|
|
}
|
|
|
|
err = d.events.Send("", event.Type, event.Metadata)
|
|
if err != nil {
|
|
return response.InternalError(err)
|
|
}
|
|
|
|
// Handle device related actions locally.
|
|
go eventsProcess(d, event)
|
|
|
|
return response.SyncResponse(true, nil)
|
|
}
|
|
|
|
func eventsProcess(d *Daemon, event api.Event) {
|
|
// As we only handle mounts, skip if disabled.
|
|
if d.Features != nil && !d.Features["mounts"] {
|
|
return
|
|
}
|
|
|
|
// We currently only need to react to device events.
|
|
if event.Type != "device" {
|
|
return
|
|
}
|
|
|
|
type deviceEvent struct {
|
|
Action string `json:"action"`
|
|
Config map[string]string `json:"config"`
|
|
Name string `json:"name"`
|
|
}
|
|
|
|
e := deviceEvent{}
|
|
err := json.Unmarshal(event.Metadata, &e)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// We only handle disk hotplug.
|
|
if e.Config["type"] != "disk" {
|
|
return
|
|
}
|
|
|
|
// And only for path based devices.
|
|
if e.Config["path"] == "" {
|
|
return
|
|
}
|
|
|
|
mntSource := "incus_" + e.Name
|
|
|
|
if e.Action == "added" {
|
|
// Attempt to perform the mount.
|
|
for range 20 {
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
err = osMountShared(mntSource, e.Config["path"], "virtiofs", nil)
|
|
if err == nil {
|
|
break
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
logger.Infof("Failed to mount hotplug %q (Type: %q) to %q", mntSource, "virtiofs", e.Config["path"])
|
|
return
|
|
}
|
|
|
|
logger.Infof("Mounted hotplug %q (Type: %q) to %q", mntSource, "virtiofs", e.Config["path"])
|
|
} else if e.Action == "removed" {
|
|
// Attempt to unmount the disk.
|
|
_ = osUmount(mntSource, e.Config["path"], "virtiofs")
|
|
}
|
|
}
|