Skip to content

Commit

Permalink
Extract Aspire.Hosting.Kafka.Tests project (#4910)
Browse files Browse the repository at this point in the history
* Extract Aspire.Hosting.Kafka.Tests project

Contributes to #3185
Contributes to #4294
  • Loading branch information
eerhardt authored Jul 16, 2024
1 parent 4e9b1b6 commit 4374062
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 140 deletions.
7 changes: 7 additions & 0 deletions Aspire.sln
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Hosting.PostgreSQL.T
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Hosting.Qdrant.Tests", "tests\Aspire.Hosting.Qdrant.Tests\Aspire.Hosting.Qdrant.Tests.csproj", "{8E2AA85E-C351-47B4-AF91-58557FAD5840}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Hosting.Kafka.Tests", "tests\Aspire.Hosting.Kafka.Tests\Aspire.Hosting.Kafka.Tests.csproj", "{0A83AA67-221E-44B4-9BA9-DC64DC17949E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1338,6 +1340,10 @@ Global
{7425E5B2-BC47-4521-AC40-B8CECA329E08}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7425E5B2-BC47-4521-AC40-B8CECA329E08}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7425E5B2-BC47-4521-AC40-B8CECA329E08}.Release|Any CPU.Build.0 = Release|Any CPU
{0A83AA67-221E-44B4-9BA9-DC64DC17949E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0A83AA67-221E-44B4-9BA9-DC64DC17949E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0A83AA67-221E-44B4-9BA9-DC64DC17949E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0A83AA67-221E-44B4-9BA9-DC64DC17949E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1583,6 +1589,7 @@ Global
{1BC02557-B78B-48CE-9D3C-488A6B7672F4} = {830A89EC-4029-4753-B25A-068BAE37DEC7}
{8E2AA85E-C351-47B4-AF91-58557FAD5840} = {830A89EC-4029-4753-B25A-068BAE37DEC7}
{7425E5B2-BC47-4521-AC40-B8CECA329E08} = {830A89EC-4029-4753-B25A-068BAE37DEC7}
{0A83AA67-221E-44B4-9BA9-DC64DC17949E} = {830A89EC-4029-4753-B25A-068BAE37DEC7}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6DCEDFEC-988E-4CB3-B45B-191EB5086E0C}
Expand Down
4 changes: 1 addition & 3 deletions tests/Aspire.EndToEnd.Tests/IntegrationServicesFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ public Task DumpComponentLogsAsync(TestResourceNames resource, ITestOutputHelper
TestResourceNames.cosmos or TestResourceNames.efcosmos => "cosmos",
TestResourceNames.eventhubs => "eventhubs",
TestResourceNames.garnet => "garnet",
TestResourceNames.kafka => "kafka",
TestResourceNames.milvus => "milvus",
TestResourceNames.mongodb => "mongodb",
TestResourceNames.mysql or TestResourceNames.efmysql => "mysql",
Expand Down Expand Up @@ -151,8 +150,7 @@ private static TestResourceNames GetResourcesToSkip()
"oracle" => TestResourceNames.oracledatabase,
"cosmos" => TestResourceNames.cosmos | TestResourceNames.efcosmos,
"eventhubs" => TestResourceNames.eventhubs,
"basicservices" => TestResourceNames.kafka
| TestResourceNames.mongodb
"basicservices" => TestResourceNames.mongodb
| TestResourceNames.rabbitmq
| TestResourceNames.redis
| TestResourceNames.garnet
Expand Down
17 changes: 0 additions & 17 deletions tests/Aspire.EndToEnd.Tests/IntegrationServicesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,6 @@ public Task VerifyCosmosComponentWorks(TestResourceNames resourceName)
return VerifyComponentWorks(resourceName);
}

