140 lines
4.0 KiB
Go
140 lines
4.0 KiB
Go
package controller
|
|
|
|
import (
|
|
"net"
|
|
"sort"
|
|
"strings"
|
|
|
|
"github.com/caddyserver/ingress/internal/k8s"
|
|
"go.uber.org/zap"
|
|
"gopkg.in/go-playground/pool.v3"
|
|
networkingv1 "k8s.io/api/networking/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
)
|
|
|
|
// dispatchSync is run every syncInterval duration to sync ingress source address fields.
|
|
func (c *CaddyController) dispatchSync() {
|
|
c.syncQueue.Add(SyncStatusAction{})
|
|
}
|
|
|
|
// SyncStatusAction provides an implementation of the action interface.
|
|
type SyncStatusAction struct {
|
|
}
|
|
|
|
// handle is run when a syncStatusAction appears in the queue.
|
|
func (r SyncStatusAction) handle(c *CaddyController) error {
|
|
return c.syncStatus(c.resourceStore.Ingresses)
|
|
}
|
|
|
|
// syncStatus ensures that the ingress source address points to this ingress controller's IP address.
|
|
func (c *CaddyController) syncStatus(ings []*networkingv1.Ingress) error {
|
|
addrs, err := k8s.GetAddresses(c.resourceStore.CurrentPod, c.kubeClient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.logger.Debugf("Syncing %d Ingress resources source addresses", len(ings))
|
|
c.updateIngStatuses(sliceToLoadBalancerIngress(addrs), ings)
|
|
|
|
return nil
|
|
}
|
|
|
|
// updateIngStatuses starts a queue and adds all monitored ingresses to update their status source address to the on
|
|
// that the ingress controller is running on. This is called by the syncStatus queue.
|
|
func (c *CaddyController) updateIngStatuses(controllerAddresses []networkingv1.IngressLoadBalancerIngress, ings []*networkingv1.Ingress) {
|
|
p := pool.NewLimited(10)
|
|
defer p.Close()
|
|
|
|
batch := p.Batch()
|
|
sort.SliceStable(controllerAddresses, lessLoadBalancerIngress(controllerAddresses))
|
|
|
|
for _, ing := range ings {
|
|
curIPs := ing.Status.LoadBalancer.Ingress
|
|
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
|
|
|
|
// check to see if ingresses source address does not match the ingress controller's.
|
|
if ingressSliceEqual(curIPs, controllerAddresses) {
|
|
c.logger.Debugf("skipping update of Ingress %v/%v (no change)", ing.Namespace, ing.Name)
|
|
continue
|
|
}
|
|
|
|
batch.Queue(runUpdate(c.logger, ing, controllerAddresses, c.kubeClient))
|
|
}
|
|
|
|
batch.QueueComplete()
|
|
batch.WaitAll()
|
|
}
|
|
|
|
// runUpdate updates the ingress status field.
|
|
func runUpdate(logger *zap.SugaredLogger, ing *networkingv1.Ingress, status []networkingv1.IngressLoadBalancerIngress, client *kubernetes.Clientset) pool.WorkFunc {
|
|
return func(wu pool.WorkUnit) (interface{}, error) {
|
|
if wu.IsCancelled() {
|
|
return nil, nil
|
|
}
|
|
|
|
updated, err := k8s.UpdateIngressStatus(client, ing, status)
|
|
if err != nil {
|
|
logger.Warnf("error updating ingress rule: %v", err)
|
|
} else {
|
|
logger.Debugf(
|
|
"updating Ingress %v/%v status from %v to %v",
|
|
ing.Namespace,
|
|
ing.Name,
|
|
ing.Status.LoadBalancer.Ingress,
|
|
updated.Status.LoadBalancer.Ingress,
|
|
)
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
// ingressSliceEqual determines if the ingress source matches the ingress controller's.
|
|
func ingressSliceEqual(lhs, rhs []networkingv1.IngressLoadBalancerIngress) bool {
|
|
if len(lhs) != len(rhs) {
|
|
return false
|
|
}
|
|
|
|
for i := range lhs {
|
|
if lhs[i].IP != rhs[i].IP {
|
|
return false
|
|
}
|
|
if lhs[i].Hostname != rhs[i].Hostname {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// lessLoadBalancerIngress is a sorting function for ingress hostnames.
|
|
func lessLoadBalancerIngress(addrs []networkingv1.IngressLoadBalancerIngress) func(int, int) bool {
|
|
return func(a, b int) bool {
|
|
switch strings.Compare(addrs[a].Hostname, addrs[b].Hostname) {
|
|
case -1:
|
|
return true
|
|
case 1:
|
|
return false
|
|
}
|
|
return addrs[a].IP < addrs[b].IP
|
|
}
|
|
}
|
|
|
|
// sliceToLoadBalancerIngress converts a slice of IP and/or hostnames to LoadBalancerIngress
|
|
func sliceToLoadBalancerIngress(endpoints []string) []networkingv1.IngressLoadBalancerIngress {
|
|
lbi := []networkingv1.IngressLoadBalancerIngress{}
|
|
for _, ep := range endpoints {
|
|
if net.ParseIP(ep) == nil {
|
|
lbi = append(lbi, networkingv1.IngressLoadBalancerIngress{Hostname: ep})
|
|
} else {
|
|
lbi = append(lbi, networkingv1.IngressLoadBalancerIngress{IP: ep})
|
|
}
|
|
}
|
|
|
|
sort.SliceStable(lbi, func(a, b int) bool {
|
|
return lbi[a].IP < lbi[b].IP
|
|
})
|
|
|
|
return lbi
|
|
}
|