分布式架构通信
分布式架构通信原理
SOA架构:
根据实际业务,把系统拆分成合适的、独立部署的模块,模块之间相互独立。
优点:分布式、松耦合、扩展灵活、可重用。
SOA架构系统中,使用Dubbo和Zookeeper进行服务间的远程通信。
优点:
Dubbo使用自定义的TCP协议,可以让请求报文体积更小,或者使用HTTP2协议,也可以减少报文 的体积,提高传输效率。
微服务架构:
SpringCloud中使用Feign解决服务之间远程通信的问题。
Feign:轻量级RESTful的HTTP服务客户端,广泛应用于Spring Cloud中。符合面向接口化的编程 习惯。
本质:封装了HTTP调用流程,类似Dubbo的服务调用。
多用于同步远程调用。
RPC主要基于TCP/UDP协议,HTTP协议是应用层协议,是构建在传输层协议TCP之上的,RPC效率 更高,RPC长连接:不必每次通信都像HTTP一样三次握手,减少网络开销;
HTTP服务开发迭代更快:在接口不多,系统与系统之间交互比较少的情况下,HTTP就显得更加方 便;相反,在接口比较多,系统与系统之间交互比较多的情况下,HTTP就没有RPC有优势。
一切都很完美!
。。。
直到你碰到了下面这个问题:
分布式同步通信的问题
电商项目中,如果后台添加商品信息,该信息放到数据库。
我们同时,需要更新搜索引擎的倒排索引
同时,假如有商品页面的静态化处理,也需要更新该页面信息
怎么解决?
方式一、可以在后台添加商品的方法中,如果数据插入数据库成功,就调用更新倒排索引的方法, 接着调用更新静态化页面的方法。
代码应该是:
Long goodsId = addGoods(goods);
if (goodsId != null) {
refreshInvertedIndex(goods);
refreshStaticPage(goods);
}
问题:
假如更新倒排索引失败,该怎么办?
假如更新静态页面失败怎么办?
解决方式:
如果更新倒排索引失败,重试
如果更新静态页面失败,重试
代码应该是这样:
public Long saveGoods() {
Long goodsId = addGoods(goods);
if (goodsId != null) {
// 调用递归的方法,实现重试
boolean indexFlag = refreshInvertedIndex(goods);
// 调用递归的方法,实现重试
boolean pageFlag = refreshStaticPage(goods);
}
}
private boolean refreshInvertedIndex(Goods goods) {
// 调用服务的方法
boolean flag = indexService.refreshIndex(goods);
if (!flag) {
refreshInvertedIndex(goods);
}
}
private boolean refreshStaticPage(Goods goods) {
// 调用服务的方法
boolean flag = staticPageService.refreshStaticPage(goods);
if (!flag) {
refreshStaticPage(goods);
}
}
以上代码在执行中的问题:
- 如果相应的更新一直失败,岂不是一直死循环直到调用栈崩溃?
- 如果相应的更新一直在重试,在重试期间,添加商品的方法调用是不是一直阻塞中?
- 如果添加商品的时候并发量很大,效率岂不是很低?
或许可以加上迭代的等待时间,迭代的次数加以限制,减少CPU消耗。
或许还可以加上多线程,同时执行更新的操作,减少执行的时间。
但是都是基于该调用一定在可见的时间内调用成功。
还是老问题:如果更新失败怎么办?
归根到底,是同步调用处理不当。这个问题在分布式架构中尤为严重。
方式二:可以先执行添加商品的方法,商品添加成功,将更新索引和更新静态页面的任务缓存到一 个公共的位置,然后由相应的服务从该位置获取任务来执行。
Long goodsId = addGoods(goods);
if (goodsId != null) {
goodsTaskService.cache(goods);
}
此时,由于添加商品仅仅是将数据插入数据库,然后将任务信息缓存,调用立刻返回。
对于添加商品方法的调用,不会存在线程阻塞,不会存在调用栈崩溃。
再考虑远一点。
由于更新倒排索引的的服务和更新静态页面的服务要从公共的缓存或者叫任务池中取出任务并执 行,它们也会有执行失败的问题,也需要重试。如果一直更新失败,也需要一个方式来处理。
比如如果更新失败,则每隔3秒钟重试一次,重试三次都失败则放弃执行。
然后将错误结果放到另一个公共的地方,等待后续的补偿,无论是手工还是自动的。
还有问题:
- 这个公共的任务池,会不会宕机?会不会服务不可用?如何解决?
- 你一定确信消息发送到任务池了吗?
- 如果在向任务池发送任务失败该如何处理?
- 如果重试的时候发送成功了,但是实际上发送了多次,更新倒排索引服务和更新静态页面服务 会不会重复执行?
- 如果重复执行,最终结果会不会不一样?
- 。。。
看来真是解决了一个问题,引进来三个问题。
如果上述的问题都由我们从0开始解决,开发难度可想而知。
分布式服务中,由于业务拆分,应用也需要拆分,甚至数据库分库分表。
但是完成一个业务处理,往往要设计到多个模块之间的协调处理。此时模块之间,服务与服务之间 以及客户端与服务端之间的通信将变得非常复杂。
分布式异步通信模式
比较典型的“生产者消费者模式”,可以跨平台、支持异构系统,通常借助消息中间件来完成。
优点:系统间解耦,并具有一定的可恢复性,支持异构系统,下游通常可并发执行,系统具备弹 性。服务解耦、流量削峰填谷等
缺点:消息中间件存在一些瓶颈和一致性问题,对于开发来讲不直观且不易调试,有额外成本。
使用异步消息模式需要注意的问题:
- 哪些业务需要同步处理,哪些业务可以异步处理?
- 如何保证消息的安全?消息是否会丢失,是否会重复?
- 请求的延迟如何能够减少?
- 消息接收的顺序是否会影响到业务流程的正常执行?
- 消息处理失败后是否需要重发?如果重发如何保证幂等性?
消息中间件简介
消息中间件概念
维基百科对消息中间件的解释:面向消息的系统(消息中间件)是在分布式系统中完成消息的发送 和接收的基础软件。
消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基 于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进 程的通信。
消息中间件就是在通信的上下游之间截断:break it,Broker
然后利用中间件解耦、异步的特性,构建弹性、可靠、稳定的系统。
体会一下:“必有歹人从中作梗”,”定有贵人从中相助“
异步处理、流量削峰、限流、缓冲、排队、最终一致性、消息驱动等需求的场景都可以使用消息中 间件。
自定义消息中间件
并发编程领域经典面试题:请使用java代码来实现“生产者消费者模式”。
BlockingQueue(阻塞队列)是java中常见的容器,在多线程编程中被广泛使用。
当队列容器已满时生产者线程被阻塞,直到队列未满后才可以继续put;
当队列容器为空时,消费者线程被阻塞,直至队列非空时才可以继续take。
KouZhao.java
package com.lagou.demo;
public class KouZhao {
private String id;
private String type;
public KouZhao(String id, String type) {
this.id = id;
this.type = type;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public String toString() {
return "KouZhao{" +
"id='" + id + '\'' +
", type='" + type + '\'' +
'}';
}
}
Producer.java
package com.lagou.demo;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private final BlockingQueue<KouZhao> blockingQueue;
public Producer(BlockingQueue<KouZhao> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(200);
if (blockingQueue.remainingCapacity() > 0) {
KouZhao kouZhao = new KouZhao(UUID.randomUUID().toString(),
"N95");
blockingQueue.add(kouZhao);
System.out.println("我在生产口罩,当前库存是:" +
blockingQueue.size());
} else {
System.out.println("我的仓库已经堆满了" + blockingQueue.size()
+ "个口罩,快来买口罩啊!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Consumer.java
package com.lagou.demo;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private final BlockingQueue<KouZhao> blockingQueue;
public Consumer(BlockingQueue<KouZhao> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(100);
long startTime = System.currentTimeMillis(); // 获取开始时间
KouZhao kouZhao = blockingQueue.take();
long endTime = System.currentTimeMillis(); // 获取结束时间
System.out.println("我消费了口罩:" + kouZhao + ", 等到货时我阻塞了"
+ (endTime - startTime) + "ms");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
App.java
package com.lagou.demo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class App {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<KouZhao> queue = new ArrayBlockingQueue<>(20);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
Thread.sleep(20000);
new Thread(consumer).start();
}
}
上述代码放到生产环境显然是不行的,比如没有集群,没有分布式,玩儿法太单一,不能满足企业 级应用的要求。。。
比如:
消息有没有持久化?
怎么确定消息一定能发送成功?
怎么确定消息一定能被消费成功?
高并发下的性能怎么样?
系统可靠吗?
有没有Pub/Sub模式?
有没有考虑过限流?
。。。
主流消息中间件及选型
在传统金融机构、银行、政府机构等有一些老系统还在使用IBM等厂商提供的商用MQ产品。
当前业界比较流行的开源消息中间件包括:ActiveMQ、RabbitMQ、RocketMQ、Kafka、 ZeroMQ等,其中应用最为广泛的要数RabbitMQ、RocketMQ、Kafka这三款。
Redis在某种程度上也可以是实现类似“Queue”和“Pub/Sub”的机制,严格意义上不算消息中间件。
选取原则
首先,产品应该是开源的。开源意味着如果队列使用中遇到bug,可以很快修改,而不用等待开发 者的更新。
其次,产品必须是近几年比较流行的,要有一个活跃的社区。这样遇到问题很快就可以找到解决方 法。同时流行也意味着bug较少。流行的产品一般跟周边系统兼容性比较好。
最后,作为消息队列,要具备以下几个特性:
- 消息传输的可靠性:保证消息不会丢失。
- 支持集群,包括横向扩展,单点故障都可以解决。
- 性能要好,要能够满足业务的性能需求。
RabbitMQ
RabbitMQ开始是用在电信业务的可靠通信的,也是少有的几款支持AMQP协议的产品之一。
优点:
- 轻量级,快速,部署使用方便
- 支持灵活的路由配置。RabbitMQ中,在生产者和队列之间有一个交换器模块。根据配置的路 由规则,生产者发送的消息可以发送到不同的队列中。路由规则很灵活,还可以自己实现。
- RabbitMQ的客户端支持大多数的编程语言。
缺点:
- 如果有大量消息堆积在队列中,性能会急剧下降
- RabbitMQ的性能在Kafka和RocketMQ中是最差的,每秒处理几万到几十万的消息。如果应 用要求高的性能,不要选择RabbitMQ。
- RabbitMQ是Erlang开发的,功能扩展和二次开发代价很高。
RocketMQ
RocketMQ是一个开源的消息队列,使用java实现。借鉴了Kafka的设计并做了很多改进。 RocketMQ主要用于有序,事务,流计算,消息推送,日志流处理,binlog分发等场景。经过了历次的 双11考验,性能,稳定性可可靠性没的说。
RocketMQ几乎具备了消息队列应该具备的所有特性和功能。
java开发,阅读源代码、扩展、二次开发很方便。
对电商领域的响应延迟做了很多优化。在大多数情况下,响应在毫秒级。如果应用很关注响应时 间,可以使用RocketMQ。
性能比RabbitMQ高一个数量级,每秒处理几十万的消息。
缺点:
跟周边系统的整合和兼容不是很好。
Kafka
Kafka的可靠性,稳定性和功能特性基本满足大多数的应用场景。
跟周边系统的兼容性是数一数二的,尤其是大数据和流计算领域,几乎所有相关的开源软件都支持 Kafka。
Kafka高效,可伸缩,消息持久化。支持分区、副本和容错。
Kafka是Scala和Java开发的,对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的 性能。它的异步消息的发送和接收是三个中最好的,但是跟RocketMQ拉不开数量级,每秒处理几十万 的消息。
如果是异步消息,并且开启了压缩,Kafka最终可以达到每秒处理2000w消息的级别。
但是由于是异步的和批处理的,延迟也会高,不适合电商场景。
区别
消息中间件应用场景
消息中间件的使用场景非常广泛,比如,12306购票的排队锁座,电商秒杀,大数据实时计算等。
电商秒杀案例:
比如6.18,活动从0:00开始,仅限前 200 名,秒杀即将开始时,用户会疯狂刷新 APP或者浏览器来 保证自己能够尽早的看到商品。
- 当秒杀开始前,用户在不断的刷新页面,系统应该如何应对高并发的读请求呢?
- 在秒杀开始时,大量并发用户瞬间向系统请求生成订单,扣减库存,系统应该如何应对高并发 的写请求呢?
系统应该如何应对高并发的读请求
- 使用缓存策略将请求挡在上层中的缓存中
- 能静态化的数据尽量做到静态化
- 加入限流(比如对短时间之内来自某一个用户,某一个IP、某个设备的重复请求做丢弃处理)
系统应该如何应对高并发的写请求
生成订单,扣减库存,用户这些操作不经过缓存直达数据库。如果在 1s内,有 1 万个数据连接同 时到达,系统的数据库会濒临崩溃。如何解决这个问题呢?我们可以使用 消息队列。
消息队列的作用:
- 削去秒杀场景下的峰值写流量——流量削峰
- 通过异步处理简化秒杀请求中的业务流程——异步处理
- 解耦,实现秒杀系统模块之间松耦合——解耦
削去秒杀场景下的峰值写流量
将秒杀请求暂存于消息队列,业务服务器响应用户“秒杀结果正在处理中。。。”,释放系统资源去 处理其它用户的请求。
削峰填谷,削平短暂的流量高峰,消息堆积会造成请求延迟处理,但秒杀用户对于短暂延迟有一定 容忍度。
秒杀商品有 1000 件,处理一次购买请求的时间是 500ms,那么总共就需要 500s 的时间。这时你 部署 10 个队列处理程序,那么秒杀请求的处理时间就是 50s,也就是说用户需要等待 50s 才可以看到 秒杀的结果,这是可以接受的。这时会并发 10 个请求到达数据库,并不会对数据库造成很大的压力。
通过异步处理简化秒杀请求中的业务流程
先处理主要的业务,异步处理次要的业务。
如主要流程是生成订单、扣减库存;次要流程比如购买成功之后会给用户发优惠券,增加用户的积 分。
此时秒杀只要处理生成订单,扣减库存的耗时,发放优惠券、增加用户积分异步去处理了。
解耦,实现秒杀系统模块之间松耦合
将秒杀数据同步给数据团队,有两种思路:
- 使用 HTTP 或者 RPC 同步调用,即提供一个接口,实时将数据推送给数据服务。 系统的耦合度高,如果其中一个服务有问题,可能会导致另一个服务不可用。
- 使用消息队列
将数据全部发送给消息队列,然后数据服务订阅这个消息队列,接收数据进行处理。
拉勾B端C端数据同步案例:
拉勾网站分B端和C端,B端面向企业用户,C端面向求职者。
这两个模块业务处理逻辑不同,数据库表结构不同,实际上是处于解耦的状态。
但是各自又需要对方的数据,需要共享:如
- 当C端求职者在更新简历之后,B端企业用户如何尽早看到该简历更新?
- 当B端企业用户发布新的职位需求后,C端用户如何尽早看到该职位信息?
无论是B端还是C端,都有各自的搜索引擎和缓存,B端需要获取C端的更新以更新搜索引擎和缓 存;C端需要获取B端的更新以更新C端的搜索引擎与缓存。
如何解决B端C端数据共享的问题?
解决方式:
- 同步方式:B端和C端通过RPC或WebService的方式发布服务,让对方来调用,以获取对方的 信息。求职者每更新一次简历,就调用一次B端的服务,进行数据的同步;B端企业用户每更 新职位需求,就调用C端的服务,进行数据的同步。
- 异步方式:使用消息队列,B端将更新的数据发布到消息队列,C端将更新的数据发布到消息 队列,B端订阅C端的消息队列,C端订阅B端的消息队列。
使用同步方式,B端和C端耦合比较紧密,如果其中一个服务有问题,可能会导致另一个服务不可 用。比如C端的RPC挂掉,企业用户有可能无法发布新的职位信息,因为发布了对方也看不到;B端的 RPC挂掉,求职者可能无法更新简历,因为即使简历更新了,对方也看不到。
你可能会想,可以让B端或C端在对方RPC挂掉的时候,先将该通知消息缓存起来,等对方服务恢复 之后再进行同步。
这正是引入异步方式,使用消息队列的目的。
使用消息队列的异步方式,对B端C端进行解耦,只要消息队列可用,双方都可以将需要同步的信息 发送到消息队列,对方在收到消息队列推送来的消息的时候,各自更新自己的搜索引擎,更新自己的缓 存数据。
支付宝购买电影票
如上图,用户在支付宝购买了一张电影票后很快就收到消息推送和短信(电影院地址、几号厅、座 位号、场次时间等),同时用户会积累一定的会员积分。
这里,交易系统并不需要一直等待消息送达等动作都完成后才返回成功,允许一定延迟和瞬时不一 致(最终一致性),而且后面两个动作通常可以并发执行。
如果后期监控大盘想要获取实时交易数据,只需要新增个消费者程序并订阅该消息即可,交易系统 对此并不感知,松耦合。
JMS规范和AMQP协议
JMS经典模式详解
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间 件(MOM,Message oriented Middleware)的API,用于在两个应用程序之间,或分布式系统中发送 消息,进行异步通信。与具体平台无关的API,绝大多数MOM提供商都支持。
它类似于JDBC(Java Database Connectivity)。
JMS消息:
消息是JMS中的一种类型对象,由两部分组成:报文头和消息主体。
报文头包括消息头字段和消息头属性。字段是JMS协议规定的字段,属性可以由用户按需添加。
JMS报文头全部字段:
消息主体则携带着应用程序的数据或有效负载。
根据有效负载的类型来划分,可以将消息分为几种类型:
- 简单文本(TextMessage
- 可序列化的对象(ObjectMessage)
- 属性集合(MapMessage)
- 字节流(BytesMessage)
- 原始值流(StreamMessage)
- 无有效负载的消息(Message)。
体系架构
JMS由以下元素组成:
- JMS供应商产品
JMS接口的一个实现。该产品可以是Java的JMS实现,也可以是非Java的面向消息中间件的适 配器。 - JMS Client
生产或消费基于消息的Java的应用程序或对象。 - JMS Producer
创建并发送消息的JMS客户。 - JMS Consumer
接收消息的JMS客户。 - JMS Message
包括可以在JMS客户之间传递的数据的对象 - JMS Queue
缓存消息的容器。消息的接受顺序并不一定要与消息的发送顺序相同。消息被消费后将从队列 中移除。 - JMS Topic
Pub/Sub模式。
对象模型
- ConnectionFactory 接口(连接工厂)
用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当 下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样, JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接 工厂。 - Connection 接口(连接)
连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与 JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题 到目标。 - Destination 接口(目标)
目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是 队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一 样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。 - Session 接口(会话)
表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的, 就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事 务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用 户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者 来接收消息。 - MessageConsumer 接口(消息消费者)
由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或(非阻 塞)接收队列和主题类型的消息。 - MessageProducer 接口(消息生产者)
由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个 通用的发送者,在发送消息时指定目标。 - Message 接口(消息)
是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个 消息有三个主要部分:- 消息头(必须):包含用于识别和为消息寻找路由的操作设置。
- 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创 建定制的字段和过滤器(消息选择器)。
- 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节 消息,流消息和对象消息)。
模式
Java消息服务应用程序结构支持两种模式:
- 点对点也叫队列模式
- 发布/订阅模式
1、在点对点或队列模型下
一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费 者的队列,并直接将消息发送到消费者的队列,概括为:
- 一条消息只有一个消费者获得
- 生产者无需在接收者消费该消息期间处于运行状态,接收者也同样无需在消息发送时处于运行 状态。
- 每一个成功处理的消息要么自动确认,要么由接收者手动确认。
2、发布/订阅模式
- 支持向一个特定的主题发布消息。
- 0或多个订阅者可能对接收特定消息主题的消息感兴趣。
- 发布者和订阅者彼此不知道对方。
- 多个消费者可以获得消息
在发布者和订阅者之间存在时间依赖性。
- 发布者需要建立一个主题,以便客户能够订阅。
- 订阅者必须保持持续的活动状态以接收消息,否则会丢失未上线时的消息。
- 对于持久订阅,订阅者未连接时发布的消息将在订阅者重连时重发。
传递方式
JMS有两种传递消息的方式。 标记为NON
_PERSISTENT的消息最多投递一次,而标记为PERSISTENT的消息将使用暂存后再转送 的机理投递。
如果一个JMS服务下线,持久性消息不会丢失,等该服务恢复时再传递。默认的消息传递方式是非 持久性的。使用非持久性消息可能降低内务和需要的存储器,当不需要接收所有消息时使用。
供应商
开源软件:
- Apache ActiveMQ
- RabbitMQ
- RocketMQ
- JBoss 社区所研发的 HornetQ
- Joram
- Coridan的MantaRay
- The OpenJMS Group的OpenJMS
专有的供应商包括:
- BEA的BEA WebLogic Server JMS
- TIBCO Software的EMS
- GigaSpaces Technologies的GigaSpaces
- Softwired 2006的iBus
- IONA Technologies的IONA JMS
- SeeBeyond的IQManager(2005年8月被Sun Microsystems并购)
- webMethods的JMS+-
- my-channels的Nirvana
- Sonic Software的SonicMQ
- SwiftMQ的SwiftMQ
- IBM的WebSphere MQ
JMS在应用集群中的问题
生产中应用基本上都是以集群部署的。在Queue模式下,消息的消费没有什么问题,因为不同节点 的相同应用会抢占式地消费消息,这样还能分摊负载。
如果使用Topic广播模式?对于一个消息,不同节点的相同应用都会收到该消息,进行相应的操 作,这样就重复消费了。。。
方案一:选择Queue模式,创建多个一样的Queue,每个应用消费自己的Queue。
弊端:浪费空间,生产者还需要关注下游到底有几个消费者,违反了“解耦”的初衷。
方案二:选择Topic模式,在业务上做散列,或者通过分布式锁等方式来实现不同节点间的竞争。
弊端:对业务侵入较大,不是优雅的解决方法。
ActiveMQ通过“虚拟主题”解决了这个问题。
生产中似乎需要结合这两种模式:即不同节点的相同应用间存在竞争,会部分消费(P2P),而不 同的应用都需要消费到全量的消息(Topic)模式。这样就可以避免重复消费。
JMS规范文档(jms-1_1-fr-spec.pdf)下载地址:
JMS是JEE平台的标准消息传递API。它可以在商业和开源实现中使用。每个实现都包括一个JMS服 务器,一个JMS客户端库,以及用于管理消息传递系统的其他特定于实现的组件。 JMS提供程序可以是 消息传递服务的独立实现,也可以是非JMS消息传递系统的桥梁。
JMS客户端API是标准化的,因此JMS应用程序可在供应商的实现之间移植。但是:
- 底层消息传递实现未指定,因此JMS实现之间没有互操作性。除非存在桥接技术,否则想要共 享消息传递的Java应用程序必须全部使用相同的JMS实现。
- 如果没有供应商特定的JMS客户端库来启用互操作性,则非Java应用程序将无法访问JMS。
- AMQP 0-9-1是一种消息传递协议,而不是像JMS这样的API。任何实现该协议的客户端都可以 访问支持AMQP 0-9-1的代理
- 协议级的互操作性允许以任何编程语言编写且在任何操作系统上运行的AMQP 0-9-1客户端都 可以参与消息传递系统,而无需桥接不兼容的服务器实现。
AMQP协议剖析
协议架构
AMQP全称高级消息队列协议(Advanced Message Queuing Protocol),是一种标准,类似于 JMS,兼容JMS协议。目前RabbitMQ主流支持AMQP 0-9-1,3.8.4版本支持AMQP 1.0。
AMQP中的概念
Publisher:消息发送者,将消息发送到Exchange并指定RoutingKey,以便queue可以接收到指 定的消息。
Consumer:消息消费者,从queue获取消息,一个Consumer可以订阅多个queue以从多个 queue中接收消息。
Server:一个具体的MQ服务实例,也称为Broker。
Virtual host:虚拟主机,一个Server下可以有多个虚拟主机,用于隔离不同项目,一个Virtual host通常包含多个Exchange、Message Queue。
Exchange:交换器,接收Producer发送来的消息,把消息转发到对应的Message Queue中。
Routing key:路由键,用于指定消息路由规则(Exchange将消息路由到具体的queue中),通 常需要和具体的Exchange类型、Binding的Routing key结合起来使用。
Bindings:指定了Exchange和Queue之间的绑定关系。Exchange根据消息的Routing key和 Binding配置(绑定关系、Binding、Routing key等)来决定把消息分派到哪些具体的queue中。这依 赖于Exchange类型。
Message Queue:实际存储消息的容器,并把消息传递给最终的Consumer。
AMQP 传输层架构
简要概述
AMQP是一个二进制的协议,信息被组织成数据帧,有很多类型。数据帧携带协议方法和其他信 息。所有数据帧都拥有基本相同的格式:帧头,负载,帧尾。数据帧负载的格式依赖于数据帧的类型。
我们假定有一个可靠的面向流的网络传输层(TCP/IP或等价的协议)。
在一个单一的socket连接中,可能有多个相互独立的控制线程,称为“channel”。每个数据帧使用 通道号码编号。通过数据帧的交织,不同的通道共享一个连接。对于任意给定通道,数据帧严格按照序 列传输。
我们使用小的数据类型来构造数据帧,如bit,integer,string以及字段表。数据帧的字段做了轻微 的封装,不会让传输变慢或解析困难。根据协议规范机械地生成成数据帧层相对简单。
线级别的格式被设计为可伸缩和足够通用,以支持任意的高层协议(不仅是AMQP)。我们假定 AMQP会扩展,改进以及随时间的其他变化,并要求wire-level格式支持这些变化。
数据类型
AMQP 使用的数据类型如下:
- Integers(数值范围1-8的十进制数字):用于表示大小,数量,限制等,整数类型无符号 的,可以在帧内不对齐。
- Bits(统一为8个字节):用于表示开/关值。
- Short strings:用于保存简短的文本属性,字符串个数限制为255,8个字节
- Long strings:用于保存二进制数据块。
- Field tables:包含键值对,字段值一般为字符串,整数等。
协议协商
AMQP客户端和服务端进行协议协商。意味着当客户端连接上之后,服务端会向客户端提出一些选 项,客户端必须能接收或修改。如果双方都认同协商的结果,继续进行连接的建立过程。协议协商是一 个很有用的技术手段,因为它可以让我们断言假设和前置条件。
在AMQP中,我们需要协商协议的一些特殊方面:
- 真实的协议和版本。服务器可能在同一个端口支持多个协议。
- 双方的加密参数和认证方式。这是功能层的一部分。
- 数据帧最大大小,通道数量以及其他操作限制。
对限制条件的认同可能会导致双方重新分配key的缓存,避免死锁。每个发来的数据帧要么遵守认 同的限制,也就是安全的,要么超过了限制,此时另一方出错,必须断开连接。出色地践行了“要么一 切工作正常,要么完全不工作”的RabbitMQ哲学。
协商双方认同限制到一个小的值,如下:
- 服务端必须告诉客户端它加上了什么限制。
- 客户端响应服务器,或许会要求对客户端的连接降低限制。
数据帧界定
TCP/IP是流协议,没有内置的机制用于界定数据帧。现有的协议从以下几个方面来解决:
- 每个连接发送单一数据帧。简单但是慢
- 在流中添加帧的边界。简单,但是解析很慢。
- 计算数据帧的大小,在每个数据帧头部加上该数据帧大小。这简单,快速,AMQP的选择。
AMQP客户端实现JMS客户端
RabbitMQ的JMS客户端用RabbitMQ Java客户端实现,既与JMS API兼容,也与AMQP 0-9-1协议兼 容。
局限性
RabbitMQ JMS客户端不支持某些JMS 1.1功能:
- JMS客户端不支持服务器会话。
- XA事务支持接口未实现。
- RabbitMQ JMS主题选择器插件支持主题选择器。队列选择器尚未实现。
- 支持RabbitMQ连接的SSL和套接字选项,但仅使用RabbitMQ客户端提供的(默认)SSL连接 协议。
- RabbitMQ不支持JMS NoLocal订阅功能,该功能禁止消费者接收通过消费者自己的连接发布 的消息。可以调用包含NoLocal参数的方法,但该方法将被忽略。
RabbitMQ使用amqp协议,JMS规范仅对于Java的使用作出的规定,跟其他语言无关,协议是语言 无关的,只要语言实现了该协议,就可以做客户端。如此,则不同语言之间互操作性得以保证。