Skip to content
This repository has been archived by the owner on May 13, 2024. It is now read-only.

Commit

Permalink
wip: [skip ci] gateway status transition
Browse files Browse the repository at this point in the history
Signed-off-by: Lin Yang <reaver@flomesh.io>
  • Loading branch information
reaver-flomesh committed Jun 8, 2023
1 parent 1067279 commit 3950bad
Show file tree
Hide file tree
Showing 20 changed files with 452 additions and 79 deletions.
156 changes: 154 additions & 2 deletions controllers/gateway/v1beta1/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,13 @@ func (r *gatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
}

// 5. after all status of gateways in the namespace have been updated successfully
// 5. update listener status of this gateway no matter it's accepted or not
result, err := r.updateListenerStatus(ctx, gateway)
if err != nil {
return result, err
}

// 6. after all status of gateways in the namespace have been updated successfully
// list all gateways in the namespace and deploy/redeploy the effective one
activeGateway, result, err := r.findActiveGateway(ctx, gateway)
if err != nil {
Expand All @@ -211,7 +217,7 @@ func (r *gatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
activeGateways[gateway.Namespace] = activeGateway
}

// 6. update addresses of Gateway status if any IP is allocated
// 7. update addresses of Gateway status if any IP is allocated
if activeGateway != nil {
lbSvc := &corev1.Service{}
key := client.ObjectKey{
Expand Down Expand Up @@ -245,6 +251,152 @@ func (r *gatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}

type gatewayStatus struct {
FullName types.NamespacedName
Conditions map[gwv1beta1.GatewayConditionType]metav1.Condition
ListenerStatus map[gwv1beta1.PortNumber]*gwv1beta1.ListenerStatus
}

func (r *gatewayReconciler) updateListenerStatus(ctx context.Context, gateway *gwv1beta1.Gateway) (ctrl.Result, error) {
if len(gateway.Annotations) == 0 {
gateway.Annotations = make(map[string]string)
}

oldHash := gateway.Annotations["gateway.flomesh.io/listeners-hash"]
hash := util.SimpleHash(gateway.Spec.Listeners)

if oldHash != hash {
gateway.Annotations["gateway.flomesh.io/listeners-hash"] = hash
if err := r.fctx.Update(ctx, gateway); err != nil {
return ctrl.Result{}, err
}

existingListenerStatus := make(map[gwv1beta1.SectionName]gwv1beta1.ListenerStatus)
for _, status := range gateway.Status.Listeners {
existingListenerStatus[status.Name] = status
}

gateway.Status.Listeners = nil
listenerStatus := make([]gwv1beta1.ListenerStatus, 0)
for _, listener := range gateway.Spec.Listeners {
status, ok := existingListenerStatus[listener.Name]
if ok {
// update existing status
programmedConditionExists := false
acceptedConditionExists := false
for _, cond := range status.Conditions {
if cond.Type == string(gwv1beta1.ListenerConditionProgrammed) {
programmedConditionExists = true
}
if cond.Type == string(gwv1beta1.ListenerConditionAccepted) {
acceptedConditionExists = true
}
}

if !programmedConditionExists {
metautil.SetStatusCondition(&status.Conditions, metav1.Condition{
Type: string(gwv1beta1.ListenerConditionProgrammed),
Status: metav1.ConditionFalse,
ObservedGeneration: gateway.Generation,
LastTransitionTime: metav1.Time{Time: time.Now()},
Reason: string(gwv1beta1.ListenerReasonInvalid),
Message: fmt.Sprintf("Invalid listener %q[:%d]", listener.Name, listener.Port),
})
}

if !acceptedConditionExists {
metautil.SetStatusCondition(&status.Conditions, metav1.Condition{
Type: string(gwv1beta1.ListenerConditionAccepted),
Status: metav1.ConditionTrue,
ObservedGeneration: gateway.Generation,
LastTransitionTime: metav1.Time{Time: time.Now()},
Reason: string(gwv1beta1.ListenerReasonAccepted),
Message: fmt.Sprintf("listener %q[:%d] is accepted.", listener.Name, listener.Port),
})
}
} else {
// create new status
status = gwv1beta1.ListenerStatus{
Name: listener.Name,
SupportedKinds: supportedKindsByProtocol(listener.Protocol),
Conditions: []metav1.Condition{
{
Type: string(gwv1beta1.ListenerConditionAccepted),
Status: metav1.ConditionTrue,
ObservedGeneration: gateway.Generation,
LastTransitionTime: metav1.Time{Time: time.Now()},
Reason: string(gwv1beta1.ListenerReasonAccepted),
Message: fmt.Sprintf("listener %q[:%d] is accepted.", listener.Name, listener.Port),
},
{
Type: string(gwv1beta1.ListenerConditionProgrammed),
Status: metav1.ConditionTrue,
ObservedGeneration: gateway.Generation,
LastTransitionTime: metav1.Time{Time: time.Now()},
Reason: string(gwv1beta1.ListenerReasonProgrammed),
Message: fmt.Sprintf("Valid listener %q[:%d]", listener.Name, listener.Port),
},
},
}
}

listenerStatus = append(listenerStatus, status)
}

if len(listenerStatus) > 0 {
gateway.Status.Listeners = listenerStatus
if err := r.fctx.Status().Update(ctx, gateway); err != nil {
return ctrl.Result{}, err
}
}
}

return ctrl.Result{}, nil
}

func supportedKindsByProtocol(protocol gwv1beta1.ProtocolType) []gwv1beta1.RouteGroupKind {
switch protocol {
case gwv1beta1.HTTPProtocolType, gwv1beta1.HTTPSProtocolType:
return []gwv1beta1.RouteGroupKind{
{
Group: utils.GroupPointer("gateway.networking.k8s.io"),
Kind: "HTTPRoute",
},
{
Group: utils.GroupPointer("gateway.networking.k8s.io"),
Kind: "GRPCRoute",
},
}
case gwv1beta1.TLSProtocolType:
return []gwv1beta1.RouteGroupKind{
{
Group: utils.GroupPointer("gateway.networking.k8s.io"),
Kind: "TLSRoute",
},
{
Group: utils.GroupPointer("gateway.networking.k8s.io"),
Kind: "TCPRoute",
},
}
case gwv1beta1.TCPProtocolType:
return []gwv1beta1.RouteGroupKind{
{
Group: utils.GroupPointer("gateway.networking.k8s.io"),
Kind: "TCPRoute",
},
}
case gwv1beta1.UDPProtocolType:
return []gwv1beta1.RouteGroupKind{
{
Group: utils.GroupPointer("gateway.networking.k8s.io"),
Kind: "UDPRoute",
},
}
}

return nil
}

func gatewayAddresses(activeGateway *gwv1beta1.Gateway, lbSvc *corev1.Service) []gwv1beta1.GatewayAddress {
existingIPs := gatewayIPs(activeGateway)
expectedIPs := lbIPs(lbSvc)
Expand Down
25 changes: 17 additions & 8 deletions controllers/gateway/v1beta1/gatewayclass_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func (r *gatewayClassReconciler) Reconcile(ctx context.Context, req ctrl.Request
ObjectMeta: metav1.ObjectMeta{
Namespace: req.Namespace,
Name: req.Name,
}})
}},
)
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
Expand All @@ -88,6 +89,13 @@ func (r *gatewayClassReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}

