From f54767cf4e07c1183a041ca4c93a05b4dcba343f Mon Sep 17 00:00:00 2001 From: Maxym Fugol Date: Mon, 24 Apr 2023 12:54:37 +0300 Subject: [PATCH 1/2] add support for discardNew --- controllers/jetstream/stream.go | 5 +++++ deploy/crds.yml | 4 ++++ pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go | 1 + 3 files changed, 10 insertions(+) diff --git a/controllers/jetstream/stream.go b/controllers/jetstream/stream.go index fff06994..ccb66a32 100644 --- a/controllers/jetstream/stream.go +++ b/controllers/jetstream/stream.go @@ -409,6 +409,10 @@ func createStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e opts = append(opts, jsm.DenyDelete()) } + if spec.DiscardPerSubject { + opts = append(opts, jsm.DiscardNewPerSubject()) + } + _, err = c.NewStream(ctx, spec.Name, opts) return err } @@ -451,6 +455,7 @@ func updateStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e MaxMsgSize: int32(spec.MaxMsgSize), Storage: storage, Discard: discard, + DiscardNewPer: spec.DiscardPerSubject, Replicas: spec.Replicas, NoAck: spec.NoAck, Duplicates: duplicates, diff --git a/deploy/crds.yml b/deploy/crds.yml index a93f39fb..15c3d0c1 100644 --- a/deploy/crds.yml +++ b/deploy/crds.yml @@ -209,6 +209,10 @@ spec: description: When true, restricts the ability to delete messages from a stream via the API. Cannot be changed once set to true. type: boolean default: false + discardPerSubject: + description: Allows to discard messages on a subject basis. + type: boolean + default: false status: type: object properties: diff --git a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go index 2f3cdff4..b7f719dd 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go @@ -28,6 +28,7 @@ type StreamSpec struct { Creds string `json:"creds"` DenyDelete bool `json:"denyDelete"` Description string `json:"description"` + DiscardPerSubject bool `json:"discardPerSubject"` PreventDelete bool `json:"preventDelete"` PreventUpdate bool `json:"preventUpdate"` Discard string `json:"discard"` From d7272ba14b5ca03b46ddc78a4dad5a61d635483d Mon Sep 17 00:00:00 2001 From: Maxym Fugol Date: Wed, 26 Apr 2023 13:05:36 +0300 Subject: [PATCH 2/2] specify MaxMsgsPer for stream updates --- controllers/jetstream/stream.go | 1 + 1 file changed, 1 insertion(+) diff --git a/controllers/jetstream/stream.go b/controllers/jetstream/stream.go index ccb66a32..4235449d 100644 --- a/controllers/jetstream/stream.go +++ b/controllers/jetstream/stream.go @@ -451,6 +451,7 @@ func updateStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e MaxConsumers: spec.MaxConsumers, MaxMsgs: int64(spec.MaxMsgs), MaxBytes: int64(spec.MaxBytes), + MaxMsgsPer: int64(spec.MaxMsgsPerSubject), MaxAge: maxAge, MaxMsgSize: int32(spec.MaxMsgSize), Storage: storage,