创建电子邮件集成流
我们决定让Taco Cloud允许客户通过电子邮件提交taco设计和创建订单。我们发放传单并在报纸上刊登广告,邀请每个人通过电子邮件发送taco订单。这非常成功!但是,令人遗憾的是,它过于成功了。有太多的电子邮件涌了进来,我们不得不申请雇佣别人阅读所有的电子邮件并将订单提交到订单系统中。
在本节,我们会实现一个集成流,它会轮询Taco Cloud的taco订单的收件箱、解析电子邮件中的订单细节,并将订单提交给Taco Cloud处理。简言之,在我们所创建的集成流中,入站通道适配器将会使用email端点模块将Taco Cloud收件箱中的电子邮件摄取到集成流中。
集成流的下一步是将电子邮件解析为订单对象,这些订单对象会传递给另一个处理器,从而将订单提交至Taco Cloud的REST API中。在这里,我们会像处理其他订单那样处理它们。首先,我们定义一个简单的配置属性类,它会捕获并处理Taco Cloud的电子邮件中的特定信息:
package tacos.email;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;
@Data
@ConfigurationProperties(prefix = "tacocloud.email")
@Component
public class EmailProperties {
private String username;
private String password;
private String host;
private String mailbox;
private long pollRate = 30000;
public String getImapUrl() {
return String.format("imaps://%s:%s@%s/%s",
this.username, this.password, this.host, this.mailbox);
}
}
我们可以看到,EmailProperties会捕获生成IMAP URL的属性。这个流会使用这个URL连接Taco Cloud电子邮件服务器并轮询电子邮件。捕获的属性包括电子邮件用户的用户名和密码、IMAP服务器的主机、要轮询的邮箱,以及轮询的频率(默认为30秒)。
EmailProperties在类级别使用了@ConfigurationProperties注解,并将prefix属性设置为tacocloud.email。这意味着我们可以在application.yml文件中按照下述方式配置使用电子邮件的详细信息:
tacocloud:
email:
host: imap.tacocloud.com
mailbox: INBOX
username: taco-in-flow
password: 1L0v3T4c0s
poll-rate: 10000
当然,这里显示的电子邮件服务器的配置是虚构的。你需要将其调整为你使用的电子邮件服务器。
另外,你可能还会在IDE中看到“unknown property”警告。这是因为IDE会尝试寻找它所需要的元数据,以便了解这些属性的含义。这些警告不会影响实际的代码,如果愿意,你可以忽略它们,也可以在构建文件中添加如下的依赖项(或在Spring Initializr的选项中选中“Spring Configuration Processor”进行添加),从而让它们消失:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
这个依赖提供了为自定义配置属性(比如我们为配置电子邮件服务器细节所定义的属性)自动生成元数据的支持。
现在,使用EmailProperties来配置集成流。我们想要创建的流大致如图10.10所示。

