声明一个简单的集成流

通常来讲, Spring Integration可以创建集成流,通过集成流,应用程序能够接收或向应用程序之外的资源发送数据。应用程序可能集成的资源之一就是文件系统。因此,Spring Integration的很多组件都有读入和写入文件的通道适配器(channel adapter)。

为了熟悉Spring Integration,我们会创建一个集成流,这个流会写入数据到文件系统中。首先,需要添加Spring Integration到项目的构建文件中。对于Maven构建来讲,必要的依赖如下所示:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
</dependency>

第一项依赖是Spring Integration的Spring Boot starter。不管我们与哪种流进行交互,对于Spring Integration流的开发来讲,这个依赖都是必需的。与所有的Spring Boot starter一样,在Initializr表单中,这个依赖也可以通过复选框选择。

第二项依赖是Spring Integration的文件端点模块。这个模块是与外部系统集成的20余个模块之一。我们会在10.2.9小节中更加详细地讨论端点模块。但是目前,我们只需要知道,文件端点模块提供了将文件从文件系统导入集成流和将流中的数据写入文件系统的能力。

接下来,我们需要为应用创建一种方法,让它能够发送数据到集成流中,这样它才能写入文件。为了实现这一点,我们需要创建一个网关接口,这样的网关接口如程序清单10.1所示。

程序清单10.1 将方法调用转换成消息的消息网关接口
package sia6;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "textInChannel")    ⇽---声明消息网关
public interface FileWriterGateway {

  void writeToFile(
       @Header(FileHeaders.FILENAME) String filename,    ⇽---写入文件
       String data);

}

尽管这只是一个很简单的Java接口,但是关于FileWriterGateway,有很多东西需要介绍。我们首先看到,它使用了@MessagingGateway注解。这个注解会告诉Spring Integration要在运行时生成该接口的实现,这与Spring Data在运行时生成存储库接口的实现非常类似。其他地方的代码在希望写入文件时将会调用它。

@MessagingGateway的defaultRequestChannel属性表明接口方法调用时所返回的消息要发送至给定的消息通道(message channel)。在本例中,我们声明调用writeToFile()所形成的消息应该发送至名为textInChannel的通道中。

对于writeToFile()方法来说,它以String类型的形式接受一个文件名,另外一个String包含了要写入文件的文本。关于这个方法的签名,还需要注意filename参数上带有@Header。在本例中,@Header注解表明传递给filename的值应该包含在消息头信息中(通过FileHeaders.FILENAME声明,它将会被解析成file_name),而不是放到消息载荷(payload)中。

现在,我们已经有了消息网关,接下来就需要配置集成流了。尽管我们往构建文件中添加的Spring Integration starter依赖能够启用Spring Integration的自动配置功能,但是满足应用需求的流定义则需要我们自行编写额外的配置。在声明集成流方面,我们有3种配置方案可供选择:

  • XML配置;

  • Java配置;

  • 使用DSL的Java配置。

我们会依次了解Spring Integration的这3种配置风格,从较为老式的XML配置开始。

使用XML定义集成流

尽管在本书中,我尽量避免使用XML配置,但是Spring Integration有使用XML定义集成流的漫长历史。所以,我认为至少展现一个XML定义集成流的样例还是很有价值的。程序清单10.2展现了如何使用XML配置示例集成流。

程序清单10.2 使用Spring XML配置定义集成流
<?xml version="1.0" encoding = "UTF-8"?><beans
     xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-file="http://www.springframework.org/schema/integration/file"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-
     integration-file.xsd">

    <int:channel id="textInChannel" />    ⇽---声明textInChannel

    <int:transformer id="upperCase"
        input-channel="textInChannel"
        output-channel="fileWriterChannel"
        expression="payload.toUpperCase()" />    ⇽---转换文本

    <int:channel id="fileWriterChannel" />    ⇽---声明fileWriterChannel

    <int-file:outbound-channel-adapter id="writer"
        channel="fileWriterChannel"
        directory="/tmp/sia6/files"
        mode="APPEND"
        append-new-line="true" />    ⇽---将文本写入文件

</beans>

