【RocketMQ】消息拉模式分析

RocketMQ有两种获取消息的方式,分别为推模式和拉模式。

推模式
推模式在【RocketMQ】消息的拉取一文中已经讲过,虽然从名字上看起来是消息到达Broker后推送给消费者,实际上还是需要消费向Broker发送拉取请求获取消息内容,推模式对应的消息消费实现类为DefaultMQPushConsumerImpl,回顾一下推模式下的消息消费过程:

  1. 消费者在启动的时候做一些初始化工作,它会创建MQClientInstance并进行启动;
  2. MQClientInstance中引用了消息拉取服务PullMessageService和负载均衡服务RebalanceService,它们都继承了ServiceThread,MQClientInstance在启动后也会对它们进行启动,所以消息拉取线程和负载均衡线程也就启动了;
  3. 负载均衡服务启动后,会对该消费者订阅的主题进行负载均衡,为消费者分配消息队列,并创建PullRequest拉取请求,用于拉取消息;
  4. PullMessageService中等待阻塞队列中PullRequest拉取请求的到来,接着会调用DefaultMQPushConsumerImplpullMessage方法进行消息拉取;
  5. 消费者向Broker发送拉取消息的请求,从Broker拉取消息;
  6. 消费者对Broker返回的响应数据进行处理,解析消息进行消费;

推模式下进行消息消费的例子:

@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerTest {
    private String consumerGroup;
    private String topic = "FooBar";
    private String brokerName = "BrokerA";
    private MQClientInstance mQClientFactory;

    @Mock
    private MQClientAPIImpl mQClientAPIImpl;
    private static DefaultMQPushConsumer pushConsumer;

    @Before
    public void init() throws Exception {
        // ...
        // 消费者组
        consumerGroup = "FooBarGroup" + System.currentTimeMillis();
        // 实例化DefaultMQPushConsumer
        pushConsumer = new DefaultMQPushConsumer(consumerGroup);
        pushConsumer.setNamesrvAddr("127.0.0.1:9876");
        // 设置拉取间隔
        pushConsumer.setPullInterval(60 * 1000);
        // 注册消息监听器
        pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                Optional.ofNullable(result).orElse(new ArrayList<MessageExt>()).stream().forEach(x-> {
                    // 处理消息
                    System.out.println(new String(x.getBody()));
                });
                return null;
            }
        });
        // ...
        // 设置订阅的主题
        pushConsumer.subscribe(topic, "*");
        // 启动消费者
        pushConsumer.start();
    }
}

消息推模式的详细过程可参考【RocketMQ】消息的拉取,接下来我们看一下拉模式。

拉模式
首先来看一下拉模式下进行消息消费的例子,拉模式下需要消费者不断调用poll方法获取消息,底层是一个阻塞队列,如果队列中没有数据,会进入等待直到队列中增加了数据:

 private void testPull() {
        // 创建DefaultLitePullConsumer
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumerGroup");;
        try {
            litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
            litePullConsumer.subscribe("LitePullConsumerTest", "*");
            litePullConsumer.start();
            litePullConsumer.setPollTimeoutMillis(20 * 1000);
            while(true) {
                // 获取消息
                List<MessageExt> result = litePullConsumer.poll();
                Optional.ofNullable(result).orElse(new ArrayList<MessageExt>()).stream().forEach(x-> {
                    // 处理消息
                    System.out.println(new String(x.getBody()));
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            litePullConsumer.shutdown();
        }
    }

推模式与拉模式的区别
对比上面推模式进行消费的例子,从使用方式上来讲,推模式不需要消费者主动去拉取消息,只需要注册消息监听器,当有消息到达时,触发consumeMessage方法进行消息消费,从表面上看就像是Broker主动推送给消费者一样,所以叫做推模式,尽管底层还是需要消费者发起拉取请求向Broker拉取消息

拉模式在使用方式上,需要消费者主动调用poll方法获取消息,从表面上看消费者需要不断主动进行消息拉取,所以叫做拉模式。

拉模式实现原理

拉模式下对应的消息拉取实现类为DefaultLitePullConsumerImpl,在DefaultLitePullConsumerDefaultMQPullConsumer被标注了@Deprecated,已不推荐使用)的构造函数中,可以看到对其进行了实例化,并在start方进行了启动:

public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
    // 拉模式下默认的消息拉取实现类
    private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;

    public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
        this.namespace = namespace;
        this.consumerGroup = consumerGroup;
        // 创建DefaultLitePullConsumerImpl
        defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
    }

    @Override
    public void start() throws MQClientException {
        setTraceDispatcher();
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        // 启动DefaultLitePullConsumerImpl
        this.defaultLitePullConsumerImpl.start();
        // ...
    }
}

