反应式地持久化Cassandra数据
要对Cassandra数据库进行反应式持久化,首先需要在项目构建文件中添加以下starter依赖。这个依赖关系会代替我们之前使用的Mongo或R2DBC依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra-reactive</artifactId>
</dependency>
然后,需要声明Cassandra键空间(keyspace)的细节和模式管理方式。在application.yml文件中,添加以下的代码:
spring:
data:
rest:
base-path: /data-api
cassandra:
keyspace-name: tacocloud
schema-action: recreate
local-datacenter: datacenter1
这与第4章中使用非反应式Cassandra存储库的YAML配置大同小异。需要注意keyspace-name属性,因为我们需要在Cassandra集群中创建一个使用该名称的键空间。
我们需要在本地机器上运行一个Cassandra集群并监听9042端口。最简单的方法是使用Docker,如下所示:
$ docker network create cassandra-net
$ docker run --name my-cassandra --network cassandra-net \
-p 9042:9042 -d cassandra:latest
如果Cassandra 集群运行在其他机器或端口上,我们需要在 application.yml 中指定联系点(contact point)和端口,这些内容我们在第4章已经讲解过。如果需要创建键空间,请运行CQL shell并使用create keyspace命令,如下所示:
$ docker run -it --network cassandra-net --rm cassandra cqlsh my-cassandra
cqlsh> create keyspace tacocloud
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};
我们现在已经有了Cassandra集群和名为tacocloud的键空间,而且项目中包含了Spring Data Cassandra Reactive starter依赖,接下来就可以定义领域类了。
定义使用Cassandra持久化的领域类
与Mongo的持久化一样,无论选择反应式还是非反应式的Cassandra持久化,领域类的定义都毫无区别。我们使用的Ingredient、Taco和TacoOrder领域类都与第4章中创建的相同。程序清单13.17展示了基于Cassandra注解的Ingredient类。
package tacos;
import org.springframework.data.cassandra.core.mapping.PrimaryKey;
import org.springframework.data.cassandra.core.mapping.Table;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PRIVATE, force = true)
@Table("ingredients")
public class Ingredient {
@PrimaryKey
private String id;
private String name;
private Type type;
public enum Type {
WRAP, PROTEIN, VEGGIES, CHEESE, SAUCE
}
}
至于Taco类,它添加Cassandra持久化注解之后如程序清单13.18所示。
package tacos;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import org.springframework.data.cassandra.core.cql.Ordering;
import org.springframework.data.cassandra.core.cql.PrimaryKeyType;
import org.springframework.data.cassandra.core.mapping.Column;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn;
import org.springframework.data.cassandra.core.mapping.Table;
import org.springframework.data.rest.core.annotation.RestResource;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import lombok.Data;
@Data
@RestResource(rel = "tacos", path = "tacos")
@Table("tacos")
public class Taco {
@PrimaryKeyColumn(type = PrimaryKeyType.PARTITIONED)
private UUID id = Uuids.timeBased();
@NotNull
@Size(min = 5, message = "Name must be at least 5 characters long")
private String name;
@PrimaryKeyColumn(type = PrimaryKeyType.CLUSTERED,
ordering = Ordering.DESCENDING)
private Date createdAt = new Date();
@Size(min = 1, message = "You must choose at least 1 ingredient")
@Column("ingredients")
private List<IngredientUDT> ingredients = new ArrayList<>();
public void addIngredient(Ingredient ingredient) {
this.ingredients.add(new IngredientUDT(ingredient.getName(),
ingredient.getType()));
}
}
由于Taco通过一个用户自定义(user-defined)类型来引用Ingredient对象,所以我们还需要一个IngredientUDT类,如程序清单13.19所示。
package tacos;
import org.springframework.data.cassandra.core.mapping.UserDefinedType;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PRIVATE, force = true)
@UserDefinedType("ingredient")
public class IngredientUDT {
private String name;
private Ingredient.Type type;
}
最后,TacoOrder类添加Cassandra持久化注解之后如程序清单13.20所示。
package tacos;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import org.springframework.data.cassandra.core.mapping.Column;
import org.springframework.data.cassandra.core.mapping.PrimaryKey;
import org.springframework.data.cassandra.core.mapping.Table;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import lombok.Data;
@Data
@Table("tacoorders")
public class TacoOrder implements Serializable {
private static final long serialVersionUID = 1L;
@PrimaryKey
private UUID id = Uuids.timeBased();
private Date placedAt = new Date();
@Column("user")
private UserUDT user;
private String deliveryName;
private String deliveryStreet;
private String deliveryCity;
private String deliveryState;
private String deliveryZip;
private String ccNumber;
private String ccExpiration;
private String ccCVV;
@Column("tacos")
private List<TacoUDT> tacos = new ArrayList<>();
public void addTaco(Taco taco) {
this.addTaco(new TacoUDT(taco.getName(), taco.getIngredients()));
}
public void addTaco(TacoUDT tacoUDT) {
this.tacos.add(tacoUDT);
}
}
就像Taco通过用户自定义类型引用Ingredient,TacoOrder会通过TacoUDT类引用Taco,如程序清单13.21所示。
package tacos;
import java.util.List;
import org.springframework.data.cassandra.core.mapping.UserDefinedType;
import lombok.Data;
@Data
@UserDefinedType("taco")
public class TacoUDT {
private final String name;
private final List<IngredientUDT> ingredients;
}
需要重申,这些模型类与非反应式的代码是完全相同的。我在这里再次展示它们,是为了让你不必翻到第4章来查阅。
现在,我们来创建持久化这些对象的存储库。
创建反应式Cassandra存储库
你可能已经在期待反应式Cassandra存储库和对应的非反应式存储库大同小异了——毕竟,如果真的是这样,那就太好了!你应该已经感受到了,无论存储库是否是反应式的,Spring Data都会尽可能保持编程模型相似。
你可能已经料想到,使存储库变为反应式的关键点在于接口要扩展ReactiveCrudRepository,如IngredientRepository接口所示:
package tacos.data;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import tacos.Ingredient;
public interface IngredientRepository
extends ReactiveCrudRepository<Ingredient, String> {
}
显而易见,OrderRepository接口也是如此:
package tacos.data;
import java.util.UUID;
import org.springframework.data.domain.Pageable;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;
import tacos.TacoOrder;
import tacos.User;
public interface OrderRepository
extends ReactiveCrudRepository<TacoOrder, UUID> {
Flux<TacoOrder> findByUserOrderByPlacedAtDesc(
User user, Pageable pageable);
}
实际上,这些存储库不仅让人联想到它们的非反应式版本,而且与我们已经编写的MongoDB存储库也没有很大的区别。除了Cassandra对TacoOrder使用UUID而不是String作为ID类型,它们几乎是相同的。这再次证明了Spring Data项目(在可行的前提下)采用的一致性原则。
我们通过编写几个测试来验证它们能够正常运行,从而结束我们对反应式Cassandra存储库的研究。
测试反应式Cassandra存储库
测试反应式 Cassandra 存储库的方式与测试反应式 MongoDB 存储库很相似,这可能也在你的意料之中。例如,请观察程序清单13.22中的IngredientRepositoryTest,你是否能发现它与程序清单13.15的区别?
package tacos.data;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.cassandra
.DataCassandraTest;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import tacos.Ingredient;
import tacos.Ingredient.Type;
@DataCassandraTest
public class IngredientRepositoryTest {
@Autowired
IngredientRepository ingredientRepo;
@BeforeEach
public void setup() {
Flux<Ingredient> deleteAndInsert = ingredientRepo.deleteAll()
.thenMany(ingredientRepo.saveAll(
Flux.just(
new Ingredient("FLTO", "Flour Tortilla", Type.WRAP),
new Ingredient("GRBF", "Ground Beef", Type.PROTEIN),
new Ingredient("CHED", "Cheddar Cheese", Type.CHEESE)
)));
StepVerifier.create(deleteAndInsert)
.expectNextCount(3)
.verifyComplete();
}
@Test
public void shouldSaveAndFetchIngredients() {
StepVerifier.create(ingredientRepo.findAll())
.recordWith(ArrayList::new)
.thenConsumeWhile(x -> true)
.consumeRecordedWith(ingredients -> {
assertThat(ingredients).hasSize(3);
assertThat(ingredients).contains(
new Ingredient("FLTO", "Flour Tortilla", Type.WRAP));
assertThat(ingredients).contains(
new Ingredient("GRBF", "Ground Beef", Type.PROTEIN));
assertThat(ingredients).contains(
new Ingredient("CHED", "Cheddar Cheese", Type.CHEESE));
})
.verifyComplete();
StepVerifier.create(ingredientRepo.findById("FLTO"))
.assertNext(ingredient -> {
ingredient.equals(new Ingredient("FLTO", "Flour Tortilla",
Type.WRAP));
});
}
}
发现它们的差异了吗?MongoDB版本的测试使用的是@DataMongoTest注解,而这个Cassandra版本使用的是@DataCassandraTest注解。仅此而已!除此之外,这些测试都是相同的。
OrderRepositoryTest的情况也是如此,除了使用@DataCassandraTest替换@DataMongoTest,其他部分都是相同的:
@DataCassandraTest
public class OrderRepositoryTest {
...
}
各个Spring Data项目之间的一致性甚至延伸到了测试的编写方式。这样一来,如果我们的众多项目使用了不同的持久化技术,在它们之间进行切换就会非常容易,不需要我们过多考虑它们分别是如何开发的。
小结
-
Spring Data支持各种数据库类型的反应式持久化操作,包括关系型数据库(使用R2DBC)、MongoDB和Cassandra。
-
Spring Data R2DBC为关系型数据库的持久化操作提供了一个反应式方案,但它目前还不能直接支持领域类中的关联。
-
由于缺乏对直接关联的支持,Spring Data R2DBC存储库需要我们采用不同的方法来设计领域模型和相关的数据库表。
-
Spring Data MongoDB和Spring Data Cassandra为编写MongoDB和Cassandra数据库的反应式存储库提供了几乎相同的编程模型。
-
借助Spring Data测试注解和StepVerifier,我们可以测试由Spring应用上下文自动创建的反应式存储库。