mirror of
https://github.com/openshift/installer.git
synced 2026-02-05 15:47:14 +01:00
AGENT-862: Extend monitor-add-nodes to support multiple nodes
Multiple IP addresses can now be specified for monitoring at the same time.
This commit is contained in:
@@ -75,10 +75,8 @@ func NewCluster(ctx context.Context, assetDir, rendezvousIP, kubeconfigPath, ssh
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
|
||||
restclient, err := NewNodeZeroRestClient(ctx, rendezvousIP, sshKey, authToken)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
restclient := NewNodeZeroRestClient(ctx, rendezvousIP, sshKey, authToken)
|
||||
|
||||
kubeclient, err := NewClusterKubeAPIClient(ctx, kubeconfigPath)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -68,16 +69,34 @@ func (mon *addNodeMonitor) logStatus(status string) {
|
||||
logrus.Infof("Node %s: %s", mon.nodeIPAddress, status)
|
||||
}
|
||||
|
||||
// MonitorAddNodes waits for the a node to be added to the cluster
|
||||
// MonitorAddNodes display the progress of one or more nodes being
|
||||
// added to a cluster. ipAddresses is an array of IP addresses to be
|
||||
// monitored. clusters is an array of their corresponding initialized Cluster
|
||||
// struct used to interact with the assisted-service and k8s APIs.
|
||||
func MonitorAddNodes(clusters []*Cluster, ipAddresses []string) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i, ipAddress := range ipAddresses {
|
||||
wg.Add(1)
|
||||
|
||||
go MonitorSingleNode(clusters[i], ipAddress, &wg)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// MonitorSingleNode waits for the a node to be added to the cluster
|
||||
// and reports its status until it becomes Ready.
|
||||
func MonitorAddNodes(cluster *Cluster, nodeIPAddress string) error {
|
||||
func MonitorSingleNode(cluster *Cluster, nodeIPAddress string, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
timeout := 90 * time.Minute
|
||||
waitContext, cancel := context.WithTimeout(cluster.Ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
mon, err := newAddNodeMonitor(nodeIPAddress, cluster)
|
||||
if err != nil {
|
||||
return err
|
||||
logrus.Errorf("could not initialize node monitor for node %v: %v", nodeIPAddress, err)
|
||||
return
|
||||
}
|
||||
|
||||
wait.Until(func() {
|
||||
@@ -111,7 +130,7 @@ func MonitorAddNodes(cluster *Cluster, nodeIPAddress string) error {
|
||||
|
||||
hasJoined, isReady, err := mon.nodeHasJoinedClusterAndIsReady()
|
||||
if err != nil {
|
||||
logrus.Debugf("nodeHasJoinedClusterAndIsReady returned err: %v", err)
|
||||
logrus.Debugf("Node %v joined cluster and is ready check returned err: %v", nodeIPAddress, err)
|
||||
}
|
||||
|
||||
if !mon.status.NodeJoinedCluster && hasJoined {
|
||||
@@ -133,7 +152,7 @@ func MonitorAddNodes(cluster *Cluster, nodeIPAddress string) error {
|
||||
if mon.cluster.API.Rest.IsRestAPILive() {
|
||||
_, err = cluster.MonitorStatusFromAssistedService()
|
||||
if err != nil {
|
||||
logrus.Warnf("Node %s: %s", nodeIPAddress, err)
|
||||
logrus.Warnf("error fetching status from assisted-service for node %s: %s", nodeIPAddress, err)
|
||||
}
|
||||
}
|
||||
}, 5*time.Second, waitContext.Done())
|
||||
@@ -144,11 +163,9 @@ func MonitorAddNodes(cluster *Cluster, nodeIPAddress string) error {
|
||||
cancel()
|
||||
}
|
||||
if errors.Is(waitErr, context.DeadlineExceeded) {
|
||||
return errors.Wrap(waitErr, "monitor-add-nodes process timed out")
|
||||
mon.logStatus(fmt.Sprintf("Node monitoring timed out after %v minutes", timeout))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mon *addNodeMonitor) nodeHasJoinedClusterAndIsReady() (bool, bool, error) {
|
||||
|
||||
@@ -33,7 +33,7 @@ type NodeZeroRestClient struct {
|
||||
}
|
||||
|
||||
// NewNodeZeroRestClient Initialize a new rest client to interact with the Agent Rest API on node zero.
|
||||
func NewNodeZeroRestClient(ctx context.Context, rendezvousIP, sshKey, token string) (*NodeZeroRestClient, error) {
|
||||
func NewNodeZeroRestClient(ctx context.Context, rendezvousIP, sshKey, token string) *NodeZeroRestClient {
|
||||
restClient := &NodeZeroRestClient{}
|
||||
|
||||
// Get SSH Keys which can be used to determine if Rest API failures are due to network connectivity issues
|
||||
@@ -57,7 +57,7 @@ func NewNodeZeroRestClient(ctx context.Context, rendezvousIP, sshKey, token stri
|
||||
restClient.config = config
|
||||
restClient.NodeZeroIP = rendezvousIP
|
||||
|
||||
return restClient, nil
|
||||
return restClient
|
||||
}
|
||||
|
||||
// FindRendezvouIPAndSSHKeyFromAssetStore returns the rendezvousIP and public ssh key.
|
||||
|
||||
@@ -3,8 +3,6 @@ package nodejoiner
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
agentpkg "github.com/openshift/installer/pkg/agent"
|
||||
"github.com/openshift/installer/pkg/asset/agent/workflow"
|
||||
)
|
||||
@@ -16,15 +14,18 @@ func NewMonitorAddNodesCommand(directory, kubeconfigPath string, ips []string) e
|
||||
return err
|
||||
}
|
||||
|
||||
cluster, err := agentpkg.NewCluster(context.Background(), "", ips[0], kubeconfigPath, "", workflow.AgentWorkflowTypeAddNodes)
|
||||
if err != nil {
|
||||
// TODO exit code enumerate
|
||||
logrus.Exit(1)
|
||||
}
|
||||
// sshKey is not required parameter for monitor-add-nodes
|
||||
sshKey := ""
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
clusters := []*agentpkg.Cluster{}
|
||||
for _, ip := range ips {
|
||||
cluster, err := agentpkg.NewCluster(context.Background(), directory, ip, kubeconfigPath, sshKey, workflow.AgentWorkflowTypeAddNodes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
clusters = append(clusters, cluster)
|
||||
}
|
||||
agentpkg.MonitorAddNodes(clusters, ips)
|
||||
|
||||
return agentpkg.MonitorAddNodes(cluster, ips[0])
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user