Open steverao opened 2 years ago
@steverao Hello everyone, I used testcontainers in the project I participated in the flink-hackthon before, this is the relevant code: flink-hackthon-test
Background of the project: lightweight-operator
I think we can use generic inheritance to classify a type of Container for Containers of different modules
In this picture, GenernicContainer is the parent container of testcontainer, and there are many submodules DebeziumMongoDbContainer, CSContainer, DebeziumMysqlContainer under it. If we need to write the code of DebeziumMysqlContainer, it is as follows:
public class DebeziumContainer <SELF extends DebeziumContainer<SELF>> extends ChaosContainer<SELF> {
private static final String DEBEZIUM_VERSION = ContainerImageVersions.getStableVersion("debezium/connect");
private static final int KAFKA_CONNECT_PORT = 8083;
private static final String TEST_PROPERTY_PREFIX = "debezium.test.";
public static final MediaType JSON = MediaType.get("application/json; charset=utf-8");
protected static final ObjectMapper MAPPER = new ObjectMapper();
protected static final OkHttpClient CLIENT = new OkHttpClient();
public DebeziumContainer(final DockerImageName containerImage,String clusterName) {
super(containerImage, clusterName);
defaultConfig();
}
public DebeziumContainer(final String containerImageName,String clusterName) {
super(DockerImageName.parse(containerImageName), clusterName);
defaultConfig();
}
private void defaultConfig() {
setWaitStrategy(
new LogMessageWaitStrategy()
.withRegEx(".*Session key updated.*")
.withStartupTimeout(Duration.of(60, SECONDS)));
withEnv("GROUP_ID", "1");
withEnv("CONFIG_STORAGE_TOPIC", "debezium_connect_config");
withEnv("OFFSET_STORAGE_TOPIC", "debezium_connect_offsets");
withEnv("STATUS_STORAGE_TOPIC", "debezium_connect_status");
withEnv("CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE", "false");
withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "false");
withExposedPorts(KAFKA_CONNECT_PORT);
}
.....
}
test
@Test
public void testKafkaStart(){
try (
KafkaContainer kafkaContainer = new KafkaContainer(KAFKA_TEST_IMAGE)
.withEmbeddedZookeeper()
.withNetwork(network);
GenericContainer<?> zookeeper = new GenericContainer<>(ZOOKEEPER_TEST_IMAGE)
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
GenericContainer<?> application = new GenericContainer<>(DockerImageName.parse("alpine"))
.withNetwork(network)
// }
.withNetworkAliases("dummy")
.withCommand("sleep 10000")
)
{
zookeeper.start();
kafkaContainer.start();
application.start();
}
}
public void testDebeziumMySqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
GenericContainer<?> zookeeper = new GenericContainer<>(ZOOKEEPER_TEST_IMAGE)
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
KafkaContainer kafkaContainer = new KafkaContainer(KAFKA_TEST_IMAGE)
.withExternalZookeeper("zookeeper:2181")
.withStartupTimeout(Duration.ofHours(1))
.withNetworkAliases("kafka01")
.withNetwork(network);
GenericContainer<?> application = new GenericContainer<>(DockerImageName.parse("alpine"))
.withNetwork(network)
// }
.withNetworkAliases("dummy")
.withCommand("sleep 1000000000");
zookeeper.start();
// kafkaContainer.start();
//setup mysql server and init database
MySQLContainer mySQLContainer = new MySQLContainer(MYSQL_IMAGE)
.withConfigurationOverride("docker.mysql/my.cnf")
.withSetupSQL("docker.mysql/setup.sql")
.withUsername("windwheel")
.withPassword("knxy0616")
.withDatabaseName("inventory");
// mySQLContainer.start();
// setup debezium server
DebeziumContainer debeziumContainer = new DebeziumContainer(DEBEZIUM_IMAGE,"debezium-connector")
.withNetwork(network)
.withMinimumRunningDuration(Duration.ofHours(1))
// .withStartupAttempts(100)
.withStartupTimeout(Duration.ofHours(1))
.withKafka(kafkaContainer)
.dependsOn(kafkaContainer);
Startables.deepStart(Stream.of(kafkaContainer,mySQLContainer,debeziumContainer)).join();
// setup mock dynamic datasource
DebeziumMySqlByKafkaSourceTest sourceTester = new DebeziumMySqlByKafkaSourceTest(
kafkaContainer,
converterClassName);
sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);
sourceTester.setServiceContainer(mySQLContainer);
final int numEntriesToInsert = sourceTester.getNumEntriesToInsert();
sourceTester.prepareSource();
for (int i = 1; i <= numEntriesToInsert; i++) {
// prepare insert event
sourceTester.prepareInsertEvent();
log.info("inserted entry {} of {}", i, numEntriesToInsert);
// validate the source insert event
sourceTester.validateSourceResult(1, SourceTester.INSERT, converterClassName);
}
// prepare update event
sourceTester.prepareUpdateEvent();
// validate the source update event
sourceTester.validateSourceResult(numEntriesToInsert, SourceTester.UPDATE, converterClassName);
// prepare delete event
sourceTester.prepareDeleteEvent();
ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(mySQLContainer)
.with("database.server.name", "dbserver1.inventory.products");
KafkaConsumer<String, String> consumer = getConsumer(kafkaContainer);
consumer.subscribe(Arrays.asList("dbserver1."));
}
...
Similar cases can refer to pulsar-integration: pulsar-integration
1.当前SCA某个组件版本升级以后,不太容易测试该组件对SCA中其他组件依赖的冲突影响情况,这一块能力对于提高项目整体测试效率具有比较重要意义。 2.当前社区采用的方案,该方案测试用户不太全面。