>>分享流行的Java框架以及开源软件,对孙卫琴的《精通Spring:Java Web开发技术详解》提供技术支持 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 20264 个阅读者 刷新本主题
 * 贴子主题:  微服务中的Kafka与Micronaut 回复文章 点赞(0)  收藏  
作者:Jacky    发表时间:2020-03-08 15:20:57     消息  查看  搜索  好友  邮件  复制  引用

                                                                                                

微服务中的Kafka与Micronaut

今天,我们将通过   Apache Kafka  主题构建一些彼此异步通信的微服务。我们使用   Micronaut  框架,它为与   Kafka  集成提供专门的库。让我们简要介绍一下示例系统的体系结构。我们有四个微型服务:   订单服务  ,   行程服务  ,   司机服务  和   乘客服务  。这些应用程序的实现非常简单。它们都有内存存储,并连接到同一个   Kafka  实例。

     我们系统的主要目标是为客户安排行程。订单服务应用程序还充当网关。它接收来自客户的请求,保存历史记录并将事件发送到   orders  主题。所有其他微服务都在监听   orders  这个主题,并处理   order-service  发送的订单。每个微服务都有自己的专用主题,其中发送包含更改信息的事件。此类事件由其他一些微服务接收。架构如下图所示。

     点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

     在阅读本文之前,有必要熟悉一下   Micronaut  框架。您可以阅读之前的一篇文章,该文章描述了通过   REST API构建微服务通信的过程  :使用microaut框架构建微服务的快速指南

   1 运行Kafka

要在本地机器上运行   Apache Kafka  ,我们可以使用它的Docker映像。最新的镜像是由https://hub.docker.com/u/wurstmeister共享的。在启动   Kafka  容器之前,我们必须启动   kafka  所用使用的   ZooKeeper  服务器。如果在   Windows  上运行   Docker  ,其虚拟机的默认地址是   192.168.99.100  。它还必须设置为   Kafka  容器的环境。

        Zookeeper  和   Kafka  容器都将在同一个网络中启动。在docker中运行Zookeeper以   zookeeper  的名称提供服务,并在暴露   2181  端口。   Kafka  容器需要在环境变量使用   KAFKA_ZOOKEEPER_CONNECT  的地址。    

$ docker network create kafka
$ docker run -d --name zookeeper --network kafka -p 2181:2181 wurstmeister/zookeeper
$ docker run -d --name kafka -p 9092:9092 --network kafka --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 wurstmeister/kafka

   2 添加micronaut-kafka依赖

使用   Kafka  构建的   microaut  应用程序可以在HTTP服务器存在的情况下启动,也可以在不存在HTTP服务器的情况下启动。要启用   Micronaut  Kafka  ,需要添加   micronaut-kafka  库到依赖项。如果您想暴露   HTTP API  ,您还应该添加   micronaut-http-server-netty  :    

<dependency>
    <groupId>io.micronaut.configuration</groupId>
    <artifactId>micronaut-kafka</artifactId>
</dependency>
<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-http-server-netty</artifactId>
</dependency>

   3 构建订单微服务

    订单微服务  是唯一一个启动嵌入式HTTP服务器并暴露   REST API  的应用程序。这就是为什么我们可以为   Kafka  提供内置   Micronaut  健康检查。要做到这一点,我们首先应该添加   micronaut-management  依赖:    

<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-management</artifactId>
</dependency>

     为了方便起见,我们将通过在   application.yml  中定义以下配置来启用所有管理端点并禁用它们的HTTP身份验证。    

endpoints:
  all:
    enabled: true
    sensitive: false

      现在,可以在地址栏http://localhost:8080/health下使用   health check  。我们的示例应用程序还将暴露   添加新订单  和   列出所有以前创建的订单  的简单   REST API  。下面是暴露这些端点的   Micronaut  控制器实现:    

@Controller("orders")
public class OrderController {

    @Inject
    OrderInMemoryRepository repository;
    @Inject
    OrderClient client;

    @Post
    public Order add(@Body Order order) {
        order = repository.add(order);
        client.send(order);
        return order;
    }

    @Get
    public Set<Order> findAll() {
        return repository.findAll();
    }

}

     每个微服务都使用内存存储库实现。以下是   订单微服务(Order-Service)  中的存储库实现:    

@Singleton
public class OrderInMemoryRepository {

    private Set<Order> orders = new HashSet<>();

    public Order add(Order order) {
        order.setId((long) (orders.size()   1));
        orders.add(order);
        return order;
    }

    public void update(Order order) {
        orders.remove(order);
        orders.add(order);
    }

