上篇介绍了NetworkReceivejava
当接收到NetworkReceive, Processor会构造了Request实例,发送给RequestChannelapi
private def processCompletedReceives() { selector.completedReceives.asScala.foreach { receive => val openChannel = selector.channel(receive.source) val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress) // 建立Request实例 val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeNanos = time.nanoseconds, listenerName = listenerName, securityProtocol = securityProtocol) requestChannel.sendRequest(req) selector.mute(receive.source) } }
Request表示请求,它有两个主要的属性。session
header是通用请求的头部数据结构
bodyAndSize是请求的数据部分,它根据不一样类型的请求,返回不一样的实例socket
case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) { val requestId = buffer.getShort() // 这里只是为了支持v0版本的shutdown请求 val requestObj: RequestOrResponse = if (requestId == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id) ControlledShutdownRequest.readFrom(buffer) else null val header: RequestHeader = if (requestObj == null) { buffer.rewind // 使用RequestHeader的类方法解析 try RequestHeader.parse(buffer) catch { case ex: Throwable => throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex) } } else null val bodyAndSize: RequestAndSize = if (requestObj == null) try { if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) { new RequestAndSize(new ApiVersionsRequest.Builder().build(), 0) } else // 根据apiKey,apiVersion和buffer,实例化RequestAndSize AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer) } catch { case ex: Throwable => throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex) } else null buffer = null
Type类是基本的数据格式,它内置定义了经常使用的数据结构,方便数据的读取ide
public abstract class Type { // 进数据Object写入到ByteBuffer public abstract void write(ByteBuffer buffer, Object o); // 从ByteBuffer读取Object数据 public abstract Object read(ByteBuffer buffer); // 验证Object是否合理 public abstract Object validate(Object o); // 返回Object数据的大小 public abstract int sizeOf(Object o); // 是否数据为null public boolean isNullable() { return false; }
自定义的Type, 有INT8,INT16,...STRING,BYTES,VARINT...。ui
下面以INT16为例。由于INT16占2个字节,恰好是Short类型this
public static final Type INT16 = new Type() { @Override public void write(ByteBuffer buffer, Object o) { // 调用ByteBuffer的putShort方法 buffer.putShort((Short) o); } @Override public Object read(ByteBuffer buffer) { // 调用ByteBuffer的getShort方法 return buffer.getShort(); } @Override public int sizeOf(Object o) { // 占两个字节 return 2; } @Override public String toString() { return "INT16"; } @Override public Short validate(Object item) { // 检验Object是否为Short类型 if (item instanceof Short) return (Short) item; else throw new SchemaException(item + " is not a Short."); } };
Field只是一些属性的集合类scala
public class Field { public static final Object NO_DEFAULT = new Object(); // 位置,代表在Schema的位置 final int index; // 名称 public final String name; // 类型 public final Type type; // 默认值 public final Object defaultValue; // 解释文档 public final String doc; final Schema schema; public Field(int index, String name, Type type, String doc, Object defaultValue) { this(index, name, type, doc, defaultValue, null); } public Field(String name, Type type, String doc, Object defaultValue) { this(-1, name, type, doc, defaultValue); } public Field(String name, Type type, String doc) { this(name, type, doc, NO_DEFAULT); }
Schema是Field的集合,Field在里面是有顺序的。它支持数据读取,返回Struct类型。code
public class Schema extends Type { // fields列表,按照顺序排序 private final Field[] fields; // 哈希表,用来经过string查看field private final Map<String, Field> fieldsByName; public Schema(Field... fs) { // this.fields = new Field[fs.length]; this.fieldsByName = new HashMap<>(); for (int i = 0; i < this.fields.length; i++) { Field field = fs[i]; if (fieldsByName.containsKey(field.name)) throw new SchemaException("Schema contains a duplicate field: " + field.name); // 实例Field,注意第一个参数i,表示位置 this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this); this.fieldsByName.put(fs[i].name, this.fields[i]); } } public Struct read(ByteBuffer buffer) { Object[] objects = new Object[fields.length]; // 按照fields的顺序,依次读取值,而且保存到objects列表 for (int i = 0; i < fields.length; i++) { try { objects[i] = fields[i].type.read(buffer); } catch (Exception e) { throw new SchemaException("Error reading field '" + fields[i].name + "': " + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } } // Struct提供了方便的接口访问数据 return new Struct(this, objects); } public void write(ByteBuffer buffer, Object o) { Struct r = (Struct) o; // 按照fields的顺序依次遍历 for (Field field : fields) { try { // 调用Struct的get方法,获取值 Object value = field.type().validate(r.get(field)); // 写入到buffer field.type.write(buffer, value); } catch (Exception e) { throw new SchemaException("Error writing field '" + field.name + "': " + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } } }
内置的Schema都在Protocol类里定义
public class Protocol { public static final Schema REQUEST_HEADER = new Schema( new Field("api_key", INT16, "The id of the request type."), new Field("api_version", INT16, "The version of the API."), new Field("correlation_id", INT32, "A user-supplied integer value that will be passed back with the response"), new Field("client_id", NULLABLE_STRING, "A user specified identifier for the client making the request.", "")); ......
RequestHeader代表通用的请求头部
RequestHeader数据结构
|api_key | api_version | correlation_id | client_id |
public class RequestHeader extends AbstractRequestResponse { private final short apiKey; private final short apiVersion; private final String clientId; private final int correlationId; public RequestHeader(Struct struct) { apiKey = struct.getShort(API_KEY_FIELD); apiVersion = struct.getShort(API_VERSION_FIELD); clientId = struct.getString(CLIENT_ID_FIELD); correlationId = struct.getInt(CORRELATION_ID_FIELD); } public static RequestHeader parse(ByteBuffer buffer) { // 使用内置的Schema,调用read方法读取 return new RequestHeader(Protocol.REQUEST_HEADER.read(buffer)); }
在Request类中,调用AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer),初始化AbstractRequest。最后request和struct实例化RequestAndSize
public abstract class AbstractRequest extends AbstractRequestResponse { public static RequestAndSize getRequest(int requestId, short version, ByteBuffer buffer) { // 根据header.apiKey,取出ApiKeys ApiKeys apiKey = ApiKeys.forId(requestId); Struct struct = apiKey.parseRequest(version, buffer); AbstractRequest request; // 根据apiKey实例化不一样的RequestAndSize switch (apiKey) { case PRODUCE: request = new ProduceRequest(struct, version); break; case FETCH: request = new FetchRequest(struct, version); break; case LIST_OFFSETS: request = new ListOffsetRequest(struct, version); break; case METADATA: request = new MetadataRequest(struct, version); break; case OFFSET_COMMIT: request = new OffsetCommitRequest(struct, version); break; case OFFSET_FETCH: request = new OffsetFetchRequest(struct, version); break; .... } return new RequestAndSize(request, struct.sizeOf());
ApiKeys是一个枚举类型,它定义了header的apikey。经过这个apikey,能够解析出这个请求是什么类型的
public enum ApiKeys { PRODUCE(0, "Produce"), FETCH(1, "Fetch"), LIST_OFFSETS(2, "Offsets"), METADATA(3, "Metadata"), LEADER_AND_ISR(4, "LeaderAndIsr", true), .... // id 到ApiKeys的value的列表 private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; public static final int MAX_API_KEY; static { int maxKey = -1; // 更新maxKey for (ApiKeys key : ApiKeys.values()) maxKey = Math.max(maxKey, key.id); ApiKeys[] idToType = new ApiKeys[maxKey + 1]; // 更新idToType for (ApiKeys key : ApiKeys.values()) idToType[key.id] = key; ID_TO_TYPE = idToType; MAX_API_KEY = maxKey; } ....... // 根据获取id从ID_TO_TYPE获取相应的type public static ApiKeys forId(int id) { if (!hasId(id)) throw new IllegalArgumentException(String.format("Unexpected ApiKeys id `%s`, it should be between `%s` " + "and `%s` (inclusive)", id, MIN_API_KEY, MAX_API_KEY)); return ID_TO_TYPE[id]; } // 解析请求 public Struct parseRequest(short version, ByteBuffer buffer) { return requestSchema(version).read(buffer); } // 返回对应version的Request Schema public Schema requestSchema(short version) { return schemaFor(Protocol.REQUESTS, version); } // 针对schemas列表,返回对应version的Schema private Schema schemaFor(Schema[][] schemas, short version) { // 检查version的值 if (id > schemas.length) throw new IllegalArgumentException("No schema available for API key " + this); if (version < 0 || version > latestVersion()) throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version); // 返回id对应的Schema列表,里面包含了不一样的version Schema[] versions = schemas[id]; if (versions[version] == null) throw new IllegalArgumentException("Unsupported version for API key " + this + ": " + version); // 返回version对应的 return versions[version]; }
Protocol类,里面定义了不少apikey和schema的关系。
// REQUESTS是Schema的二维数据。一维坐标是key_id,二维坐标是version public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][]; // produce request不一样版本的集合 public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3}; static { REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST; REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST; REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST; REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST; REQUESTS[ApiKeys.LEADER_AND_ISR.id] = LEADER_AND_ISR_REQUEST; ....... }
Schema由多个Field组成,Field主要包含了Type,name等属性。Schema负责从buffer解析数据,返回Struct结果。
Protocol包含了许多内置的Schema。
RequestHeader代表请求头部,就是使用了内置的Protocol.REQUEST_HEADER这个Schema解析。请求头部包含apiKey,version等属性。
ApiKeys包含了apikey的集合。它能够根据apikey和version找到对应的Schema。
AbstractRequest提供了根据apikey和version,解析和返回对应的Request的实例。