与消息推模式类似,DefaultLitePullConsumerImpl的start的方法主要做一些初始化的工作:

  1. 初始化客户端实例对象mQClientFactory,对应实现类为MQClientInstance,拉取服务线程、负载均衡线程都是通过MQClientInstance启动的;
  2. 初始化负载均衡类,拉模式对应的负载均衡类为RebalanceLitePullImpl
  3. 创建消息拉取API对象PullAPIWrapper,用于向Broker发送拉取消息的请求;
  4. 初始化消息拉取偏移量;
  5. 启动一些定时任务;
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultLitePullConsumer.changeInstanceNameToPID();
                }
                // 初始化MQClientInstance
                initMQClientFactory();
                // 初始化负载均衡
                initRebalanceImpl();
                // 初始化消息拉取API对象
                initPullAPIWrapper();
                // 初始化拉取偏移量
                initOffsetStore();
                // 启动MQClientInstance
                mQClientFactory.start();
                // 启动一些定时任务
                startScheduleTask();
                this.serviceState = ServiceState.RUNNING;
                log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());
                operateAfterRunning();
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
    }
}

负载均衡

拉取模式对应的负载均衡类为RebalanceLitePullImpl(推模式使用的是RebalanceService),在initRebalanceImpl方法中设置了消费者组、消费模式、分配策略等信息:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    
    // 实例化,拉模式使用的是RebalanceLitePullImpl
    private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);

    private void initRebalanceImpl() {
        // 设置消费者组
        this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
        // 设置消费模式
        this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
        // 设置分配策略
        this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
        // 设置mQClientFactory
        this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    }
}

在【RocketMQ】消息的拉取一文中已经讲到过,消费者启动后会进行负载均衡,对每个主题进行负载均衡,拉模式下处理逻辑也是如此,所以这里跳过中间的过程,进入到rebalanceByTopic方法,可以负载均衡之后如果消费者负载的ProcessQueue发生了变化,会调用messageQueueChanged方法触发变更事件:

public abstract class RebalanceImpl {
     private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                // ...
            }
            case CLUSTERING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                // ...
                if (mqSet != null && cidAll != null) {
                    // ...
                    try {
                        // 分配消息队列
                        allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
                    // 更新处理队列
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        // 触发变更事件
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }
}

触发消息队列变更事件

RebalanceLitePullImplmessageQueueChanged方法中又调用了MessageQueueListenermessageQueueChanged方法触发消息队列改变事件:

public class RebalanceLitePullImpl extends RebalanceImpl {
    @Override
    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
        if (messageQueueListener != null) {
            try {
                // 触发改变事件
                messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
            } catch (Throwable e) {
                log.error("messageQueueChanged exception", e);
            }
        }
    }
}

MessageQueueListenerImplDefaultLitePullConsumerImpl的内部类,在messageQueueChanged方法中,不管是广播模式还是集群模式,都会调用updatePullTask更新拉取任务:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    class MessageQueueListenerImpl implements MessageQueueListener {
        @Override
        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
            MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
            switch (messageModel) {
                case BROADCASTING:
                    updateAssignedMessageQueue(topic, mqAll);
                    updatePullTask(topic, mqAll); // 更新拉取任务
                    break;
                case CLUSTERING:
                    updateAssignedMessageQueue(topic, mqDivided);
                    updatePullTask(topic, mqDivided); // 更新拉取任务
                    break;
                default:
                    break;
            }
        }
    }
}

更新拉取任务

在updatePullTask方法中,从拉取任务表taskTable中取出了所有的拉取任务进行遍历,taskTable中记录了之前分配的拉取任务,负载均衡之后可能发生变化,所以需要对其进行更新,这一步主要是处理原先分配给当前消费者的消息队列,在负载均衡之后不再由当前消费者负责,所以需要从taskTable中删除,之后调用startPullTask启动拉取任务:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();

    private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
        // 从拉取任务表中获取之前分配的消息队列进行遍历
        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
            // 如果与重新进行负载均衡的主题一致
            if (next.getKey().getTopic().equals(topic)) {
                // 如果重新分配的消息队列集合中不包含此消息独立
                if (!mqNewSet.contains(next.getKey())) {
                    next.getValue().setCancelled(true);
                    // 从任务表移除
                    it.remove();
                }
            }
        }
        // 启动拉取任务
        startPullTask(mqNewSet);
    }
}