    public Optional<Order> findByTripIdAndType(Long tripId, OrderType type) {
        return orders.stream().filter(order -> order.getTripId().equals(tripId) && order.getType() == type).findAny();
    }

    public Optional<Order> findNewestByUserIdAndType(Long userId, OrderType type) {
        return orders.stream().filter(order -> order.getUserId().equals(userId) && order.getType() == type)
                .max(Comparator.comparing(Order::getId));
    }

    public Set<Order> findAll() {
        return orders;
    }

}

     内存存储库存储   Order  对象实例。   Order  对象还被发送到名为   orders  的Kafka主题。下面是   Order  类的实现:    

public class Order {

    private Long id;
    private LocalDateTime createdAt;
    private OrderType type;
    private Long userId;
    private Long tripId;
    private float currentLocationX;
    private float currentLocationY;
    private OrderStatus status;
    
    // ... GETTERS AND SETTERS
}

   4 使用Kafka异步通信

现在,让我们想一个可以通过示例系统实现的用例——   添加新的行程  。

     我们创建了   OrderType.NEW_TRIP  类型的新订单。在此之后,(1)   订单服务  创建一个订单并将其发送到   orders  主题。订单由三个微服务接收:   司机服务  、   乘客服务  和   行程服务  。(2)所有这些应用程序都处理这个新订单。   乘客服务  应用程序检查乘客帐户上是否有足够的资金。如果没有,它就取消了行程,否则它什么也做不了。   司机服务  正在寻找最近可用的司机,(3)   行程服务  创建和存储新的行程。   司机服务  和   行程服务  都将事件发送到它们的主题(   drivers  ,    trips  ),其中包含相关更改的信息。

     每一个事件可以被其他   microservices  访问,例如,(4)   行程服务  侦听来自   司机服务  的事件,以便为行程分配一个新的司机

     下图说明了在添加新的行程时,我们的微服务之间的通信过程。点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小现在,让我们继续讨论实现细节。

   4.1 发送订单

首先,我们需要创建Kafka 客户端,负责向主题发送消息。我们创建的一个接口,命名为   OrderClient  ,为它添加   @KafkaClient  并声明用于发送消息的一个或多个方法。每个方法都应该通过   @Topic  注解设置目标主题名称。对于方法参数,我们可以使用三个注解   @KafkaKey  、   @Body  或   @Header  。   @KafkaKey  用于分区,这是我们的示例应用程序所需要的。在下面可用的客户端实现中,我们只使用   @Body  注解。    

@KafkaClient
public interface OrderClient {

    @Topic("orders")
    void send(@Body Order order);

}

   4.2 接收订单

一旦客户端发送了一个订单,它就会被监听   orders   主题的所有其他微服务接收。下面是   司机服务  中的监听器实现。监听器类   OrderListener  应该添加   @KafkaListener  注解。我们可以声明   groupId  作为一个注解参数,以防止单个应用程序的多个实例接收相同的消息。然后,我们声明用于处理传入消息的方法。与客户端方法相同,应该通过   @Topic  注解设置目标主题名称,因为我们正在监听   Order  对象,所以应该使用   @Body  注解——与对应的客户端方法相同。    

@KafkaListener(groupId = "driver")
public class OrderListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private DriverService service;

    public OrderListener(DriverService service) {
        this.service = service;
    }

    @Topic("orders")
    public void receive(@Body Order order) {
        LOGGER.info("Received: {}", order);
        switch (order.getType()) {
            case NEW_TRIP -> service.processNewTripOrder(order);
        }
    }

}

   4.3 发送到其他主题

现在,让我们看一下   司机服务  中的   processNewTripOrder  方法。   DriverService  注入两个不同的   Kafka Client  bean:    OrderClient  和   DriverClient  。当处理新订单时,它将试图寻找与发送订单的乘客最近的司机。找到他之后,将该司机的状态更改为   UNAVAILABLE  ,并将带有   Driver  对象的事件发送到   drivers  主题。    

@Singleton
public class DriverService {

    private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class);

    private DriverClient client;
    private OrderClient orderClient;
    private DriverInMemoryRepository repository;

    public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) {
        this.client = client;
        this.orderClient = orderClient;
        this.repository = repository;
    }

    public void processNewTripOrder(Order order) {
        LOGGER.info("Processing: {}", order);
        Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY());
        driver.ifPresent(driverLocal -> {
            driverLocal.setStatus(DriverStatus.UNAVAILABLE);
            repository.updateDriver(driverLocal);
            client.send(driverLocal, String.valueOf(order.getId()));
            LOGGER.info("Message sent: {}", driverLocal);
        });
    }
    
    // ...
}

     这是   Kafka Client  在   司机服务  中的实现,用于向   driver  主题发送消息。因为我们需要将   Driver  与   Order   关联起来,所以我们使用   @Header  注解 的   orderId  参数。没有必要把它包括到   Driver  类中,将其分配给监听器端的正确行程。    

