Apache RocketMQ 5.0 笔记

RocketMQ 5.0:云原生“消息、事件、流”实时数据处理平台,覆盖云边端一体化数据处理场景。

核心特性

  • 云原生:生与云,长与云,无限弹性扩缩,K8s友好
  • 高吞吐:万亿级吞吐保证,同时满足微服务与大数据场景
  • 流处理:提供轻量、高扩展、高性能和丰富功能的流计算引擎
  • 金融级:金融级的稳定性,广泛用于交易核心链路
  • 架构极简:零外部依赖,Shared-nothing 架构
  • 生态友好:无缝对接微服务、实时计算、数据湖等周边生态

1. 基本概念

1、消息由生产者初始化并发送到Apache RocketMQ 服务端。

2、消息按照到达Apache RocketMQ 服务端的顺序存储到主题的指定队列中。

3、消费者按照指定的订阅关系从Apache RocketMQ 服务端中获取消息并消费。

1.1. 消息

消息是 Apache RocketMQ 中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到 Apache RocketMQ 服务端,服务端按照相关语义将消息投递到消费端进行消费。

RocketMQ 消息构成非常简单,如下所示:

  • topic:表示要发送的消息的主题
  • body:表示消息的存储内容
  • properties:表示消息属性
  • transactionId:会在事务消息中使用

消息内部属性

字段名 必填 说明
主题名称

当前消息所属的主题的名称。集群内全局唯一。

消息体 消息体
消息类型

Normal:普通消息,消息本身无特殊语义,消息之间也没有任何关联。

FIFO:顺序消息,Apache RocketMQ 通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。

Delay:定时/延时消息,通过指定延时时间控制消息生产后不要立即投递,而是在延时间隔后才对消费者可见。

Transaction:事务消息,Apache RocketMQ 支持分布式事务消息,支持应用数据库更新和消息调用的事务一致性保障。

过滤标签Tag 方便服务器过滤使用,消费者可通过Tag对消息进行过滤,仅接收指定标签的消息。目前只支持每个消息设置一个。
索引Key 消息的索引键,可通过设置不同的Key区分消息和快速查找消息。
定时时间   定时场景下,消息触发延时投递的毫秒级时间戳。
消费重试次数 否   消息消费失败后,Apache RocketMQ 服务端重新投递的次数。每次重试后,重试次数加1。
业务自定义属性   生产者可以自定义设置的扩展信息。

系统默认的消息最大限制如下:

  • 普通和顺序消息:4 MB
  • 事务和定时或延时消息:64 KB

1.2. Tag

Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。使用 Tag 可以实现对 Topic 中的消息进行过滤。

提示:

  • Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
  • Tag:消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性。

Topic 和 Tag 的关系如下图所示:

什么时候该用 Topic,什么时候该用 Tag?

可以从以下几个方面进行判断:

  • 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
  • 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
  • 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
  • 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。

通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。

1.3. Keys

Apache RocketMQ 每个消息可以在业务层面的设置唯一标识码 keys 字段,方便将来定位消息丢失问题。 Broker 端会为每个消息创建索引(哈希索引),应用可以通过 topic、key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。

// 订单Id
String orderId = "20034568923546";
message.setKeys(orderId);

1.4. 队列

一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上。

队列天然具备顺序性,即消息按照进入队列的顺序写入存储,同一队列间的消息天然存在顺序关系,队列头部为最早写入的消息,队列尾部为最新写入的消息。消息在队列中的位置和消息之间的顺序通过位点(Offset)进行标记管理。

Apache RocketMQ 默认提供消息可靠存储机制,所有发送成功的消息都被持久化存储到队列中,配合生产者和消费者客户端的调用可实现至少投递一次的可靠性语义。

Apache RocketMQ 队列模型和Kafka的分区(Partition)模型类似。在 Apache RocketMQ 消息收发模型中,队列属于主题的一部分,虽然所有的消息资源以主题粒度管理,但实际的操作实现是面向队列。例如,生产者指定某个主题,向主题内发送消息,但实际消息发送到该主题下的某个队列中。

Apache RocketMQ 中通过修改队列数量,以此实现横向的水平扩容和缩容。

一般来说一条消息,如果没有重复发送(比如因为服务端没有响应而进行重试),则只会存在在 Topic 的其中一个队列中,消息在队列中按照先进先出的原则存储,每条消息会有自己的位点,每个队列会统计当前消息的总条数,这个称为最大位点 MaxOffset;队列的起始位置对应的位置叫做起始位点 MinOffset。队列可以提升消息发送和消费的并发度。

注意:按照实际业务消耗设置队列数,队列数量的设置应遵循少用够用原则,避免随意增加队列数量。

1.5. 生产者

生产者(Producer)就是消息的发送者,Apache RocketMQ 拥有丰富的消息类型,可以支持不同的应用场景,在不同的场景中,需要使用不同的消息进行发送。比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略,此时就需要用到延迟消息;电商场景中,业务上要求同一订单的消息保持严格顺序,此时就要用到顺序消息。在日志处理场景中,可以接受的比较大的发送延迟,但对吞吐量的要求很高,希望每秒能处理百万条日志,此时可以使用批量消息。在银行扣款的场景中,要保持上游的扣款操作和下游的短信通知保持一致,此时就要使用事务消息。

注意:不要在同一个主题内使用多种消息类型

生产者通常被集成在业务系统中,将业务消息按照要求封装成 Apache RocketMQ 的消息(Message)并发送至服务端。

生产者和主题的关系为多对多关系,即同一个生产者可以向多个主题发送消息,同一个主题也可以接收多个生产者的消息。

注意:不建议频繁创建和销毁生产者

Producer p = ProducerBuilder.build();
for (int i =0;i<n;i++){
    Message m= MessageBuilder.build();
    p.send(m);
}
p.shutdown();

1.6. 消费者与消费者组

如果多个消费者设置了相同的Consumer Group,我们认为这些消费者在同一个消费组内。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。

