273 lines
7.5 KiB
Go
273 lines
7.5 KiB
Go
package controller
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/caddyserver/caddy/v2"
|
|
"github.com/caddyserver/certmagic"
|
|
"github.com/caddyserver/ingress/internal/k8s"
|
|
"github.com/caddyserver/ingress/pkg/store"
|
|
"go.uber.org/zap"
|
|
networkingv1 "k8s.io/api/networking/v1"
|
|
"k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/util/workqueue"
|
|
|
|
// load required caddy plugins
|
|
_ "github.com/caddyserver/caddy/v2/modules/caddyhttp/proxyprotocol"
|
|
_ "github.com/caddyserver/caddy/v2/modules/caddyhttp/reverseproxy"
|
|
_ "github.com/caddyserver/caddy/v2/modules/caddytls"
|
|
_ "github.com/caddyserver/caddy/v2/modules/caddytls/standardstek"
|
|
_ "github.com/caddyserver/caddy/v2/modules/metrics"
|
|
_ "github.com/caddyserver/ingress/pkg/storage"
|
|
)
|
|
|
|
const (
|
|
// how often we should attempt to keep ingress resource's source address in sync
|
|
syncInterval = time.Second * 30
|
|
|
|
// how often we resync informers resources (besides receiving updates)
|
|
resourcesSyncInterval = time.Hour * 1
|
|
)
|
|
|
|
// Action is an interface for ingress actions.
|
|
type Action interface {
|
|
handle(c *CaddyController) error
|
|
}
|
|
|
|
// Informer defines the required SharedIndexInformers that interact with the API server.
|
|
type Informer struct {
|
|
Ingress cache.SharedIndexInformer
|
|
ConfigMap cache.SharedIndexInformer
|
|
TLSSecret cache.SharedIndexInformer
|
|
}
|
|
|
|
// InformerFactory contains shared informer factory
|
|
// We need to type of factory:
|
|
// - One used to watch resources in the Pod namespaces (caddy config, secrets...)
|
|
// - Another one for Ingress resources in the selected namespace
|
|
type InformerFactory struct {
|
|
PodNamespace informers.SharedInformerFactory
|
|
WatchedNamespace informers.SharedInformerFactory
|
|
}
|
|
|
|
type Converter interface {
|
|
ConvertToCaddyConfig(store *store.Store) (interface{}, error)
|
|
}
|
|
|
|
// CaddyController represents a caddy ingress controller.
|
|
type CaddyController struct {
|
|
resourceStore *store.Store
|
|
|
|
kubeClient *kubernetes.Clientset
|
|
|
|
logger *zap.SugaredLogger
|
|
|
|
// main queue syncing ingresses, configmaps, ... with caddy
|
|
syncQueue workqueue.RateLimitingInterface
|
|
|
|
// informer factories
|
|
factories *InformerFactory
|
|
|
|
// informer contains the cache Informers
|
|
informers *Informer
|
|
|
|
// save last applied caddy config
|
|
lastAppliedConfig []byte
|
|
|
|
converter Converter
|
|
|
|
stopChan chan struct{}
|
|
}
|
|
|
|
func NewCaddyController(
|
|
logger *zap.SugaredLogger,
|
|
kubeClient *kubernetes.Clientset,
|
|
opts store.Options,
|
|
converter Converter,
|
|
stopChan chan struct{},
|
|
) *CaddyController {
|
|
controller := &CaddyController{
|
|
logger: logger,
|
|
kubeClient: kubeClient,
|
|
converter: converter,
|
|
stopChan: stopChan,
|
|
syncQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
|
informers: &Informer{},
|
|
factories: &InformerFactory{},
|
|
}
|
|
|
|
podInfo, err := k8s.GetPodDetails(kubeClient)
|
|
if err != nil {
|
|
logger.Fatalf("Unexpected error obtaining pod information: %v", err)
|
|
}
|
|
|
|
// Create informer factories
|
|
controller.factories.PodNamespace = informers.NewSharedInformerFactoryWithOptions(
|
|
kubeClient,
|
|
resourcesSyncInterval,
|
|
informers.WithNamespace(podInfo.Namespace),
|
|
)
|
|
controller.factories.WatchedNamespace = informers.NewSharedInformerFactoryWithOptions(
|
|
kubeClient,
|
|
resourcesSyncInterval,
|
|
informers.WithNamespace(opts.WatchNamespace),
|
|
)
|
|
|
|
// Watch ingress resources in selected namespaces
|
|
ingressParams := k8s.IngressParams{
|
|
InformerFactory: controller.factories.WatchedNamespace,
|
|
ClassName: opts.ClassName,
|
|
ClassNameRequired: opts.ClassNameRequired,
|
|
}
|
|
controller.informers.Ingress = k8s.WatchIngresses(ingressParams, k8s.IngressHandlers{
|
|
AddFunc: controller.onIngressAdded,
|
|
UpdateFunc: controller.onIngressUpdated,
|
|
DeleteFunc: controller.onIngressDeleted,
|
|
})
|
|
|
|
// Watch Configmap in the pod's namespace for global options
|
|
cmOptionsParams := k8s.ConfigMapParams{
|
|
Namespace: podInfo.Namespace,
|
|
InformerFactory: controller.factories.PodNamespace,
|
|
ConfigMapName: opts.ConfigMapName,
|
|
}
|
|
controller.informers.ConfigMap = k8s.WatchConfigMaps(cmOptionsParams, k8s.ConfigMapHandlers{
|
|
AddFunc: controller.onConfigMapAdded,
|
|
UpdateFunc: controller.onConfigMapUpdated,
|
|
DeleteFunc: controller.onConfigMapDeleted,
|
|
})
|
|
|
|
// Create resource store
|
|
controller.resourceStore = store.NewStore(opts, podInfo)
|
|
|
|
return controller
|
|
}
|
|
|
|
// Shutdown stops the caddy controller.
|
|
func (c *CaddyController) Shutdown() error {
|
|
// remove this ingress controller's ip from ingress resources.
|
|
c.updateIngStatuses([]networkingv1.IngressLoadBalancerIngress{{}}, c.resourceStore.Ingresses)
|
|
|
|
if err := caddy.Stop(); err != nil {
|
|
c.logger.Error("failed to stop caddy server", zap.Error(err))
|
|
return err
|
|
}
|
|
certmagic.CleanUpOwnLocks(context.TODO(), c.logger.Desugar())
|
|
return nil
|
|
}
|
|
|
|
// Run method starts the ingress controller.
|
|
func (c *CaddyController) Run() {
|
|
defer runtime.HandleCrash()
|
|
defer c.syncQueue.ShutDown()
|
|
|
|
// start informers where we listen to new / updated resources
|
|
go c.informers.ConfigMap.Run(c.stopChan)
|
|
go c.informers.Ingress.Run(c.stopChan)
|
|
|
|
// wait for all involved caches to be synced before processing items
|
|
// from the queue
|
|
if !cache.WaitForCacheSync(c.stopChan,
|
|
c.informers.ConfigMap.HasSynced,
|
|
c.informers.Ingress.HasSynced,
|
|
) {
|
|
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
|
|
}
|
|
|
|
// start processing events for syncing ingress resources
|
|
go wait.Until(c.runWorker, time.Second, c.stopChan)
|
|
|
|
// start ingress status syncher and run every syncInterval
|
|
go wait.Until(c.dispatchSync, syncInterval, c.stopChan)
|
|
|
|
// wait for SIGTERM
|
|
<-c.stopChan
|
|
c.logger.Info("stopping ingress controller")
|
|
|
|
var exitCode int
|
|
err := c.Shutdown()
|
|
if err != nil {
|
|
c.logger.Error("could not shutdown ingress controller properly, " + err.Error())
|
|
exitCode = 1
|
|
}
|
|
|
|
os.Exit(exitCode)
|
|
}
|
|
|
|
// runWorker processes items in the event queue.
|
|
func (c *CaddyController) runWorker() {
|
|
for c.processNextItem() {
|
|
}
|
|
}
|
|
|
|
// processNextItem determines if there is an ingress item in the event queue and processes it.
|
|
func (c *CaddyController) processNextItem() bool {
|
|
// Wait until there is a new item in the working queue
|
|
action, quit := c.syncQueue.Get()
|
|
if quit {
|
|
return false
|
|
}
|
|
|
|
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
|
|
// This allows safe parallel processing because two ingresses with the same key are never processed in
|
|
// parallel.
|
|
defer c.syncQueue.Done(action)
|
|
|
|
// Invoke the method containing the business logic
|
|
err := action.(Action).handle(c)
|
|
if err != nil {
|
|
c.handleErr(err, action)
|
|
return true
|
|
}
|
|
|
|
err = c.reloadCaddy()
|
|
if err != nil {
|
|
c.logger.Error("could not reload caddy: " + err.Error())
|
|
return true
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// handleErrs reports errors received from queue actions.
|
|
//
|
|
//goland:noinspection GoUnusedParameter
|
|
func (c *CaddyController) handleErr(err error, action interface{}) {
|
|
c.logger.Error(err.Error())
|
|
}
|
|
|
|
// reloadCaddy generate a caddy config from controller's store
|
|
func (c *CaddyController) reloadCaddy() error {
|
|
config, err := c.converter.ConvertToCaddyConfig(c.resourceStore)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
j, err := json.Marshal(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if bytes.Equal(c.lastAppliedConfig, j) {
|
|
c.logger.Debug("caddy config did not change, skipping reload")
|
|
return nil
|
|
}
|
|
|
|
c.logger.Debug("reloading caddy with config", string(j))
|
|
err = caddy.Load(j, false)
|
|
if err != nil {
|
|
return fmt.Errorf("could not reload caddy config %v", err.Error())
|
|
}
|
|
c.lastAppliedConfig = j
|
|
return nil
|
|
}
|