mirror of
https://github.com/gluster/glusterd2.git
synced 2026-02-05 12:45:38 +01:00
events: Expose API to view the Events
- Increased the TTL to 10 minutes and made configurable. - Exposed the new API to view Events - Added Geo-replication Events - `GET /v1/events` now returns list of events in sorted order Updates: #418 Signed-off-by: Aravinda VK <avishwan@redhat.com>
This commit is contained in:
@@ -3,6 +3,7 @@ package peercommands
|
||||
import (
|
||||
"github.com/gluster/glusterd2/glusterd2/events"
|
||||
"github.com/gluster/glusterd2/glusterd2/peer"
|
||||
"github.com/gluster/glusterd2/pkg/api"
|
||||
)
|
||||
|
||||
type peerEvent string
|
||||
@@ -12,7 +13,7 @@ const (
|
||||
eventPeerRemoved = "peer.removed"
|
||||
)
|
||||
|
||||
func newPeerEvent(e peerEvent, p *peer.Peer) *events.Event {
|
||||
func newPeerEvent(e peerEvent, p *peer.Peer) *api.Event {
|
||||
data := map[string]string{
|
||||
"peer.id": p.ID.String(),
|
||||
"peer.name": p.Name,
|
||||
|
||||
@@ -3,6 +3,7 @@ package volumecommands
|
||||
import (
|
||||
"github.com/gluster/glusterd2/glusterd2/events"
|
||||
"github.com/gluster/glusterd2/glusterd2/volume"
|
||||
"github.com/gluster/glusterd2/pkg/api"
|
||||
)
|
||||
|
||||
type volumeEvent string
|
||||
@@ -14,7 +15,7 @@ const (
|
||||
eventVolumeDeleted = "volume.deleted"
|
||||
)
|
||||
|
||||
func newVolumeEvent(e volumeEvent, v *volume.Volinfo) *events.Event {
|
||||
func newVolumeEvent(e volumeEvent, v *volume.Volinfo) *api.Event {
|
||||
data := map[string]string{
|
||||
"volume.name": v.Name,
|
||||
"volume.id": v.ID.String(),
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"strconv"
|
||||
|
||||
"github.com/gluster/glusterd2/glusterd2/events"
|
||||
"github.com/gluster/glusterd2/pkg/api"
|
||||
)
|
||||
|
||||
type daemonEvent string
|
||||
@@ -21,7 +22,7 @@ const (
|
||||
)
|
||||
|
||||
// newEvent returns an event of given type with daemon data filled
|
||||
func newEvent(d Daemon, e daemonEvent, pid int) *events.Event {
|
||||
func newEvent(d Daemon, e daemonEvent, pid int) *api.Event {
|
||||
data := map[string]string{
|
||||
"name": d.Name(),
|
||||
"id": d.ID(),
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/gluster/glusterd2/pkg/api"
|
||||
)
|
||||
|
||||
// Handler defines the event handler interface.
|
||||
@@ -13,7 +15,7 @@ type Handler interface {
|
||||
// Handle is the function that gets called when an event occurs.
|
||||
// Handle needs to be thread safe, as it can be called concurrently when
|
||||
// multiple events arrive at the same time.
|
||||
Handle(*Event)
|
||||
Handle(*api.Event)
|
||||
// Events should returns a list of events that the handler is interested in.
|
||||
// Return an empty list if interested in all events.
|
||||
Events() []string
|
||||
@@ -24,7 +26,7 @@ type HandlerID uint64
|
||||
|
||||
// handler implements the Handler interface around a standalone Handle function
|
||||
type handler struct {
|
||||
handle func(*Event)
|
||||
handle func(*api.Event)
|
||||
events []string
|
||||
}
|
||||
|
||||
@@ -33,16 +35,16 @@ var (
|
||||
wg sync.WaitGroup
|
||||
|
||||
sync.RWMutex
|
||||
chans map[HandlerID]chan<- *Event
|
||||
chans map[HandlerID]chan<- *api.Event
|
||||
next HandlerID
|
||||
}
|
||||
)
|
||||
|
||||
func init() {
|
||||
handlers.chans = make(map[HandlerID]chan<- *Event)
|
||||
handlers.chans = make(map[HandlerID]chan<- *api.Event)
|
||||
}
|
||||
|
||||
func addHandler(c chan<- *Event) HandlerID {
|
||||
func addHandler(c chan<- *api.Event) HandlerID {
|
||||
handlers.Lock()
|
||||
defer handlers.Unlock()
|
||||
|
||||
@@ -53,7 +55,7 @@ func addHandler(c chan<- *Event) HandlerID {
|
||||
return id
|
||||
}
|
||||
|
||||
func delHandler(id HandlerID) chan<- *Event {
|
||||
func delHandler(id HandlerID) chan<- *api.Event {
|
||||
handlers.Lock()
|
||||
defer handlers.Unlock()
|
||||
|
||||
@@ -67,7 +69,7 @@ func delHandler(id HandlerID) chan<- *Event {
|
||||
|
||||
// Register a Handler to be called when the events happen.
|
||||
func Register(h Handler) HandlerID {
|
||||
in := make(chan *Event)
|
||||
in := make(chan *api.Event)
|
||||
id := addHandler(in)
|
||||
|
||||
handlers.wg.Add(1)
|
||||
@@ -88,7 +90,7 @@ func Unregister(id HandlerID) {
|
||||
}
|
||||
}
|
||||
|
||||
func handleEvents(in <-chan *Event, h Handler) {
|
||||
func handleEvents(in <-chan *api.Event, h Handler) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
events := normalizeEvents(h.Events())
|
||||
@@ -117,7 +119,7 @@ func normalizeEvents(events []string) []string {
|
||||
|
||||
// interested returns true if given event is found in the events list
|
||||
// Returns true if found or if list is empty
|
||||
func interested(e *Event, events []string) bool {
|
||||
func interested(e *api.Event, events []string) bool {
|
||||
if len(events) == 0 {
|
||||
return true
|
||||
}
|
||||
@@ -141,11 +143,11 @@ func stopHandlers() error {
|
||||
|
||||
// NewHandler returns a Handler wrapping the provided Handle function, and the interested events.
|
||||
// If no events are provided, the handler is interested in all events.
|
||||
func NewHandler(handle func(*Event), events ...string) Handler {
|
||||
func NewHandler(handle func(*api.Event), events ...string) Handler {
|
||||
return &handler{handle, events}
|
||||
}
|
||||
|
||||
func (h *handler) Handle(e *Event) {
|
||||
func (h *handler) Handle(e *api.Event) {
|
||||
h.handle(e)
|
||||
}
|
||||
|
||||
|
||||
@@ -5,43 +5,26 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gluster/glusterd2/glusterd2/gdctx"
|
||||
"github.com/gluster/glusterd2/pkg/api"
|
||||
|
||||
"github.com/pborman/uuid"
|
||||
)
|
||||
|
||||
// Event represents an event in GD2
|
||||
type Event struct {
|
||||
// ID is a unique event ID
|
||||
ID uuid.UUID `json:"id"`
|
||||
// Name is the the name of the event
|
||||
Name string `json:"name"`
|
||||
// Data is any additional data attached to the event.
|
||||
Data map[string]string `json:"data,omitempty"`
|
||||
// global should be set to true to broadcast event to the full GD2 cluster.
|
||||
// If not event is only broadcast in the local node
|
||||
global bool
|
||||
// Origin is used when broadcasting global events to prevent origin nodes
|
||||
// rebroadcasting a global event. Event generators need not set this.
|
||||
Origin uuid.UUID `json:"origin"`
|
||||
// Timestamp is the time when the event was created
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
// New returns a new Event with given information
|
||||
// Set global to true if event should be broadast across cluster
|
||||
func New(name string, data map[string]string, global bool) *Event {
|
||||
return &Event{
|
||||
func New(name string, data map[string]string, global bool) *api.Event {
|
||||
return &api.Event{
|
||||
ID: uuid.NewRandom(),
|
||||
Name: strings.ToLower(name),
|
||||
Data: data,
|
||||
global: global,
|
||||
Global: global,
|
||||
Origin: gdctx.MyUUID,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast broadcasts events to all registered event handlers
|
||||
func Broadcast(e *Event) error {
|
||||
func Broadcast(e *api.Event) error {
|
||||
handlers.RLock()
|
||||
defer handlers.RUnlock()
|
||||
|
||||
|
||||
@@ -6,14 +6,17 @@ import (
|
||||
|
||||
"github.com/gluster/glusterd2/glusterd2/gdctx"
|
||||
"github.com/gluster/glusterd2/glusterd2/store"
|
||||
"github.com/gluster/glusterd2/pkg/api"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/pborman/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
config "github.com/spf13/viper"
|
||||
)
|
||||
|
||||
const (
|
||||
eventsPrefix = "events/"
|
||||
eventsPrefix = "events/"
|
||||
defaultEventsTTL int64 = 600
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -25,8 +28,8 @@ var (
|
||||
)
|
||||
|
||||
// globalHandler listens for events that are global and broadcasts them across the cluster
|
||||
func globalHandler(ev *Event) {
|
||||
if !ev.global {
|
||||
func globalHandler(ev *api.Event) {
|
||||
if !ev.Global {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -41,9 +44,13 @@ func globalHandler(ev *Event) {
|
||||
}
|
||||
|
||||
// Putting event with a TTL so that we don't have stale events lingering in store
|
||||
// Using a TTL of 10 seconds should allow all members in the cluster to receive event
|
||||
// TODO: Allow timeout to be customizable
|
||||
l, err := store.Store.Grant(store.Store.Ctx(), 10)
|
||||
// Using a TTL of 10 minutes(configurable) should allow all members in the
|
||||
// cluster to receive event
|
||||
eventsttl := config.GetInt64("eventsttl")
|
||||
if eventsttl == 0 {
|
||||
eventsttl = defaultEventsTTL
|
||||
}
|
||||
l, err := store.Store.Grant(store.Store.Ctx(), eventsttl)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"event.id": ev.ID.String(),
|
||||
@@ -72,7 +79,7 @@ func globalListener(glStop chan struct{}) {
|
||||
return
|
||||
}
|
||||
for _, sev := range resp.Events {
|
||||
var ev Event
|
||||
var ev api.Event
|
||||
if err := json.Unmarshal(sev.Kv.Value, &ev); err != nil {
|
||||
log.WithField("event.id", string(sev.Kv.Key)).WithError(err).Error("could not unmarshal global event")
|
||||
continue
|
||||
|
||||
@@ -7,9 +7,10 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/gluster/glusterd2/pkg/api"
|
||||
"github.com/gluster/glusterd2/pkg/logging"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
config "github.com/spf13/viper"
|
||||
)
|
||||
|
||||
@@ -22,7 +23,7 @@ type eventLogger struct {
|
||||
wc io.WriteCloser
|
||||
}
|
||||
|
||||
func (l *eventLogger) Handle(e *Event) {
|
||||
func (l *eventLogger) Handle(e *api.Event) {
|
||||
|
||||
b := new(bytes.Buffer)
|
||||
if err := json.NewEncoder(b).Encode(e); err != nil {
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
|
||||
"github.com/gluster/glusterd2/pkg/api"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -17,7 +19,7 @@ const dbusScript = "/usr/libexec/ganesha/dbus-send.sh"
|
||||
|
||||
type ganesha struct{}
|
||||
|
||||
func (g *ganesha) Handle(e *Event) {
|
||||
func (g *ganesha) Handle(e *api.Event) {
|
||||
var option string
|
||||
if e.Name == eventVolumeStarted {
|
||||
option = "on"
|
||||
|
||||
25
pkg/api/events.go
Normal file
25
pkg/api/events.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/pborman/uuid"
|
||||
)
|
||||
|
||||
// Event represents an event in GD2
|
||||
type Event struct {
|
||||
// ID is a unique event ID
|
||||
ID uuid.UUID `json:"id"`
|
||||
// Name is the the name of the event
|
||||
Name string `json:"name"`
|
||||
// Data is any additional data attached to the event.
|
||||
Data map[string]string `json:"data,omitempty"`
|
||||
// global should be set to true to broadcast event to the full GD2 cluster.
|
||||
// If not event is only broadcast in the local node
|
||||
Global bool `json:"-"`
|
||||
// Origin is used when broadcasting global events to prevent origin nodes
|
||||
// rebroadcasting a global event. Event generators need not set this.
|
||||
Origin uuid.UUID `json:"origin"`
|
||||
// Timestamp is the time when the event was created
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
@@ -1,4 +1,11 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"github.com/gluster/glusterd2/pkg/api"
|
||||
)
|
||||
|
||||
// WebhookList holds list of webhooks containing just its URL
|
||||
type WebhookList []string
|
||||
|
||||
// EventList holds list of events happened in last 10 mins(configurable)
|
||||
type EventList []api.Event
|
||||
|
||||
@@ -40,6 +40,12 @@ func (p *Plugin) RestRoutes() route.Routes {
|
||||
Pattern: "/events/webhook",
|
||||
Version: 1,
|
||||
HandlerFunc: webhookListHandler},
|
||||
route.Route{
|
||||
Name: "EventsList",
|
||||
Method: "GET",
|
||||
Pattern: "/events",
|
||||
Version: 1,
|
||||
HandlerFunc: eventsListHandler},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -108,3 +108,17 @@ func webhookListHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
restutils.SendHTTPResponse(ctx, w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func eventsListHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
events, err := GetEventsList()
|
||||
if err != nil {
|
||||
restutils.SendHTTPError(
|
||||
ctx, w, http.StatusInternalServerError,
|
||||
"Could not retrive events list")
|
||||
return
|
||||
}
|
||||
|
||||
restutils.SendHTTPResponse(ctx, w, http.StatusOK, events)
|
||||
}
|
||||
|
||||
@@ -3,10 +3,12 @@ package events
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/gluster/glusterd2/glusterd2/store"
|
||||
"github.com/gluster/glusterd2/pkg/api"
|
||||
eventsapi "github.com/gluster/glusterd2/plugins/events/api"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -14,6 +16,7 @@ import (
|
||||
|
||||
const (
|
||||
webhookPrefix string = "config/events/webhooks/"
|
||||
eventsPrefix = "events/"
|
||||
)
|
||||
|
||||
func webhookExists(webhook eventsapi.Webhook) (bool, error) {
|
||||
@@ -73,3 +76,31 @@ func deleteWebhook(webhook eventsapi.Webhook) error {
|
||||
_, e := store.Store.Delete(context.TODO(), webhookPrefix+strings.Replace(webhook.URL, "/", "|", -1))
|
||||
return e
|
||||
}
|
||||
|
||||
// GetEventsList returns list of Events recorded in last few minutes
|
||||
func GetEventsList() ([]*api.Event, error) {
|
||||
resp, e := store.Store.Get(context.TODO(), eventsPrefix, clientv3.WithPrefix())
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
|
||||
events := make([]*api.Event, len(resp.Kvs))
|
||||
|
||||
for i, kv := range resp.Kvs {
|
||||
var ev api.Event
|
||||
|
||||
if err := json.Unmarshal(kv.Value, &ev); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"event": string(kv.Key),
|
||||
"error": err,
|
||||
}).Error("Failed to unmarshal event")
|
||||
continue
|
||||
}
|
||||
|
||||
events[i] = &ev
|
||||
}
|
||||
// Sort based on Event Timestamp
|
||||
sort.Slice(events, func(i, j int) bool { return int64(events[j].Timestamp.Sub(events[i].Timestamp)) > 0 })
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
44
plugins/georeplication/events.go
Normal file
44
plugins/georeplication/events.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package georeplication
|
||||
|
||||
import (
|
||||
"github.com/gluster/glusterd2/glusterd2/events"
|
||||
"github.com/gluster/glusterd2/pkg/api"
|
||||
georepapi "github.com/gluster/glusterd2/plugins/georeplication/api"
|
||||
)
|
||||
|
||||
type georepEvent string
|
||||
|
||||
const (
|
||||
eventGeorepCreated georepEvent = "georep.created"
|
||||
eventGeorepStarted = "georep.started"
|
||||
eventGeorepStopped = "georep.stopped"
|
||||
eventGeorepDeleted = "georep.deleted"
|
||||
eventGeorepPaused = "georep.paused"
|
||||
eventGeorepResumed = "georep.resumed"
|
||||
eventGeorepConfigSet = "georep.config.set"
|
||||
eventGeorepConfigReset = "georep.config.reset"
|
||||
)
|
||||
|
||||
func newGeorepEvent(e georepEvent, session *georepapi.GeorepSession, extra *map[string]string) *api.Event {
|
||||
data := make(map[string]string)
|
||||
|
||||
if session != nil {
|
||||
data = map[string]string{
|
||||
"master.name": session.MasterVol,
|
||||
"master.id": session.MasterID.String(),
|
||||
"remote.name": session.RemoteVol,
|
||||
"remote.id": session.RemoteID.String(),
|
||||
"remote.host": session.RemoteHosts[0].Hostname,
|
||||
"remote.peerid": session.RemoteHosts[0].NodeID.String(),
|
||||
"remote.user": session.RemoteUser,
|
||||
}
|
||||
}
|
||||
|
||||
if extra != nil {
|
||||
for k, v := range *extra {
|
||||
data[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
return events.New(string(e), data, true)
|
||||
}
|
||||
@@ -8,7 +8,9 @@ import (
|
||||
"net/http"
|
||||
"os/exec"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/gluster/glusterd2/glusterd2/events"
|
||||
"github.com/gluster/glusterd2/glusterd2/gdctx"
|
||||
restutils "github.com/gluster/glusterd2/glusterd2/servers/rest/utils"
|
||||
"github.com/gluster/glusterd2/glusterd2/transaction"
|
||||
@@ -211,6 +213,8 @@ func georepCreateHandler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
events.Broadcast(newGeorepEvent(eventGeorepCreated, geoSession, nil))
|
||||
|
||||
restutils.SendHTTPResponse(ctx, w, http.StatusCreated, geoSession)
|
||||
}
|
||||
|
||||
@@ -290,19 +294,24 @@ func georepActionHandler(w http.ResponseWriter, r *http.Request, action actionTy
|
||||
|
||||
doFunc := ""
|
||||
stateToSet := ""
|
||||
var eventToSet georepEvent
|
||||
switch action {
|
||||
case actionStart:
|
||||
doFunc = "georeplication-start.Commit"
|
||||
stateToSet = georepapi.GeorepStatusStarted
|
||||
eventToSet = eventGeorepStarted
|
||||
case actionPause:
|
||||
doFunc = "georeplication-pause.Commit"
|
||||
stateToSet = georepapi.GeorepStatusPaused
|
||||
eventToSet = eventGeorepPaused
|
||||
case actionResume:
|
||||
doFunc = "georeplication-resume.Commit"
|
||||
stateToSet = georepapi.GeorepStatusStarted
|
||||
eventToSet = eventGeorepResumed
|
||||
case actionStop:
|
||||
doFunc = "georeplication-stop.Commit"
|
||||
stateToSet = georepapi.GeorepStatusStopped
|
||||
eventToSet = eventGeorepStopped
|
||||
default:
|
||||
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, "Unknown action")
|
||||
return
|
||||
@@ -348,6 +357,8 @@ func georepActionHandler(w http.ResponseWriter, r *http.Request, action actionTy
|
||||
return
|
||||
}
|
||||
|
||||
events.Broadcast(newGeorepEvent(eventToSet, geoSession, nil))
|
||||
|
||||
restutils.SendHTTPResponse(ctx, w, http.StatusOK, geoSession)
|
||||
}
|
||||
|
||||
@@ -441,6 +452,7 @@ func georepDeleteHandler(w http.ResponseWriter, r *http.Request) {
|
||||
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, e)
|
||||
return
|
||||
}
|
||||
events.Broadcast(newGeorepEvent(eventGeorepDeleted, geoSession, nil))
|
||||
|
||||
restutils.SendHTTPResponse(ctx, w, http.StatusOK, nil)
|
||||
}
|
||||
@@ -810,6 +822,16 @@ func georepConfigSetHandler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
var allopts []string
|
||||
for k, v := range req {
|
||||
allopts = append(allopts, k+"="+v)
|
||||
}
|
||||
setOpts := map[string]string{
|
||||
"options": strings.Join(allopts, ","),
|
||||
}
|
||||
|
||||
events.Broadcast(newGeorepEvent(eventGeorepConfigSet, geoSession, &setOpts))
|
||||
|
||||
restutils.SendHTTPResponse(ctx, w, http.StatusOK, geoSession.Options)
|
||||
}
|
||||
|
||||
@@ -938,6 +960,10 @@ func georepConfigResetHandler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
events.Broadcast(newGeorepEvent(eventGeorepConfigReset, geoSession,
|
||||
&map[string]string{"options": strings.Join(req, ",")},
|
||||
))
|
||||
|
||||
restutils.SendHTTPResponse(ctx, w, http.StatusOK, geoSession.Options)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user