在 Apache RocketMQ 有两种消费模式,分别是:

  • 集群消费模式:当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。
  • 广播消费模式:当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。

负载均衡

RocketMQ的负载均衡策略与Kafka极其类似,几乎一毛一样

集群模式下,同一个消费组内的消费者会分担收到的全量消息,这里的分配策略是怎样的?如果扩容消费者是否一定能提升消费能力?

Apache RocketMQ 提供了多种集群模式下的分配策略,包括平均分配策略、机房优先分配策略、一致性hash分配策略等,可以通过如下代码进行设置相应负载均衡策略。

consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());

默认的分配策略是平均分配,这也是最常见的策略。平均分配策略下消费组内的消费者会按照类似分页的策略均摊消费。

在平均分配的算法下,可以通过增加消费者的数量来提高消费的并行度。比如下图中,通过增加消费者来提高消费能力。

但也不是一味地增加消费者就能提升消费能力的,比如下图中Topic的总队列数小于消费者的数量时,消费者将分配不到队列,即使消费者再多也无法提升消费能力。

1.7. 消费者分类

如上图所示, Apache RocketMQ 的消费者处理消息时主要经过以下阶段:消息获取--->消息处理--->消费状态提交。

针对以上几个阶段,Apache RocketMQ 提供了不同的消费者类型: PushConsumer 、SimpleConsumer 和 PullConsumer。这几种类型的消费者通过不同的实现方式和接口可满足您在不同业务场景下的消费需求。具体差异如下:

注:在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。

PushConsumer

PushConsumers是一种高度封装的消费者类型,消费消息仅通过消费监听器处理业务并返回消费结果。消息的获取、消费状态提交以及消费重试都通过 Apache RocketMQ 的客户端SDK完成。

SimpleConsumer

SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。

补充:

rocketmq-client中定义的:

  • DefaultMQProducer
  • DefaultMQPushConsumer
  • DefaultLitePullConsumer

rocketmq-client-java中定义的:

  • Producer
  • PushConsumer
  • SimpleConsumer

1.8. 消费位点

消息是按到达Apache RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。一条消息被某个消费者消费完成后不会立即从队列中删除,Apache RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点

如上图所示,在Apache RocketMQ中每个队列都会记录自己的最小位点、最大位点。针对于消费组,还有消费位点的概念,在集群模式下,消费位点是由客户端提给交服务端保存的,在广播模式下,消费位点是由客户端自己保存的。一般情况下消费位点正常更新,不会出现消息重复,但如果消费者发生崩溃或有新的消费者加入群组,就会触发重平衡,重平衡完成后,每个消费者可能会分配到新的队列,而不是之前处理的队列。为了能继续之前的工作,消费者需要读取每个队列最后一次的提交的消费位点,然后从消费位点处继续拉取消息。但在实际执行过程中,由于客户端提交给服务端的消费位点并不是实时的,所以重平衡就可能会导致消息少量重复。

1.9. 订阅关系

一个订阅关系指的是指定某个消费者分组对于某个主题的订阅。

不同消费者分组对于同一个主题的订阅相互独立如下图所示,消费者分组Group A和消费者分组Group B分别以不同的订阅关系订阅了同一个主题Topic A,这两个订阅关系互相独立,可以各自定义,不受影响。

同一个消费者分组对于不同主题的订阅也相互独立如下图所示,消费者分组Group A订阅了两个主题Topic A和Topic B,对于Group A中的消费者来说,订阅的Topic A为一个订阅关系,订阅的Topic B为另外一个订阅关系,且这两个订阅关系互相独立,可以各自定义,不受影响。

2. 消息类型

1、顺序消息(FIFO):这类消息必须设置 message group,这种类型的消息需要与FIFO消费者组一起使用

2、延迟消息(DELAY):消息被发送后不会立即对消费者可见,这种类型的消息必须设置delivery timestamp以决定对消费者可见的时间;

3、事务消息(TRANSACTIONAL):将一个或多个消息的发布包装到一个事务中,提供提交/回滚方法来决定消息的可见性;

4、普通消息(NORMAL):默认类型

不同的类型是互斥的,当意味着要发布的消息不能同时是FIFO类型和DELAY类型。实际上,主题的类型决定了消息的类型。例如,FIFO主题不允许发布其他类型的消息。

2.1. 普通消息

普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。

典型场景一:微服务异步解耦

如上图所示,以在线的电商交易场景为例,上游订单系统将用户下单支付这一业务事件封装成独立的普通消息并发送至Apache RocketMQ服务端,下游按需从服务端订阅消息并按照本地消费逻辑处理下游任务。每个消息之间都是相互独立的,且不需要产生关联。

典型场景二:数据集成传输

如上图所示,以离线的日志收集场景为例,通过埋点组件收集前端应用的相关操作日志,并转发到 Apache RocketMQ 。每条消息都是一段日志数据,Apache RocketMQ 不做任何处理,只需要将日志数据可靠投递到下游的存储系统和分析系统即可,后续功能由后端应用完成。

2.2. 顺序消息

应用场景

在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用 Apache RocketMQ 的顺序消息可以有效保证数据传输的顺序性。

典型场景一:撮合交易

以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。

典型场景二:数据实时增量同步

以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过 Apache RocketMQ 传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。

功能原理

顺序消息是 Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。 相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。

Apache RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。

基于消息组的顺序判定逻辑,支持按照业务逻辑做细粒度拆分,可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力。

如何保证消息的顺序性?

Apache RocketMQ 的消息的顺序性分为两部分,生产顺序性和消费顺序性。

1、生产顺序性

如需保证消息生产的顺序性,则必须满足以下条件:

  • 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
  • 串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

  • 相同消息组的消息按照先后顺序被存储在同一个队列。
  • 不同消息组的消息可以混合在同一个队列中,且不保证连续。

2、消费顺序性