提交拉取任务

startPullTask方法入参中传入的是负载均衡后重新分配的消息队列集合,在startPullTask中会对重新分配的集合进行遍历,如果taskTable中不包含某个消息队列,就构建PullTaskImpl对象,加入taskTable,这一步主要是处理负载均衡后新增的消息队列,为其构建PullTaskImpl加入到taskTable,之后将拉取消息的任务PullTaskImpl提交到线程池周期性的执行:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {

    private void startPullTask(Collection<MessageQueue> mqSet) {
        // 遍历最新分配的消息队列集合
        for (MessageQueue messageQueue : mqSet) {
            // 如果任务表中不包含
            if (!this.taskTable.containsKey(messageQueue)) {
                // 创建拉取任务
                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
                // 加入到任务表
                this.taskTable.put(messageQueue, pullTask);
                // 将任务提交到线程池定时执行
                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
            }
        }
    }
}

拉取消息

PullTaskImpl继承了Runnable,在run方法中的处理逻辑如下:

  1. 获取消息队列对应处理队列ProcessQueue;
  2. 获取消息拉取偏移量,也就是从何处开始拉取消息;
  3. 调用pull方法进行消息拉取;
  4. 判断拉取结果,如果拉取到了消息,将拉取到的结果封装为ConsumeRequest进行提交,也就是放到了阻塞队列中,后续消费者从队列中获取数据进行消费;
   public class PullTaskImpl implements Runnable {
        private final MessageQueue messageQueue;
        private volatile boolean cancelled = false;
        private Thread currentThread;

        @Override
        public void run() {
            // 如果未取消
            if (!this.isCancelled()) {
                this.currentThread = Thread.currentThread();
                // ...
                // 获取消息队列对应的ProcessQueue
                ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
                // ...  跳过一系列校验
                long offset = 0L;
                try {
                    // 获取拉取偏移量
                    offset = nextPullOffset(messageQueue);
                } catch (Exception e) {
                    log.error("Failed to get next pull offset", e);
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
                    return;
                }

                if (this.isCancelled() || processQueue.isDropped()) {
                    return;
                }
                long pullDelayTimeMills = 0;
                try {
                    SubscriptionData subscriptionData;
                    // 获取主题
                    String topic = this.messageQueue.getTopic();
                    // 获取主题对应的订阅信息SubscriptionData
                    if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                        subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
                    } else {
                        subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
                    }
                    // 拉取消息
                    PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
                    if (this.isCancelled() || processQueue.isDropped()) {
                        return;
                    }
                    // 判断拉取结果
                    switch (pullResult.getPullStatus()) {
                        case FOUND: // 如果获取到了数据
                            final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
                            synchronized (objLock) { // 加锁
                                if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
                                    processQueue.putMessage(pullResult.getMsgFoundList());
                                    // 将拉取结果封装为ConsumeRequest,提交消费请求
                                    submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
                                }
                            }
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("The pull request offset illegal, {}", pullResult.toString());
                            break;
                        default:
                            break;
                    }
                    updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
                } catch (InterruptedException interruptedException) {
                    log.warn("Polling thread was interrupted.", interruptedException);
                } catch (Throwable e) {
                    pullDelayTimeMills = pullTimeDelayMillsWhenException;
                    log.error("An error occurred in pull message process.", e);
                }
                // ...
            }
        }
    }

submitConsumeRequest方法中可以看到将创建的ConsumeRequest对象放入了阻塞队列consumeRequestCache中:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    // 阻塞队列
    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();

    private void submitConsumeRequest(ConsumeRequest consumeRequest) {
        try {
            // 放入阻塞队列consumeRequestCache中
            consumeRequestCache.put(consumeRequest);
        } catch (InterruptedException e) {
            log.error("Submit consumeRequest error", e);
        }
    }
}

消息消费