[Fact]
[Trait("scenario", "basicservices")]
public Task KafkaComponentCanProduceAndConsume()
=> RunTestAsync(async() =>
{
_integrationServicesFixture.EnsureAppHasResources(TestResourceNames.kafka);
string topic = $"topic-{Guid.NewGuid()}";
var response = await _integrationServicesFixture.IntegrationServiceA.HttpGetAsync("http", $"/kafka/produce/{topic}");
var responseContent = await response.Content.ReadAsStringAsync();
Assert.True(response.IsSuccessStatusCode, responseContent);
response = await _integrationServicesFixture.IntegrationServiceA.HttpGetAsync("http", $"/kafka/consume/{topic}");
responseContent = await response.Content.ReadAsStringAsync();
Assert.True(response.IsSuccessStatusCode, responseContent);
});

[Fact]
// Include all the scenarios here so this test gets run for all of them.
[Trait("scenario", "cosmos")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Net.Sockets;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Utils;
using Microsoft.Extensions.DependencyInjection;
using System.Net.Sockets;
using Xunit;

namespace Aspire.Hosting.Tests.Kafka;
namespace Aspire.Hosting.Kafka.Tests;

public class AddKafkaTests
{
[Fact]
Expand Down
18 changes: 18 additions & 0 deletions tests/Aspire.Hosting.Kafka.Tests/Aspire.Hosting.Kafka.Tests.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>$(NetCurrent)</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Aspire.Hosting.Kafka\Aspire.Hosting.Kafka.csproj" />
<ProjectReference Include="..\..\src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj" />
<ProjectReference Include="..\Aspire.Hosting.Tests\Aspire.Hosting.Tests.csproj" />
</ItemGroup>

<ItemGroup>
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Kafka\KafkaContainerImageTags.cs" />
<Compile Include="$(SharedDir)VolumeNameGenerator.cs" Link="Utils\VolumeNameGenerator.cs" />
</ItemGroup>

</Project>
211 changes: 211 additions & 0 deletions tests/Aspire.Hosting.Kafka.Tests/KafkaFunctionalTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Components.Common.Tests;
using Aspire.Hosting.Utils;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;

namespace Aspire.Hosting.Kafka.Tests;

public class KafkaFunctionalTests(ITestOutputHelper testOutputHelper)
{
[Fact]
[RequiresDocker]
public async Task VerifyKafkaResource()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));

var builder = CreateDistributedApplicationBuilder();

var kafka = builder.AddKafka("kafka");

using var app = builder.Build();

await app.StartAsync();

var hb = Host.CreateApplicationBuilder();

hb.Configuration.AddInMemoryCollection(new Dictionary<string, string?>
{
[$"ConnectionStrings:{kafka.Resource.Name}"] = await kafka.Resource.ConnectionStringExpression.GetValueAsync(default)
});

hb.AddKafkaProducer<string, string>("kafka");
hb.AddKafkaConsumer<string, string>("kafka", consumerBuilder =>
{
consumerBuilder.Config.GroupId = "aspire-consumer-group";
consumerBuilder.Config.AutoOffsetReset = AutoOffsetReset.Earliest;
});

using var host = hb.Build();

await host.StartAsync();

var topic = "test-topic";
var producer = host.Services.GetRequiredService<IProducer<string, string>>();
for (var i = 0; i < 10; i++)
{
await producer.ProduceAsync(topic, new Message<string, string> { Key = "test-key", Value = $"test-value{i}" });
}

var consumer = host.Services.GetRequiredService<IConsumer<string, string>>();
consumer.Subscribe(topic);
for (var i = 0; i < 10; i++)
{
var result = consumer.Consume(cts.Token);

Assert.Equal($"test-key", result.Message.Key);
Assert.Equal($"test-value{i}", result.Message.Value);
}
}

[Theory]
[ActiveIssue("https://github.com/dotnet/aspire/issues/4909")]
[InlineData(true)]
[InlineData(false)]
[RequiresDocker]
public async Task WithDataShouldPersistStateBetweenUsages(bool useVolume)
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
var topic = "test-topic";
string? volumeName = null;
string? bindMountPath = null;

