From c8510d6755c374e86ffd15b93f9b2d90ee192897 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Fri, 22 Sep 2023 13:01:47 +0200 Subject: [PATCH] Add stream subject transform Signed-off-by: Tomasz Pietrek --- controllers/jetstream/stream.go | 59 ++++++++++------ deploy/crds.yml | 10 +++ .../apis/jetstream/v1beta2/streamtypes.go | 70 ++++++++++--------- .../v1beta2/zz_generated.deepcopy.go | 21 ++++++ .../generated/applyconfiguration/utils.go | 2 + 5 files changed, 110 insertions(+), 52 deletions(-) diff --git a/controllers/jetstream/stream.go b/controllers/jetstream/stream.go index 32cf65cd..1ac013dd 100644 --- a/controllers/jetstream/stream.go +++ b/controllers/jetstream/stream.go @@ -405,6 +405,16 @@ func createStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e })) } + if spec.SubjectTransform != nil { + opts = append(opts, func(o *api.StreamConfig) error { + o.SubjectTransform = &jsmapi.SubjectTransformConfig{ + Source: spec.SubjectTransform.Source, + Destination: spec.SubjectTransform.Dest, + } + return nil + }) + } + if spec.AllowDirect { opts = append(opts, jsm.AllowDirect()) } @@ -454,27 +464,36 @@ func updateStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e return err } + var subjectTransform *jsmapi.SubjectTransformConfig + if spec.SubjectTransform != nil { + subjectTransform = &jsmapi.SubjectTransformConfig{ + Source: spec.SubjectTransform.Source, + Destination: spec.SubjectTransform.Dest, + } + } + config := jsmapi.StreamConfig{ - Name: spec.Name, - Description: spec.Description, - Retention: retention, - Subjects: spec.Subjects, - MaxConsumers: spec.MaxConsumers, - MaxMsgs: int64(spec.MaxMsgs), - MaxBytes: int64(spec.MaxBytes), - MaxMsgsPer: int64(spec.MaxMsgsPerSubject), - MaxAge: maxAge, - MaxMsgSize: int32(spec.MaxMsgSize), - Storage: storage, - Discard: discard, - DiscardNewPer: spec.DiscardPerSubject, - Replicas: spec.Replicas, - NoAck: spec.NoAck, - Duplicates: duplicates, - AllowDirect: spec.AllowDirect, - DenyDelete: spec.DenyDelete, - RollupAllowed: spec.AllowRollup, - FirstSeq: spec.FirstSequence, + Name: spec.Name, + Description: spec.Description, + Retention: retention, + Subjects: spec.Subjects, + MaxConsumers: spec.MaxConsumers, + MaxMsgs: int64(spec.MaxMsgs), + MaxBytes: int64(spec.MaxBytes), + MaxMsgsPer: int64(spec.MaxMsgsPerSubject), + MaxAge: maxAge, + MaxMsgSize: int32(spec.MaxMsgSize), + Storage: storage, + Discard: discard, + DiscardNewPer: spec.DiscardPerSubject, + Replicas: spec.Replicas, + NoAck: spec.NoAck, + Duplicates: duplicates, + AllowDirect: spec.AllowDirect, + DenyDelete: spec.DenyDelete, + RollupAllowed: spec.AllowRollup, + FirstSeq: spec.FirstSequence, + SubjectTransform: subjectTransform, } if spec.Republish != nil { config.RePublish = &jsmapi.RePublish{ diff --git a/deploy/crds.yml b/deploy/crds.yml index e577d36b..f295e919 100644 --- a/deploy/crds.yml +++ b/deploy/crds.yml @@ -200,6 +200,16 @@ spec: - s2 - none default: '' + subjectTransform: + description: SubjectTransform is for applying a subject transform (to matching messages) when a new message is received + type: object + properties: + source: + type: string + description: Source subject + dest: + type: string + description: Destination subject to transform to preventDelete: description: When true, the managed Stream will not be deleted when the resource is deleted type: boolean diff --git a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go index da2b0cdf..961801b7 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go @@ -22,38 +22,44 @@ func (s *Stream) GetSpec() interface{} { // StreamSpec is the spec for a Stream resource type StreamSpec struct { - Account string `json:"account"` - AllowDirect bool `json:"allowDirect"` - AllowRollup bool `json:"allowRollup"` - Creds string `json:"creds"` - DenyDelete bool `json:"denyDelete"` - Description string `json:"description"` - DiscardPerSubject bool `json:"discardPerSubject"` - PreventDelete bool `json:"preventDelete"` - PreventUpdate bool `json:"preventUpdate"` - Discard string `json:"discard"` - DuplicateWindow string `json:"duplicateWindow"` - MaxAge string `json:"maxAge"` - MaxBytes int `json:"maxBytes"` - MaxConsumers int `json:"maxConsumers"` - MaxMsgs int `json:"maxMsgs"` - MaxMsgSize int `json:"maxMsgSize"` - MaxMsgsPerSubject int `json:"maxMsgsPerSubject"` - Mirror *StreamSource `json:"mirror"` - Name string `json:"name"` - Nkey string `json:"nkey"` - NoAck bool `json:"noAck"` - Placement *StreamPlacement `json:"placement"` - Replicas int `json:"replicas"` - Republish *RePublish `json:"republish"` - FirstSequence uint64 `json:"firstSequence"` - Compression string `json:"compression"` - Retention string `json:"retention"` - Servers []string `json:"servers"` - Sources []*StreamSource `json:"sources"` - Storage string `json:"storage"` - Subjects []string `json:"subjects"` - TLS TLS `json:"tls"` + Account string `json:"account"` + AllowDirect bool `json:"allowDirect"` + AllowRollup bool `json:"allowRollup"` + Creds string `json:"creds"` + DenyDelete bool `json:"denyDelete"` + Description string `json:"description"` + DiscardPerSubject bool `json:"discardPerSubject"` + PreventDelete bool `json:"preventDelete"` + PreventUpdate bool `json:"preventUpdate"` + Discard string `json:"discard"` + DuplicateWindow string `json:"duplicateWindow"` + MaxAge string `json:"maxAge"` + MaxBytes int `json:"maxBytes"` + MaxConsumers int `json:"maxConsumers"` + MaxMsgs int `json:"maxMsgs"` + MaxMsgSize int `json:"maxMsgSize"` + MaxMsgsPerSubject int `json:"maxMsgsPerSubject"` + Mirror *StreamSource `json:"mirror"` + Name string `json:"name"` + Nkey string `json:"nkey"` + NoAck bool `json:"noAck"` + Placement *StreamPlacement `json:"placement"` + Replicas int `json:"replicas"` + Republish *RePublish `json:"republish"` + SubjectTransform *SubjectTransform `json:subjectTransform` + FirstSequence uint64 `json:"firstSequence"` + Compression string `json:"compression"` + Retention string `json:"retention"` + Servers []string `json:"servers"` + Sources []*StreamSource `json:"sources"` + Storage string `json:"storage"` + Subjects []string `json:"subjects"` + TLS TLS `json:"tls"` +} + +type SubjectTransform struct { + Source string `json:"source"` + Dest string `json:"dest"` } type StreamPlacement struct { diff --git a/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go b/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go index eb17c9b4..d6f17ddc 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go @@ -425,6 +425,11 @@ func (in *StreamSpec) DeepCopyInto(out *StreamSpec) { *out = new(RePublish) **out = **in } + if in.SubjectTransform != nil { + in, out := &in.SubjectTransform, &out.SubjectTransform + *out = new(SubjectTransform) + **out = **in + } if in.Servers != nil { in, out := &in.Servers, &out.Servers *out = make([]string, len(*in)) @@ -460,6 +465,22 @@ func (in *StreamSpec) DeepCopy() *StreamSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SubjectTransform) DeepCopyInto(out *SubjectTransform) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubjectTransform. +func (in *SubjectTransform) DeepCopy() *SubjectTransform { + if in == nil { + return nil + } + out := new(SubjectTransform) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TLS) DeepCopyInto(out *TLS) { *out = *in diff --git a/pkg/jetstream/generated/applyconfiguration/utils.go b/pkg/jetstream/generated/applyconfiguration/utils.go index 18c5056d..4e1751e0 100644 --- a/pkg/jetstream/generated/applyconfiguration/utils.go +++ b/pkg/jetstream/generated/applyconfiguration/utils.go @@ -52,6 +52,8 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &jetstreamv1beta2.StreamSourceApplyConfiguration{} case v1beta2.SchemeGroupVersion.WithKind("StreamSpec"): return &jetstreamv1beta2.StreamSpecApplyConfiguration{} + case v1beta2.SchemeGroupVersion.WithKind("SubjectTransform"): + return &jetstreamv1beta2.SubjectTransformApplyConfiguration{} case v1beta2.SchemeGroupVersion.WithKind("TLS"): return &jetstreamv1beta2.TLSApplyConfiguration{} case v1beta2.SchemeGroupVersion.WithKind("TLSSecret"):