反应式地持久化Cassandra数据

要对Cassandra数据库进行反应式持久化,首先需要在项目构建文件中添加以下starter依赖。这个依赖关系会代替我们之前使用的Mongo或R2DBC依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-cassandra-reactive</artifactId>
</dependency>

然后,需要声明Cassandra键空间(keyspace)的细节和模式管理方式。在application.yml文件中,添加以下的代码:

spring:
  data:
    rest:
      base-path: /data-api
    cassandra:
      keyspace-name: tacocloud
      schema-action: recreate
      local-datacenter: datacenter1

这与第4章中使用非反应式Cassandra存储库的YAML配置大同小异。需要注意keyspace-name属性,因为我们需要在Cassandra集群中创建一个使用该名称的键空间。

我们需要在本地机器上运行一个Cassandra集群并监听9042端口。最简单的方法是使用Docker,如下所示:

$ docker network create cassandra-net
$ docker run --name my-cassandra --network cassandra-net \
         -p 9042:9042 -d cassandra:latest

如果Cassandra 集群运行在其他机器或端口上,我们需要在 application.yml 中指定联系点(contact point)和端口,这些内容我们在第4章已经讲解过。如果需要创建键空间,请运行CQL shell并使用create keyspace命令,如下所示:

$ docker run -it --network cassandra-net --rm cassandra cqlsh my-cassandra
cqlsh> create keyspace tacocloud
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

我们现在已经有了Cassandra集群和名为tacocloud的键空间,而且项目中包含了Spring Data Cassandra Reactive starter依赖,接下来就可以定义领域类了。

定义使用Cassandra持久化的领域类

与Mongo的持久化一样,无论选择反应式还是非反应式的Cassandra持久化,领域类的定义都毫无区别。我们使用的Ingredient、Taco和TacoOrder领域类都与第4章中创建的相同。程序清单13.17展示了基于Cassandra注解的Ingredient类。

程序清单13.17 添加Cassandra持久化注解的Ingredient类
package tacos;

import org.springframework.data.cassandra.core.mapping.PrimaryKey;
import org.springframework.data.cassandra.core.mapping.Table;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PRIVATE, force = true)
@Table("ingredients")
public class Ingredient {

  @PrimaryKey
  private String id;
  private String name;
  private Type type;

  public enum Type {
    WRAP, PROTEIN, VEGGIES, CHEESE, SAUCE
  }

}

至于Taco类,它添加Cassandra持久化注解之后如程序清单13.18所示。

程序清单13.18 添加Cassandra持久化注解的Taco类
package tacos;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;

import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;

import org.springframework.data.cassandra.core.cql.Ordering;
import org.springframework.data.cassandra.core.cql.PrimaryKeyType;
import org.springframework.data.cassandra.core.mapping.Column;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn;
import org.springframework.data.cassandra.core.mapping.Table;
import org.springframework.data.rest.core.annotation.RestResource;

import com.datastax.oss.driver.api.core.uuid.Uuids;

import lombok.Data;

@Data
@RestResource(rel = "tacos", path = "tacos")
@Table("tacos")
public class Taco {

  @PrimaryKeyColumn(type = PrimaryKeyType.PARTITIONED)
  private UUID id = Uuids.timeBased();

  @NotNull
  @Size(min = 5, message = "Name must be at least 5 characters long")
  private String name;

  @PrimaryKeyColumn(type = PrimaryKeyType.CLUSTERED,
                     ordering = Ordering.DESCENDING)
  private Date createdAt = new Date();

  @Size(min = 1, message = "You must choose at least 1 ingredient")
  @Column("ingredients")
  private List<IngredientUDT> ingredients = new ArrayList<>();

  public void addIngredient(Ingredient ingredient) {
      this.ingredients.add(new IngredientUDT(ingredient.getName(),
     ingredient.getType()));
  }

}

由于Taco通过一个用户自定义(user-defined)类型来引用Ingredient对象,所以我们还需要一个IngredientUDT类,如程序清单13.19所示。

程序清单13.19 添加Cassandra持久化注解的IngredientUDT类
package tacos;

import org.springframework.data.cassandra.core.mapping.UserDefinedType;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PRIVATE, force = true)
@UserDefinedType("ingredient")
public class IngredientUDT {
  private String name;
  private Ingredient.Type type;
}

最后,TacoOrder类添加Cassandra持久化注解之后如程序清单13.20所示。

程序清单13.20 添加Cassandra持久化注解的TacoOrder类
package tacos;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;

import org.springframework.data.cassandra.core.mapping.Column;
import org.springframework.data.cassandra.core.mapping.PrimaryKey;
import org.springframework.data.cassandra.core.mapping.Table;

import com.datastax.oss.driver.api.core.uuid.Uuids;

import lombok.Data;

@Data
@Table("tacoorders")
public class TacoOrder implements Serializable {
  private static final long serialVersionUID = 1L;

  @PrimaryKey
  private UUID id = Uuids.timeBased();
  private Date placedAt = new Date();

  @Column("user")
  private UserUDT user;

  private String deliveryName;

  private String deliveryStreet;

  private String deliveryCity;

  private String deliveryState;

  private String deliveryZip;

  private String ccNumber;

  private String ccExpiration;

  private String ccCVV;
  @Column("tacos")
  private List<TacoUDT> tacos = new ArrayList<>();

  public void addTaco(Taco taco) {
    this.addTaco(new TacoUDT(taco.getName(), taco.getIngredients()));
  }

  public void addTaco(TacoUDT tacoUDT) {
    this.tacos.add(tacoUDT);
  }

}

