Skip to content
This repository has been archived by the owner on May 13, 2024. It is now read-only.

Commit

Permalink
feat: managed clusters can be installed in a different namespace than…
Browse files Browse the repository at this point in the history
… the control plane (#281)

* chore: initialize version 0.2.5-alpha.2

Signed-off-by: Lin Yang <reaver@flomesh.io>

* feat: detect mesh namespace

Signed-off-by: Lin Yang <reaver@flomesh.io>

---------

Signed-off-by: Lin Yang <reaver@flomesh.io>
  • Loading branch information
reaver-flomesh committed Jul 11, 2023
1 parent 2374a94 commit 0be6a27
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 47 deletions.
6 changes: 3 additions & 3 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func main() {
controlPlaneConfigStore := config.NewStore(k8sApi)
mcClient := controlPlaneConfigStore.MeshConfig
mc := mcClient.GetConfig()
mc.Cluster.UID = getClusterUID(k8sApi)
mc.Cluster.UID = getClusterUID(k8sApi, mc)
mc, err := mcClient.UpdateConfig(mc)
if err != nil {
os.Exit(1)
Expand Down Expand Up @@ -200,8 +200,8 @@ func newK8sAPI(kubeconfig *rest.Config, args *startArgs) *kube.K8sAPI {
return api
}

func getClusterUID(api *kube.K8sAPI) string {
ns, err := api.Client.CoreV1().Namespaces().Get(context.TODO(), config.GetFsmNamespace(), metav1.GetOptions{})
func getClusterUID(api *kube.K8sAPI, mc *config.MeshConfig) string {
ns, err := api.Client.CoreV1().Namespaces().Get(context.TODO(), mc.GetMeshNamespace(), metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get fsm namespace: %s", err)
os.Exit(1)
Expand Down
6 changes: 3 additions & 3 deletions cmd/manager/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func createWebhookConfigurations(k8sApi *kube.K8sAPI, configStore *config.Store,
os.Exit(1)
}

ns := config.GetFsmNamespace()
ns := mc.GetMeshNamespace()
svcName := mc.Webhook.ServiceName
caBundle := cert.CA
webhooks.RegisterWebhooks(ns, svcName, caBundle)
Expand Down Expand Up @@ -134,8 +134,8 @@ func issueCertForWebhook(certMgr certificate.Manager, mc *config.MeshConfig) (*c
commons.DefaultCAValidityPeriod,
[]string{
mc.Webhook.ServiceName,
fmt.Sprintf("%s.%s.svc", mc.Webhook.ServiceName, config.GetFsmNamespace()),
fmt.Sprintf("%s.%s.svc.cluster.local", mc.Webhook.ServiceName, config.GetFsmNamespace()),
fmt.Sprintf("%s.%s.svc", mc.Webhook.ServiceName, mc.GetMeshNamespace()),
fmt.Sprintf("%s.%s.svc.cluster.local", mc.Webhook.ServiceName, mc.GetMeshNamespace()),
},
)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func resolveValues(object metav1.Object, mc *config.MeshConfig) (map[string]inte
overrides := []string{
"fsm.ingress.namespaced=true",
fmt.Sprintf("fsm.image.repository=%s", mc.Images.Repository),
fmt.Sprintf("fsm.namespace=%s", config.GetFsmNamespace()),
fmt.Sprintf("fsm.namespace=%s", mc.GetMeshNamespace()),
}

for _, ov := range overrides {
Expand Down
8 changes: 5 additions & 3 deletions pkg/cluster/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,23 @@ func NewConnector(ctx context.Context, broker *event.Broker, certMgr certificate
return nil, err
}

clusterCfg := config.NewStore(k8sAPI)
mc := clusterCfg.MeshConfig.GetConfig()

// checks if fsm is installed in the cluster, this's a MUST otherwise it doesn't work
_, err = k8sAPI.Client.AppsV1().
Deployments(config.GetFsmNamespace()).
Deployments(mc.GetMeshNamespace()).
Get(context.TODO(), commons.ManagerDeploymentName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
klog.Error("FSM Control Plane is not installed or not in a proper state, please check it.")
return nil, err
}

klog.Errorf("Get FSM manager component %s/%s error: %s", config.GetFsmNamespace(), commons.ManagerDeploymentName, err)
klog.Errorf("Get FSM manager component %s/%s error: %s", mc.GetMeshNamespace(), commons.ManagerDeploymentName, err)
return nil, err
}

clusterCfg := config.NewStore(k8sAPI)
connectorCache := cache.NewCache(connectorCtx, k8sAPI, clusterCfg, broker, certMgr, resyncPeriod)

if connectorCtx.ConnectorConfig.IsInCluster() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/listener/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (l basicConfigChangeListener) updateIngressControllerSpec(oldCfg *config.Me
},
)
svcList, err := l.listenerCfg.K8sApi.Client.CoreV1().
Services(config.GetFsmNamespace()).
Services(cfg.GetMeshNamespace()).
List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String()})

if err != nil {
Expand Down Expand Up @@ -127,7 +127,7 @@ func (l basicConfigChangeListener) updateIngressControllerSpec(oldCfg *config.Me

if len(service.Spec.Ports) > 0 {
if _, err := l.listenerCfg.K8sApi.Client.CoreV1().
Services(config.GetFsmNamespace()).
Services(cfg.GetMeshNamespace()).
Update(context.TODO(), service, metav1.UpdateOptions{}); err != nil {
klog.Errorf("Failed update spec of ingress-pipy service: %s", err)
}
Expand Down
90 changes: 61 additions & 29 deletions pkg/config/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
Expand All @@ -48,17 +49,18 @@ var (
)

type MeshConfig struct {
IsManaged bool `json:"isManaged"`
Repo Repo `json:"repo"`
Images Images `json:"images"`
Webhook Webhook `json:"webhook"`
Ingress Ingress `json:"ingress"`
GatewayApi GatewayApi `json:"gatewayApi"`
Certificate Certificate `json:"certificate"`
Cluster Cluster `json:"cluster"`
ServiceLB ServiceLB `json:"serviceLB"`
Logging Logging `json:"logging"`
FLB FLB `json:"flb"`
IsManaged bool `json:"isManaged"`
Repo Repo `json:"repo"`
Images Images `json:"images"`
Webhook Webhook `json:"webhook"`
Ingress Ingress `json:"ingress"`
GatewayApi GatewayApi `json:"gatewayApi"`
Certificate Certificate `json:"certificate"`
Cluster Cluster `json:"cluster"`
ServiceLB ServiceLB `json:"serviceLB"`
Logging Logging `json:"logging"`
FLB FLB `json:"flb"`
MeshNamespace string `json:"-"`
}

type Repo struct {
Expand Down Expand Up @@ -142,21 +144,45 @@ type Logging struct {
type MeshConfigClient struct {
k8sApi *kube.K8sAPI
cmLister v1.ConfigMapNamespaceLister
meshNs string
}

func NewMeshConfigClient(k8sApi *kube.K8sAPI) *MeshConfigClient {
informerFactory := informers.NewSharedInformerFactoryWithOptions(k8sApi.Client, 60*time.Second, informers.WithNamespace(GetFsmNamespace()))
configmapLister := informerFactory.Core().V1().ConfigMaps().Lister().ConfigMaps(GetFsmNamespace())
configmapInformer := informerFactory.Core().V1().ConfigMaps().Informer()
go configmapInformer.Run(wait.NeverStop)
managers, err := k8sApi.Client.AppsV1().
Deployments(corev1.NamespaceAll).
List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(
map[string]string{
"app.kubernetes.io/component": commons.ManagerDeploymentName,
"app.kubernetes.io/instance": commons.ManagerDeploymentName,
},
).String(),
})

if !k8scache.WaitForCacheSync(wait.NeverStop, configmapInformer.HasSynced) {
runtime.HandleError(fmt.Errorf("timed out waiting for configmap to sync"))
if err != nil {
panic(err)
}

return &MeshConfigClient{
k8sApi: k8sApi,
cmLister: configmapLister,
switch len(managers.Items) {
case 1:
mgr := managers.Items[0]
meshNs := mgr.Namespace
informerFactory := informers.NewSharedInformerFactoryWithOptions(k8sApi.Client, 60*time.Second, informers.WithNamespace(meshNs))
configmapLister := informerFactory.Core().V1().ConfigMaps().Lister().ConfigMaps(meshNs)
configmapInformer := informerFactory.Core().V1().ConfigMaps().Informer()
go configmapInformer.Run(wait.NeverStop)

if !k8scache.WaitForCacheSync(wait.NeverStop, configmapInformer.HasSynced) {
runtime.HandleError(fmt.Errorf("timed out waiting for configmap to sync"))
}

return &MeshConfigClient{
k8sApi: k8sApi,
cmLister: configmapLister,
meshNs: meshNs,
}
default:
panic(fmt.Sprintf("There's total %d %s in the cluster, should be ONLY ONE.", len(managers.Items), commons.ManagerDeploymentName))
}
}

Expand Down Expand Up @@ -201,7 +227,11 @@ func (o *MeshConfig) GetCaBundleNamespace() string {
return o.Certificate.CaBundleNamespace
}

return GetFsmNamespace()
return o.GetMeshNamespace()
}

func (o *MeshConfig) GetMeshNamespace() string {
return o.MeshNamespace
}

func (o *MeshConfig) NamespacedIngressCodebasePath(namespace string) string {
Expand Down Expand Up @@ -303,21 +333,21 @@ func (c *MeshConfigClient) UpdateConfig(config *MeshConfig) (*MeshConfig, error)

cm := c.getConfigMap()
if cm == nil {
return nil, fmt.Errorf("config map '%s/fsm-mesh-config' is not found", GetFsmNamespace())
return nil, fmt.Errorf("config map '%s/fsm-mesh-config' is not found", c.meshNs)
}
cm.Data[commons.MeshConfigJsonName] = config.ToJson()

cm, err = c.k8sApi.Client.CoreV1().
ConfigMaps(GetFsmNamespace()).
ConfigMaps(c.meshNs).
Update(context.TODO(), cm, metav1.UpdateOptions{})

if err != nil {
msg := fmt.Sprintf("Update ConfigMap %s/fsm-mesh-config error, %s", GetFsmNamespace(), err)
msg := fmt.Sprintf("Update ConfigMap %s/fsm-mesh-config error, %s", c.meshNs, err)
klog.Errorf(msg)
return nil, fmt.Errorf(msg)
}

klog.V(5).Infof("After updating, ConfigMap %s/fsm-mesh-config = %#v", GetFsmNamespace(), cm)
klog.V(5).Infof("After updating, ConfigMap %s/fsm-mesh-config = %#v", c.meshNs, cm)

return ParseMeshConfig(cm)
}
Expand All @@ -329,15 +359,15 @@ func (c *MeshConfigClient) getConfigMap() *corev1.ConfigMap {
// it takes time to sync, perhaps still not in the local store yet
if apierrors.IsNotFound(err) {
cm, err = c.k8sApi.Client.CoreV1().
ConfigMaps(GetFsmNamespace()).
ConfigMaps(c.meshNs).
Get(context.TODO(), commons.MeshConfigName, metav1.GetOptions{})

if err != nil {
klog.Errorf("Get ConfigMap %s/fsm-mesh-config from API server error, %s", GetFsmNamespace(), err.Error())
klog.Errorf("Get ConfigMap %s/fsm-mesh-config from API server error, %s", c.meshNs, err.Error())
return nil
}
} else {
klog.Errorf("Get ConfigMap %s/fsm-mesh-config error, %s", GetFsmNamespace(), err.Error())
klog.Errorf("Get ConfigMap %s/fsm-mesh-config error, %s", c.meshNs, err.Error())
return nil
}
}
Expand All @@ -348,7 +378,7 @@ func (c *MeshConfigClient) getConfigMap() *corev1.ConfigMap {
func ParseMeshConfig(cm *corev1.ConfigMap) (*MeshConfig, error) {
cfgJson, ok := cm.Data[commons.MeshConfigJsonName]
if !ok {
msg := fmt.Sprintf("Config file mesh_config.json not found, please check ConfigMap %s/fsm-mesh-config.", GetFsmNamespace())
msg := fmt.Sprintf("Config file mesh_config.json not found, please check ConfigMap %s/fsm-mesh-config.", cm.Namespace)
klog.Errorf(msg)
return nil, fmt.Errorf(msg)
}
Expand All @@ -370,5 +400,7 @@ func ParseMeshConfig(cm *corev1.ConfigMap) (*MeshConfig, error) {
return nil, err
}

cfg.MeshNamespace = cm.Namespace

return &cfg, nil
}
2 changes: 1 addition & 1 deletion pkg/config/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (

var (
meshMetadata FsmMetadata
DefaultWatchedConfigMaps = sets.String{}
DefaultWatchedConfigMaps = sets.Set[string]{}
)

func init() {
Expand Down
10 changes: 5 additions & 5 deletions pkg/config/utils/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,20 @@ func getLoggingSecret(api *kube.K8sAPI, mc *config.MeshConfig) (*corev1.Secret,
if mc.Logging.Enabled {
secretName := mc.Logging.SecretName
secret, err := api.Client.CoreV1().
Secrets(config.GetFsmNamespace()).
Secrets(mc.GetMeshNamespace()).
Get(context.TODO(), secretName, metav1.GetOptions{})

if err != nil {
if errors.IsNotFound(err) {
secret, err = api.Client.CoreV1().
Secrets(config.GetFsmNamespace()).
Secrets(mc.GetMeshNamespace()).
Create(
context.TODO(),
&corev1.Secret{
TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Name: secretName,
Namespace: config.GetFsmNamespace(),
Namespace: mc.GetMeshNamespace(),
},
Data: map[string][]byte{
"url": []byte("http://localhost:8123/ping"),
Expand All @@ -138,14 +138,14 @@ func getLoggingSecret(api *kube.K8sAPI, mc *config.MeshConfig) (*corev1.Secret,
)

if err != nil {
klog.Errorf("failed to create Secret %s/%s: %s", config.GetFsmNamespace(), secretName, err)
klog.Errorf("failed to create Secret %s/%s: %s", mc.GetMeshNamespace(), secretName, err)
return nil, err
}

return secret, nil
}

klog.Errorf("failed to get Secret %s/%s: %s", config.GetFsmNamespace(), secretName, err)
klog.Errorf("failed to get Secret %s/%s: %s", mc.GetMeshNamespace(), secretName, err)
return nil, err
}

Expand Down

0 comments on commit 0be6a27

Please sign in to comment.