如需保证消息消费的顺序性,则必须满足以下条件:

  • 投递顺序:Apache RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收---处理---应答的语义处理消息,避免因异步处理导致消息乱序。
  • 有限重试:Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序。

生产顺序性和消费顺序性组合

如果消息需要严格按照先进先出(FIFO)的原则处理,即先发送的先消费、后发送的后消费,则必须要同时满足生产顺序性和消费顺序性。

一般业务场景下,同一个生产者可能对接多个下游消费者,不一定所有的消费者业务都需要顺序消费,您可以将生产顺序性和消费顺序性进行差异化组合,应用于不同的业务场景。例如发送顺序消息,但使用非顺序的并发消费方式来提高吞吐能力。更多组合方式如下表所示:

生产顺序 消费顺序 顺序性效果
设置消息组,保证消息顺序发送。 顺序消费 按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。
设置消息组,保证消息顺序发送。 并发消费 并发消费,尽可能按时间顺序处理。
未设置消息组,消息乱序发送。 顺序消费 按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。
未设置消息组,消息乱序发送。 并发消费 并发消费,尽可能按照时间顺序处理。

2.3. 定时/延时消息

注:定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。

应用场景

在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。使用 Apache RocketMQ 的定时消息可以简化定时调度任务的开发逻

辑,实现高性能、可扩展、高可靠的定时触发能力。

典型场景一:分布式定时调度

在分布式定时调度场景下,需要实现各类精度的定时任务,例如每天5点执行文件清理,每隔2分钟触发一次消息推送等需求。基于 Apache RocketMQ 的定时消息可以封装出多种类型的定时触发器。

典型场景二:任务超时处理

以电商交易场景为例,订单下单后暂未支付,此时不可以直接关闭订单,而是需要等待一段时间后才能关闭订单。使用 Apache RocketMQ 定时消息可以实现超时任务的检查触发。

基于定时消息的超时任务处理具备如下优势:

  • 精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。
  • 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。 Apache RocketMQ 的定时消息具有高并发和水平扩展的能力。

功能原理

定时时间设置原则

Apache RocketMQ 定时消息设置的定时时间是一个预期触发的系统时间戳,延时时间也需要转换成当前系统时间后的某一个时间戳,而不是一段延时时长。

投递等级

Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:

2.4. 事务消息

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
  • 积分系统状态变更:变更用户积分,更新用户积分表。
  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

