Skip to content

Commit

Permalink
feat(msk-alpha): MSK Kafka versions 2.8.2.tiered and 3.5.1 and Storag…
Browse files Browse the repository at this point in the history
…eMode property (#27560)

This patch adds support for the `tiered` `storage mode` and Kafka versions `2.8.2.tiered` & `3.5.1` in the `aws-msk-alpha` package.

---

### Changes

- added Kafka versions `2.8.2.tiered` & `3.5.1`.
- added `storageMode` L1 construct property to the L2 msk
- added unit and integ tests for `Kafka versions and 'storageMode`
- updated test versions to latest supported Kafka version as desired

Ref:
- [aws: MSK supported Kafka versions](https://docs.aws.amazon.com/msk/latest/developerguide/supported-kafka-versions.html)

---

Closes #27551

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
chrispidcock authored Dec 1, 2023
1 parent 71b4648 commit f9f15fa
Show file tree
Hide file tree
Showing 36 changed files with 36,194 additions and 251 deletions.
24 changes: 22 additions & 2 deletions packages/@aws-cdk/aws-msk-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ new CfnOutput(this, 'ZookeeperConnectionTls', { value: cluster.zookeeperConnecti
To import an existing MSK cluster into your CDK app use the `.fromClusterArn()` method.

```ts
const cluster = msk.Cluster.fromClusterArn(this, 'Cluster',
const cluster = msk.Cluster.fromClusterArn(this, 'Cluster',
'arn:aws:kafka:us-west-2:1234567890:cluster/a-cluster/11111111-1111-1111-1111-111111111111-1',
);
```
Expand Down Expand Up @@ -146,7 +146,7 @@ const cluster = new msk.Cluster(this, 'cluster', {

### SASL/IAM + TLS

Enable client authentication with [IAM](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html)
Enable client authentication with [IAM](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html)
as well as enable client authentication with TLS by setting the `certificateAuthorityArns` property to reference your ACM Private CA. [More info on Private CAs.](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html)

```ts
Expand Down Expand Up @@ -210,3 +210,23 @@ in the `cdk.json` file.
}
```

## Storage Mode

You can configure an MSK cluster storage mode using the `storageMode` property.

Tiered storage is a low-cost storage tier for Amazon MSK that scales to virtually unlimited storage,
making it cost-effective to build streaming data applications.

> Visit [Tiered storage](https://docs.aws.amazon.com/msk/latest/developerguide/msk-tiered-storage.html) for more details.
```ts
declare const vpc: ec2.Vpc;
declare const bucket: s3.IBucket;

const cluster = new msk.Cluster(this, 'cluster', {
clusterName: 'myCluster',
kafkaVersion: msk.KafkaVersion.V2_8_2_TIERED,
vpc,
storageMode: msk.StorageMode.TIERED,
});
```
17 changes: 17 additions & 0 deletions packages/@aws-cdk/aws-msk-alpha/lib/cluster-version.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ export class KafkaVersion {
*/
public static readonly V2_8_1 = KafkaVersion.of('2.8.1');

/**
* AWS MSK Kafka version 2.8.2.tiered
*/
public static readonly V2_8_2_TIERED = KafkaVersion.of('2.8.2.tiered');

/**
* Kafka version 3.1.1
*/
Expand All @@ -101,6 +106,11 @@ export class KafkaVersion {
*/
public static readonly V3_4_0 = KafkaVersion.of('3.4.0');

/**
* Kafka version 3.5.1
*/
public static readonly V3_5_1 = KafkaVersion.of('3.5.1');

/**
* Custom cluster version
* @param version custom version number
Expand All @@ -114,4 +124,11 @@ export class KafkaVersion {
* @param version cluster version number
*/
private constructor(public readonly version: string) {}

/**
* Checks if the cluster version supports tiered storage mode.
*/
public isTieredStorageCompatible() {
return this.version.endsWith('.tiered');
};
}
63 changes: 63 additions & 0 deletions packages/@aws-cdk/aws-msk-alpha/lib/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,25 @@ export interface ClusterProps {
* The physical name of the cluster.
*/
readonly clusterName: string;

/**
* The version of Apache Kafka.
*/
readonly kafkaVersion: KafkaVersion;

/**
* Number of Apache Kafka brokers deployed in each Availability Zone.
*
* @default 1
*/
readonly numberOfBrokerNodes?: number;

/**
* Defines the virtual networking environment for this cluster.
* Must have at least 2 subnets in two different AZs.
*/
readonly vpc: ec2.IVpc;

/**
* Where to place the nodes within the VPC.
* Amazon MSK distributes the broker nodes evenly across the subnets that you specify.
Expand All @@ -83,57 +87,74 @@ export interface ClusterProps {
* @default - the Vpc default strategy if not specified.
*/
readonly vpcSubnets?: ec2.SubnetSelection;

/**
* The EC2 instance type that you want Amazon MSK to use when it creates your brokers.
*
* @see https://docs.aws.amazon.com/msk/latest/developerguide/msk-create-cluster.html#broker-instance-types
* @default kafka.m5.large
*/
readonly instanceType?: ec2.InstanceType;

/**
* The AWS security groups to associate with the elastic network interfaces in order to specify who can
* connect to and communicate with the Amazon MSK cluster.
*
* @default - create new security group
*/
readonly securityGroups?: ec2.ISecurityGroup[];

/**
* Information about storage volumes attached to MSK broker nodes.
*
* @default - 1000 GiB EBS volume
*/
readonly ebsStorageInfo?: EbsStorageInfo;

/**
* This controls storage mode for supported storage tiers.
*
* @default - StorageMode.LOCAL
* @see https://docs.aws.amazon.com/msk/latest/developerguide/msk-tiered-storage.html
*/
readonly storageMode?: StorageMode;

/**
* The Amazon MSK configuration to use for the cluster.
*
* @default - none
*/
readonly configurationInfo?: ClusterConfigurationInfo;

/**
* Cluster monitoring configuration.
*
* @default - DEFAULT monitoring level
*/
readonly monitoring?: MonitoringConfiguration;

/**
* Configure your MSK cluster to send broker logs to different destination types.
*
* @default - disabled
*/
readonly logging?: BrokerLogging;

/**
* Config details for encryption in transit.
*
* @default - enabled
*/
readonly encryptionInTransit?: EncryptionInTransitConfig;

/**
* Configuration properties for client authentication.
* MSK supports using private TLS certificates or SASL/SCRAM to authenticate the identity of clients.
*
* @default - disabled
*/
readonly clientAuthentication?: ClientAuthentication;

/**
* What to do when this resource is deleted from a stack.
*
Expand All @@ -152,6 +173,7 @@ export interface EbsStorageInfo {
* @default 1000
*/
readonly volumeSize?: number;

/**
* The AWS KMS key for encrypting data at rest.
*
Expand All @@ -160,6 +182,21 @@ export interface EbsStorageInfo {
readonly encryptionKey?: kms.IKey;
}

/**
* The storage mode for the cluster brokers.
*/
export enum StorageMode {
/**
* Local storage mode utilizes network attached EBS storage.
*/
LOCAL = 'LOCAL',

/**
* Tiered storage mode utilizes EBS storage and Tiered storage.
*/
TIERED = 'TIERED',
}

/**
* The Amazon MSK configuration to use for the cluster.
* Note: There is currently no Cloudformation Resource to create a Configuration
Expand All @@ -170,6 +207,7 @@ export interface ClusterConfigurationInfo {
* For example, arn:aws:kafka:us-east-1:123456789012:configuration/example-configuration-name/abcdabcd-1234-abcd-1234-abcd123e8e8e-1.
*/
readonly arn: string;

/**
* The revision of the Amazon MSK configuration to use.
*/
Expand All @@ -186,14 +224,17 @@ export enum ClusterMonitoringLevel {
* Default metrics are the essential metrics to monitor.
*/
DEFAULT = 'DEFAULT',

/**
* Per Broker metrics give you metrics at the broker level.
*/
PER_BROKER = 'PER_BROKER',

/**
* Per Topic Per Broker metrics help you understand volume at the topic level.
*/
PER_TOPIC_PER_BROKER = 'PER_TOPIC_PER_BROKER',

/**
* Per Topic Per Partition metrics help you understand consumer group lag at the topic partition level.
*/
Expand All @@ -210,12 +251,14 @@ export interface MonitoringConfiguration {
* @default DEFAULT
*/
readonly clusterMonitoringLevel?: ClusterMonitoringLevel;

/**
* Indicates whether you want to enable or disable the JMX Exporter.
*
* @default false
*/
readonly enablePrometheusJmxExporter?: boolean;

/**
* Indicates whether you want to enable or disable the Prometheus Node Exporter.
*
Expand All @@ -236,12 +279,14 @@ export interface BrokerLogging {
* @default - disabled
*/
readonly firehoseDeliveryStreamName?: string;

/**
* The CloudWatch Logs group that is the destination for broker logs.
*
* @default - disabled
*/
readonly cloudwatchLogGroup?: logs.ILogGroup;

/**
* Details of the Amazon S3 destination for broker logs.
*
Expand All @@ -258,6 +303,7 @@ export interface S3LoggingConfiguration {
* The S3 bucket that is the destination for broker logs.
*/
readonly bucket: s3.IBucket;

/**
* The S3 prefix that is the destination for broker logs.
*
Expand All @@ -274,10 +320,12 @@ export enum ClientBrokerEncryption {
* TLS means that client-broker communication is enabled with TLS only.
*/
TLS = 'TLS',

/**
* TLS_PLAINTEXT means that client-broker communication is enabled for both TLS-encrypted, as well as plaintext data.
*/
TLS_PLAINTEXT = 'TLS_PLAINTEXT',

/**
* PLAINTEXT means that client-broker communication is enabled in plaintext only.
*/
Expand All @@ -296,6 +344,7 @@ export interface EncryptionInTransitConfig {
* @default - TLS
*/
readonly clientBroker?: ClientBrokerEncryption;

/**
* Indicates that data communication among the broker nodes of the cluster is encrypted.
*
Expand All @@ -314,12 +363,14 @@ export interface SaslAuthProps {
* @default false
*/
readonly scram?: boolean;

/**
* Enable IAM access control.
*
* @default false
*/
readonly iam?: boolean;

/**
* KMS Key to encrypt SASL/SCRAM secrets.
*
Expand Down Expand Up @@ -486,6 +537,17 @@ export class Cluster extends ClusterBase {
ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.LARGE),
);

if (props.storageMode && props.storageMode === StorageMode.TIERED) {
if (!props.kafkaVersion.isTieredStorageCompatible()) {
throw Error(`To deploy a tiered cluster you must select a compatible Kafka version, got ${props.kafkaVersion.version}`);
}
if (instanceType === this.mskInstanceType(
ec2.InstanceType.of(ec2.InstanceClass.T3, ec2.InstanceSize.SMALL),
)) {
throw Error('Tiered storage doesn\'t support broker type t3.small');
}
}

const encryptionAtRest = props.ebsStorageInfo?.encryptionKey
? {
dataVolumeKmsKeyId:
Expand Down Expand Up @@ -683,6 +745,7 @@ export class Cluster extends ClusterBase {
configurationInfo: props.configurationInfo,
enhancedMonitoring: props.monitoring?.clusterMonitoringLevel,
openMonitoring: openMonitoring,
storageMode: props.storageMode,
loggingInfo: loggingInfo,
clientAuthentication: clientAuthentication,
});
Expand Down
Loading

0 comments on commit f9f15fa

Please sign in to comment.