MQ系列11:如何保证消息可靠性传输(除夕奉上)

MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的发送模式
MQ系列6:消息的消费
MQ系列7:消息通信,追求极致性能
MQ系列8:数据存储,消息队列的高可用保障
MQ系列9:高可用架构分析
MQ系列10:如何保证消息幂等性消费

1 介绍

这篇我们来说说 MQ 消息的可靠性传输。可靠性传输其实包含两种情况:一种是重复消费的情况,我们上一篇的幂等性消费解决的就是这个问题;另外一种是消息丢失的情况的,要确保我们生产的消息一定最终会得到消费。这时候就要从消息执行的几个阶段去保证,每一个阶段都不能出现问题。
image

2 消息生产阶段

消息生产阶段指的是消息从生产到消息发送出去,经过网络传输,再到达Broker服务器并被接收的这整个阶段,我们需要一个健壮的确认机制(ACK)来保证消息传递的可靠性。如果说消息被接收到之后可以反馈给消息生产方去确认,那这个过程就比较完美了。

  • 消息创建和发送事务性原则保证,要么成功,要么不成功
  • 同步发送时,处理好返回值,如果发生异常,则进行异常捕捉并处理。
  • 异步发送时,处理好回调的工作,如果发生异常,则进行异常捕捉并处理。
  • 异常/超时重试机制:如果长时间收不到确认返回结果,则需要进行重试;如果返回的结果是异常的,也可以有限的进行重试。
    超时重试和异常重试需要谨慎使用,重试次数也要谨慎斟酌。建议只对消息丢失、错误、丢失特别敏感的时候使用,如果过度使用,反而可能造成请求堆积,队列阻塞。
    image

3 消息服务器处理阶段

Broker作为消息服务器,主要用于消息收发的操作。一般情况下只要消息服务正常运行,并依赖数据持久化能力,丢消息的可能行就比较小。
但是在很多场景下,为了提升消息队列的效率,为了提升吞吐能力,在没有确定完成持久化动作(刷盘)之前,就会把确认消息返回。即只要消息进行
Commit了,那就是成功的。但是如果还没持久化成功便发生了宕机,那就有存在消息丢失的风险。可以参照如下优化:

  • 单节点模式下的Broker,优化Broker参数,在收到消息并持久化到磁盘之后才把确认消息返回给生产者 Producer。下面以RocketMQ为例子介绍配置优化手段:
    • 如果是RabbitMQ,则将Message的delivermode设置为2,exchange持久化动作操作完成之后才返回确认消息,确保消息不丢失;
    • 将 flushDiskType 设置为 SYNC_FLUSH,这是同步刷盘的意思,那就要求把这个动作同步完成之后才算消息发送成功。
  • 上面说的是单节点模式,如果配置了集群模式,一般是多副本,则要求确认消息要发到 一半以上(N/2 + 1)的节点并得到响应。这样Producer才算真正发送成功。
    image

4 消息消费阶段

消息存储到了Broker之后,剩下的就是消息消费了。消息消费阶段跟生产阶段大概一致,都是使用确认机制来保证消息的可靠性和传输的。
当Consumer从Broker拉取到消息之后,开始消费消息,执行业务的的逻辑程序,业务程序执行成功后,才给Broker发送消费确认响应。
如果没成功或者消息在发送中途丢失,就没有确认响应,这样的话,在下一轮消息拉取的时候,Broker依旧会返回这一条消费数据给你,避免网络抖动原因或者Consumer在执行消费出错导致丢失。

4.1 消费分区的策略模式

多个消费者消费用一个分区,我们经常会出现这种情况:同一个Consumer Group 里面有多个Consumer,比如Comsumer A 拉走了某一批数据,但是还没返回确认消息,Consumer B 又过来要 拉数据了,Broker要怎么判定呢?
这边举个例子:Consumer A 拉取 index = 106 位置的数据,但是还没返回消费完成的确认信息,这时候消费位置依然是 index = 10086,如果 Consumer B 也过拉取数据,则

  • Broker接收确认信息的时间未超时(比如配置为5s),则说明Consumer A还在消费中,回绝了Consumer B的请求。
  • Broker接收确认信息的时间已超时(比如配置为5s),则说明Consumer A消费失败了,返回 index = 106 位置的消息数据给 Consumer B。
    所以,多个消费者消费同一个分区,要严格按照顺序消费,具体可以参考官网的介绍,很详细。