将程序清单10.2中的XML拆分讲解一下。

  • 我们首先配置了一个名为textInChannel的通道。可以发现,它就是FileWriterGateway的请求通道。当FileWriterGateway的writeToFile()方法被调用的时候,结果形成的消息会发布到这个通道上。

  • 我们还配置了一个转换器(transformer),它会从textInChannel接收消息。它使用Spring表达式语言(Spring Expression Language, SpEL)为消息载荷调用toUpperCase()方法。进行大写操作之后的结果会发布到fileWriterChannel上。

  • 随后,我们配置了名为fileWriterChannel的通道。这个通道会作为一根导线,将转换器与出站通道适配器(outbound channel adapter)连接在一起。

  • 最后,我们使用int-file命名空间配置了出站通道适配器。这个XML命名空间是由Spring Integration的文件模块提供的,实现文件写入的功能。按照我们的配置,它从fileWriterChannel接收消息,并将消息的载荷写入一个文件,这个文件的名称是由消息头信息中的file_name属性指定的,而存入的目录则是由这里的directory属性指定的。如果文件已经存在,会以新行的方式进行追加文件内容,而不会覆盖原文件。

图10.1使用Enterprise Integration Patterns中的图形元素样式阐述了这个流。

image 2024 03 13 23 58 49 138
Figure 1. 图10.1 文件写入器的集成流

这个流包含了5个组件:一个网关、两个通道、一个转换器和一个通道适配器。能够组装到集成流中的组件有很多,这只是其中很少的一部分。我们会在10.2节讨论这些组件以及Spring Integration支持的其他组件。

如果想要在Spring Boot应用中使用XML配置,需要将XML作为源导入Spring应用。最简单的实现方式就是在应用的某个Java配置类上使用Spring的@ImportResource注解:

@Configuration
@ImportResource("classpath:/filewriter-config.xml")
public class FileWriterIntegrationConfig { ... }

尽管基于XML的配置能够很好地用于Spring Integration,但是大多数的开发人员对于XML的使用越来越谨慎。(正如我所言,在本书中,我会尽量避免使用XML配置。)现在,我们抛开尖括号,看一下Spring Integration的Java配置风格。

使用Java配置集成流

大多数的现代Spring应用程序都会避免使用XML配置,而更加青睐Java配置。实际上,在Spring Boot应用中,Java配置是自动化配置功能更自然的补充形式。因此,如果要为Spring Boot应用添加集成流,最好使用Java来定义流程。

程序清单10.3展示了使用Java配置编写集成流的一个样例。这里的代码依然是功能相同的文件写入集成流,但是这次我们使用Java来实现。

程序清单10.3 使用Java配置来定义集成流
package sia6;

import java.io.File;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.transformer.GenericTransformer;

@Configuration
public class FileWriterIntegrationConfig {

  @Bean
  @Transformer(inputChannel = "textInChannel",    ⇽---声明转换器
               outputChannel = "fileWriterChannel")
  public GenericTransformer<String, String> upperCaseTransformer() {
    return text -> text.toUpperCase();
  }

  @Bean
  @ServiceActivator(inputChannel = "fileWriterChannel")
  public FileWritingMessageHandler fileWriter() {    ⇽---声明文件写入器
    FileWritingMessageHandler handler =
        new FileWritingMessageHandler(new File("/tmp/sia6/files"));
    handler.setExpectReply(false);
    handler.setFileExistsMode(FileExistsMode.APPEND);
    handler.setAppendNewLine(true);
    return handler;
  }

}

在Java配置中,我们声明了两个bean:一个转换器和一个文件写入消息处理器。这里的转换器是GenericTransformer。因为GenericTransformer是函数式接口,所以我们可以使用lambda表达式为其提供实现,这里调用了消息文本的toUpperCase()方法。我们为转换器bean使用了@Transformer注解,这样会将其声明成集成流中的一个转换器。它接受来自textInChannel通道的消息,然后将消息写入名为fileWriterChannel的通道。

而负责文件写入的bean则使用了@ServiceActivator注解,表明它会接受来自fileWriterChannel的消息,并且会将消息传递给FileWritingMessageHandler实例所定义的服务。FileWritingMessageHandler是一个消息处理器,可以将消息的载荷写入特定目录下的文件,而文件的名称是通过消息的file_name头信息指定的。与XML样例类似,FileWritingMessageHandler也配置为以新行的方式为文件追加内容。