try
{
var builder1 = CreateDistributedApplicationBuilder();
var kafka1 = builder1.AddKafka("kafka");

if (useVolume)
{
// Use a deterministic volume name to prevent them from exhausting the machines if deletion fails
volumeName = VolumeNameGenerator.CreateVolumeName(kafka1, nameof(WithDataShouldPersistStateBetweenUsages));

// if the volume already exists (because of a crashing previous run), try to delete it
DockerUtils.AttemptDeleteDockerVolume(volumeName);
kafka1.WithDataVolume(volumeName);
}
else
{
bindMountPath = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
kafka1.WithDataBindMount(bindMountPath);
}

using (var app = builder1.Build())
{
await app.StartAsync();
try
{
var hb = Host.CreateApplicationBuilder();

hb.Configuration.AddInMemoryCollection(new Dictionary<string, string?>
{
[$"ConnectionStrings:{kafka1.Resource.Name}"] = await kafka1.Resource.ConnectionStringExpression.GetValueAsync(default)
});

hb.AddKafkaProducer<string, string>("kafka");

using (var host = hb.Build())
{
await host.StartAsync();

var producer = host.Services.GetRequiredService<IProducer<string, string>>();
for (var i = 0; i < 10; i++)
{
await producer.ProduceAsync(topic, new Message<string, string> { Key = "test-key", Value = $"test-value{i}" });
}
}
}
finally
{
// Stops the container, or the Volume/mount would still be in use
await app.StopAsync();
}
}

var builder2 = CreateDistributedApplicationBuilder();
var kafka2 = builder2.AddKafka("kafka");

if (useVolume)
{
kafka2.WithDataVolume(volumeName);
}
else
{
kafka2.WithDataBindMount(bindMountPath!);
}

using (var app = builder2.Build())
{
await app.StartAsync();
try
{
var hb = Host.CreateApplicationBuilder();

hb.Configuration.AddInMemoryCollection(new Dictionary<string, string?>
{
[$"ConnectionStrings:{kafka2.Resource.Name}"] = await kafka2.Resource.ConnectionStringExpression.GetValueAsync(default)
});

hb.AddKafkaConsumer<string, string>("kafka", consumerBuilder =>
{
consumerBuilder.Config.GroupId = "aspire-consumer-group";
consumerBuilder.Config.AutoOffsetReset = AutoOffsetReset.Earliest;
});

using (var host = hb.Build())
{
await host.StartAsync();

var consumer = host.Services.GetRequiredService<IConsumer<string, string>>();
consumer.Subscribe(topic);
for (var i = 0; i < 10; i++)
{
var result = consumer.Consume(cts.Token);

Assert.Equal($"test-key", result.Message.Key);
Assert.Equal($"test-value{i}", result.Message.Value);
}
}
}
finally
{
// Stops the container, or the Volume/mount would still be in use
await app.StopAsync();
}
}
}
finally
{
if (volumeName is not null)
{
DockerUtils.AttemptDeleteDockerVolume(volumeName);
}

if (bindMountPath is not null)
{
try
{
File.Delete(bindMountPath);
}
catch
{
// Don't fail test if we can't clean the temporary folder
}
}
}
}

private TestDistributedApplicationBuilder CreateDistributedApplicationBuilder()
{
var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry();
builder.Services.AddXunitLogging(testOutputHelper);
return builder;
}
}
2 changes: 0 additions & 2 deletions tests/Aspire.Hosting.Tests/Aspire.Hosting.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
<ProjectReference Include="..\..\src\Aspire.Hosting.MongoDB\Aspire.Hosting.MongoDB.csproj" IsAspireProjectResource="false" />
<ProjectReference Include="..\..\src\Aspire.Hosting.MySql\Aspire.Hosting.MySql.csproj" IsAspireProjectResource="false" />
<ProjectReference Include="..\..\src\Aspire.Hosting.Nats\Aspire.Hosting.Nats.csproj" IsAspireProjectResource="false" />
<ProjectReference Include="..\..\src\Aspire.Hosting.Qdrant\Aspire.Hosting.Qdrant.csproj" IsAspireProjectResource="false" />
<ProjectReference Include="..\..\src\Aspire.Hosting.Testing\Aspire.Hosting.Testing.csproj" />
<ProjectReference Include="..\..\src\Aspire.Hosting.Python\Aspire.Hosting.Python.csproj" IsAspireProjectResource="false" />
<ProjectReference Include="..\..\src\Aspire.Hosting.Valkey\Aspire.Hosting.Valkey.csproj" IsAspireProjectResource="false" />
Expand All @@ -55,7 +54,6 @@

