前期准备、基础概念与机制
版本
1.3.7
相关脚本
# 启动ElectionBootstrap脚本 boot_example1.sh 放在jraft-example目录中
#!/bin/bash
DATA_PATH=$1
ID=$2
ENDPOINT=$3
ENDPOINT_LIST=$4
CLASSPATH=$(echo target/jraft-bin/lib/*.jar|tr ' ' ':' )
CLASSPATH=target/jraft-example-1.3.7.jar:${CLASSPATH}
java -cp ${CLASSPATH} com.alipay.sofa.jraft.example.election.ElectionBootstrap ${DATA_PATH} ${ID} ${ENDPOINT} ${ENDPOINT_LIST}
#EOF
# 启动三个节点用于选主
nohup ./boot_example1.sh /tmp/server1 election_test 127,0.0.1:8081 127,0.0.1:8081,127.0.0,1:8082,127.0.0.1:8083 >>node1.log 2>&1 &
nohup ./boot_example1.sh /tmp/server2 election_test 127.0.0.1:8082 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083 >>node2.log 2>&1 &
nohup ./boot_example1.sh /tmp/server3 election_test 127.0.0.1:8083 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083 >>node3.log 2>&1 &
# 启动计数器server
# boot_counter_server1.sh
#!/bin/bash
DATA_PATH=$1
ID=$2
ENDPOINT=$3
ENDPOINT_LIST=$4
JAVA_DEBUG_PORT=$5
CLASSPATH=$(echo target/jraft-bin/lib/*.jar|tr ' ' ':' )
CLASSPATH=target/jraft-example-1.3.7.jar:${CLASSPATH}
java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=${JAVA_DEBUG_PORT} -cp ${CLASSPATH} com.alipay.sofa.jraft.example.counter.CounterServer ${DATA_PATH} ${ID} ${ENDPOINT} ${ENDPOINT_LIST}
nohup ./boot_counter_server1.sh /tmp/server1 election_test 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083 8051 >>node1.log 2>&1 &
nohup ./boot_counter_server1.sh /tmp/server2 election_test 127.0.0.1:8082 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083 8052 >>node2.log 2>&1 &
nohup ./boot_counter_server1.sh /tmp/server3 election_test 127.0.0.1:8083 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083 8053 >>node3.log 2>&1 &
# counter_client.sh
#!/bin/bash
ID=$1
ENDPOINT_LIST=$2
CLASSPATH=$(echo target/jraft-bin/lib/*.jar|tr ' ' ':' )
CLASSPATH=target/jraft-example-1.3.7.jar:${CLASSPATH}
java -cp ${CLASSPATH} com.alipay.sofa.jraft.example.counter.CounterClient ${ID} ${ENDPOINT_LIST}
./counter_client.sh election_test 127.0.0.1:8081,127.0.0.1:8082,127.0,0.1:8083
从其example中学习到make-assembly
maven插件可以打包出带有bin、依赖放在lib目录下这种“应用的release包”,这些配置可以写在一assembly.xml中,之前仅仅是用make-assembly
来打那种fat jar。
基础概念与机制
LogId与LogIndex的区别是什么?
LogId由term和index二元组组成。LogIndex单纯的指单条日志在整个日志中的索引。
事件与闭包机制
disruptor事件异步
在sofa-jraft中设计了事件与闭包的机制去完成了对于Log的逻辑处理,而且还有不少其他地方用了类似的机制,比如FSMCallerImpl中。了解LogId的处理逻辑之前,先了解下事件与闭包机制。
事件类型有:
- OTHER, // other event type.
- RESET, // reset
- TRUNCATE_PREFIX, // truncate log from prefix
- TRUNCATE_SUFFIX, // truncate log from suffix
- SHUTDOWN, //
- LAST_LOG_ID // get last log id
在定义了这些事件类型之后,用Disruptor(disruptor)
,RingBuffer(diskQueue,disruptor的队列)
,StableClosure(done,事件执行完成之后的回调)
三个组件完成事件执行以及事件执行之后闭包的回调。
com.alipay.sofa.jraft.storage.impl.LogManagerImpl.disruptor
com.alipay.sofa.jraft.storage.impl.LogManagerImpl.StableClosureEventHandler
com.alipay.sofa.jraft.storage.impl.LogManagerImpl.diskQueue
关键代码如下:
this.disruptor.handleEventsWith(new StableClosureEventHandler()); // disruptor的事件处理器
this.diskQueue = this.disruptor.start(); // 事件处理队列
private void offerEvent(final StableClosure done, final EventType type) {
if (this.stopped) {
Utils.runClosureInThread(done, new Status(RaftError.ESTOP, "Log manager is stopped."));
return;
}
if (!this.diskQueue.tryPublishEvent((event, sequence) -> {
event.reset();
event.type = type;
event.done = done;
})) {
reportError(RaftError.EBUSY.getNumber(), "Log manager is overload.");
Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Log manager is overload."));
}
}
StableClosureEventHandler是disruptor的事件处理器 ,配合disruptor使用,其实现了disruptor的EventHandler接口的onEvent方法。在Handler中流转的是StableClosureEvent。StableClosureEvent包含了事件类型和回调逻辑封装两个要素。
闭包异步回调
sofa-jraft中有很多回调done的逻辑是通过异步执行落地的,具体的提交回调异步执行的代码如下:
// com/alipay/sofa/jraft/util/Utils.java
private static ThreadPoolExecutor CLOSURE_EXECUTOR = ThreadPoolUtil
// ...
public static Future<?> runInThread(final Runnable runnable) {
return CLOSURE_EXECUTOR.submit(runnable);
}
protobuf存储数据
数据定义示例
以快照元数据为例子,__raft_snapshot_meta name是jraft.LocalSnapshotPbMeta
,数据定义如下:
// local_storage.proto
message LocalSnapshotPbMeta {
message File {
required string name = 1;
optional LocalFileMeta meta = 2;
};
optional SnapshotMeta meta = 1;
repeated File files = 2;
}
// raft.proto
message SnapshotMeta {
required int64 last_included_index = 1;
required int64 last_included_term = 2;
repeated string peers = 3;
repeated string old_peers = 4;
repeated string learners = 5;
repeated string old_learners = 6;
}
enum FileSource {
FILE_SOURCE_LOCAL = 0;
FILE_SOURCE_REFERENCE = 1;
}
// local_file_meta.proto
message LocalFileMeta {
optional bytes user_meta = 1;
optional FileSource source = 2;
optional string checksum = 3;
}
__raft_snapshot_meta toString后的数据形式如下:
meta {
last_included_index: 1
last_included_term: 1
peers: "127.0.0.1:8081"
peers: "127.0.0.1:8082"
peers: "127.0.0.1:8083"
}
files {
name: "data"
meta {
}
}
读取protobuf数据
读取protobuf数据的地方
com.alipay.sofa.jraft.storage.io.ProtoBufFile.load()
整体格式如下:
name的长度(4 byte)|name|msg的长度(4 byte)|msg
| 是示意格式的分隔符,在真正文件中不会出现。
这个格式并非protobuf要求,这是sofa-jraft自己加了name的长度(4 byte)|name|msg的长度(4 byte)|这部分内容的,细节参见com.alipay.sofa.jraft.storage.io.ProtoBufFile.save(Message, boolean)
读取数据的工具类
读取数据的工具类,用于在分析时解析本地protobuf文件:
public class Test {
private static Map<String/* class name in proto file */, MethodHandle> PARSE_METHODS_4PROTO = new HashMap<>();
public static void main(String[] args) throws Throwable {
final FileDescriptorSet descriptorSet = FileDescriptorSet
.parseFrom(Test.class.getResourceAsStream("/raft.desc"));
final List<FileDescriptor> resolveFDs = new ArrayList<>();
for (final FileDescriptorProto fdp : descriptorSet.getFileList()) {
final FileDescriptor[] dependencies = new FileDescriptor[resolveFDs.size()];
resolveFDs.toArray(dependencies);
final FileDescriptor fd = FileDescriptor.buildFrom(fdp, dependencies);
resolveFDs.add(fd);
for (final Descriptor descriptor : fd.getMessageTypes()) {
final String className = fdp.getOptions().getJavaPackage() + "."
+ fdp.getOptions().getJavaOuterClassname() + "$" + descriptor.getName();
final Class<?> clazz = Class.forName(className);
System.out.println(className);
final MethodHandle parseFromHandler = MethodHandles.lookup().findStatic(clazz, "parseFrom",
methodType(clazz, byte[].class));
System.out.println(descriptor.getFullName());
PARSE_METHODS_4PROTO.put(descriptor.getFullName(), parseFromHandler);
}
}
LocalSnapshotPbMeta localSnapshotPbMeta = null;
FileInputStream input = new FileInputStream(new File("/tmp/server1/snapshot/snapshot_4/__raft_snapshot_meta"));
final byte[] lenBytes = new byte[4];
readBytes(lenBytes, input);
final int len = getInt(lenBytes, 0);
if (len <= 0) {
throw new IOException("Invalid message fullName.");
}
final byte[] nameBytes = new byte[len];
readBytes(nameBytes, input);
final String name = new String(nameBytes);
System.out.println("name=" + name);
readBytes(lenBytes, input);
final int msgLen = getInt(lenBytes, 0);
final byte[] msgBytes = new byte[msgLen];
readBytes(msgBytes, input);
localSnapshotPbMeta = (LocalSnapshotPbMeta) PARSE_METHODS_4PROTO.get(name).invoke(msgBytes);
System.out.println(localSnapshotPbMeta);
}
private static void readBytes(final byte[] bs, final InputStream input) throws IOException {
int read;
if ((read = input.read(bs)) != bs.length) {
throw new IOException("Read error, expects " + bs.length + " bytes, but read " + read);
}
}
static int getInt(byte[] memory, int index) {
return (memory[index] & 0xff) << 24 | (memory[index + 1] & 0xff) << 16 | (memory[index + 2] & 0xff) << 8
| memory[index + 3] & 0xff;
}
}
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!