前期准备、基础概念与机制

版本

1.3.7

相关脚本

# 启动ElectionBootstrap脚本 boot_example1.sh 放在jraft-example目录中
#!/bin/bash
DATA_PATH=$1
ID=$2
ENDPOINT=$3
ENDPOINT_LIST=$4
CLASSPATH=$(echo target/jraft-bin/lib/*.jar|tr ' ' ':' )
CLASSPATH=target/jraft-example-1.3.7.jar:${CLASSPATH}
java -cp ${CLASSPATH} com.alipay.sofa.jraft.example.election.ElectionBootstrap ${DATA_PATH} ${ID} ${ENDPOINT} ${ENDPOINT_LIST} 
#EOF

# 启动三个节点用于选主
nohup ./boot_example1.sh /tmp/server1 election_test 127,0.0.1:8081 127,0.0.1:8081,127.0.0,1:8082,127.0.0.1:8083 >>node1.log 2>&1 &
nohup ./boot_example1.sh /tmp/server2 election_test 127.0.0.1:8082 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083 >>node2.log 2>&1 &
nohup ./boot_example1.sh /tmp/server3 election_test 127.0.0.1:8083 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083 >>node3.log 2>&1 &

# 启动计数器server
# boot_counter_server1.sh
#!/bin/bash
DATA_PATH=$1
ID=$2
ENDPOINT=$3
ENDPOINT_LIST=$4
JAVA_DEBUG_PORT=$5
CLASSPATH=$(echo target/jraft-bin/lib/*.jar|tr ' ' ':' )
CLASSPATH=target/jraft-example-1.3.7.jar:${CLASSPATH}
java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=${JAVA_DEBUG_PORT} -cp ${CLASSPATH} com.alipay.sofa.jraft.example.counter.CounterServer ${DATA_PATH} ${ID} ${ENDPOINT} ${ENDPOINT_LIST} 

nohup ./boot_counter_server1.sh /tmp/server1 election_test 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083 8051 >>node1.log 2>&1 &
nohup ./boot_counter_server1.sh /tmp/server2 election_test 127.0.0.1:8082 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083 8052 >>node2.log 2>&1 &
nohup ./boot_counter_server1.sh /tmp/server3 election_test 127.0.0.1:8083 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083 8053 >>node3.log 2>&1 &


# counter_client.sh
#!/bin/bash
ID=$1
ENDPOINT_LIST=$2
CLASSPATH=$(echo target/jraft-bin/lib/*.jar|tr ' ' ':' )
CLASSPATH=target/jraft-example-1.3.7.jar:${CLASSPATH}
java -cp ${CLASSPATH} com.alipay.sofa.jraft.example.counter.CounterClient ${ID} ${ENDPOINT_LIST}

./counter_client.sh election_test 127.0.0.1:8081,127.0.0.1:8082,127.0,0.1:8083

从其example中学习到make-assembly maven插件可以打包出带有bin、依赖放在lib目录下这种“应用的release包”,这些配置可以写在一assembly.xml中,之前仅仅是用make-assembly来打那种fat jar。

基础概念与机制

LogId与LogIndex的区别是什么?

LogId由term和index二元组组成。LogIndex单纯的指单条日志在整个日志中的索引。

事件与闭包机制

disruptor事件异步

在sofa-jraft中设计了事件与闭包的机制去完成了对于Log的逻辑处理,而且还有不少其他地方用了类似的机制,比如FSMCallerImpl中。了解LogId的处理逻辑之前,先了解下事件与闭包机制。

事件类型有:

  • OTHER, // other event type.
  • RESET, // reset
  • TRUNCATE_PREFIX, // truncate log from prefix
  • TRUNCATE_SUFFIX, // truncate log from suffix
  • SHUTDOWN, //
  • LAST_LOG_ID // get last log id

在定义了这些事件类型之后,用Disruptor(disruptor)RingBuffer(diskQueue,disruptor的队列)StableClosure(done,事件执行完成之后的回调)三个组件完成事件执行以及事件执行之后闭包的回调。

com.alipay.sofa.jraft.storage.impl.LogManagerImpl.disruptor

com.alipay.sofa.jraft.storage.impl.LogManagerImpl.StableClosureEventHandler

com.alipay.sofa.jraft.storage.impl.LogManagerImpl.diskQueue

关键代码如下:

this.disruptor.handleEventsWith(new StableClosureEventHandler()); // disruptor的事件处理器  
this.diskQueue = this.disruptor.start(); // 事件处理队列


private void offerEvent(final StableClosure done, final EventType type) {
  if (this.stopped) {
    Utils.runClosureInThread(done, new Status(RaftError.ESTOP, "Log manager is stopped."));
    return;
  }
  if (!this.diskQueue.tryPublishEvent((event, sequence) -> {
    event.reset();
    event.type = type;
    event.done = done;
  })) {
    reportError(RaftError.EBUSY.getNumber(), "Log manager is overload.");
    Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Log manager is overload."));
  }
}

StableClosureEventHandler是disruptor的事件处理器 ,配合disruptor使用,其实现了disruptor的EventHandler接口的onEvent方法。在Handler中流转的是StableClosureEvent。StableClosureEvent包含了事件类型和回调逻辑封装两个要素。

闭包异步回调

sofa-jraft中有很多回调done的逻辑是通过异步执行落地的,具体的提交回调异步执行的代码如下:

// com/alipay/sofa/jraft/util/Utils.java
private static ThreadPoolExecutor CLOSURE_EXECUTOR                    = ThreadPoolUtil
// ...
public static Future<?> runInThread(final Runnable runnable) {
  return CLOSURE_EXECUTOR.submit(runnable);
}

protobuf存储数据

数据定义示例

以快照元数据为例子,__raft_snapshot_meta name是jraft.LocalSnapshotPbMeta,数据定义如下:

// local_storage.proto
message LocalSnapshotPbMeta {
    message File {
        required string name = 1;
        optional LocalFileMeta meta = 2;
    };
    optional SnapshotMeta meta = 1;
    repeated File files = 2;
}

// raft.proto
message SnapshotMeta {
    required int64 last_included_index = 1;
    required int64 last_included_term = 2;
    repeated string peers = 3;
    repeated string old_peers = 4;
    repeated string learners = 5;
    repeated string old_learners = 6;
}

enum FileSource {
    FILE_SOURCE_LOCAL = 0;
    FILE_SOURCE_REFERENCE = 1;
}

// local_file_meta.proto
message LocalFileMeta {
    optional bytes user_meta   = 1;
    optional FileSource source = 2;
    optional string checksum   = 3;
}

__raft_snapshot_meta toString后的数据形式如下:

meta {
  last_included_index: 1
  last_included_term: 1
  peers: "127.0.0.1:8081"
  peers: "127.0.0.1:8082"
  peers: "127.0.0.1:8083"
}
files {
  name: "data"
  meta {
  }
}

读取protobuf数据

读取protobuf数据的地方

com.alipay.sofa.jraft.storage.io.ProtoBufFile.load()

整体格式如下:

name的长度(4 byte)|name|msg的长度(4 byte)|msg

| 是示意格式的分隔符,在真正文件中不会出现。

这个格式并非protobuf要求,这是sofa-jraft自己加了name的长度(4 byte)|name|msg的长度(4 byte)|这部分内容的,细节参见com.alipay.sofa.jraft.storage.io.ProtoBufFile.save(Message, boolean)

读取数据的工具类

读取数据的工具类,用于在分析时解析本地protobuf文件:

public class Test {
	private static Map<String/* class name in proto file */, MethodHandle> PARSE_METHODS_4PROTO = new HashMap<>();
	public static void main(String[] args) throws Throwable {
		final FileDescriptorSet descriptorSet = FileDescriptorSet
				.parseFrom(Test.class.getResourceAsStream("/raft.desc"));
		final List<FileDescriptor> resolveFDs = new ArrayList<>();
		for (final FileDescriptorProto fdp : descriptorSet.getFileList()) {
			final FileDescriptor[] dependencies = new FileDescriptor[resolveFDs.size()];
			resolveFDs.toArray(dependencies);
			final FileDescriptor fd = FileDescriptor.buildFrom(fdp, dependencies);
			resolveFDs.add(fd);
			for (final Descriptor descriptor : fd.getMessageTypes()) {
				final String className = fdp.getOptions().getJavaPackage() + "."
						+ fdp.getOptions().getJavaOuterClassname() + "$" + descriptor.getName();
				final Class<?> clazz = Class.forName(className);
				System.out.println(className);
				final MethodHandle parseFromHandler = MethodHandles.lookup().findStatic(clazz, "parseFrom",
						methodType(clazz, byte[].class));
				System.out.println(descriptor.getFullName());
				PARSE_METHODS_4PROTO.put(descriptor.getFullName(), parseFromHandler);
			}
		}

		LocalSnapshotPbMeta localSnapshotPbMeta = null;
		FileInputStream input = new FileInputStream(new File("/tmp/server1/snapshot/snapshot_4/__raft_snapshot_meta"));
		final byte[] lenBytes = new byte[4];
		readBytes(lenBytes, input);
		final int len = getInt(lenBytes, 0);
		if (len <= 0) {
			throw new IOException("Invalid message fullName.");
		}
		final byte[] nameBytes = new byte[len];
		readBytes(nameBytes, input);
		final String name = new String(nameBytes);
		System.out.println("name=" + name);
		readBytes(lenBytes, input);
		final int msgLen = getInt(lenBytes, 0);
		final byte[] msgBytes = new byte[msgLen];
		readBytes(msgBytes, input);
		localSnapshotPbMeta = (LocalSnapshotPbMeta) PARSE_METHODS_4PROTO.get(name).invoke(msgBytes);
		System.out.println(localSnapshotPbMeta);
	}

	private static void readBytes(final byte[] bs, final InputStream input) throws IOException {
		int read;
		if ((read = input.read(bs)) != bs.length) {
			throw new IOException("Read error, expects " + bs.length + " bytes, but read " + read);
		}
	}

	static int getInt(byte[] memory, int index) {
		return (memory[index] & 0xff) << 24 | (memory[index + 1] & 0xff) << 16 | (memory[index + 2] & 0xff) << 8
				| memory[index + 3] & 0xff;
	}

}