<Compile Include="$(TestsSharedDir)Logging\*.cs" LinkBase="shared/Logging" />
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Garnet\GarnetContainerImageTags.cs" />
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Kafka\KafkaContainerImageTags.cs" />
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Nats\NatsContainerImageTags.cs" />
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Milvus\MilvusContainerImageTags.cs" />
<Compile Include="$(RepoRoot)src\Aspire.Hosting.MongoDB\MongoDBContainerImageTags.cs" />
Expand Down
44 changes: 0 additions & 44 deletions tests/Aspire.Hosting.Tests/ManifestGenerationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -306,25 +306,6 @@ public void EnsureAllRabbitMQManifestTypesHaveVersion0Suffix()
Assert.Equal("container.v0", server.GetProperty("type").GetString());
}

[Fact]
public void EnsureAllKafkaManifestTypesHaveVersion0Suffix()
{
using var program = CreateTestProgramJsonDocumentManifestPublisher();

program.AppBuilder.AddKafka("kafkacontainer");

// Build AppHost so that publisher can be resolved.
program.Build();
var publisher = program.GetManifestPublisher();

program.Run();

var resources = publisher.ManifestDocument.RootElement.GetProperty("resources");

var server = resources.GetProperty("kafkacontainer");
Assert.Equal("container.v0", server.GetProperty("type").GetString());
}

[Fact]
public void NodeAppIsExecutableResource()
{
Expand Down Expand Up @@ -508,7 +489,6 @@ public void VerifyTestProgramFullManifest()
"ConnectionStrings__rabbitmq": "{rabbitmq.connectionString}",
"ConnectionStrings__mymongodb": "{mymongodb.connectionString}",
"ConnectionStrings__freepdb1": "{freepdb1.connectionString}",
"ConnectionStrings__kafka": "{kafka.connectionString}",
"ConnectionStrings__cosmos": "{cosmos.connectionString}",
"ConnectionStrings__eventhubns": "{eventhubns.connectionString}",
"ConnectionStrings__milvus": "{milvus.connectionString}"
Expand Down Expand Up @@ -685,30 +665,6 @@ public void VerifyTestProgramFullManifest()
"type": "value.v0",
"connectionString": "{oracledatabase.connectionString}/freepdb1"
},
"kafka": {
"type": "container.v0",
"connectionString": "{kafka.bindings.tcp.host}:{kafka.bindings.tcp.port}",
"image": "{{TestConstants.AspireTestContainerRegistry}}/{{KafkaContainerImageTags.Image}}:{{KafkaContainerImageTags.Tag}}",
"env": {
"KAFKA_LISTENERS": "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT",
"KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://{kafka.bindings.tcp.host}:29092,PLAINTEXT_HOST://{kafka.bindings.tcp.host}:{kafka.bindings.tcp.port},PLAINTEXT_INTERNAL://{kafka.bindings.internal.host}:{kafka.bindings.internal.port}"
},
"bindings": {
"tcp": {
"scheme": "tcp",
"protocol": "tcp",
"transport": "tcp",
"targetPort": 9092
},
"internal": {
"scheme": "tcp",
"protocol": "tcp",
"transport": "tcp",
"targetPort": 9093
}
}
},
"cosmos": {
"type": "azure.bicep.v0",
"connectionString": "{cosmos.secretOutputs.connectionString}",
Expand Down
Loading

0 comments on commit 4374062

Please sign in to comment.