diff --git a/controllers/jetstream/stream.go b/controllers/jetstream/stream.go index fff06994..4235449d 100644 --- a/controllers/jetstream/stream.go +++ b/controllers/jetstream/stream.go @@ -409,6 +409,10 @@ func createStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e opts = append(opts, jsm.DenyDelete()) } + if spec.DiscardPerSubject { + opts = append(opts, jsm.DiscardNewPerSubject()) + } + _, err = c.NewStream(ctx, spec.Name, opts) return err } @@ -447,10 +451,12 @@ func updateStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e MaxConsumers: spec.MaxConsumers, MaxMsgs: int64(spec.MaxMsgs), MaxBytes: int64(spec.MaxBytes), + MaxMsgsPer: int64(spec.MaxMsgsPerSubject), MaxAge: maxAge, MaxMsgSize: int32(spec.MaxMsgSize), Storage: storage, Discard: discard, + DiscardNewPer: spec.DiscardPerSubject, Replicas: spec.Replicas, NoAck: spec.NoAck, Duplicates: duplicates, diff --git a/deploy/crds.yml b/deploy/crds.yml index a93f39fb..15c3d0c1 100644 --- a/deploy/crds.yml +++ b/deploy/crds.yml @@ -209,6 +209,10 @@ spec: description: When true, restricts the ability to delete messages from a stream via the API. Cannot be changed once set to true. type: boolean default: false + discardPerSubject: + description: Allows to discard messages on a subject basis. + type: boolean + default: false status: type: object properties: diff --git a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go index 2f3cdff4..b7f719dd 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go @@ -28,6 +28,7 @@ type StreamSpec struct { Creds string `json:"creds"` DenyDelete bool `json:"denyDelete"` Description string `json:"description"` + DiscardPerSubject bool `json:"discardPerSubject"` PreventDelete bool `json:"preventDelete"` PreventUpdate bool `json:"preventUpdate"` Discard string `json:"discard"`