@KafkaClient
public interface DriverClient {

    @Topic("drivers")
    void send(@Body Driver driver, @Header("Order-Id") String orderId);

}

   4.4 服务间通信

由   DriverListener  收到   @KafkaListener  在   行程服务  中声明。它监听传入到   trip  主题。接收方法的参数和客户端发送方法的类似,如下所示:    

@KafkaListener(groupId = "trip")
public class DriverListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private TripService service;

    public DriverListener(TripService service) {
        this.service = service;
    }

    @Topic("drivers")
    public void receive(@Body Driver driver, @Header("Order-Id") String orderId) {
        LOGGER.info("Received: driver->{}, header->{}", driver, orderId);
        service.processNewDriver(driver);
    }

}  

     最后一步,将   orderId  查询到的行程   Trip  与   driverId  关联,这样整个流程就结束。    

@Singleton
public class TripService {

    private static final Logger LOGGER = LoggerFactory.getLogger(TripService.class);

    private TripInMemoryRepository repository;
    private TripClient client;

    public TripService(TripInMemoryRepository repository, TripClient client) {
        this.repository = repository;
        this.client = client;
    }

        public void processNewDriver(Driver driver, String orderId) {
        LOGGER.info("Processing: {}", driver);
        Optional<Trip> trip = repository.findByOrderId(Long.valueOf(orderId));
        trip.ifPresent(tripLocal -> {
            tripLocal.setDriverId(driver.getId());
            repository.update(tripLocal);
        });
    }
    
    // ... OTHER METHODS

}

   5 跟踪

我们可以使用Micronaut Kafka轻松地启用分布式跟踪。首先,我们需要启用和配置Micronaut跟踪。要做到这一点,首先应该添加一些依赖项:    

<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-tracing</artifactId>
</dependency>
<dependency>
    <groupId>io.zipkin.brave</groupId>
    <artifactId>brave-instrumentation-http</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-reporter</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.opentracing.brave</groupId>
    <artifactId>brave-opentracing</artifactId>
</dependency>
<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>0.0.16</version>
    <scope>runtime</scope>
</dependency>

     我们还需要在   application.yml  配置文件中,配置Zipkin 的追踪的地址等    

   tracing:
  zipkin:
    enabled: true
    http:
      url: http://192.168.99.100:9411
    sampler:
      probability: 1

     在启动应用程序之前,我们必须运行   Zipkin  容器:

        $ docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin    

   6 总结

在本文中,您将了解通过   Apache  Kafka  使用异步通信构建微服务架构的过程。我已经向大家展示了   Microaut Kafka  库最重要的特性,它允许您轻松地声明   Kafka  主题的生产者和消费者,为您的微服务启用   健康检查  和   分布式跟踪  。我已经为我们的系统描述了一个简单的场景的实现,包括根据客户的请求添加一个新的行程。本示例系统的整体实现,请查看GitHub上的源代码

     点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

                                                                                                            
----------------------------
原文链接:https://blog.csdn.net/Developlee/article/details/103175967

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2020-03-09 22:27:30 重新编辑]
  Java面向对象编程-->继承
  JavaWeb开发-->Web运作原理(Ⅳ)
  JSP与Hibernate开发-->立即检索和延迟检索策略
  Java网络编程-->基于UDP的数据报和套接字
  精通Spring-->计算属性和数据监听
  Vue3开发-->通过Vuex进行状态管理
  NIO的几道常见面试题
  Redis 超详细总结笔记
  99%的人都能看懂的“熔断”以及最佳实践
  阿里巴巴为什么能抗住90秒100亿?看完这篇你就明白了!
  Spring 5 webflux响应式编程 - 但时间也偷换概念
  Spring 5 webflux响应式编程 - 但时间也偷换概念
  【项目实践】后端接口统一规范的同时,如何优雅地扩展规范
  Spring数据验证 中@NotNull, @NotEmpty和@NotBlank之间的区别
  Spring MVC处理异步请求
  Spring MVC关于分页的简单实现
  SpringMVC Model、ModelMap和ModelAndView的区别和用法
  RESTful API的最佳设计原则
  Spring Boot集成shiro ehcache(使用shiro的缓存管理)
  Spring如何实现AOP,请不要再说cglib了!
  MyBatis解析和运行原理
  更多...
 IPIP: 已设置保密
楼主      
1页 2条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。