FileWritingMessageHandler bean的一个独特之处在于它调用了setExpectReply(false)方法,能够通过这个方法告知服务激活器(service activator)不要期望存在答复通道(reply channel,通过这样的通道,我们可以将某个值返回到流中的上游组件)。如果我们不调用setExpectReply(false),那么文件写入bean的默认值是true,尽管管道的功能和预期一样,但是在日志中会看到一些错误信息,提示我们没有设置答复通道。

你会发现,我们在这里没有必要显式声明通道。如果名为textInChannel和fileWriterChannel的bean不存在,这两个通道将会自动创建。但是,如果想要更加精确地控制通道如何配置,可以按照如下的方式显式构建这些bean:

@Bean
public MessageChannel textInChannel() {
  return new DirectChannel();
}
...
@Bean
public MessageChannel fileWriterChannel() {
  return new DirectChannel();
}

基于Java的配置方案可能更易于阅读、更简洁,也符合我在本书中倡导的纯Java配置风格。但是,如果使用Spring Integration的Java DSL配置风格,配置过程可以更加流畅。

使用Spring Integration的DSL配置

我们再次尝试文件写入集成流的定义。这一次,我们依然使用Java进行定义,但是会使用Spring Integration的Java DSL。我们不再将流中的每个组件都声明为单独的bean,而是使用一个bean来定义整个流,如程序清单10.4所示。

程序清单10.4 为集成流的设计提供一个流畅的API
package sia6;

import java.io.File;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.support.FileExistsMode;
@Configuration
public class FileWriterIntegrationConfig {

  @Bean
  public IntegrationFlow fileWriterFlow() {
    return IntegrationFlows
        .from(MessageChannels.direct("textInChannel"))    ⇽---入站通道
        .<String, String>transform(t -> t.toUpperCase())    ⇽---声明转换器
        .handle(Files    ⇽---处理文件写入
             .outboundAdapter(new File("/tmp/sia6/files"))
             .fileExistsMode(FileExistsMode.APPEND)
             .appendNewLine(true))
         .get();
  }

}

这种新的配置方式在一个bean方法中定义了整个流,做到了尽可能简洁。IntegrationFlows类初始化构建器API,我们可以通过这个API来定义流。

在程序清单10.4中,我们首先从名为textInchannel的通道接收消息,然后,消息进入一个转换器,这个转换器会将消息载荷转换成大写形式。在转换器之后,消息会交由出站通道适配器处理,这个适配器是由Spring Integration file模块的Files类型创建的。最后,通过对get()的调用返回要构建的IntegrationFlow。简言之,这个bean方法定义了与XML和Java配置样例相同的集成流。

你可能已经发现,与Java配置样例类似,我们不需要显式声明通道bean。我们引用了textInChannel,如果该名字对应的通道不存在,Spring Integration会自动创建它。不过,我们也可以显式声明bean。

对于连接转换器和出站通道适配器的通道,我们甚至没有通过名字引用它。如果需要显式配置通道,可以在流定义的时候,通过调用channel()来引用它的名称:

@Bean
public IntegrationFlow fileWriterFlow() {
  return IntegrationFlows
      .from(MessageChannels.direct("textInChannel"))
      .<String, String>transform(t -> t.toUpperCase())
      .channel(MessageChannels.direct("FileWriterChannel"))
      .handle(Files
          .outboundAdapter(new File("/tmp/sia6/files"))
          .fileExistsMode(FileExistsMode.APPEND)
          .appendNewLine(true))
      .get();
}

使用Spring Integration的Java DSL(与其他的fluent API类似)时,必须要巧妙地使用空格来保持可读性。在这里的样例中,我小心翼翼地使用缩进来保证代码块的可读性。对于更长、更复杂的流,我们甚至可以考虑将流的一部分抽取到单独的方法或子流中,以实现更好的可读性。

现在,我们已经看到了如何使用3种不同的方式来定义一个简单的流,接下来,我们回过头来看一下Spring Integration的全景。