// Accept all GatewayClasses those ControllerName is flomesh.io/gateway-controller
r.setAcceptedStatus(gatewayClass)
result, err := r.updateStatus(ctx, gatewayClass, gwv1beta1.GatewayClassConditionStatusAccepted)
if err != nil {
return result, err
}

gatewayClassList, err := r.fctx.K8sAPI.GatewayAPIClient.GatewayV1beta1().
GatewayClasses().
List(ctx, metav1.ListOptions{})
Expand All @@ -96,13 +104,6 @@ func (r *gatewayClassReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, err
}

// Accept all GatewayClasses those ControllerName is flomesh.io/gateway-controller
r.setAcceptedStatus(gatewayClass)
result, err := r.updateStatus(ctx, gatewayClass, gwv1beta1.GatewayClassConditionStatusAccepted)
if err != nil {
return result, err
}

// If there's multiple GatewayClasses, the oldest is set to active and the rest are set to inactive
for _, class := range r.setActiveStatus(gatewayClassList) {
result, err := r.updateStatus(ctx, class, gateway.GatewayClassConditionStatusActive)
Expand All @@ -111,6 +112,14 @@ func (r *gatewayClassReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
}

// As status of all GatewayClasses have been updated, just send the event
r.fctx.EventHandler.OnAdd(&gwv1beta1.GatewayClass{
ObjectMeta: metav1.ObjectMeta{
Namespace: req.Namespace,
Name: req.Name,
}},
)

return ctrl.Result{}, nil
}