使用普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。而基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。(PS:重点是两阶段提交

事务消息处理流程

1、生产者将消息发送至Apache RocketMQ服务端。

2、Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

3、生产者开始执行本地事务逻辑。

4、生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

  • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
  • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

5、在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

6、生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

7、生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

3. 机制

3.1. 消息发送重试机制

Apache RocketMQ 客户端连接服务端发起消息发送请求时,可能会因为网络故障、服务异常等原因导致调用失败。为保证消息的可靠性, Apache RocketMQ 在客户端SDK中内置请求重试逻辑,尝试通过重试发送达到最终调用成功的效果。

同步发送和异步发送模式均支持消息发送重试。

重试触发条件:

  • 客户端消息发送请求调用失败或请求超时
  • 网络异常造成连接失败或请求超时
  • 服务端节点处于重启或下线等状态造成连接失败
  • 服务端运行慢造成请求超时
  • 服务端返回失败错误码

重试流程:

生产者在初始化时设置消息发送最大重试次数,当出现上述触发条件的场景时,生产者客户端会按照设置的重试次数一直重试发送消息,直到消息发送成功或达到最大重试次数重试结束,并在最后一次重试失败后返回调用错误响应。

  • 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败,抛出错误码和异常。
  • 异步发送:调用线程不会阻塞,但调用结果会通过异常事件或者成功事件返回。

重试间隔

  • 除服务端返回系统流控错误场景,其他触发条件触发重试后,均会立即进行重试,无等待间隔。
  • 若由于服务端返回流控错误触发重试,系统会按照指数退避策略进行延迟重试。指数退避算法通过以下参数控制重试行为:
    • INITIAL_BACKOFF: 第一次失败重试前后需等待多久,默认值:1秒
    • MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6
    • JITTER :随机抖动因子,默认值:0.2
    • MAX_BACKOFF :等待间隔时间上限,默认值:120秒
    • MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20秒 

3.2. 消息流控机制

消息流控指的是系统容量或水位过高, Apache RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力。

触发条件

  • 存储压力大:消费者分组的初始消费位点为当前队列的最大消费位点。
  • 服务端请求任务排队溢出:若消费者消费能力不足,导致队列中有大量堆积消息,当堆积消息超过一定数量后会触发消息流控,减少下游消费系统压力。

流控行为

当系统触发消息发送流控时,客户端会收到系统限流错误和异常,错误码信息如下:

  • reply-code:530
  • reply-text:TOO_MANY_REQUESTS

3.3. 消费重试

消费者出现异常,消费某条消息失败时, Apache RocketMQ 会根据消费重试策略重新投递该消息。消费重试主要解决的是业务处理逻辑失败导致的消费完整性问题,是一种为业务兜底的策略,不应该被用做业务流程控制。

推荐使用消息重试场景如下:

  • 业务处理失败,且失败原因跟当前的消息内容相关,比如该消息对应的事务状态还未获取到,预期一段时间后可执行成功。
  • 消费失败的原因不会导致连续性,即当前消息消费失败是一个小概率事件,不是常态化的失败,后面的消息大概率会消费成功。此时可以对当前消息进行重试,避免进程阻塞。

消费重试策略

消费重试指的是,消费者在消费某条消息失败后,Apache RocketMQ 服务端会根据重试策略重新消费该消息,超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中。

消息重试的触发条件

  • 消费失败,包括消费者返回消息失败状态标识或抛出非预期异常。
  • 消息处理超时,包括在PushConsumer中排队超时。

重试策略差异

3.4. 消费进度

消息位点(Offset)

消息是按到达服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。

任意一个消息队列在逻辑上都是无限存储,即消息位点会从0到Long.MAX无限增加。通过主题、队列和位点就可以定位任意一条消息的位置,具体关系如下图所示:

Apache RocketMQ 定义队列中最早一条消息的位点为最小消息位点(MinOffset);最新一条消息的位点为最大消息位点(MaxOffset)。虽然消息队列逻辑上是无限存储,但由于服务端物理节点的存储空间有限, Apache RocketMQ 会滚动删除队列中存储最早的消息。因此,消息的最小消费位点和最大消费位点会一直递增变化。

消费位点(ConsumerOffset)

Apache RocketMQ 领域模型为发布订阅模式,每个主题的队列都可以被多个消费者分组订阅。若某条消息被某个消费者消费后直接被删除,则其他订阅了该主题的消费者将无法消费该消息。

因此,Apache RocketMQ 通过消费位点管理消息的消费进度。每条消息被某个消费者消费完成后不会立即在队列中删除,Apache RocketMQ 会基于每个消费者分组维护一份消费记录,该记录指定消费者分组消费某一个队列时,消费过的最新一条消息的位点,即消费位点。

当消费者客户端离线,又再次重新上线时,会严格按照服务端保存的消费进度继续处理消息。如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。

注:消费位点的保存和恢复是基于 Apache RocketMQ 服务端的存储实现,和任何消费者无关。

队列中消息位点MinOffset、MaxOffset和每个消费者分组的消费位点ConsumerOffset的关系如下:

ConsumerOffset≤MaxOffset:

  • 当消费速度和生产速度一致,且全部消息都处理完成时,最大消息位点和消费位点相同,即ConsumerOffset=MaxOffset
  • 当消费速度较慢小于生产速度时,队列中会有部分消息未消费,此时消费位点小于最大消息位点,即ConsumerOffset<MaxOffset,两者之差就是该队列中堆积的消息量

ConsumerOffset≥MinOffset:

  • 正常情况下有效的消费位点ConsumerOffset必然大于等于最小消息位点MinOffset。消费位点小于最小消息位点时是无效的,相当于消费者要消费的消息已经从队列中删除了,是无法消费到的,此时服务端会将消费位点强制纠正到合法的消息位点。

消费位点初始值

消费位点初始值指的是消费者分组首次启动消费者消费消息时服务端保存的消费位点的初始值。Apache RocketMQ 定义消费位点的初始值为消费者首次获取消息时,该时刻队列中的最大消息位点。相当于消费者将从队列中最新的消息开始消费。

3.5. 消息存储机制

Apache RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。

4. 架构

4.1. 技术架构

RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer和Consumer仍然可以动态感知Broker的路由的信息。
  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。

4.2. 部署架构

RocketMQ 网络部署特点:

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

结合部署架构图,描述集群工作流程:

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

5. 客户端

在编写客户端代码时,首先准备一个简单的环境,可以选用Local模式。这里不多介绍,只说一句,启动broker的时候可以-c指定配置文件,启动完以后通过jps查看进程

通过mqadmin命令创建并查看主题

mqadmin updateTopic -n localhost:9876 -b 172.16.52.116:10911 -t TEST_TOPIC
mqadmin topicList -n localhost:9876

具体命令参数,参见  http://rocketmq.apache.org/zh/docs/deploymentOperations/16admintool/

也可以通过RocketMQ Dashboard创建主题

5.1. rocketmq-client

引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.0.0</version>
</dependency>

代码片段

public class AppTest extends TestCase {

    private String producerGroupName = "MyProducerGroup";
    private String consumerGroupName = "MyConsumerGroup";

    /**
     * 发送同步消息
     */
    @Test
    public void testSyncProducer() throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        // 发送消息
        Message message = new Message("TEST_TOPIC", "A", "UserID12345", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.send(message);
        System.out.println(sendResult);
        // 关闭Producer实例
        producer.shutdown();
    }

    /**
     * 发送异步消息
     */
    @Test
    public void testAsyncProducer() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);

        Message msg = new Message("TEST_TOPIC", "B", "OrderID12346", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // SendCallback接收异步返回结果的回调
        producer.send(msg, new SendCallback() {
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });
        //  等待5秒
        TimeUnit.SECONDS.sleep(5);
    }

    /**
     * 单向发送消息
     * 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
     */
    @Test
    public void testOnewayProducer() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("TEST_TOPIC", "C", "OrderID12348", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 发送单向消息,没有任何返回结果
        producer.sendOneway(msg);
    }

    /**
     * 消费消息
     */
    @Test
    public void testConsumer() throws Exception {
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroupName);
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TEST_TOPIC", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.start();

        while (true) {
            List<MessageExt> messageExts = consumer.poll();
            if (messageExts.isEmpty()) {
                continue;
            }
            messageExts.forEach(msg -> {
                System.out.println(String.format("MsgId: %s, MsgBody: %s", msg.getMsgId(), new String(msg.getBody())));
            });
            consumer.commitSync();
        }
    }
}

5.2. rocketmq-spring-boot-starter

依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

application.yml

配置项详见 org.apache.rocketmq.spring.autoconfigure.RocketMQProperties

rocketmq:
  name-server: localhost:9876
  producer:
    group: MyProducerGroup
    send-message-timeout: 10000
  consumer:
    group: MyConsumerGroup

发送消息

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author: ChengJianSheng
 * @Date: 2023/1/18
 */
@RestController
@RequestMapping("/message")
public class MessageController {

    private String springTopic = "SPRING_TOPIC";
    private String userTopic = "USER_TOPIC";
    private String orderTopic = "ORDER_TOPIC";
    private String extTopic = "EXT_TOPIC";
    private String reqTopic = "REQ_TOPIC";
    private String objTopic = "OBJECT_TOPIC";


    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send")
    public String send() {

        SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello World");

        Message message = MessageBuilder.withPayload("Hello World!2222".getBytes()).build();
        sendResult = rocketMQTemplate.syncSend(springTopic, message);

        message = MessageBuilder.withPayload("Hello, World! I'm from spring message").build();
        sendResult = rocketMQTemplate.syncSend(springTopic, message);


        sendResult = rocketMQTemplate.syncSend(userTopic, new User("zhangsan", 20));

        message = MessageBuilder.withPayload(new User("lisi", 21))
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)
                .build();
        sendResult = rocketMQTemplate.syncSend(userTopic, message);