有两种方案可以定义这个流。
-
在Taco Cloud应用中进行定义:在流的结束点,服务激活器要调用我们之前定义的创建订单的存储库。
-
在单独的应用中进行定义:在流的结束点,服务激活器要发送POST请求到Taco Cloud API以提交taco订单。
方案的选择会影响服务激活器的实现方式,但对流本身的影响并不大。但是,因为我们需要一些表示taco、订单和配料的类型,并且它们与Taco Cloud主应用可能会略有差异,所以我们会在单独的应用中定义集成流,避免与已有的领域类型相混淆。
我们可以选择使用XML配置、Java配置或Java DSL来定义流。我喜欢DSL的优雅,所以在这里会使用这种方案。如果你想要一些额外的挑战,可以选择其他配置风格编写流的定义。现在,我们看一下电子邮件订单流的Java DSL配置,如程序清单10.5所示。
package tacos.email;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.mail.dsl.Mail;
@Configuration
public class TacoOrderEmailIntegrationConfig {
@Bean
public IntegrationFlow tacoOrderEmailFlow(
EmailProperties emailProps,
EmailToOrderTransformer emailToOrderTransformer,
OrderSubmitMessageHandler orderSubmitHandler) {
return IntegrationFlows
.from(Mail.imapInboundAdapter(emailProps.getImapUrl()),
e -> e.poller(
Pollers.fixedDelay(emailProps.getPollRate())))
.transform(emailToOrderTransformer)
.handle(orderSubmitHandler)
.get();
}
}
根据tacoOrderEmailFlow()方法的定义,电子邮件订单流由3个不同的组件组成。
-
IMAP 电子邮件入站通道适配器:这个通道适配器是使用IMAP URL创建的,而URL则是根据EmailProperties的getImapUrl()方法创建的,它会根据EmailProperties中设置的pollRate属性进行轮询。传入的电子邮件会传递给一个通道,然后连接到转换器。
-
将电子邮件转换成订单对象的转换器:转换器是通过EmailToOrderTransformer实现的,它会注入tacoOrderEmailFlow()方法。转换所形成的订单会通过另一个通道传递给最后一个组件。
-
处理器(作为出站通道适配器):处理器接受订单对象并将其提交至Taco Cloud的REST API。
只有将电子邮件端点模块作为依赖项添加到项目构建文件中,才能调用Mail.imapInboundAdapter()。Maven依赖如下所示:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mail</artifactId>
</dependency>
EmailToOrderTransformer是Spring Integration Transformer接口的实现。它扩展了AbstractMailMessageTransformer,如程序清单10.6所示。
package tacos.email;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.internet.InternetAddress;
import org.apache.commons.text.similarity.LevenshteinDistance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.mail.transformer
.AbstractMailMessageTransformer;
import org.springframework.integration.support
.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class EmailToOrderTransformer
extends AbstractMailMessageTransformer<EmailOrder> {
private static Logger log =
LoggerFactory.getLogger(EmailToOrderTransformer.class);
private static final String SUBJECT_KEYWORDS = "TACO ORDER";
@Override
protected AbstractIntegrationMessageBuilder<EmailOrder>
doTransform(Message mailMessage) throws Exception {
EmailOrder tacoOrder = processPayload(mailMessage);
return MessageBuilder.withPayload(tacoOrder);
}
private EmailOrder processPayload(Message mailMessage) {
try {
String subject = mailMessage.getSubject();
if (subject.toUpperCase().contains(SUBJECT_KEYWORDS)) {
String email =
((InternetAddress) mailMessage.getFrom()[0]).getAddress();
String content = mailMessage.getContent().toString();
return parseEmailToOrder(email, content);
}
} catch (MessagingException e) {
log.error("MessagingException: {}", e);
} catch (IOException e) {
log.error("IOException: {}", e);
}
return null;
}
private EmailOrder parseEmailToOrder(String email, String content) {
EmailOrder order = new EmailOrder(email);
String[] lines = content.split("\\r?\\n");
for (String line : lines) {
if (line.trim().length() > 0 && line.contains(":")) {
String[] lineSplit = line.split(":");
String tacoName = lineSplit[0].trim();
String ingredients = lineSplit[1].trim();
String[] ingredientsSplit = ingredients.split(",");
List<String> ingredientCodes = new ArrayList<>();
for (String ingredientName : ingredientsSplit) {
String code = lookupIngredientCode(ingredientName.trim());
if (code != null) {
ingredientCodes.add(code);
}
}
Taco taco = new Taco(tacoName);
taco.setIngredients(ingredientCodes);
order.addTaco(taco);
}
}
return order;
}
private String lookupIngredientCode(String ingredientName) {
for (Ingredient ingredient : ALL_INGREDIENTS) {
String ucIngredientName = ingredientName.toUpperCase();
if (LevenshteinDistance.getDefaultInstance()
.apply(ucIngredientName, ingredient.getName()) < 3 ||
ucIngredientName.contains(ingredient.getName()) ||
ingredient.getName().contains(ucIngredientName)) {
return ingredient.getCode();
}
}
return null;
}
private static Ingredient[] ALL_INGREDIENTS = new Ingredient[] {
new Ingredient("FLTO", "FLOUR TORTILLA"),
new Ingredient("COTO", "CORN TORTILLA"),
new Ingredient("GRBF", "GROUND BEEF"),
new Ingredient("CARN", "CARNITAS"),
new Ingredient("TMTO", "TOMATOES"),
new Ingredient("LETC", "LETTUCE"),
new Ingredient("CHED", "CHEDDAR"),
new Ingredient("JACK", "MONTERREY JACK"),
new Ingredient("SLSA", "SALSA"),
new Ingredient("SRCR", "SOUR CREAM")
};
}
AbstractMailMessageTransformer是一个很便利的基类,适用于处理载荷为电子邮件的消息。它会抽取传入消息中的电子邮件信息,并将其放到一个Message对象中,这个对象会传递给doTransform()方法。
在doTransform()方法中,我们将Message对象传递给一个名为processPayload()的private方法,这个方法会将电子邮件解析为EmailOrder对象。这个Order对象尽管和主Taco Cloud应用中的TacoOrder对象有些相似,但二者并不完全相同,这里的对象更加简单一些:
package tacos.email;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
@Data
public class EmailOrder {
private final String email;
private List<Taco> tacos = new ArrayList<>();
public void addTaco(Taco taco) {
this.tacos.add(taco);
}
}
这个EmailOrder类不包含客户完整的投递信息和账单信息,只携带了客户的电子邮件地址,这是通过传入的电子邮件获取到的。
将电子邮件解析成订单是一项非常重要的任务。实际上,即便最简单的实现也需要几十行代码。这些代码对于进一步讨论Spring Integration和转换器的实现并没有任何助益。所以,为了节省空间,我在这里省略了processPayload()方法的细节。
EmailToOrderTransformer做的最后一件事情就是返回一个MessageBuilder,并让消息的载荷中包含EmailOrder对象。MessageBuilder生成的消息会发送至集成流的最后一个组件:将订单提交至Taco Cloud API的消息处理器。OrderSubmitMessageHandler实现了Spring Integration的GenericHandler接口,它会处理带有EmailOrder载荷的消息,如程序清单10.7所示。
package tacos.email;
import org.springframework.integration.handler.GenericHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
@Component
public class OrderSubmitMessageHandler
implements GenericHandler<EmailOrder> {
private RestTemplate rest;
private ApiProperties apiProps;
public OrderSubmitMessageHandler(ApiProperties apiProps, RestTemplate rest) {
this.apiProps = apiProps;
this.rest = rest;
}
@Override
public Object handle(EmailOrder order, MessageHeaders headers) {
rest.postForObject(apiProps.getUrl(), order, String.class);
return null;
}
}
为了满足GenericHandler接口的要求,OrderSubmitMessageHandler重写了handle()方法,接收传入的EmailOrder对象,并使用注入的RestTemplate利用POST请求将EmailOrder提交至ApiProperties对象指定的URL。最后,handle()方法返回null,表明这个处理器是流的终点。
这里使用ApiProperties避免在postForObject()时硬编码URL。它是一个配置属性类,如下所示:
package tacos.email;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;
@Data
@ConfigurationProperties(prefix = "tacocloud.api")
@Component
public class ApiProperties {
private String url;
}
在application.yml中,Taco Cloud API的URL可能会配置如下:
tacocloud:
api:
url: http://localhost:8080/orders/fromEmail
为了让这个应用能够使用RestTemplate并自动注入OrderSubmitMessageHandler,我们需要在项目的构建文件中添加Spring Boot web starter依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
这不仅会将RestTemplate添加到类路径中,还会触发Spring MVC的自动配置功能。作为独立的Spring Integration流,这个应用并不需要Spring MVC,更不需要自动配置提供的嵌入式Tomcat。所以,我们可以在application.yml中通过如下的配置条目禁用Spring MVC的自动配置:
spring:
main:
web-application-type: none
spring.main.web-application-type属性可以设置为servlet、reactive或none。当Spring MVC位于类路径之中时,自动配置功能会将其设置为servlet。但是,我们在这里将其重写为none,所以Spring MVC和Tomcat不会自动配置(我会在第12章介绍反应式Web应用是什么样子的)。