在前面的例子中,可以看到消费者是调用poll方法获取数据的,进入到poll方法中,可以看到是从consumeRequestCache中获取消费请求的,然后从中解析出消息内容返回:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    
    public synchronized List<MessageExt> poll(long timeout) {
        try {
            // ...
            long endTime = System.currentTimeMillis() + timeout;
            // 从consumeRequestCache中获取数据进行处理
            ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            // ...
            if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
                // 获取消息内容
                List<MessageExt> messages = consumeRequest.getMessageExts();
                long offset = consumeRequest.getProcessQueue().removeMessage(messages);
                assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
                this.resetTopic(messages);
                // 返回消息内容
                return messages;
            }
        } catch (InterruptedException ignore) {
        }
        return Collections.emptyList();
    }
}

参考

RocketMQ源码分析之pull模式consumer

RocketMQ版本:4.9.3

本文转载于网络 如有侵权请联系删除

相关文章

  • 翻译 | 我在 React-Native app开发中曾经犯过的11个错误

    经过差不多一年的ReactNative的开发后,我决定把我自打新手开始所犯的错误总结一下.1.错误的预计真的!开始设想的ReactNative(RN)的应用是完全错误的.彻底的错误.1、你需要单独考虑iOS和Android版本的布局.当然,有很多的组件是可以重用的,但是他们有不同的布局考虑.甚至他们之间的应用结构页面也都是不同的.2、当你在预测form的时候-你最好要一并考虑一下数据验证层.例如,当你使用ReactNative开发应用程序的时候,你会比使用Cordova时写更多的代码.3、如果你需要在已经已经开发完毕,并且已经有后端(所以,你可以使用现存的API)的webapp基础上创建一个app-要确保检查每个后端提供的数据点.因为你需要在app中处理逻辑,编码应该要恰如其分.理解数据库的结构,实体之间的连接关系等等.如果你理解了数据库的结构,你可以正确的规划你的reduxstore(后面会讲到).(译注:分离关注点,引入了Redux,React的逻辑处理权交到了Redux手中.意识到这一点对于Redux和React的结合使用非常重要.)2.尽量使用已经构建好的组件(buttons,

  • EasyNVR录像回看进度条拖动回导致画面卡住不动是什么原因?

    目前EasyNVR作为TSINGSEE青犀视频开发的稳定可靠的智能安防监控平台,具备视频采集、直播、转码、分发等能力,其中在录像功能方面,不仅可以调取录像视频直接回放,还可以将录像文件通过接口调用下载。有用户反馈在使用EasyNVR录像回看功能时,进度条偶尔会卡住。通过排查发现,是在使用时间轴功能时,将进度条拖动到靠前位置,画面则会卡住不动,但进度条时间仍在运行。切换录像格式为列表视图查看录像文件播放都比较正常。没有发生和时间轴模式相似的问题,确定视频源没有问题,下一步就是排查播放器问题。使用新版播放器尝试后可以正常使用,但版本还在测试中,暂未上线。此处就是因为该用户现场版本号使用的是比较早的版本,可以通过替换成最新版来解决该问题。该版本将播放器问题处理后即将上线,如有相似问题可等待后续版本更新,或联系技术人员获取最新版。EasyNVR视频智能安防监控系统支持直接下载部署测试,如果有兴趣可以直接下载,测试期间二次开发接口仍可调用,支持定制和修改及功能的拓展。此外,EasyPlayer播放器项目还包括EasyPlayerRTSP、EasyPlayerRTMP、EasyPlayerPro和

  • "Activity top position already set to onTop=false"解决方案[未验证]

    最近bugly上一直报这个错---java.lang.IllegalStateException ---ActivitytoppositionalreadysettoonTop=false ---android.app.ActivityThread.handleTopResumedActivityChanged(ActivityThread.java:4623)复制而且报错的android手机都是10及以上。经过初步查找,找到了出错的代码在这里@Override publicvoidhandleTopResumedActivityChanged(IBindertoken,booleanonTop,Stringreason){ ActivityClientRecordr=mActivities.get(token); if(r==null||r.activity==null){ Slog.w(TAG,"Notfoundtargetactivitytoreportpositionchangefortoken:"+token); return; } if(DEBUG_OR

  • 防护ddos无从下手?了解ddos原理轻松应对危机

    近几年,大规模的DDoS攻击事件在全球范围内发生了很多次,再次造成了轰动,如何防护DDoS由此也引起了大众的重点关注。虽然很多互联网企业都建立了一定的本地DDoS防御措施及运营商级的DDoS监测清洗服务,但是物联网飞速发展,而且进行攻击的成本越来越低,衍生的新型攻击手段层出不穷,DDoS攻击逐渐形成了产业链,许多互联网企业都为此头痛不已。 防护ddos无从下手?了解ddos原理轻松应对危机那么想要采取防护防护DDoS措施,势必要先了解DDoS的原理,结合借鉴自身和他人网络安全运维经验理清DDoS攻击防护思路,制定适合的防护方案。DDoS分布式拒绝服务,主要利用Internet上现有机器及系统的漏洞,攻占大量联网主机,使其成为攻击者的代理。当被控制的机器达到一定数量后,攻击者通过发送指令操纵这些攻击机同时向目标主机或网络发起DoS攻击,大量消耗其网络带和系统资源,导致该网络或系统瘫痪或停止提供正常的网络服务。防护DDoS从原理来说就是需要从所有流量中区分出正常流量和恶意攻击流量,然后过滤点攻击流量,避免其占用服务器资源为正常流量服务。按这个道理来说,只要成功过滤恶意流量,就能是DDoS攻

  • 流量控制--7.总结

    8.流量控制的规则、准则和方法8.1.Linux流量控制的通用规则可以使用如下通用规则来学习Linux流量控制。可以使用tcng或tc进行初始化配置Linux下的流量控制结构。任何执行整流功能的路由器都应该成为链路的瓶颈,并且应该调整为略低于最大可用的链路带宽。通过整流可以防止在其他路由器中形成队列,从而最大程度地控制到整流设备的报文的延迟/延期。一个设备可以对其传输的流量进行调整。由于已经在输入接口上接收到流量,因此无法调整这类流量。解决此问题的传统方法是使用ingress策略。每个接口必须包含一个qdisc。当没有明确附加其他qdisc时会使用默认的qdisc(pfifo_fastqdisc)。如果一个接口附加了classfulqdiscs,但没有任何子类,这种情况会无意义地消耗CPU。任何新创建的类都包含一个FIFO。可以使用任何qdisc来替换这个qdisc。当一个子类附加到该类时,会隐式地删除FIFOqdisc。直接附加到rootqdisc的类可以模拟虚拟电路。可以在类或任意一种classfulqdiscs上附加过滤器。8.2.处理已知带宽的链路当一个链路已知带宽时,HTB是

  • Nginx线程池浅析

    Nginx通过使用多路复用IO(如Linux的epoll、FreeBSD的kqueue等)技术很好的解决了c10k问题,但前提是Nginx的请求不能有阻塞操作,否则将会导致整个Nginx进程停止服务。但很多时候阻塞操作是不可避免的,例如客户端请求静态文件时,由于磁盘IO可能会导致进程阻塞,所以将会导致Nginx的性能下降。为了解决这个问题,Nginx在1.7.11版本中实现了线程池机制。下面我们将会分析Nginx是怎么通过线程池来解决阻塞操作问题。启用线程池功能要使用线程池功能,首先需要在配置文件中添加如下配置项:location/{ root/html; thread_pooldefaultthreads=32max_queue=65536; aiothreads=default; }复制上面定义了一个名为“default”,包含32个线程,任务队列最多支持65536个请求的线程池。如果任务队列过载,Nginx将输出如下错误日志并拒绝请求:threadpool"default"queueoverflow:Ntaskswaiting如果出现上面的错误,说明线程池的负

  • 任你旋转跳跃不停歇,也能完美呈现3D姿态估计 | 代码开源

    鱼羊十三发自凹非寺 量子位报道|公众号QbitAI让AI通过预测,捕捉你「左手画龙,右手画彩虹」的动作,对于AI理解人类行为至关重要。想要做到这一点,人体运动数据不可或缺,但实际上,真实的3D运动数据恰恰是稀缺资源。现在,来自马克斯·普朗克智能系统研究所的一项研究,利用对抗学习框架,在缺少真实3D标签的情况下,也能对实时视频里的人体运动,做出运动学上的合理预测。就像这样,奔跑、跳跃都能跟得上:并且,相比前辈,这一名为VIBE的方法更懂人心,连胳膊要抬几度,都计算得明明白白。 那么,VIBE到底是如何做到的?对抗学习框架关键创新,在于采用了对抗学习框架。一方面,利用时间(temporal)生成网络,预估视频序列中每个帧的SMPL人体模型参数。注:SMPL,即ASkinnedMulti-PersonLinearModel,马普所提出的一种人体建模方法。具体来说,给定一个单人视频作为输入,使用预先训练的CNN提取每个帧的特征。训练双向门控循环单元组成的时间编码器,输出包含过去和将来帧中信息的潜在变量。然后,利用这些特征对SMPL人体模型的参数进行回归。另一方面,运动鉴别器能够以SMPL格式访

  • Spring5 源码学习 (7) ConfigurationClassPostProcessor (下)

    ConfigurationClassPostProcessor继承了BeanDefinitionRegistryPostProcessor接口,它实现了postProcessBeanDefinitionRegistry和其父类的BeanFactoryPostProcessor#postProcessBeanFactory方法。关于postProcessBeanDefinitionRegistry方法的解析可以参看:Spring5源码学习(5)ConfigurationClassPostProcessor(上)。现在我们来看一下ConfigurationClassPostProcessor#postProcessBeanFactory方法的源码。ConfigurationClassPostProcessor#postProcessBeanFactory调用时机ConfigurationClassPostProcessor#postProcessBeanFactory方法也在refresh();方法中执行invokeBeanFactoryPostProcessors(beanFactory

  • Lockdoor Framework:一套自带大量网络安全资源的渗透测试框架

    博客&文章Reddit: https://www.reddit.com/r/cybersecurity/comments/d4hthh/lockdoor_a_penetration_testing_framework_with/ Medium.com: https://medium.com/@SofianeHamlaoui/lockdoor-framework-a-penetration-testing-framework-with-cyber-security-resources-sofiane-22fbb7942378 XploitLab: https://xploitlab.com/lockdoor-framework-penetration-testing-framework-with-cyber-security-resources/ StationX: https://www.stationx.net/threat-intelligence-17th-september/ KelvinSecurity; https://blog.kelvinsecurity.com

  • 设计模式的征途—23.解释器(Interpreter)模式

    虽然目前计算机编程语言有好几百种,但有时人们还是希望用一些简单的语言来实现特定的操作,只需要向计算机输入一个句子或文件,就能按照预定的文法规则来对句子或文件进行解释。例如,我们想要只输入一个加法/减法表达式,它就能够计算出表达式结果。例如输入“1+2+3-4+1”时,将输出计算结果为3。像C++,Java或C#都无法直接解释类似这样的字符串,因此用户必须自定义一套文法规则来实现对这些语句的解释,即设计一个自定义语言。如果所基于的编程语言是面向对象语言,此时可以使用解释器模式实现自定义语言。解释器模式(Interpreter)学习难度:★★★★★使用频率:★☆☆☆☆一、格式化指令的需求背景Background:M公司开发了一套简单的基于字符界面的格式化指令,可以根据输入的指令在字符界面输出一些格式化内容,例如输入“LOOP2PRINT杨过SPACESPACEPRINT小龙女BREAKENDPRINT郭靖SPACESPACEPRINT黄蓉”,将输出以下结果: 其中,关键词LOOP表示循环,后面的数字表示循环次数;PRINT表示打印,后面的字符串表示打印的内容;SPACE表示空格;BREA

  • 当Impala碰到由Hive生成的timestamp数据

    温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。Fayson的github:https://github.com/fayson/cdhproject提示:代码块部分可以左右滑动查看噢1.文档编写目的默认情况下,Impala不会使用本地时区存储timestamp,以避免因为时区问题导致的其他故障。无论是写入还是读取数据,或者通过诸如from_unixtime()或unix_timestamp()之类的函数转换为Unix时间戳或者从Unix时间转换时。要将timestamp值转换为date或者time,我们一般使用from_utc_timestamp()来进行转换,但是对于Impala来说如果想转换为OS的本地时区,一般你还要带上时区参数比如CST,为了方便你也可以在Impala的配置中加上--use_local_tz_for_unix_timestamp_conversions=true。这一点我们在《Hive中的Timestamp类型日期与Impala中显示不一致分析》和《Hive中的Timestamp类型日期与Impala中显示不一致分析(补充)》文章中都进行过分析。当

  • 学界 | Yoshua Bengio与MIT发表新论文:深度学习中的泛化

    选自arXiv机器之心编译参与:路雪、刘晓坤日前,MIT和Bengio发表新论文,探讨深度学习中的泛化。该论文解释了深度学习能够实现较好泛化的原因,并提出了一系列新型正则化方法。机器之心对该论文进行了编译。论文链接:https://arxiv.org/abs/1710.05468本论文解释了为什么深度学习在面临容量过大、算法不稳定、非鲁棒和尖点等问题时仍能实现较好的泛化。基于理论的探索,该论文提出了一系列新的正则化方法。实验证明,即使其中最简单的方法也可以将基础模型在MNIST和CIFAR-10上的表现提升到业内最佳水平。此外,本文提出了数据依赖性(data-dependent)和数据独立性的泛化保证,它们提高了收敛速度。我们的研究引出了一系列新方向。1引言一些经典的理论研究把泛化能力归功于小容量模型类别的使用(Mohrietal.,2012)。从与小容量相关的紧凑表示(compactrepresentation)的角度来看,深度模型类别在展示特定的自然目标函数时比浅层的模型类别具有指数优势(Pascanuetal.,2014;Montufaretal.,2014;Livnietal.

  • Jumpserver docker-compose 随手记

    wget 或 gitclone   docker build -t  jumpserver:v1  .   #构建镜像  dockerimages   vim jumpserver-dockercompose.yml version:'3' networks: jumpserver: services: mysql: image:mysql:5.7 container_name:jms__mysql restart:always version:'3' networks: jumpserver: services: mysql: image:mysql:5.7 version:'3' networks: jumpserver: services: mysql: image:mysql:5.7 container_name:jms__mysql restart:always tty:true environmen

  • Springboot源码——应用程序上下文分析

      前两篇(SpringMVC源码——RootWebApplicationContext 和SpringMVC源码——ServletWebApplicationContext)讲述了springmvc项目创建上下文的过程,这一篇带大家了解一下springboot项目创建上下文的过程。 SpringApplication引导类 SpringApplication类用于启动或者引导springboot项目,直接应用在javamain方法中。 publicSpringApplication(ResourceLoaderresourceLoader,Class<?>...primarySources){ this.resourceLoader=resourceLoader; Assert.notNull(primarySources,"PrimarySourcesmustnotbenull"); this.primarySources=newLinkedHashSet<>(Arrays.asList(primarySources)); //判断当前web应用

  • BGP原理介绍-2

    7.BGP发布路由的策略   BGP发布路由时采用如下策略:   ·     存在多条有效路由时,BGP发言者只将最优路由发布给对等体。如果配置了advertise-rib-active命令,则BGP发布IP路由表中的最优路由;否则,发布BGP路由表中的最优路由。 ·     BGP发言者只把自己使用的路由发布给对等体。 ·     BGP发言者会将从EBGP获得的路由发布给它的所有BGP对等体(包括EBGP对等体和IBGP对等体)。 ·     BGP发言者会将从IBGP获得的路由发布给它的EBGP对等体,但不会发布给它的IBGP对等体。 ·     会话一旦建立,BGP发言者将把满足上述条件的所有BGP路由发布给新对等体。之后,BGP发言者只在路由变化时,向对等体发布更新的路由。 8.BGP负载分

  • 技术洞察是技术战略成功的关键

    什么是技术洞察?  所谓技术洞察,简称(TI,TechnologyInsight),是根据市场发展趋势和客户需求,以及技术的生命周期,对某项技术发展趋势进行判断和预测,并明确未来3~5年的技术战略和战略控制点、重大的技术投资方向,完成技术战略规划的制订,并最终进行技术战略解码,为公司整体战略创造价值。如下图:    随着市场竞争越来越激烈,很多技术领先型或创新型企业都在进行技术创新,创新的源头就来自于技术洞察的结果,如果对新技术的判断趋势不准确,就会导致公司产品失去方向,在市场上失去竞争力。因此,对于新技术的洞察能力是至关重要的。如原来知名的企业UT斯达康,由于技术规划偏向于小灵通和IPTV技术,导致公司在几年后被竞争对手淘汰,像柯达胶卷和诺基亚手机,均是由于技术落后于市场,没有及时对市场需要的技术进行洞察而导致错失方向,国内的案例更多,现在像海底捞,金噪子,老干妈等传统企业,没有及时进行技术的创新,导致现在面临业绩大幅下滑的困难与挑战。在国产替代,自主研发的大环境下,技术洞察至关重要。 谁来做技术洞察?  参与技术洞察的成员应包

  • php 计算上一个月的今天 PHP 计算几个月前的今天

    PHP计算几个月前的今天    下面第一个方法基本全覆盖了所需功能 /* *根据指定时间计算指定前N个月的今天 **/ functionlastMonth($nowT,$i){ $lastM1=date('n',strtotime("-".$i."month",strtotime("firstdayof0month",$nowT))); $lastM2=date('n',strtotime("-".$i."month",$nowT)); if($lastM1!=$lastM2){ $expectD=date('Y-m-d',strtotime("lastdayof-".$i."month",$nowT)); }else{ $expectD=date('Y-m-d',strtotime("-".$i."month",$nowT)); } return$expectD; }复制     下面这个方法只是适用于调取上个月的今天 /** *计算上一个月的今天,如果上个月没有今天,则返回上一个月的最后一天 *@paramtype$time

  • 国外最好用的WordPress主机推荐

    在网站运营过程中,Wordpress主机是非常重要的一部分,选择一个好的主机有助于提升SEO和商品销售,在网络上有各种各样的免费主机、付费主机、共享主机、独立主机等,在本教程中,我们将教大家选择最合适的Wordpress主机空间   WordPress主机的要求 首先我们需要知道,Wordpress对主机有哪些要求,这里其实官方已经说的很清楚了,WordPressRequirements,这里面说了Wordpress建议的环境,不过因为种种原因,比如主题、插件的兼容性,对代码升级的改造有时候会有大量的工作要完成,大部分主题都不能够在语言升级后立即适配,所以要适当的进行降级,关于挑选Wordpress主机的要求WP主题站主要提供以下几点建议: PHP最好支持7.0及以上,因为PHP7.0性能相对之前有大幅的提升,并且大部分主题都已经适配7.0,选择7.0版本即可 MySQL5.6及以上,同样是为了性能,这个大部分都能满足需求 支持HTTPS,非常重要,https的网站能够获得更好的排名,而且会在浏览器中展示安全标识,容易获得访客的信任 能够满足以上需求,基本上就属于合格的W

  • 将博客搬至CSDN

    将博客搬至CSDN

  • Django类视图

      假设有这样一种情况,前端页面请求的某个地址,即有get请求,也有post请求,或者还有别的一些请求,这种情况我们使用函数视图是怎么定义的呢? fromdjango.httpimportHttpResponse defindex_page(request): ifrequest.method=='GET': returnHttpResponse("<p>这是一个get请求</p>") elifrequest.method=='POST': returnHttpResponse("<p>这是一个post请求</p>") elifrequest.method=='DELETE': returnHttpResponse("<p>这是一个delete请求</p>")复制   上面这种视图的定义方式未免太过繁琐,我们可以通过类视图的方式去定义,往往在项目中用的最多的也是类视图,类视图需要继承views模块下的View类,并且视图里面的实例方法均使用小写: fromdjango.httpimportHttpRes

  • 9/10

    队名:我头发呢队 组长博客 杰(组长) 过去两天完成了哪些任务 继续翻阅MaterialDesign2文档 重构了UI原型 程序员鼓励师 接下来的计划 页面编辑 还剩下哪些任务 继续爬…… 燃尽图 有哪些困难 进度缓慢 有哪些收获和疑问 眼睛有点累,找出规律好累 跃安 过去两天完成了哪些任务 配置了Androidstudio环境,学习安卓界面开发 尝试了音乐源的爬取。 接下来的计划 专心完成界面开发任务 还剩下哪些任务 与爬取音乐源的队友沟通如何整合两方面的工作 有哪些收获和疑问 安卓界面的开发是第一次遇到很多的困难也是在所难免的,但是了解了我们经常使用的app是如何完成的,还是挺高兴的。 写代码的过程中有很多都不会,容易出错。 裕翔 过去两天完成了哪些任务 阅读『安卓学习之路』(未读完) 配置Python环境 配置Androidstudio 接下来的计划 学习Python和java 学习,如何使用Androidstudio 有哪些困难 安卓开发还在摸索 有哪些收获和疑问 对安卓的开发了解有了进步 佳炜 过去两天完成了哪些任务 界面制作 试图、控件的操作学习 接下来

相关推荐

推荐阅读