        rocketMQTemplate.asyncSend(orderTopic, new Order("oid1234", "4.56"), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("async onSucess SendResult=%s %n", sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.printf("async onException Throwable=%s %n", throwable);
            }
        });

        rocketMQTemplate.convertAndSend(extTopic + ":tag0", "I'm from tag0");
        rocketMQTemplate.convertAndSend(extTopic + ":tag1", "I'm from tag1");

        String replyString = rocketMQTemplate.sendAndReceive(reqTopic, "request string", String.class);
        System.out.printf("receive %s %n", replyString);
        User replyUser = rocketMQTemplate.sendAndReceive(objTopic, new User("wangwu", 21), User.class);
        System.out.printf("receive %s %n", replyUser);

        return "ok";
    }
}

接收消息

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "SPRING_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class StringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.printf("------- StringConsumer received: %s \n", message);
    }
}

@Component
@RocketMQMessageListener(topic = "ORDER_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class OrderConsumer implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order message) {
        System.out.printf("------- OrderConsumer received: %s [orderId : %s]\n", message, message.getOrderNo());
    }
}

@Component
@RocketMQMessageListener(topic = "USER_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class UserConsumer implements RocketMQListener<User>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(User message) {
        System.out.printf("------ UserConsumer received: %s ; age: %s ; name: %s \n", message, message.getAge(), message.getName());
    }
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    }
}

@Component
@RocketMQMessageListener(topic = "REQ_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
    @Override
    public String onMessage(String message) {
        System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
        return "reply string";
    }
}

@Component
@RocketMQMessageListener(topic = "OBJECT_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User> {
    @Override
    public User onMessage(User message) {
        System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", message);
        return new User("tom", 8);
    }
}

@Component
@RocketMQMessageListener(topic = "EXT_TOPIC", selectorExpression = "tag0||tag1", consumerGroup = "${rocketmq.consumer.group}")
public class MessageExtConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        System.out.printf("------- MessageExtConsumer received message, msgId: %s, body:%s \n", message.getMsgId(), new String(message.getBody()));
    }
}

6. 文档

http://rocketmq.apache.org/zh/

http://rocketmq.apache.org/zh/docs/deploymentOperations/15deploy/

http://github.com/apache/rocketmq/tree/rocketmq-all-5.0.0/docs/cn

http://github.com/apache/rocketmq/blob/rocketmq-all-5.0.0/docs/cn/architecture.md

http://github.com/apache/rocketmq/blob/rocketmq-all-5.0.0/docs/cn/RocketMQ_Example.md

http://github.com/apache/rocketmq-dashboard

http://github.com/apache/rocketmq-spring

http://github.com/apache/rocketmq

 

本文转载于网络 如有侵权请联系删除

