diff --git a/pkg/agent/cluster.go b/pkg/agent/cluster.go index 308fa410d5..aa514c5202 100644 --- a/pkg/agent/cluster.go +++ b/pkg/agent/cluster.go @@ -75,10 +75,8 @@ func NewCluster(ctx context.Context, assetDir, rendezvousIP, kubeconfigPath, ssh logrus.Fatal(err) } - restclient, err := NewNodeZeroRestClient(ctx, rendezvousIP, sshKey, authToken) - if err != nil { - logrus.Fatal(err) - } + restclient := NewNodeZeroRestClient(ctx, rendezvousIP, sshKey, authToken) + kubeclient, err := NewClusterKubeAPIClient(ctx, kubeconfigPath) if err != nil { logrus.Fatal(err) diff --git a/pkg/agent/monitoraddnodes.go b/pkg/agent/monitoraddnodes.go index 8f8929225a..8fd767ac53 100644 --- a/pkg/agent/monitoraddnodes.go +++ b/pkg/agent/monitoraddnodes.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "strings" + "sync" "time" "github.com/pkg/errors" @@ -68,16 +69,34 @@ func (mon *addNodeMonitor) logStatus(status string) { logrus.Infof("Node %s: %s", mon.nodeIPAddress, status) } -// MonitorAddNodes waits for the a node to be added to the cluster +// MonitorAddNodes display the progress of one or more nodes being +// added to a cluster. ipAddresses is an array of IP addresses to be +// monitored. clusters is an array of their corresponding initialized Cluster +// struct used to interact with the assisted-service and k8s APIs. +func MonitorAddNodes(clusters []*Cluster, ipAddresses []string) { + var wg sync.WaitGroup + + for i, ipAddress := range ipAddresses { + wg.Add(1) + + go MonitorSingleNode(clusters[i], ipAddress, &wg) + } + + wg.Wait() +} + +// MonitorSingleNode waits for the a node to be added to the cluster // and reports its status until it becomes Ready. -func MonitorAddNodes(cluster *Cluster, nodeIPAddress string) error { +func MonitorSingleNode(cluster *Cluster, nodeIPAddress string, wg *sync.WaitGroup) { + defer wg.Done() timeout := 90 * time.Minute waitContext, cancel := context.WithTimeout(cluster.Ctx, timeout) defer cancel() mon, err := newAddNodeMonitor(nodeIPAddress, cluster) if err != nil { - return err + logrus.Errorf("could not initialize node monitor for node %v: %v", nodeIPAddress, err) + return } wait.Until(func() { @@ -111,7 +130,7 @@ func MonitorAddNodes(cluster *Cluster, nodeIPAddress string) error { hasJoined, isReady, err := mon.nodeHasJoinedClusterAndIsReady() if err != nil { - logrus.Debugf("nodeHasJoinedClusterAndIsReady returned err: %v", err) + logrus.Debugf("Node %v joined cluster and is ready check returned err: %v", nodeIPAddress, err) } if !mon.status.NodeJoinedCluster && hasJoined { @@ -133,7 +152,7 @@ func MonitorAddNodes(cluster *Cluster, nodeIPAddress string) error { if mon.cluster.API.Rest.IsRestAPILive() { _, err = cluster.MonitorStatusFromAssistedService() if err != nil { - logrus.Warnf("Node %s: %s", nodeIPAddress, err) + logrus.Warnf("error fetching status from assisted-service for node %s: %s", nodeIPAddress, err) } } }, 5*time.Second, waitContext.Done()) @@ -144,11 +163,9 @@ func MonitorAddNodes(cluster *Cluster, nodeIPAddress string) error { cancel() } if errors.Is(waitErr, context.DeadlineExceeded) { - return errors.Wrap(waitErr, "monitor-add-nodes process timed out") + mon.logStatus(fmt.Sprintf("Node monitoring timed out after %v minutes", timeout)) } } - - return nil } func (mon *addNodeMonitor) nodeHasJoinedClusterAndIsReady() (bool, bool, error) { diff --git a/pkg/agent/rest.go b/pkg/agent/rest.go index 1e69120de6..813f1afdfa 100644 --- a/pkg/agent/rest.go +++ b/pkg/agent/rest.go @@ -33,7 +33,7 @@ type NodeZeroRestClient struct { } // NewNodeZeroRestClient Initialize a new rest client to interact with the Agent Rest API on node zero. -func NewNodeZeroRestClient(ctx context.Context, rendezvousIP, sshKey, token string) (*NodeZeroRestClient, error) { +func NewNodeZeroRestClient(ctx context.Context, rendezvousIP, sshKey, token string) *NodeZeroRestClient { restClient := &NodeZeroRestClient{} // Get SSH Keys which can be used to determine if Rest API failures are due to network connectivity issues @@ -57,7 +57,7 @@ func NewNodeZeroRestClient(ctx context.Context, rendezvousIP, sshKey, token stri restClient.config = config restClient.NodeZeroIP = rendezvousIP - return restClient, nil + return restClient } // FindRendezvouIPAndSSHKeyFromAssetStore returns the rendezvousIP and public ssh key. diff --git a/pkg/nodejoiner/monitoraddnodes.go b/pkg/nodejoiner/monitoraddnodes.go index fab9febf3e..1c6e0dc1ca 100644 --- a/pkg/nodejoiner/monitoraddnodes.go +++ b/pkg/nodejoiner/monitoraddnodes.go @@ -3,8 +3,6 @@ package nodejoiner import ( "context" - "github.com/sirupsen/logrus" - agentpkg "github.com/openshift/installer/pkg/agent" "github.com/openshift/installer/pkg/asset/agent/workflow" ) @@ -16,15 +14,18 @@ func NewMonitorAddNodesCommand(directory, kubeconfigPath string, ips []string) e return err } - cluster, err := agentpkg.NewCluster(context.Background(), "", ips[0], kubeconfigPath, "", workflow.AgentWorkflowTypeAddNodes) - if err != nil { - // TODO exit code enumerate - logrus.Exit(1) - } + // sshKey is not required parameter for monitor-add-nodes + sshKey := "" - if err != nil { - return err + clusters := []*agentpkg.Cluster{} + for _, ip := range ips { + cluster, err := agentpkg.NewCluster(context.Background(), directory, ip, kubeconfigPath, sshKey, workflow.AgentWorkflowTypeAddNodes) + if err != nil { + return err + } + clusters = append(clusters, cluster) } + agentpkg.MonitorAddNodes(clusters, ips) - return agentpkg.MonitorAddNodes(cluster, ips[0]) + return nil }