Expand Down
30 changes: 29 additions & 1 deletion controllers/gateway/v1beta1/httproute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

Expand Down Expand Up @@ -84,9 +88,19 @@ func (r *httpRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if len(activeGateways) > 0 {
//activeGateway.Spec.GatewayClassName
for _, gw := range activeGateways {
httpRoute.Status.Parents = nil

for _, parentRef := range httpRoute.Spec.ParentRefs {
if utils.IsRefToGateway(parentRef, utils.ObjectKey(gw)) {

for _, listener := range gw.Spec.Listeners {

}
}
}
//gw.Spec.Listeners[0].Hostname
//httpRoute.Spec.Hostnames[]
httpRoute.Status.Parents = nil

for _, ref := range httpRoute.Spec.ParentRefs {
//if ref.Group ==

Expand All @@ -107,5 +121,19 @@ func (r *httpRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
func (r *httpRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&gwv1beta1.HTTPRoute{}).
//Watches(
//&source.Kind{Type: &gwv1beta1.Gateway{}},
//handler.EnqueueRequestsFromMapFunc(r.gatewayToRoutes),
//).
Complete(r)
}

//func (r *httpRouteReconciler) gatewayToRoutes(obj client.Object) []reconcile.Request {
// gateway, ok := obj.(*gwv1beta1.Gateway)
// if !ok {
// klog.Errorf("unexpected object type: %T", obj)
// return nil
// }
//
//
//}
Empty file removed docs/.gitkeep
Empty file.
4 changes: 4 additions & 0 deletions docs/gateway-config-json.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
| | Enum | | |
|------------------------------|----------------------------|---|---|
| .Listeners[].Protocol | HTTP, HTTPS, TLS, TCP, UDP | | |
| .Listeners[].TLS.TLSModeType | Terminate, Passthrough | | |
9 changes: 5 additions & 4 deletions pkg/event/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
gw "github.com/flomesh-io/fsm-classic/pkg/gateway"
"github.com/google/go-cmp/cmp"
"k8s.io/kubernetes/pkg/util/async"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"
)

Expand Down Expand Up @@ -64,25 +65,25 @@ func NewEventHandler(config EventHandlerConfig) EventHandler {
return handler
}

func (e *FsmEventHandler) OnAdd(obj interface{}) {
func (e *FsmEventHandler) OnAdd(obj client.Object) {
if e.onChange(nil, obj) {
e.Sync()
}
}

func (e *FsmEventHandler) OnUpdate(oldObj, newObj interface{}) {
func (e *FsmEventHandler) OnUpdate(oldObj, newObj client.Object) {
if e.onChange(oldObj, newObj) {
e.Sync()
}
}

func (e *FsmEventHandler) OnDelete(obj interface{}) {
func (e *FsmEventHandler) OnDelete(obj client.Object) {
if e.onChange(obj, nil) {
e.Sync()
}
}

func (e *FsmEventHandler) onChange(oldObj, newObj interface{}) bool {
func (e *FsmEventHandler) onChange(oldObj, newObj client.Object) bool {
if newObj == nil {
return e.cache.Delete(oldObj)
} else {
Expand Down
Loading

0 comments on commit 3950bad

Please sign in to comment.