本篇文章旨在分析SOFAJRaft中jraft-example模块的启动过程,由于SOFAJRaft在持续开源的过程中,所以无法保证示例代码永远是最新的,要是有较大的变动或者纰漏、错误的地方,欢迎大家留言讨论。
@Author:Akai-yuan
更新时间:2023年1月20日
public static void main(final String[] args) throws IOException {
if (args.length != 4) {
System.out
.println("Usage : java com.alipay.sofa.jraft.example.counter.CounterServer {dataPath} {groupId} {serverId} {initConf}");
System.out
.println("Example: java com.alipay.sofa.jraft.example.counter.CounterServer /tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");
System.exit(1);
}
//日志存储路径
final String dataPath = args[0];
//SOFAJRaft集群的名字
final String groupId = args[1];
//当前节点的ip和端口
final String serverIdStr = args[2];
//集群节点的ip和端口
final String initConfStr = args[3];
final NodeOptions nodeOptions = new NodeOptions();
// for test, modify some params
// 设置选举超时时间为 1 秒
nodeOptions.setElectionTimeoutMs(1000);
// 关闭 CLI 服务
nodeOptions.setDisableCli(false);
// 每隔30秒做一次 snapshot
nodeOptions.setSnapshotIntervalSecs(30);
// 解析参数
final PeerId serverId = new PeerId();
if (!serverId.parse(serverIdStr)) {
throw new IllegalArgumentException("Fail to parse serverId:" + serverIdStr);
}
final Configuration initConf = new Configuration();
//将raft分组加入到Configuration的peers数组中
if (!initConf.parse(initConfStr)) {
throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr);
}
// 设置初始集群配置
nodeOptions.setInitialConf(initConf);
// 启动raft server
final CounterServer counterServer = new CounterServer(dataPath, groupId, serverId, nodeOptions);
System.out.println("Started counter server at port:"
+ counterServer.getNode().getNodeId().getPeerId().getPort());
// GrpcServer need block to prevent process exit
CounterGrpcHelper.blockUntilShutdown();
}
我们在启动CounterServer的main方法的时候,会将传入的String[]类型参数args分别转化为日志存储的路径、SOFAJRaft集群的名字、当前节点的ip和端口、集群节点的ip和端口,并设值到NodeOptions中,作为当前节点启动的参数。
引子:在main方法中,我们可以看到,程序将String类型参数转换成了PeerId对象,那么接下来我们需要探究转换的具体过程。
在转换当前节点并初始化为一个PeerId对象的过程中,调用了PeerId中的parse方法:
public boolean parse(final String s) {
if (StringUtils.isEmpty(s)) {
return false;
}
final String[] tmps = Utils.parsePeerId(s);
if (tmps.length < 2 || tmps.length > 4) {
return false;
}
try {
final int port = Integer.parseInt(tmps[1]);
this.endpoint = new Endpoint(tmps[0], port);
switch (tmps.length) {
case 3:
this.idx = Integer.parseInt(tmps[2]);
break;
case 4:
if (tmps[2].equals("")) {
this.idx = 0;
} else {
this.idx = Integer.parseInt(tmps[2]);
}
this.priority = Integer.parseInt(tmps[3]);
break;
default:
break;
}
this.str = null;
return true;
} catch (final Exception e) {
LOG.error("Parse peer from string failed: {}.", s, e);
return false;
}
}
该方法内部又调用了工具类Utils.parsePeerId,最终达到的效果如下:
其中,a、b分别对应IP和Port端口号,组成了PeerId的EndPoint属性;c指代idx【同一地址中的索引,默认值为0】;d指代priority优先级【节点的本地优先级值,如果节点不支持优先级选择,则该值为-1】。
PeerId.parse("a:b") = new PeerId("a", "b", 0 , -1)
PeerId.parse("a:b:c") = new PeerId("a", "b", "c", -1)
PeerId.parse("a:b::d") = new PeerId("a", "b", 0, "d")
PeerId.parse("a:b:c:d") = new PeerId("a", "b", "c", "d")
引子:在main方法中,我们可以看到,进行初步的参数解析后,调用了CountServer的构造器,要说这个构造器,第一次看里面的步骤确实会感觉挺复杂的,接下来我们一起分析一下源码。
CountServer构造器的源码如下:
public CounterServer(final String dataPath, final String groupId, final PeerId serverId,
final NodeOptions nodeOptions) throws IOException {
// 初始化raft data path, 它包含日志、元数据、快照
FileUtils.forceMkdir(new File(dataPath));
// 这里让 raft RPC 和业务 RPC 使用同一个 RPC server, 通常也可以分开
final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint());
// GrpcServer need init marshaller
CounterGrpcHelper.initGRpc();
CounterGrpcHelper.setRpcServer(rpcServer);
// 注册业务处理器
CounterService counterService = new CounterServiceImpl(this);
rpcServer.registerProcessor(new GetValueRequestProcessor(counterService));
rpcServer.registerProcessor(new IncrementAndGetRequestProcessor(counterService));
// 初始化状态机
this.fsm = new CounterStateMachine();
// 设置状态机到启动参数
nodeOptions.setFsm(this.fsm);
// 设置存储路径 (包含日志、元数据、快照)
// 日志(必须)
nodeOptions.setLogUri(dataPath + File.separator + "log");
// 元数据(必须)
nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta");
// 快照(可选, 一般都推荐)
nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
// 初始化 raft group 服务框架
this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
// 启动
this.node = this.raftGroupService.start();
}
接下来仔细说说CountServer的构造器里面具体做了什么。
引子:CountServer构造器中调用的RaftRpcServerFactory.createRaftRpcServer()方法,底层到底是如何构造出一个RpcServer的呢,接下来会和大家讨论createRaftRpcServer()方法的具体实现
首先请看RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint())方法:
createRaftRpcServer方法目前有createRaftRpcServer(final Endpoint endpoint)和
createRaftRpcServer(final Endpoint endpoint, final Executor raftExecutor,final Executor cliExecutor)两个重载方法,其实不管哪个方法,本质上实现过程都有如下两个步骤:
(1)首先调用了GrpcRaftRpcFactory的createRpcServer方法,这里涉及gRpc构建server的底层知识,有时间会再写一篇文章探究一下gRpc,这里可以简单理解为构建了一个rpc服务端。该方法实现如下:
public RpcServer createRpcServer(final Endpoint endpoint, final ConfigHelper<RpcServer> helper) {
final int port = Requires.requireNonNull(endpoint, "endpoint").getPort();
Requires.requireTrue(port > 0 && port < 0xFFFF, "port out of range:" + port);
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
final Server server = ServerBuilder.forPort(port) //
.fallbackHandlerRegistry(handlerRegistry) //
.directExecutor() //
.maxInboundMessageSize(RPC_MAX_INBOUND_MESSAGE_SIZE) //
.build();
final RpcServer rpcServer = new GrpcServer(server, handlerRegistry, this.parserClasses, getMarshallerRegistry());
if (helper != null) {
helper.config(rpcServer);
}
return rpcServer;
}
(2)紧接着调用addRaftRequestProcessors,这个方法为RpcServer添加RAFT和CLI服务核心请求处理器,关于RpcProcessor这个实体类,会在后面的文章中具体分析,这里可以先"不求甚解"。
//添加RAFT和CLI服务请求处理器
public static void addRaftRequestProcessors(final RpcServer rpcServer, final Executor raftExecutor,
final Executor cliExecutor) {
// 添加raft核心处理器
final AppendEntriesRequestProcessor appendEntriesRequestProcessor = new AppendEntriesRequestProcessor(
raftExecutor);
rpcServer.registerConnectionClosedEventListener(appendEntriesRequestProcessor);
rpcServer.registerProcessor(appendEntriesRequestProcessor);
rpcServer.registerProcessor(new GetFileRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new InstallSnapshotRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new RequestVoteRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new PingRequestProcessor());
rpcServer.registerProcessor(new TimeoutNowRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new ReadIndexRequestProcessor(raftExecutor));
// 添加raft cli服务处理器
rpcServer.registerProcessor(new AddPeerRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new RemovePeerRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new ResetPeerRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new ChangePeersRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new GetLeaderRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new SnapshotRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new TransferLeaderRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new GetPeersRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new AddLearnersRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new RemoveLearnersRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new ResetLearnersRequestProcessor(cliExecutor));
}
CountServer构造器在初步创建RpcServer后,调用了CounterGrpcHelper.initGRpc()和CounterGrpcHelper.setRpcServer(rpcServer)两个方法,接下来和大家分析这两个方法的实现过程
首先请看initGRpc方法:
RpcFactoryHelper.rpcFactory()实际是调用了GrpcRaftRpcFactory(因为GrpcRaftRpcFactory实现了RaftRpcFactory接口),GrpcRaftRpcFactory中维护了一个ConcurrentHashMap<String, Message> parserClasses 其中【key为各种请求/响应实体的名称,value为对应请求/响应的实例】。
然后通过反射获取到MarshallerHelper的registerRespInstance方法,实际上MarshallerHelper里面维护了一个ConcurrentHashMap<String, Message> messages 其中【key为请求实体的名称,value为对应响应的实例】
public static void initGRpc() {
if ("com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory".equals(RpcFactoryHelper.rpcFactory().getClass()
.getName())) {
RpcFactoryHelper.rpcFactory().registerProtobufSerializer(CounterOutter.GetValueRequest.class.getName(),
CounterOutter.GetValueRequest.getDefaultInstance());
RpcFactoryHelper.rpcFactory().registerProtobufSerializer(
CounterOutter.IncrementAndGetRequest.class.getName(),
CounterOutter.IncrementAndGetRequest.getDefaultInstance());
RpcFactoryHelper.rpcFactory().registerProtobufSerializer(CounterOutter.ValueResponse.class.getName(),
CounterOutter.ValueResponse.getDefaultInstance());
try {
Class<?> clazz = Class.forName("com.alipay.sofa.jraft.rpc.impl.MarshallerHelper");
Method registerRespInstance = clazz.getMethod("registerRespInstance", String.class, Message.class);
registerRespInstance.invoke(null, CounterOutter.GetValueRequest.class.getName(),
CounterOutter.ValueResponse.getDefaultInstance());
registerRespInstance.invoke(null, CounterOutter.IncrementAndGetRequest.class.getName(),
CounterOutter.ValueResponse.getDefaultInstance());
} catch (Exception e) {
LOG.error("Failed to init grpc server", e);
}
}
}
接着我们再看setRpcServer方法:
CounterGrpcHelper里面还维护了一个RpcServer实例,CounterGrpcHelper.setRpcServer(rpcServer)实际上会将构造的RpcServer装配到CounterGrpcHelper里面。
public static void setRpcServer(RpcServer rpcServer) {
CounterGrpcHelper.rpcServer = rpcServer;
}
在CountServer构造器中,经过上述一系列操作步骤,走到了RaftGroupService构造器中,在构造RaftGroupService实体后,调用了它的start方法,这一步在于初始化 raft group 服务框架
public synchronized Node start(final boolean startRpcServer) {
//如果已经启动了,那么就返回
if (this.started) {
return this.node;
}
//校验serverId和groupId
if (this.serverId == null || this.serverId.getEndpoint() == null
|| this.serverId.getEndpoint().equals(new Endpoint(Utils.IP_ANY, 0))) {
throw new IllegalArgumentException("Blank serverId:" + this.serverId);
}
if (StringUtils.isBlank(this.groupId)) {
throw new IllegalArgumentException("Blank group id:" + this.groupId);
}
//设置当前node的ip和端口
NodeManager.getInstance().addAddress(this.serverId.getEndpoint());
//创建node
this.node = RaftServiceFactory.createAndInitRaftNode(this.groupId, this.serverId, this.nodeOptions);
if (startRpcServer) {
//启动远程服务
this.rpcServer.init(null);
} else {
LOG.warn("RPC server is not started in RaftGroupService.");
}
this.started = true;
LOG.info("Start the RaftGroupService successfully.");
return this.node;
}
这个方法会在一开始的时候对RaftGroupService在构造器实例化的参数进行校验,然后把当前节点的Endpoint添加到NodeManager的addrSet变量中,接着调用RaftServiceFactory#createAndInitRaftNode实例化Node节点。
每个节点都会启动一个rpc的服务,因为每个节点既可以被选举也可以投票给其他节点,节点之间需要互相通信,所以需要启动一个rpc服务。
以下就是Node节点的一系列创建过程,由于嵌套的层数比较多,所以就全部列举出来了,整个过程简而言之就是,createAndInitRaftNode方法首先调用createRaftNode实例化一个Node的实例NodeImpl,然后调用其init方法进行初始化,主要的配置都是在init方法中完成的。代码如下:
this.node = RaftServiceFactory.createAndInitRaftNode(this.groupId, this.serverId, this.nodeOptions);
public static Node createAndInitRaftNode(final String groupId, final PeerId serverId, final NodeOptions opts) {
final Node ret = createRaftNode(groupId, serverId);
if (!ret.init(opts)) {
throw new IllegalStateException("Fail to init node, please see the logs to find the reason.");
}
return ret;
}
public static Node createRaftNode(final String groupId, final PeerId serverId) {
return new NodeImpl(groupId, serverId);
}
public NodeImpl(final String groupId, final PeerId serverId) {
super();
if (groupId != null) {
Utils.verifyGroupId(groupId);
}
this.groupId = groupId;
this.serverId = serverId != null ? serverId.copy() : null;
this.state = State.STATE_UNINITIALIZED;
this.currTerm = 0;
updateLastLeaderTimestamp(Utils.monotonicMs());
this.confCtx = new ConfigurationCtx(this);
this.wakingCandidate = null;
final int num = GLOBAL_NUM_NODES.incrementAndGet();
LOG.info("The number of active nodes increment to {}.", num);
}
老实说,NodeImpl#init方法确实挺长的,所以我打算分成几个部分来展示,方便分析
这段代码主要是给各个变量赋值,然后进行校验判断一下serverId不能为0.0.0.0,当前的Endpoint必须要在NodeManager里面设置过等等(NodeManager的设置是在RaftGroupService的start方法里)。
然后会初始化一个全局的的定时调度管理器TimerManager:
//一系列判空操作
Requires.requireNonNull(opts, "Null node options");
Requires.requireNonNull(opts.getRaftOptions(), "Null raft options");
Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory");
//JRaftServiceFactory目前有3个实现类
// 1.BDBLogStorageJRaftServiceFactory
// 2.DefaultJRaftServiceFactory
// 3.HybridLogJRaftServiceFactory
this.serviceFactory = opts.getServiceFactory();
this.options = opts;
this.raftOptions = opts.getRaftOptions();
//基于 Metrics 类库的性能指标统计,具有丰富的性能统计指标,默认为false,不开启度量工具
this.metrics = new NodeMetrics(opts.isEnableMetrics());
this.serverId.setPriority(opts.getElectionPriority());
this.electionTimeoutCounter = 0;
//Utils.IP_ANY = "0.0.0.0"
if (this.serverId.getIp().equals(Utils.IP_ANY)) {
LOG.error("Node can't started from IP_ANY.");
return false;
}
if (!NodeManager.getInstance().serverExists(this.serverId.getEndpoint())) {
LOG.error("No RPC server attached to, did you forget to call addService?");
return false;
}
if (this.options.getAppendEntriesExecutors() == null) {
this.options.setAppendEntriesExecutors(Utils.getDefaultAppendEntriesExecutor());
}
//定时任务管理器
//此处TIMER_FACTORY获取到的是DefaultRaftTimerFactory
//this.options.isSharedTimerPool()默认为false
//this.options.getTimerPoolSize()取值为Utils.cpus() * 3 > 20 ? 20 : Utils.cpus() * 3
this.timerManager = TIMER_FACTORY.getRaftScheduler(this.options.isSharedTimerPool(),
this.options.getTimerPoolSize(), "JRaft-Node-ScheduleThreadPool");
此处浅析一下__TimerManager:
初始化一个线程池,根据传入的参数this.options.getTimerPoolSize()==Utils.cpus() * 3 > 20 ? 20 : Utils.cpus() * 3可以分析得知如果当前的服务器的cpu线程数_3 大于20 ,那么这个线程池的coreSize就是20,否则就是cpu线程数的_3倍。
public TimerManager(int workerNum, String name) {
this.executor = ThreadPoolUtil.newScheduledBuilder() //
.poolName(name) //
.coreThreads(workerNum) //
.enableMetric(true) //
.threadFactory(new NamedThreadFactory(name, true)) //
.build();
}
由于这些计时器的实现比较繁杂,所以具体功能等到后面对应章节再一并梳理。
这些计时器有一个共同的特点就是会根据不同的计时器返回一个在一定范围内随机的时间。返回一个随机的时间可以防止多个节点在同一时间内同时发起投票选举从而降低选举失败的概率。
//设置投票计时器
final String suffix = getNodeId().toString();
String name = "JRaft-VoteTimer-" + suffix;
this.voteTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(), TIMER_FACTORY.getVoteTimer(
this.options.isSharedVoteTimer(), name)) {
//处理投票超时
@Override
protected void onTrigger() {
handleVoteTimeout();
}
//在一定范围内返回一个随机的时间戳
@Override
protected int adjustTimeout(final int timeoutMs) {
return randomTimeout(timeoutMs);
}
};
//设置预投票计时器
//当leader在规定的一段时间内没有与 Follower 舰船进行通信时,
// Follower 就可以认为leader已经不能正常担任旗舰的职责,则 Follower 可以去尝试接替leader的角色。
// 这段通信超时被称为 Election Timeout
//候选者在发起投票之前,先发起预投票
name = "JRaft-ElectionTimer-" + suffix;
this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(),
TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) {
@Override
protected void onTrigger() {
handleElectionTimeout();
}
//在一定范围内返回一个随机的时间戳
//为了避免同时发起选举而导致失败
@Override
protected int adjustTimeout(final int timeoutMs) {
return randomTimeout(timeoutMs);
}
};
//leader下台的计时器
//定时检查是否需要重新选举leader
name = "JRaft-StepDownTimer-" + suffix;
this.stepDownTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs() >> 1,
TIMER_FACTORY.getStepDownTimer(this.options.isSharedStepDownTimer(), name)) {
@Override
protected void onTrigger() {
handleStepDownTimeout();
}
};
//快照计时器
name = "JRaft-SnapshotTimer-" + suffix;
this.snapshotTimer = new RepeatedTimer(name, this.options.getSnapshotIntervalSecs() * 1000,
TIMER_FACTORY.getSnapshotTimer(this.options.isSharedSnapshotTimer(), name)) {
private volatile boolean firstSchedule = true;
@Override
protected void onTrigger() {
handleSnapshotTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
if (!this.firstSchedule) {
return timeoutMs;
}
// Randomize the first snapshot trigger timeout
this.firstSchedule = false;
if (timeoutMs > 0) {
int half = timeoutMs / 2;
return half + ThreadLocalRandom.current().nextInt(half);
} else {
return timeoutMs;
}
}
};
关于Disruptor的内容,后面有时间会写一篇相关的文章进行分享
这里初始化了一个Disruptor作为消费队列,然后校验了metrics是否开启,默认是不开启的
this.configManager = new ConfigurationManager();
//初始化一个disruptor,采用多生产者模式
this.applyDisruptor = DisruptorBuilder.<LogEntryAndClosure>newInstance() //
//设置disruptor大小,默认16384
.setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
.setEventFactory(new LogEntryAndClosureFactory()) //
.setThreadFactory(new NamedThreadFactory("JRaft-NodeImpl-Disruptor-", true)) //
.setProducerType(ProducerType.MULTI) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.build();
//设置事件处理器
this.applyDisruptor.handleEventsWith(new LogEntryAndClosureHandler());
//设置异常处理器
this.applyDisruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
// 启动disruptor的线程
this.applyQueue = this.applyDisruptor.start();
//如果开启了metrics统计
if (this.metrics.getMetricRegistry() != null) {
this.metrics.getMetricRegistry().register("jraft-node-impl-disruptor",
new DisruptorMetricSet(this.applyQueue));
}
对快照、日志、元数据等功能进行初始化
//fsmCaller封装对业务 StateMachine 的状态转换的调用以及日志的写入等
this.fsmCaller = new FSMCallerImpl();
//初始化日志存储功能
if (!initLogStorage()) {
LOG.error("Node {} initLogStorage failed.", getNodeId());
return false;
}
//初始化元数据存储功能
if (!initMetaStorage()) {
LOG.error("Node {} initMetaStorage failed.", getNodeId());
return false;
}
//对FSMCaller初始化
if (!initFSMCaller(new LogId(0, 0))) {
LOG.error("Node {} initFSMCaller failed.", getNodeId());
return false;
}
//实例化投票箱
this.ballotBox = new BallotBox();
final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
ballotBoxOpts.setWaiter(this.fsmCaller);
ballotBoxOpts.setClosureQueue(this.closureQueue);
//初始化ballotBox的属性
if (!this.ballotBox.init(ballotBoxOpts)) {
LOG.error("Node {} init ballotBox failed.", getNodeId());
return false;
}
//初始化快照存储功能
if (!initSnapshotStorage()) {
LOG.error("Node {} initSnapshotStorage failed.", getNodeId());
return false;
}
//校验日志文件索引的一致性
final Status st = this.logManager.checkConsistency();
if (!st.isOk()) {
LOG.error("Node {} is initialized with inconsistent log, status={}.", getNodeId(), st);
return false;
}
//配置管理raft group中的信息
this.conf = new ConfigurationEntry();
this.conf.setId(new LogId());
// if have log using conf in log, else using conf in options
if (this.logManager.getLastLogIndex() > 0) {
checkAndSetConfiguration(false);
} else {
this.conf.setConf(this.options.getInitialConf());
// initially set to max(priority of all nodes)
this.targetPriority = getMaxPriorityOfNodes(this.conf.getConf().getPeers());
}
if (!this.conf.isEmpty()) {
Requires.requireTrue(this.conf.isValid(), "Invalid conf: %s", this.conf);
} else {
LOG.info("Init node {} with empty conf.", this.serverId);
}
初始化replicatorGroup、rpcService以及readOnlyService:
// TODO RPC service and ReplicatorGroup is in cycle dependent, refactor it
this.replicatorGroup = new ReplicatorGroupImpl();
//收其他节点或者客户端发过来的请求,转交给对应服务处理
this.rpcService = new DefaultRaftClientService(this.replicatorGroup, this.options.getAppendEntriesExecutors());
final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions();
rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs()));
rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs());
rgOpts.setLogManager(this.logManager);
rgOpts.setBallotBox(this.ballotBox);
rgOpts.setNode(this);
rgOpts.setRaftRpcClientService(this.rpcService);
rgOpts.setSnapshotStorage(this.snapshotExecutor != null ? this.snapshotExecutor.getSnapshotStorage() : null);
rgOpts.setRaftOptions(this.raftOptions);
rgOpts.setTimerManager(this.timerManager);
// Adds metric registry to RPC service.
this.options.setMetricRegistry(this.metrics.getMetricRegistry());
//初始化rpc服务
if (!this.rpcService.init(this.options)) {
LOG.error("Fail to init rpc service.");
return false;
}
this.replicatorGroup.init(new NodeId(this.groupId, this.serverId), rgOpts);
this.readOnlyService = new ReadOnlyServiceImpl();
final ReadOnlyServiceOptions rosOpts = new ReadOnlyServiceOptions();
rosOpts.setFsmCaller(this.fsmCaller);
rosOpts.setNode(this);
rosOpts.setRaftOptions(this.raftOptions);
//只读服务初始化
if (!this.readOnlyService.init(rosOpts)) {
LOG.error("Fail to init readOnlyService.");
return false;
}
这段代码里会将当前的状态设置为Follower,然后启动快照定时器定时生成快照。
如果当前的集群不是单节点集群需要做一下stepDown,表示新生成的Node节点需要重新进行选举。
最下面有一个if分支,如果当前的jraft集群里只有一个节点,那么个节点必定是leader直接进行选举就好了,所以会直接调用electSelf进行选举。
// 将当前的状态设置为Follower
this.state = State.STATE_FOLLOWER;
if (LOG.isInfoEnabled()) {
LOG.info("Node {} init, term={}, lastLogId={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,
this.logManager.getLastLogId(false), this.conf.getConf(), this.conf.getOldConf());
}
//如果快照执行器不为空,并且生成快照的时间间隔大于0,那么就定时生成快照
if (this.snapshotExecutor != null && this.options.getSnapshotIntervalSecs() > 0) {
LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(), this.currTerm);
this.snapshotTimer.start();
}
//新启动的node需要重新选举
if (!this.conf.isEmpty()) {
stepDown(this.currTerm, false, new Status());
}
if (!NodeManager.getInstance().add(this)) {
LOG.error("NodeManager add {} failed.", getNodeId());
return false;
}
// Now the raft node is started , have to acquire the writeLock to avoid race
// conditions
this.writeLock.lock();
//这个分支表示当前的jraft集群里只有一个节点,那么个节点必定是leader直接进行选举就好了
if (this.conf.isStable() && this.conf.getConf().size() == 1 && this.conf.getConf().contains(this.serverId)) {
// The group contains only this server which must be the LEADER, trigger
// the timer immediately.
electSelf();
} else {
this.writeLock.unlock();
}
return true;
SOFAJRaft 是一个基于 RAFT 一致性算法的生产级高性能 Java 实现。
第一次阅读这种复杂的开源代码,老实说确实非常吃力,但其实步步深入,反复推敲,逐渐会从恐惧陌生甚至抵触,转变为惊喜与赞叹。你会慢慢痴迷于里面很多优雅且优秀的实现。
在这里,感谢SOFAJRaft的每一位代码贡献者。源码的阅读过程中,的的确确学到了很多东西。我也会继续学习下去,希望能够巩固、深入我对RAFT一致性算法的理解与体悟。
大家好,又见面了,我是你们的朋友全栈君。1.说明:教程属于官方E聊SDK-简介(1) 进入官网 进入管理台 SDK版本:v1.012.简介:E聊SDK是一套适用于PC端,移动端的即时通讯解决方案,源代码开放。E聊整合了即时通讯的基础能力,使用E聊,您可以让您的应用快速接入即时聊天的功能。E聊现已适配PCWeb,移动Web,Android,iOS等平台。3.系统架构:3.1各模块介绍E聊服务器:提供了基础的消息转发功能,用户管理、群组管理等功能; E聊管理台:向E聊服务器申请接入SDK所需要的AppKey,ClientSecret,AppSecret等; 应用客户端:使用E聊账号登入E聊服务器,实现单聊、群聊消息收发等功能; 应用后台:业务应用后台,维护原有的业务用户列表,此外,需维护一张与E聊用户的关系映射表。3.2E聊功能介绍3.2.1单聊 点对点的基础聊天,支持文字,图片,文件等方式;3.2.2群聊 一对多的群聊/讨论组,支持文字,图片,文件等方式;3.2.3用户资料管理 用户账号(auid),密码(token),昵称,性别,头像,签名,手机,邮箱等;3.2.4用户关系管理 用户的
对于一个新手站长而言,在做SEO的过程中,相关的SEO培训机构,都是让对方建立一个自己的独立博客,通过优化一个博客,开始SEO之路。而在这个过程中,基于成本的核算,对方通常建议大家在学习SEO之初选择虚拟主机即可。那么,虚拟主机会影响到SEO吗?根据以往自媒体博客运营的经验,我们经通过如下内容,进一步说明:1、优点①价格非常一个明显的特征:虚拟主机的价格是非常廉价的,但这并不代表价廉低质,特别是新站初期,根本用不上服务器。②配置虚拟主机的服务器配置,一般都是主机商提前配置好,对于一些经验不足的网站推广员而言,根本不需要自己花费时间与精力配置服务器。比如:如何开启SSL,配置伪静态等,实在不行,你还可以反馈工单。③安全由于虚拟主机在服务器集群上,一般而言,网站被黑的情况,相对比较少见。2、缺点①同IP网站如果你的同IP网站,有的自己做了违规网站,而被搜索引擎所识别,那么,它可能连带整个IP段被搜索引擎所识别,导致在这个IP上的网站,出现百度K站的情况。②域名绑定我们知道如果是一个独立的服务器,在这个IP上,我们可以配置大量的网站,而如果是虚拟主机的话,那么,一般而言,一个虚拟主机所能够绑
这点我以前确实没想过刚看到这个观点的时候,我是很不以为然的,谁让它标题不吸引人>>>《构建测试体系》 就这标题,谁不知道要测试啊。还好我没有“以貌取文”,我认真的看了下去。如果观察一个程序员把时间都花在哪里了,可能下意识会说是编码,其实不然,仔细想想,除了编码,我们还要立项、设计、分工、规划、测试、调试、调试、调试,花大把时间的事情说三遍。为什么调试会花掉大把时间,因为工程大啊,你说它突然来个段错误,你知道是哪里段错误?段错误会死机,那不死机的呢?比方说数据传输错误,你手动设置防火墙它这个错误就不叫错误,就不会死机,就算你设置了防火墙,你去哪里找这个数据缺失?那么多地方传输数据。当然,找得到,绝对找得到,不过找一下也得几个小时甚至一天就下来了。每个类都配备测试代码,烦不烦啊你?烦。但是项目run的时候爆了烦不烦?那会儿可就不是一个人烦了,那是一个团队一起烦。 就比方说上次我们服务器客户端对接测试一个项目,中间数据传输出问题了,好了,现在是谁的问题都不知道。。。 然后测试端也只能跟后边等着了。这种问题其实完全可以避免,甚至可以不发生,只要给每个类配备一个测试代码。 写一
微信新功能:支持设置群公告和小程序卡片为【群待办】。设置为群待办的信息,其他群友会收到【群待办】的通知消息,并且可以看到其他群友的完成状态。一、支持群公告设置为【群待办】 微信的群公告大家都熟悉,会@所有人,群内的所有成员都会收到提醒。设置为【群待办】后,可以看到每个人的完成情况,包括时间信息,群主可以收到正向反馈。这个功能不一定适合所有群,但是某些场景可能比较有用,比如公司内部的公告,方便知道哪些员工已阅。 二、支持小程序卡片设置为【群待办】小程序在分享给好友时,会有个“同时设为群待办”的选项,如果打开此开关,小程序将被设置为【群待办】,同样会通知群内所有成员。也可以先发送小程序卡片到群内,然后再去群内把小程序设置为【群待办】。 小程序卡片的群待办功能,确实很有用,现在社区网格员,每天早上准时发小程序卡片到网格群,然后设置为【群待办】。之前的做法是,先分享小程序到群内,再发个群公告,让大家去填写信息。如果网格员管理的群比较多的话,这真是个体力活儿。比较好的一点儿,可以设置多个【群待办】,以及每个群待办的完成情况。如果群消息比较多的话,方便群员查看之前的公告。 各位小程序开发者,可以考
namespace设计解读namespace是Kubernetes进行多租户资源隔离的主要手段,那么它在系统中的表现形式是什么样的?实现原理和使用方法又是怎样的呢?什么是namespacenamespace是一个将Kubernetes的资源对象进行细分的类似于DNS子域名的概念。namespace能够帮助不同的租户共享一个Kubernetes集群。Kubernetes引人namespace的目的包括以下几点:建立一种简单易用,能够在逻辑上对Kubernetes资源对象进行隔离的机制。将资源对象与实际的物理节点解耦,用户只需关注namespace而非工作节点上的资源情况。随着Kubernetes访问控制代码开发的深人,与Kubernetes认证和授权机制相结合。通过namespace对Kubernete,资源进行归类,使得APIserver能够建立一套有效的过滤Kubernetes资源请求的机制。Kubernetes用户认证机制Kubernetes用以认证用户请求的方式主要有5种,下面将逐一进行简单说明。基于客户端证书的认证机制在APIServer启动过程中,通过传入--client-c
一、Spark数据分析导论1.Spark是一个用来实现快速而通用的集群计算的平台,扩展了MapReduce计算模型,支持更多计算模式,包括交互式查询和流处理2.包括SparkCore、SparkSQL、SparkStreaming(内存流式计算)、MLlib(机器学习)、GraphX(图计算)3.适用于数据科学应用和数据处理应用二、Spark下载与入门1.Spark应用都由一个驱动器程序(driverprogram)来发起集群上的各种并行操作,驱动程序通过一个SparkContext对象来访问Spark,这个对象代表对计算集群的一个连接三、RDD编程1.RDD(ResilientDistributedDataset,弹性分布式数据集),就是分布式的元素集合,在Spark中,对数据的所有操作就是创建RDD、转化RDD以及调用RDD操作进行求值2.工作方式:从外部数据创建出输入RDD使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD告诉Spark对需要被征用的中间结果RDD执行persist()操作使用行动操作(例如count()和first()等)来触发一次并行计算
序本文主要研究一下scalecube-cluster的MembershipProtocolMembershipProtocolscalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java/** *ClusterMembershipProtocolcomponentresponsibleformanaginginformationaboutexistingmembers *ofthecluster. */ publicinterfaceMembershipProtocol{ /** *Startsrunningclustermembershipprotocol.Afterstarteditbeginstoreceiveandsendcluster *membershipmessages */ Mono<Void>start(); /**Stopsrunningclustermembershipprotocolandrelease
错误处理错误处理机制:try...except...finally...try: print('try...') r=10/5 print('result:',int(r)) exceptZeroDivisionErrorase: print('except:',e) finally: print('finally...') print('END')复制由于没有错误发生,所以except语句块不会被执行,但是finally如果有,则一定会被执行(可以没有finally语句)。try... result:2 finally... END复制当我们认为某些代码可能会出错时,就可以用try来运行这段代码,如果执行出错,则后续代码不会继续执行,而是直接跳转至错误处理代码,即except语句块,执行完except后,如果有finally语句块,则执行finally语句块,至此,执行完毕。try: print('try...') r=10/0 print('result
今日洞见文章作者来自ThoughtWorks:吴雪峰,配图来自网络。本文所有内容,包括文字、图片和音视频资料,版权均属ThoughtWorks公司所有,任何媒体、网站或个人未经本网协议授权不得转载、链接、转贴或以其他方式复制发布/发表。已经本网协议授权的媒体、网站,在使用时必须注明"内容来源:ThoughtWorks洞见",并指定原文链接,违者本网将依法追究责任。2016年3月,笔者有幸和诸多对Scala感兴趣的人一起,跟Scala的创始人MartinOdersky做了一次面对面的交流。下面是这次交流中的一些问题的整理,采用问答的形式,根据问答内容做了简单的摘要。提问者:我看到SparkCore里面有很多OOP风格的代码,这是为什么?MartinOdersky:Spark的API设计是和Scala集合类设计是一致的函数式风格,里面具体的实现为了追求性能用了命令式,你可以看到Scala集合里面的实现函数为了性能也用了很多var。提问者:很高兴看到你将在Coursera上再发布两门Scala的课程。但我想问一下,怎么培训那些大学没有毕业的人用Scala?MartinOd
人工神经网络如何理解我们大脑的神经网络?在3月24日至26日的周末,ycombinator支持的创业公司DeepGram举办了一场深度学习黑客马拉松。参加这个周末活动的人包括谷歌大脑的发言人和法官。我选择了由DeepGram提出的EEGreadings数据集,它来自斯坦福的一个研究项目,在该项目使用线性判别分析来预测测试对象看到的图像类别。WinningKaggle竞赛小组已经成功地将人工神经网络应用于EEG数据。人工神经网络模型能在斯坦福的数据集上做得更好吗?测试对象的六个主要类别是:人的身体,人的脸,动物的身体,动物的脸,自然的物体和人造的物体。数据集描述和表达斯坦福的研究论文含有下载他们的数据集的链接。可以在GitHub.上找到。根据数据集附带的README文本文件,他们在测试对象上使用的EEG传感器就是这个装置:该装置有124个电极,每人每次显示一个图像,每个电极可以收集32个读数,每次读数为62.5Hz。以下是第一次试验时EEGreadings中电极1的图像,该图表示在第一个测试对象(十分之一)上进行试验时,测试对象显示图像的时间大约为半秒。现在,想象一下如何安排EEGrea
MSP430F5529共有两类共4个定时器,分别是Timer_A定时器3个和Timer_B定时器1个,按照每个寄存器配备的捕获/比较器的个数分别命名为Timer0_A(内有5个捕获比较器)、Timer1_A(3个)、Timer2_A(3个)、Timer0_B(7个)。 这一章,我们讲定时器Timer0_A.(A类的都一样) 注意:下面所提到的所有寄存器,在TA后面插入0或1或2就分别表示Timer0_A、Timer1_A、Timer2_A(我这里省略了数字) 6.1 简介一下 定时器A是一个复合了捕获/比较寄存器的十六位的定时(加减)计数器。定时器A支持多重捕获/比较,PWM输出和内部定时,具有扩展中断功能,中断可以由定时器溢出产生或由捕获/比较寄存器产生。 特征简介: ○四种运行模式的异步16位定时/计
https://mp.weixin.qq.com/s?__biz=MzI4MDYzNzg4Mw==&mid=2247537696&idx=4&sn=4db4f54f831277c05e63b9c1df4ca75a&chksm=ebb76cf4dcc0e5e254f0b76fddcab79008837b254e4279ed6c04b2b8487871207b4167f18298&mpshare=1&scene=23&srcid=0708YNcJ7WabHhm0RtjcK93h&sharer_sharetime=1625713182219&sharer_shareid=9ed15fc26b568c844598f8638f4c17a4#rd
HolidayHotel TimeLimit: 2000MS MemoryLimit: 65536K TotalSubmissions: 8302 Accepted: 3249 Description Mr.andMrs.Smitharegoingtotheseasidefortheirholiday.Beforetheystartoff,theyneedtochooseahotel.TheygotalistofhotelsfromtheInternet,andwanttochoosesomecandidatehotelswhicharecheapandclosetotheseashore.AcandidatehotelMmeetstworequirements: AnyhotelwhichisclosertotheseashorethanMwillbemoreexpensiveth
题号标题已通过代码题解/讨论通过率团队的状态 A ThepowerofFibonacci 点击查看 进入讨论 69/227 未通过 B Quadraticequation 点击查看 高次剩余 391/888 未通过 C Inversionsofallpermutations 点击查看 进入讨论 28/61 未通过 D KnapsackCryptosystem 点击查看 进入讨论 606/2251 通过 E Allmenarebrothers 点击查看 进入讨论 425/1117 通过 F BirthdayReminders 点击查看 进入讨论 5/11 未通过 G Checkers 点击查看 进入讨论 0/15 未通过 H CuttingBamboos 点击查看 二分,主席树 187/834 通过 I KMandM 点击查看 进入讨论 19/296 未通过 J SymmetricalPainting 点击查看 进入讨论 227/930 通过 H CuttingBa
造成原因: 之前docker容器关闭时没有 解决方案: 删除原有运行时产生的文件,再重新运行所有容器 rm-rf/run/runc/* #启动所有容器 dockerstart$(dockerps-a|awk'{print$1}'|tail-n+2)复制 以下是补充的docker管理命令#关闭所有容器 dockerstop$(dockerps-a|awk'{print$1}'|tail-n+2)复制 #重启所有容器 dockerrestart$(dockerps-a-q)复制 #删除所有容器 dockerrm$(dockerps-a|awk'{print$1}'|tail-n+2)复制 #删除所有镜像 dockerrmi$(dockerimages|awk'{print$3}'|tail-n+2)复制 2、造成protain管理的web界面无法连接上容器管理者 编辑 vi/usr/lib/systemd/system/docker.service复制 ExecStart=/usr/bin/dockerd-Htcp://0.0.0.0:2375-Hunix:
没事就提醒我安全更新,GUI界面里只能禁一部分,烦死了。 网上找了一个老版本的方法,不知道对20.04是不是有用 sudosystemctldisableapt-daily.serviceapt-daily-upgrade.service sudosystemctldisableapt-daily.timerapt-daily-upgrade.timer sudosystemctlstopapt-daily.serviceapt-daily-upgrade.service sudosystemctlstopapt-daily.timerapt-daily-upgrade.timer复制
QuartusII18.xx创建新工程 本节以QuartusII18.0为例介绍如何在QuatusII下创建一个新的工程,其它版本如QuartusII18.XX,QuartusII19.XX,QuartusII20.XX,基本一致,可以参照本节步骤执行。 启动QuartusII18.0liteEdition,启动后的界面如图1所示: 图1 在新建、打开项目向导区,点击NewProjectWizard 按钮,如图2红框标识: 图2 注:图2左下方绿框设置只有在QuartusII新打开的时候出现,工程打开后就会关闭,如果选择”Don’tshowthisscreenagain”,在以后新建,打开工程将不在出现。 也可以如图3,主菜单点击 File->NewProjectWizard,将会跳出图3界面。 图3 图3界面中介绍了,后续将会选择或设置的各项内容。如: 工程名称和工程目录 顶层设计实体的名称 工程文件及库 要开发的目标器件家族及具体器件 EDA(electronicdesignautomation)工具的各项设定。 后面我们会一步一步介绍上
sql注入报错注入原理详解 转载sql报错注入原理解析:https://blog.csdn.net/he_and/article/details/80455884 前言 我相信很多小伙伴在玩sql注入报错注入时都会有一个疑问,为什么这么写就会报错?曾经我去查询的时候,也没有找到满意的答案,时隔几个月终于找到搞清楚原理,特此记录,也希望后来的小伙伴能够少走弯路 0x01 我们先来看一看现象,我这里有一个users表,里面有五条数据: 然后用我们的报错语句查询一下: selectcount(*),(concat(floor(rand()*2),(selectversion())))xfromusersgroupbyx 复制 成功爆出了数据库的版本号。 要理解这个错误产生的原因,我们首先要知道groupby语句都做了什么。我们用一个studetn表来看一下: 现在我们通过年龄对这个表中的数据进行下分组: 形成了一个新的表是吧?你其实应该能够想到groupby语句的执行流程了吧?最开始我们看到的这张sage-count()表应该时空的,但是在group
//需求是描述内容过长时将超出部分隐藏并显示省略号,鼠标放上去可查看全部信息 columns=[ {title:'描述',dataIndex:'desc',key:"desc", onCell:()=>{ return{ style:{ maxWidth:150, overflow:'hidden', whiteSpace:'nowrap', textOverflow:'ellipsis', cursor:'pointer' } } }, render:(text)=><Tooltipplacement="topLeft"title={text}>{text}</Tooltip> }, ] render(){ return( <Table dataSource={list} columns={this.columns} loading={loading} /> ) } 复制
效果图如下: 右侧导航栏英文字母旋转 如有需要参考: <!DOCTYPEhtml><html><head><title>右侧导航栏英文字母旋转</title> </head><body><style>/*css初始化*/a,body,dd,div,dl,dt,h1,h2,h3,h4,h5,h6,input,li,p,ul{margin:0;padding:0}li,ul{list-style:none}img{vertical-align:top;border:0none}a{text-decoration:none}a,a:visited{color:#424242;text-decoration:none}i{font-style:normal}em{font-style:normal}.fl{float:left}.fr{float:right}.pr{position:relative}.pa{position:absolute}