4.2 消费重试和死信队列

在RocketMQ中,当消息第一次消费失败时,消息队列会自动进行消息重试,达到最大重试次数(可配置阈值,比如5)后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,消息队列RocketMQ版不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。这种无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
可以使用单独的作业服务进行独立处理,比如重新发送死信消息进行消费,避免消息漏处理导致业务服务可用性问题。

image

5 总结

总得来说:MQ可以从三个角度来分析:生产者丢数据、消息队列服务器(Broker)丢数据、消费者丢数据
生产者丢数据:RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
消息队列服务丢数据:开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。
消费者丢数据:与生产者基本一直,等消费完成并接收到confirm才能确认是消费成功。超时或者失败则重试,重试超过指定阈值的时候,计入死信队列并独立处理。

架构与思维公众号 架构与思维·公众号:撰稿者为bat、字节的几位高阶研发/架构。不做广告、不卖课、不要打赏,只分享优质技术 ★ 加公众号获取学习资料和面试集锦 码字不易,欢迎关注,欢迎转载 作者:翁智华 出处:http://www.cnblogs.com/wzh2010/ 本文采用「CC BY 4.0」知识共享协议进行许可,转载请注明作者及出处。
本文转载于网络 如有侵权请联系删除

相关文章

  • Java 中 3 种常见的 IO 模型

    知识背景操作系统:为了保证操作系统的稳定性和安全性,一个进程的地址空间被分为用户空间和内核空间;用户空间不能直接访问内核空间,要想访问必须进行系统调用;IO操作只有内核空间才能完成,所以用户进程需要进行系统调用;所以用户空间仅仅是发起系统调用请求,真正的IO操作执行是由内核空间完成的。常见的IO模型:同步阻塞IO⭐同步非阻塞IOIO多路复用⭐信号驱动IO异步IO⭐其中带有星号的模型为java中常见的3种模型,下面将分别介绍。BIOBIO即BlockingI/O;字面意思就可以看出它属于同步阻塞IO。如下图,应用程序发出一个read调用,内核空间需要经历准备数据的几个阶段,准备好之后返回数据给应用程序。期间如果另一个应用程序也需要read调用,那么它必须等待;这就是阻塞。BIO最大的特点就是一次只能处理一个调用,这在高并发的场景下肯定是不行的。示例代码客户端:publicclassIOClient{ publicstaticvoidmain(String[]args){ //创建多个线程,模拟多个客户端连接服务端 newThread(()->{ try{ Socketsocket

  • 「类与对象」说一说isa指针

    概要在讲isa指针前,我们先来看一道经典的面试题:对象的isa指针指向哪里?复制看到这道题,心中可能朦朦胧胧有些答案,也可能不太确定,抑或说不明白。那咱就带着这个问题,揭开isa指针的神秘面纱。这篇文章主要就isa的作用、数据结构以及如何优化等方面进行讲解。isa的作用在文章NSObject对象的分类中,详细讲解了isa指针的指向、如何寻找实例方法和类方法以及如何通过isa指针找到类对象和元类对象的。基本总结一下,instance对象的isa指向instance对象所对应的Class对象,Class对象的isa指向Class对象所对应的MetaClass对象。isa结构变化在ARM32位的时候,isa的类型是Class类型的,直接存储着实例对象或者类对象的地址,具体结构如下所示:typedefstructobjc_class*Class; typedefstructobjc_object{ Classisa; }*id;复制在ARM64结构下,isa的类型变成了共用体(union),使用了位域去存储更多信息。isa_t结构如下所示:unionisa_t { Classcls; uint

  • 还在使用kill -9 pid结束spring boot项目吗?那你已经落伍了!

    作者:流星007 blog.csdn.net/qq_33220089/article/details/105708331kill-9pid???kill可将指定的信息送至程序。预设的信息为SIGTERM(15),可将指定程序终止。若仍无法终止该程序,可使用SIGKILL(9)信息尝试强制删除程序。程序或工作的编号可利用ps指令或jobs指令查看(这段话来自菜鸟教程)。讲的这个复杂,简单点来说就是用来杀死linux中的进程,啥?你问我啥是进程?请自行百度。我相信很多人都用过kill-9pid这个命令,彻底杀死进程的意思,一般情况我们使用它没有上面问题,但是在我们项目中使用它就有可能存在致命的问题。kill-9pid带来的问题由于kill-9属于暴力删除,所以会给程序带来比较严重的后果,那究竟会带来什么后果呢?举个栗子:转账功能,再给两个账户进行加钱扣钱的时候突然断电了?这个时候会发生什么事情?对于InnoDB存储引擎来说,没有什么损失,因为它支持事务,但是对于MyISAM引擎来说那简直就是灾难,为什么?假如给A账户扣了钱,现在需要将B账户加钱,这个时候停电了,就会造成,A的钱被扣了,但

  • RDMA网络下重思数据库高可用

    RDMA网络下重思数据库高可用摘要高可用数据库系统常常使用用数据复制来达到容错的目的。Active-passive和active-active复制算法都是严重依赖于时延,网络常常成为性能的主要瓶颈。从某种意义上说,这些技术旨在最小化副本之间的网络通信。然而,下一代网络的出现,以期高吞吐低延迟的特性,使得需要重视这些假设。首先提出,现代RDMA网络使得瓶颈转向CPU,因此现代网络优化的复制技术不再是最优选择。提出了一个新高可用机制active-memory复制,充分利用RDMA达到消除在复制中处理多余工作的目的。使用active-memory,所有replica都将CPU能力专用于执行新事物,而不是复制的冗余计算。当出现故障时,active-memory通过基于RDMA的undo机制,维护高可用和数据正确性。实验表明,active-memory比第二种协议在RDMA网络上快2倍。引言任何传统数据库系统都有一个关键功能:高可用。单机情况下,故障会导致数据库服务不可用并且会造成数据丢失。高可用通常通过分布式数据复制来完成。主机上update会复制到备机从而当主机故障时可以被备机替代。传统分布

  • 有了 elseif 为什么还要 switch case

    引出你有没有想过既然有了ifelseif,为什么还要设计一个switchcase的语法出来呢?按理说,一个语言的设计角度来说,关键词越少越好吧,而且多出来一种选择分支也没有看出太大用处.以下几种switchcase均可以写成ifelse的形式(java代码)://形式一 switch(a){ case1: break; case2: break; default: break; } if(a==1){ }elseif(a==2){ }else{} //形式二 switch(a){ case1: case2: //dosomething break; default: break; } if(a==1||a==2){ }else{} 复制经过多次尝试,所有的switchcase都可以转换成ifelse.那当初涉及语言的大佬为什么要多此一举搞了这么一个switchcase出来呢?是何用意?甚至还要加一个时不时就忘记的break.分析虽然我们已然回不到当初设计高级语言的哪个年代,但是要继承胡先生的思想:"大胆假设,小心求证".为了了解为什么要设计出switchcase

  • Java和Python哪个更适合初学者的问题

    我个人不是很喜欢讨论这个问题,为什么呢,每个人都学习能力不一样,你要是不行,哪个对于你也不简单。客观分析,这两种语言都在程序员的工具箱中都占有一席之地。学习哪个并不重要,个人觉得对于入门级同学来说,你要学的是“编程思想的基础知识“,而不是”特定的编程语言“。那这两门语言有区别吗?当然有,对于有一定编程经验的人来说,哪个更顺滑当然能感觉都到。非要比比呢,那就比比。同样都功能,用Java和Python编写这么看你觉得哪个简单?如果是你纯小白,我相信你看这两个哪个也不简单。 进一步分析你玩儿的最6的语言就是最好的语言能够更好地表达自己思路的语言不一定是最简单的编程语言,相反你玩儿的最6都那个就是最好的语言。 编程就像爱情,你越熟练,你就越能用这种语言创造价值。别误会我的意思,我并不是让你应该爱上任何编程语言,但是,鼓励你爱上编程的艺术。学习基本概念非常重要,许多程序员甚至不知道设计模式是什么。这就解释了为什么公司里有人编写代码低效bug多的代码。编程语言就是程序员的工具例如,锤子和扳手的使用方式截然不同,螺丝刀与钢锯的用途不同,假设现在是午夜,工匠想把钉子钉在墙上,但他唯一的工具是扳手,螺丝

  • Python有哪些技术上的优点?比其他语言好在哪儿?

    导读:本文是对Python一些最优的技术特性的快速介绍,以解答初学者关心的问题并消除他们最常见的疑惑。作者:马克·卢茨(MarkLutz)如需转载请联系大数据(ID:hzdashuju)01Python有哪些技术上的优点1.面向对象和函数式从根本上讲,Python是一种面向对象的语言。它的类模型支持多态、运算符重载和多重继承等高级概念,并且以Python特有的简洁的语法和类型为背景,OOP十分易于使用。事实上,即使你不懂这些术语,仍会发现学习Python比学习其他OOP语言要容易得多。除了作为一种强大的代码组织和重用手段以外,Python的OOP本质使它成为其他面向对象系统语言的理想脚本工具。例如,通过适当的粘接代码,Python程序可以对C++、Java和C#的类进行子类的定制。OOP只是Python的一个选择而已,这一点非常重要。即使不能立马成为一个面向对象高手,但你同样可以继续深入学习。就像C++一样,Python既支持面向对象编程也支持面向过程编程的模式。如果条件允许,其面向对象的工具可以立即派上用场。这对策略开发模式十分有用,该模式常用于软件开发的设计阶段。除了最初的过程式(

  • R语言之词云:wordcloud&wordcloud2安装及参数说明

    一、wordcloud安装说明     install.packages("wordcloud"); 二、wordcloud2安装说明     install.packages("devtools");         devtools::install_github("lchiffon/wordcloud2",type="source")      错误提示:     解决办法:     install.packages("htmltools"); 三、wordcloud参数说明3.1函数原型 wordcloud(words,freq,scale=c(4,.5),min.freq=3,max.words=Inf,random.order=TRUE,random.color=FALSE,rot.per=.1,colors="black",ordered.colors=FALSE,use.r.layout=FALSE,...)3.2常用参数 (1)words——关键词列

  • git subtree pull 错误 Working tree has modifications

    gitsubtree是不错的东西,用于git管理子项目。本文记录我遇到问题和翻译网上的答案。当我开始pull的时候,使用下面的代码gitsubtreepull--prefix=<本地子项目目录><远程库仓库地址|远程库别名><分支>--squash复制其中--squash参数是把子项目的记录合成一次commit提交到主项目,这样主项目只是合并一次commit记录。但是在我执行这句代码的时候,出现下面的错误Workingtreehasmodifications.Cannotadd.复制当我检查本地是否有没提交的保存时候,没有找到gitstatus复制这个问题是因为gitdiff-indexHEAD返回结果,即使本地没提交,解决这个问题很简单。切换到本地另一个分支然后切换回来,这样就可以解决gitcheckout其他分支 gitcheckoutmaster复制如果执行了上面的命令还无法使用,请告诉我。Gitsubtree管理子项目包使用小结https://stackoverflow.com/a/18608538/6116637

  • 看看上下文映射的清晰视图

    在我之前的文章中,我详细讨论了有界上下文以及如何处理域的复杂性。最好将域划分为几个子域,并将它们映射到不同的有界上下文,其中每个业务实体/值对象在该上下文中都具有一定的含义,因此业务的每个利益相关者(产品所有者,开发人员,架构师和赞助商)都理解上下文和具有适当分类标准的实体。当我们在商业利益相关者之间以统一的语言讨论域对象时,就不会对命名造成混淆。在有界上下文中,我们正确地定义了一个业务模型,根据业务领域创建了不同的上下文,但一个功能总是跨越多个业务实体,这些实体位于不同的有界上下文/域中,因此了解有界上下文之间的关系非常重要,架构业务解决方案上下文映射是一种技术,通过这种技术,我们可以可视化不同上下文之间的关系,集成架构师可以选择最佳的集成模式来与其他上下文进行通信。为什么上下文映射在设计解决方案时如此重要借助UML图,架构师可以了解不同部分与其他部分的通信方式。它为架构师提供了不同上下文之间通信的视图。这很好,但是上下文映射出现在UML图之前;这有助于可视化关系的本质,并且基于这种性质,架构师可以决定应该采用什么样的技术解决方案。上下文映射可视化的最好部分是它讨论了关系的性质。它不

  • “大话架构”阿里架构师分享Java程序员需要突破的技术要点一、源码分析二、分布式架构三、微服务四、性能优化五、Java工程化

    一、源码分析源码分析是一种临界知识,掌握了这种临界知识,能不变应万变,源码分析对于很多人来说很枯燥,生涩难懂。源码阅读,我觉得最核心有三点:技术基础+强烈的求知欲+耐心。我认为是阅读源码的最核心驱动力。我见到绝大多数程序员,对学习的态度,基本上就是这几个层次(很偏激哦):1、只关注项目本身,不懂就baidu一下。2、除了做好项目,还会阅读和项目有关的技术书籍,看wikipedia。3、除了阅读和项目相关的书外,还会阅读IT行业的书,比如学Java时,还会去了解函数语言,如LISP。4、找一些开源项目看看,大量试用第三方框架,还会写写demo。5、阅读基础框架、J2EE规范、Debug服务器内核。大多数程序都是第1种,到第5种不光需要浓厚的兴趣,还需要勇气:我能读懂吗?其实,你能够读懂的耐心,真的很重要。因为你极少看到阅读源码的指导性文章或书籍,也没有人要求或建议你读。你读的过程中经常会卡住,而一卡主可能就陷进了迷宫。这时,你需要做的,可能是暂时中断一下,再从外围看看它:如API结构、框架的设计图。下图是我总结出目前最应该学习的源码知识点:“大话架构”阿里架构师分享的Java程序员需要突

  • 喜迎国庆 | 图扑数字孪生军演,构建跨域作战体系

    前言 战争造成的伤害是无法磨灭的。有人提出“战役元宇宙”的概念,让人们在虚拟世界中体验战争的创伤,期望唤醒参与者对和平的向往。随着扩展现实、数字孪生、3D渲染、高速网络、区块链等技术的发展及终端设备的迭代,让“战役元宇宙”有望照进现实,为练兵备战提供虚实结合、物网相联的时空环境。 系统分析 1995年9月,美军在巴拿马举行了名为FuertasDefensas的军事演习,开启了信息化战争条件下的战场态势可视化先河。2021年5月的巴以冲突中,以色列代号为“城墙卫士”的行动被称为世界上第一场“人工智能战争”。人工智能、建模与仿真技术等综合集成构建的人机智能推演和创新系统,推动建模仿真在基础理论与方法、顶层架构、装备全生命周期、军事演习、作战训练与保障等诸多方面取得重要进展。实兵演习在可视化技术的加持下,逐渐有了多维度、全感官、沉浸式的互联网体验,加快了“战场元宇宙”的演进。   军演实时环境呈现受限于战场数据链带宽限制和敌方的电磁干扰,在无法实时传送战场真实情况的条件下,战场态势可视化系统多采用仿真技术对战场环境和敌我态势进行还原,通过仿真沙盘模拟战局。图扑软件基于地理空间数据

  • aop计时,超时输出日志

    一个正在顺着生活规律挺近的青年,首先应注意,自己的才能和愿望与事业相衡。——培根 代码如下:packagecom.ruben.simplestreamquery.aop; importcom.alibaba.ttl.TransmittableThreadLocal; importlombok.extern.slf4j.Slf4j; importorg.aspectj.lang.ProceedingJoinPoint; importorg.aspectj.lang.annotation.Around; importorg.aspectj.lang.annotation.Aspect; importorg.springframework.stereotype.Component; importorg.springframework.web.context.request.RequestContextHolder; importorg.springframework.web.context.request.ServletRequestAttributes; importjavax.ser

  • 45.继续求多项式

    #include<stdio.h> intmain() { intn,i,j,sum; while(scanf("%d",&n)!=EOF){ sum=0; for(i=1;i<=n;i++){ for(j=1;j<=i;j++) sum+=j; } printf("%d\n",sum); } return0; }复制   C语言从零开始,我们都是新人,我们一起成长

  • SQL行转列

    createtable#tTemp ( iIDintIDENTITY, cTechParaNamevarchar(64)null, cTechParaName2varchar(64)NULL, iSerialvarchar(32) ) insertinto#tTemp(cTechParaName,cTechParaName2,iSerial) selectcTechParaName,isnull(cTechParaName2,cTechParaName)ascTechParaName2,iSerial fromtEqaTechParaDefinewherecEquiSortCode='0001'orderbyiSerial selectcTechParaName,isnull(cTechParaName2,cTechParaName)ascTechParaName2,iSerial fromtEqaTechParaDefinewherecEquiSortCode='0001'orderbyiSerial createtable#tColumn_Temp ( iIDint, ) ins

  • poj3468(A Simple Problem with Integers)

    题目地址:ASimpleProblemwithIntegers   题目大意:    给你N个数进行两种操作,Q,代表查询区间的和,C代表是在区间内所有的值都增加val值。结果可能大于2^32。   解题思路:    线段数,区间更新,区间求和。   代码: 1#include<algorithm> 2#include<iostream> 3#include<sstream> 4#include<cstdlib> 5#include<cstring> 6#include<cstdio> 7#include<string> 8#include<bitset> 9#include<vector> 10#include<queue> 11#include<stack> 12#include<cmath> 13#include<list> 14//

  • Python之旅:MySQL系列

    第一篇:初识数据库 第二篇:库操作 第三篇:表操作 第四篇:数据操作 第五篇:索引原理与慢查询优化 第六篇:数据备份、pymysql模块 第七篇:视图、触发器、事务、存储过程、函数 第八篇:ORM框架SQLAlchemy 作者:墨颜丶——在南方的北方少年郎 签名:当你的才华还撑不起你的野心的时候,你就应该静下心来学习 出处:http://www.cnblogs.com/moyand/ 本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

  • 《电子科技大学公开课:网络时代的信息安全》

    1.   拿不到、看不见、赖不掉   攻得进、守得住、打不垮

  • Spring Security实现禁止用户重复登陆(配置及原理)

    系统使用了SpringSecurity做权限管理,现在对于系统的用户,需要改动配置,实现无法多地登陆。   一、SpringMVC项目,配置如下: 首先在修改Security相关的XML,我这里是spring-security.xml,修改UsernamePasswordAuthenticationFilter相关Bean的构造配置 加入 <propertyname="sessionAuthenticationStrategy"ref="sas"/>复制 新增sas的Bean及其相关配置 <beanid="sas"class="org.springframework.security.web.authentication.session.CompositeSessionAuthenticationStrategy"> <constructor-arg> <list> <beanclass="org.springframework.security.web.authentication.session.Concurr

  • SQLSERVER 自增列,值突然增大1000

    SQLSERVER自增列,值突然增大1000https://blog.csdn.net/lichxi1002/article/details/40074247  

  • 在线考试————随机出题

    //获取前台页面的值 //随机出现题目 @RequestMapping(value="/queryQuestion") publicStringqueryQuestion(TestQuestionques,HttpServletRequestrequest){ //获取前台输入个数数组写死直接尽心个获取比较 //获取所有的题目主干 //List<Map<String,Object>>radioCounts= //disposeService.queryQuestions(); //存放option表的内容 List<Map<String,Object>>radioOptionCount=disposeService.queryOption(); //抽取单选 List<Map<String,Object>>danXuan=disposeService.queryQuestionDanxuan(); //抽取多选 List<Map<String,Obje

相关推荐

推荐阅读