就像Taco通过用户自定义类型引用Ingredient,TacoOrder会通过TacoUDT类引用Taco,如程序清单13.21所示。

程序清单13.21 添加Cassandra持久化注解的TacoUDT类
package tacos;

import java.util.List;

import org.springframework.data.cassandra.core.mapping.UserDefinedType;

import lombok.Data;

@Data
@UserDefinedType("taco")
public class TacoUDT {

  private final String name;
  private final List<IngredientUDT> ingredients;

}

需要重申,这些模型类与非反应式的代码是完全相同的。我在这里再次展示它们,是为了让你不必翻到第4章来查阅。

现在,我们来创建持久化这些对象的存储库。

创建反应式Cassandra存储库

你可能已经在期待反应式Cassandra存储库和对应的非反应式存储库大同小异了——毕竟,如果真的是这样,那就太好了!你应该已经感受到了,无论存储库是否是反应式的,Spring Data都会尽可能保持编程模型相似。

你可能已经料想到,使存储库变为反应式的关键点在于接口要扩展ReactiveCrudRepository,如IngredientRepository接口所示:

package tacos.data;

import org.springframework.data.repository.reactive.ReactiveCrudRepository;

import tacos.Ingredient;

public interface IngredientRepository
         extends ReactiveCrudRepository<Ingredient, String> {

}

显而易见,OrderRepository接口也是如此:

package tacos.data;

import java.util.UUID;

import org.springframework.data.domain.Pageable;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;

import reactor.core.publisher.Flux;
import tacos.TacoOrder;
import tacos.User;

public interface OrderRepository
         extends ReactiveCrudRepository<TacoOrder, UUID> {

  Flux<TacoOrder> findByUserOrderByPlacedAtDesc(
          User user, Pageable pageable);

}

实际上,这些存储库不仅让人联想到它们的非反应式版本,而且与我们已经编写的MongoDB存储库也没有很大的区别。除了Cassandra对TacoOrder使用UUID而不是String作为ID类型,它们几乎是相同的。这再次证明了Spring Data项目(在可行的前提下)采用的一致性原则。

我们通过编写几个测试来验证它们能够正常运行,从而结束我们对反应式Cassandra存储库的研究。

测试反应式Cassandra存储库

测试反应式 Cassandra 存储库的方式与测试反应式 MongoDB 存储库很相似,这可能也在你的意料之中。例如,请观察程序清单13.22中的IngredientRepositoryTest,你是否能发现它与程序清单13.15的区别?

程序清单13.22 测试Cassandra IngredientRepository
package tacos.data;

import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.cassandra
     .DataCassandraTest;

import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import tacos.Ingredient;
import tacos.Ingredient.Type;

@DataCassandraTest
public class IngredientRepositoryTest {

  @Autowired
  IngredientRepository ingredientRepo;

  @BeforeEach
  public void setup() {
      Flux<Ingredient> deleteAndInsert = ingredientRepo.deleteAll()
          .thenMany(ingredientRepo.saveAll(
              Flux.just(
                  new Ingredient("FLTO", "Flour Tortilla", Type.WRAP),
                  new Ingredient("GRBF", "Ground Beef", Type.PROTEIN),
                  new Ingredient("CHED", "Cheddar Cheese", Type.CHEESE)
           )));

      StepVerifier.create(deleteAndInsert)
                  .expectNextCount(3)
                  .verifyComplete();
  }

  @Test
  public void shouldSaveAndFetchIngredients() {

      StepVerifier.create(ingredientRepo.findAll())
          .recordWith(ArrayList::new)
          .thenConsumeWhile(x -> true)
          .consumeRecordedWith(ingredients -> {
            assertThat(ingredients).hasSize(3);
            assertThat(ingredients).contains(
                new Ingredient("FLTO", "Flour Tortilla", Type.WRAP));
            assertThat(ingredients).contains(
                new Ingredient("GRBF", "Ground Beef", Type.PROTEIN));
            assertThat(ingredients).contains(
                new Ingredient("CHED", "Cheddar Cheese", Type.CHEESE));
          })
          .verifyComplete();

       StepVerifier.create(ingredientRepo.findById("FLTO"))
            .assertNext(ingredient -> {
                ingredient.equals(new Ingredient("FLTO", "Flour Tortilla",
       Type.WRAP));
         });
  }
}

发现它们的差异了吗?MongoDB版本的测试使用的是@DataMongoTest注解,而这个Cassandra版本使用的是@DataCassandraTest注解。仅此而已!除此之外,这些测试都是相同的。

OrderRepositoryTest的情况也是如此,除了使用@DataCassandraTest替换@DataMongoTest,其他部分都是相同的:

@DataCassandraTest
public class OrderRepositoryTest {
   ...
}

各个Spring Data项目之间的一致性甚至延伸到了测试的编写方式。这样一来,如果我们的众多项目使用了不同的持久化技术,在它们之间进行切换就会非常容易,不需要我们过多考虑它们分别是如何开发的。

小结

  • Spring Data支持各种数据库类型的反应式持久化操作,包括关系型数据库(使用R2DBC)、MongoDB和Cassandra。

  • Spring Data R2DBC为关系型数据库的持久化操作提供了一个反应式方案,但它目前还不能直接支持领域类中的关联。

  • 由于缺乏对直接关联的支持,Spring Data R2DBC存储库需要我们采用不同的方法来设计领域模型和相关的数据库表。

  • Spring Data MongoDB和Spring Data Cassandra为编写MongoDB和Cassandra数据库的反应式存储库提供了几乎相同的编程模型。

  • 借助Spring Data测试注解和StepVerifier,我们可以测试由Spring应用上下文自动创建的反应式存储库。