相关文章

  • Flask asyncio 异步处理请求

    来自:​​​​​​MakingFlaskasyncandQuartsync(pgjones.dev)示例:fromflaskimportFlask,jsonify,has_request_context,copy_current_request_context,requestfromfunctoolsimportwrapsfromconcurrent.futuresimportFuture,ThreadPoolExecutorimportasynciodefrun_async(func):@wraps(func)def_wrapper(*args,**kwargs):call_result=Future()def_run():loop=asyncio.new_event_loop()try:result=loop.run_until_complete(func(*args,**kwargs))exceptExceptionaserror:call_result.set_exception(error)else:call_result.set_result(result)finally:

  • 什么是反向代理?Nginx反向代理如何配置?

    nginx在日常工作中是一个不可缺少的服务,其中使用nginx做的事情最多的就是反反向代理,今天笔者带大家详细学习一下nginx反向代理。让我们直接开始!安装nginx在开始本文前,请先保证你的服务器已经安装好nginx,安装nginx非常简单。本文我将以ubantu系统为例,那么怎么在ubantu系统安装nginx呢?只需要一条命令即可:apt-getinstallnginx复制这里我就不带大家安装了,安装好后,执行一条命令:nginx-v复制如果出现以下结果:则代表你的nginx安装成功了,我这里安装的是1.14.0的版本。用apt-getinstallnginx命令安装完nginx后,有几个默认的目录需要大家记一下:主程序:/usr/sbin/nginx配置文件:/etc/nginx静态文件:/usr/share/nginx日志文件:/var/log/nginx这几个目录记好,后面配置的时候可能会用到。当然了,用源码部署的方式安装你就可以自己定义路径了。Nginx几个重要的命令启动:./usr/sbin/nginx或者servicenginxstart重启:./usr/sbin/

  • 通过通过案例带你轻松玩转JMeter连载(3)

    1.2录制HTTPS协议如果被测试软件是HTTP协议,我们应该如何通过JMeter本身自带的录制工具来录制呢?1)在第4.1.1-2的第9)步认真读一下上面的内容。在JMeter的根目录下产生了一个认证文件,有效期为7天。2)我们到%JMETER_HOME%/bin目录下会找到ApacheJMeterTemporaryRootCA.crt这个文件。3)在明命令行中输入mmc命令,打开控制台,如图22所示。图22控制台4)在控制台窗口中点击菜单“文件(F)->添加和删除管理单元”,如图4-23所示。图4-23添加和删除管理单元 5)点击左边的“证书”添加到右边,弹出如图4-24窗口。图4-24证书管理单元 6)点击【完成】按钮,如图4-25所示。图4-25添加“证书”到管理单元7)点击【确定】按键。8)展开“证书-当前用户”,选择“受信任的根证书颁发机构”。右击,在弹出菜单中选择“所有任务->导入(I)…”如图4-26所示。4-26控制台管理证书 9)在接下来的菜单,如图4-27所示。显示【下一步】按键。图4-27证书管理向导10)在接下来的窗口中选择浏览。选择%JMETER

  • PREDATOR: 低重叠三维点云的配准方法(CVPR2021)

    代码地址:在公众号「3D视觉工坊」,后台回复「PREDATOR」,即可直接下载。图1PREDATOR的将注意力集中在重叠区域,并选择该区域的显著点,以便在低重叠情况下仍能进行鲁棒配准。针对的问题:1.实际应用中很多情况点云是低重叠的。例如在狭窄的走廊上移动时或者在密集的建筑区域,森林等处移动,又例如考虑到数据采集的昂贵成本,点云采集往往追求低次数的扫描,只有必要的重叠。2.目前绝大多数的评价数据集都是高重叠率的点云数据,但当两个点云之间的重叠低于30%时,即使是最知名的方法的配准性能也会迅速恶化。重要的贡献:1.分析为什么现有的配准体系在低重叠制度下会崩溃2.提出一种新颖的重叠注意块,允许两个点云之间的早期信息交换,并将后续步骤集中在重叠区域上。3.提出一种改进特征点描述符的方案,调节点云时也同时在对应点云上进行调节。4.提出了一种新颖的损失函数来训练匹配度分数,有助于更好地提取兴趣点,使兴趣点具有更强的可重复性。算法原理:作者所提出的PREDATOR是一个双流编解码器网络。该网络的实现使用的是KPConvstyle点卷积的残差块,但是这个主干架构是不可知的,当然也可以用其他的三维卷积

  • Go 语言原生的 json 包有什么问题?如何更好地处理 JSON 数据?

    Go的“玩家”们看到这个题目可能会很疑惑——对于JSON而言,Go原生库encoding/json已经是提供了足够舒适的JSON处理工具,广受Go开发者的好评。它还能有什么问题?但是,实际上在业务开发过程中,我们遇到了不少原生json做不好甚至是做不到的问题,还真是不能完全满足我们的要求。那么,如果不用它用什么?它又有什么问题吗?使用第三方库的原因是什么?如何选型?性能如何?不过呢,在抛出具体问题之前,我们先来尽可能简单地了解一下Go目前在处理JSON中常用的一些库,以及对这些库的测试数据分析。如果读者觉得下面的文字太长了,也可以直接跳到结论部分。部分常用的GoJSON解析库Go原生encoding/json这应该是广大Go程序员最熟悉的库了,使用json.Unmarshal和json.Marshal函数,可以轻松将JSON格式的二进制数据反序列化到指定的Go结构体中,以及将Go结构体序列化为二进制流。而对于未知结构或不确定结构的数据,则支持将二进制反序列化到map[string]interface{}类型中,使用KV的模式进行数据的存取。这里我提两个大家可能不会留意到的额外特性:js

  • 以太坊Layer 2 扩容,V神为什么偏爱ZK rollup ?

    来源|巴比特资讯注:原文作者为以太坊创始人VitalikButerin。很多区块链应用最具价值的特征之一就是信任最小化(trustlessness):应用能够以预期的方式继续运行,而无需依赖特定参与者以特定的方式进行操作(即使他们的兴趣可能改变,并促使他们在未来以某种不同的意外方式行事)。区块链应用从来都不是完全无需信任的,但有些应用要比其它应用更接近于无需信任。如果我们朝着信任最小化的方向发展,那我们要能够比较不同程度的信任。首先,我对信任的简单定义是:信任是对他人行为的任何假设的运用。如果在新冠大流行之前,你走在街上而没有确保与陌生人保持两米的距离,他们也不会突然掏出刀子来捅你,这是一种信任:双方都相信人们很少会完全精神错乱,相信管理法律体系的人会继续提供强有力的措施来反对这种行为。当你运行由他人编写的一段代码时,你可以相信他们诚实地编写了这段代码(无论是出于正派观念还是出于维护名誉的经济利益),或者至少是有足够的人检查该代码,并发现到漏洞。不亲自去种植自己的食物,也是一种信任的体现:你要相信足够多的人会意识到种植食物符合他们的利益,以便将其出售给你。你可以信任不同规模的人群,并且

  • kubernetes中那些不为存储数据而存在的volume

    这kubernetes中,这类Volume不是为了存放数据,也不是用来做数据交换,而是为容器提供预先定义好的数据。所以从容器角度来看,这类Volume就像是被投射进容器一样。到目前为止,kubernetes支持4种这类Volume: (1)、Secret (2)、ConfigMap (3)、DownloadAPI (4)、ServiceAccountTokenSecretSecret的作用是把Pod想要访问的加密数据存放到Etcd中,然后可以在Pod容器中通过挂载的方式访问Secret里保存的信息。一旦Secret被创建,就可以通过下面三种方式使用它: (1)在创建Pod时,通过为Pod指定ServiceAccount来自动使用该Secret。 (2)通过挂载该Secret到Pod来使用它。 (3)在Docker镜像下载时使用,通过指定Pod的spc.ImagePullSecrets来引用它。例如,我们定义下面这个Pod:apiVersion:v1 kind:Pod metadata: name:pod-volume-test labels: app:pod-volume-test s

  • 重磅:2019中科院JCR分区正式出炉,PANS、NC重返一区

    2019年中科院分区JCR于12月16日新鲜出炉。版权归中国科学院文献情报中心科学计量中心,网址:http://www.fenqubiao.com。今年,分区表共设18个大类,176个小类(JCR学科分类体系JournalRanking)。在原有自然科学期刊的基础上,新增社会科学期刊(SSCI所收录的期刊)。材料科学新设置为一单独大类,预计有373本期刊将归属在材料科学大类。最多的仍然是医学大类,有3554本,工程技术大类有1188本。其中,一区共有1946本。综合类的1区期刊有7本;生物类的1区有76本;医学领域的1区有219本(本数据来源于iNature整理)。其中去年因离开一区而引起轩然大波的PNAS及NatureCommunications又重新回到一区。中科院JCR期刊分区(又称分区表、分区数据)是中国科学院文献情报中心世界科学前沿分析中心的科学研究成果。分区表设计的思路始于2000年之初,旨在纠正当时国内科研界对不同学科期刊影响因子数值差异的忽视。自2004年发布之后,分区表为我国科研、教育机构的管理人员、科研工作者提供了一份评价国际学术期刊影响力的参考数据,得到了全国各地

  • 用sphinx给PHP加个给力的搜索功能

    最近工作上需要实现搜索功能,尝试了几种方案。虽然最终线上部署的还是最low的方案,但是中间的过程还是比较有意思的。业务上根据关键字查找内容。关键字的出处多来源于标题,文章描述等。主要实现方式有一些几种,各个方式各有利弊,需要权衡。like模糊查询标题和描述,使用或条件查询like查询估计是最常用的方式了,也是最容易实现的方式。业务代码少,逻辑清晰,准确率也高。不用其他额外操作(比如分词)。但是有个非常致命的问题,那就是效率。效率非常低,特别是在数据量大的情况。测试过程中,在224256行数据中,对3749个字进行like查询,执行总时间长达4003秒。相当于每个查询需要花费1.06秒的查询时间。生成关键字表,使用关键字表进行查询对数据内容的标题和内容进行分词,把各个分词结果关联该内容。查询的时候根据查询关键字进行匹配。因为不是模糊搜索,所以可以使用数据库的索引,加快搜索速度。但是效果依赖于分词,以及用户输入关键词匹配程度。例如标题内容"2018年12月7日美国会通过加拿大边境墙预算",分词内容"2018年12月7日/美国/会/通过/加拿大/边境/墙/预算&

  • iOS开发常用之消息相关

    消息相关消息推送客户端SGPushDemo-消息推送客户端Orbiter-消息推送客户端:iOS推送通知注册。PushDemo-客户端消息接收消息代码,IOS开发之----IOS8推送消息注册,分分钟搞定IOS远程消息推送。消息推送服务端javapns源代码-消息推送的java服务端代码,注意:DeviceToken中间不能有空格。pushMeBaby-Mac端消息推送端代码,注意:DeviceToken中间要有空格。通知相关JSQNotificationObserverKit-一款轻量,易用的通知发送及响应框架类库。作者是知名开源项目JSQMessagesViewController(Objective-C版即时聊天)的作者JesseSquires。GLPubSub-一个简短实用的NSNotificationCenter的封装。Homeoff-用swift写了一个模仿Launcher通知中心快捷方式的应用。支持20个应用,并增加了一个返回到桌面来解放Home键的功能。JDStatusBarNotification-在状态栏顶部显示通知。可以自定义颜色字体以及动画。支持进度显示以及显示

  • JHipster生成微服务架构的应用栈(五)- 容器编排示例

    本系列文章演示如何用JHipster生成一个微服务架构风格的应用栈。 环境需求:安装好JHipster开发环境的CentOS7.4(参考这里) 应用栈名称:appstack 认证微服务:uaa 业务微服务:microservice1 网关微服务:gateway 实体名:role 主机IP:192.168.220.1201前提条件1.1已经生成微服务架构的应用栈请参考这个系列的前4篇文章。1.2安装DockerCompose推荐版本:1.21.2完整安装说明,请参考这里1.3创建一个编排目录在命令行,进入appstack目录,创建一个子目录docker-compose,现在整个应用栈的目录结构是这样的:--appstack |--uaa |--microservice1 |--gateway |--docker-compose复制1.4公共镜像预先下载openjdk:8-jre-alpine容器镜像,能提高后续工作的效率。2构建微服务的镜像2.1构建所有微服务的镜像注意:编写本文时使用的JHipster版本为5.1.0,镜像构建命令为:'dockerfile:build

  • 触控屏可以丢掉了,未来我们可以用思维控制任何东西

    据CNET报道,只用大脑思维来控制你身边的事物,这种能力好像只存在于某些科幻小说中。尽管在今天,这种想法听起来仍然有许多未来主义色彩。虽然如此,但是世界各地的神经学家仍然在努力研究怎样为大脑创造一个数字界面,并且在最近几年取得了非常大的进展。尽管这项技术现在还处在起步阶段,但是放弃触控屏、利用大脑思维控制周围的世界似乎已经不再遥不可及。在很多情况下,脑机接口(BCI)当前只针对那些受到了严重伤害的人,这些伤害使他们部分甚至是完全瘫痪,包括丹尼斯·德格拉伊(DennisDegray)。2007年,德格拉伊在雨中滑倒之后,发现自己脖子以下陷入了瘫痪。但是在今年的早些时候,斯坦福大学的神经学家成功地帮助他打破了使用电脑打字的世界纪录。斯坦福大学的神经外科医生揭秘·亨德森(JaimieHenderson)提供帮助是成功的一部分原因。他在德格拉伊的大脑中植入了两个很小的电极阵列这些电极阵列监测大脑活动,并且对运动皮层神经元所发射的复杂电信号进行解码。运动皮层是人体运动的控制中心,这些解码之后的信号被用来与虚拟键盘进行交互。德格拉伊通过使用这种办法,可以只用大脑在一分钟内准确地输出39个字符。毫无

  • Leetcode 166 Fraction to Recurring Decimal

    Giventwointegersrepresentingthenumeratoranddenominatorofafraction,returnthefractioninstringformat.Ifthefractionalpartisrepeating,enclosetherepeatingpartinparentheses.Forexample,Givennumerator=1,denominator=2,return"0.5".Givennumerator=2,denominator=1,return"2".Givennumerator=2,denominator=3,return"0.(6)".分数转小数。模拟小数除法,狂WA不止,还是不够细!1.考虑结果正负,2.全部转为正数,因为负数会不便于中间处理,3.用map保存余数和商对应位置的关系,出现重复余数直接插入括号结束.4.用longlong,INT_MIN变成正值会炸5.我用stringstream进行数转string,看到有人这么做:res+=to_strin

  • pycharm 设置环境_pycharm 虚拟环境

    大家好,又见面了,我是你们的朋友全栈君。 今天有点小收获,做一点积累吧。pycharm使用的是2018.3.1专业版,python使用的是3.7.1首先是新建工程,打开pycharm之后,面对窗口如下:选择createnewproject,接下来会弹出如下窗口这个页面有两个选项,Newenvironmentusing这个选项是建立一个虚拟的python运行环境,目录就是之前自己设置的项目目录下的venv(virtuleenvironment简称),这个虚拟环境可以包含你运行本工程需要的支持包,并可以在这个虚拟的环境中安装新的支持包,这能给你建立一个相对独立的python环境,这个新建虚拟环境的下面两个子选项的意思是继承全局的site-package,含义就是将自己python路径下的site-package链接到你的venv下面以供使用(在venv下面生成几个配置文件,可以链接到你的本地python/Lib/site-package),第二个子选项的含义是你在这个工程中使用的python解释器可以对其他工程可见。第二个选项的意思是使用自己安装的python编译器去运行此工程,当然已经安

  • Spring容器管理对象和new对象

    https://blog.csdn.net/weixin_30296405/article/details/95652402

  • CentOS 使用yum命令安装出现错误提示”could not retrieve mirrorlist http://mirrorlist.centos.org

    CentOS使用yum命令安装出现错误提示”couldnotretrievemirrorlisthttp://mirrorlist.centos.org这个错误, 在网上找了好多,但是都不正确。最后终于解决了。 总结下, 1.在命令提示符中输入:vi/etc/sysconfig/network-scripts/ifcfg-eth0 用vi打开这个文件后,将ONBOOT和MM_CONTROLLED改成如下 ONBOOT=yes MM_CONTROLLED=no 复制 2.重启,reboot 3.在试试yum命令,应该可以了 你的能量超乎你想象_________

  • 单工,半双工,全双工通信区别

     

  • 391. Perfect Rectangle

    问题: 给定多个小矩形,rec[i]={x1,y1,x2,y2} (x1,y1)代表小矩形的↙️左下顶点坐标 (x2,y2)代表小矩形的↗️右上顶点坐标 这些小矩形是否能组成一个完美矩形。 要求:不能存在重叠or空余的内部空间。 Example1: Input:rectangles=[[1,1,3,3],[3,1,4,2],[3,2,4,4],[1,3,2,4],[2,3,3,4]] Output:true Explanation:All5rectanglestogetherformanexactcoverofarectangularregion. Example2: Input:rectangles=[[1,1,2,3],[1,3,2,4],[3,1,4,2],[3,2,4,4]] Output:false Explanation:Becausethereisagapbetweenthetworectangularregions. Example3: Input:rectangles=[[1,1,3,3],[3,1,4,2],[1,3,2,4],[3,2,4,4]] Out

  • MWeb 1.4 新功能介绍二:静态博客功能增强

    MWeb比较有特色的是一键生成静态博客功能,然后从MWeb最开始规划要做静态博客生成功能时,我就希望MWeb的静态博客生成功能在易用的同时,还要有很强大的扩展性。 比如说能自己增加网站公告,这个公告可以随时更换和取消。再比如说能自己为网站增加广告,而且可以增加多个广告位,google的或者自定的广告都可以。这些之前都可以通过修改模板解决,但是有些是解决不了的:比如说把文章列表做成像thevergehttp://www.theverge.com/reviews的页的效果,就是列表中有设定的特色图片,有自定的简介,有评分,有价格时会显示价格等。比如说设定SEO的一些信息,设定分享到Facebook的一些信息。又比如说播客类的网站,要生成Apple规格的RSS。 1.4版的静态博客增强就是解决这些问题的。我一直考虑如何介绍这个功能,然后因为之前比特新声的主播之一郝海龙随意问了一下是否支持生成Podcast的RSS。所以我想来想去,还是介绍用MWeb自带的greyshade网站模板做一个可以写博客又可以做为播客的新模板好了。 这个新模板的Demo网址是:http://coderforar

  • ReentrantLock 公平锁源码 第1篇

    ReentrantLock1 这篇还是接着ReentrantLock的公平锁,没看过第0篇的可以先去看上一篇https://www.cnblogs.com/sunankang/p/16456342.html 这篇就以问题为导向,先提出问题,然后根据问题去看代码 确保能唤醒排队的线程? A,B两线程,A线程执行完业务释放锁过程中B线程添加进了链表,如何保证B线程能正常醒来 现在假设A线程走完tryAcuqire后获取到锁,执行业务代码,最后unlock()tryAcquire代码就不进去看了,上篇讲过了现在只需关注两个点 lock方法中的acquireQueued用来park unlock方法中的release用来unpark 首先来看park的条件是啥 publicfinalvoidacquire(intarg){ if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg)) selfInterrupt(); } 复制 进入acquireQueued方法 finalbooleanacquireQ

  • programming review (c++): (2)binary tree, BFS, DFS, recursive, non-recursive

    1.二叉树定义 //Definitionforabinarytreenode. structTreeNode{ intval; TreeNode*left; TreeNode*right; TreeNode(intx):val(x),left(NULL),right(NULL){} }; 复制   2.遍历 a.递归先序: //递归先序:中左右。PS:中序-左中右,后序-左右中,调换cout的位置即可 voidNLR(TreeNode*T) { if(T!=NULL){    cout<<T->val; NLR(T->left); NLR(T->right); } return; }复制  PS: 递归变量传递的几种方式: 参数里,比如可以以此标记每个node的深度; return,适合做累计操作,例子: intmaxDepth(TreeNode*root)//求最大深度:反过来算+max,符合逻辑 { returnroot==NULL?0:max(maxDepth(root->left),maxDepth(root-

相关推荐

推荐阅读