kafka-请求头部解析

上篇介绍了NetworkReceivejava

当接收到NetworkReceive, Processor会构造了Request实例,发送给RequestChannelapi

private def processCompletedReceives() {
    selector.completedReceives.asScala.foreach { receive =>
    val openChannel = selector.channel(receive.source)
    val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
    val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
    // 建立Request实例
     val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
          buffer = receive.payload, startTimeNanos = time.nanoseconds,
          listenerName = listenerName, securityProtocol = securityProtocol)
        requestChannel.sendRequest(req)
        selector.mute(receive.source)
    }
  }

Request

Request表示请求,它有两个主要的属性。session

header是通用请求的头部数据结构

bodyAndSize是请求的数据部分,它根据不一样类型的请求,返回不一样的实例socket

case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer,
                     startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
    val requestId = buffer.getShort()
    // 这里只是为了支持v0版本的shutdown请求
    val requestObj: RequestOrResponse = if (requestId == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)
      ControlledShutdownRequest.readFrom(buffer)
    else
      null

    val header: RequestHeader =
      if (requestObj == null) {
        buffer.rewind
        // 使用RequestHeader的类方法解析
        try RequestHeader.parse(buffer)
        catch {
          case ex: Throwable =>
            throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex)
        }
      } else
        null
    val bodyAndSize: RequestAndSize =
      if (requestObj == null)
        try {
          if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) {
            new RequestAndSize(new ApiVersionsRequest.Builder().build(), 0)
          }
          else
            // 根据apiKey,apiVersion和buffer,实例化RequestAndSize
            AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
        } catch {
          case ex: Throwable =>
            throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
        }
      else
        null

    buffer = null

Type

Type类是基本的数据格式,它内置定义了经常使用的数据结构,方便数据的读取ide

public abstract class Type {
    // 进数据Object写入到ByteBuffer
    public abstract void write(ByteBuffer buffer, Object o);
    // 从ByteBuffer读取Object数据
    public abstract Object read(ByteBuffer buffer);
    // 验证Object是否合理
    public abstract Object validate(Object o);
    // 返回Object数据的大小
    public abstract int sizeOf(Object o);
    // 是否数据为null
    public boolean isNullable() {
        return false;
    }

自定义的Type, 有INT8,INT16,...STRING,BYTES,VARINT...。ui

下面以INT16为例。由于INT16占2个字节,恰好是Short类型this

public static final Type INT16 = new Type() {
        @Override
        public void write(ByteBuffer buffer, Object o) {
            // 调用ByteBuffer的putShort方法
            buffer.putShort((Short) o);
        }

        @Override
        public Object read(ByteBuffer buffer) {
            // 调用ByteBuffer的getShort方法
            return buffer.getShort();
        }

        @Override
        public int sizeOf(Object o) {
            // 占两个字节
            return 2;
        }

        @Override
        public String toString() {
            return "INT16";
        }

        @Override
        public Short validate(Object item) {
            // 检验Object是否为Short类型
            if (item instanceof Short)
                return (Short) item;
            else
                throw new SchemaException(item + " is not a Short.");
        }
    };

Field

Field只是一些属性的集合类scala

public class Field {

    public static final Object NO_DEFAULT = new Object();
    // 位置,代表在Schema的位置
    final int index;
    // 名称
    public final String name;
    // 类型
    public final Type type;
    // 默认值
    public final Object defaultValue;
    // 解释文档
    public final String doc;
    final Schema schema;

    public Field(int index, String name, Type type, String doc, Object defaultValue) {
        this(index, name, type, doc, defaultValue, null);
    }

    public Field(String name, Type type, String doc, Object defaultValue) {
        this(-1, name, type, doc, defaultValue);
    }

    public Field(String name, Type type, String doc) {
        this(name, type, doc, NO_DEFAULT);
    }

Schema

Schema是Field的集合,Field在里面是有顺序的。它支持数据读取,返回Struct类型。code

public class Schema extends Type {
    // fields列表,按照顺序排序
    private final Field[] fields;
    // 哈希表,用来经过string查看field
    private final Map<String, Field> fieldsByName;

    public Schema(Field... fs) {
        // 
        this.fields = new Field[fs.length];
        this.fieldsByName = new HashMap<>();
        for (int i = 0; i < this.fields.length; i++) {
            Field field = fs[i];
            if (fieldsByName.containsKey(field.name))
                throw new SchemaException("Schema contains a duplicate field: " + field.name);
            // 实例Field,注意第一个参数i,表示位置
            this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this);
            this.fieldsByName.put(fs[i].name, this.fields[i]);
        }
    }
    
    
    public Struct read(ByteBuffer buffer) {
        Object[] objects = new Object[fields.length];
        // 按照fields的顺序,依次读取值,而且保存到objects列表
        for (int i = 0; i < fields.length; i++) {
            try {
                objects[i] = fields[i].type.read(buffer);
            } catch (Exception e) {
                throw new SchemaException("Error reading field '" + fields[i].name + "': " +
                                          (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
            }
        }
        // Struct提供了方便的接口访问数据
        return new Struct(this, objects);
    }
    
    public void write(ByteBuffer buffer, Object o) {
        Struct r = (Struct) o;
        // 按照fields的顺序依次遍历
        for (Field field : fields) {
            try {
                // 调用Struct的get方法,获取值
                Object value = field.type().validate(r.get(field));
                // 写入到buffer
                field.type.write(buffer, value);
            } catch (Exception e) {
                throw new SchemaException("Error writing field '" + field.name + "': " +
                                          (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
            }
        }
    }

内置的Schema都在Protocol类里定义

public class Protocol {

    public static final Schema REQUEST_HEADER = new Schema(
                                 new Field("api_key", INT16, "The id of the request type."),
                                 new Field("api_version", INT16, "The version of the API."),
                                 new Field("correlation_id", INT32,
                                    "A user-supplied integer value that will be passed back with the response"),
                                 new Field("client_id",  NULLABLE_STRING,
                                    "A user specified identifier for the client making the request.", ""));
    ......

RequestHeader

RequestHeader代表通用的请求头部

RequestHeader数据结构

|api_key | api_version | correlation_id | client_id |

public class RequestHeader extends AbstractRequestResponse {

    private final short apiKey;
    private final short apiVersion;
    private final String clientId;
    private final int correlationId;

    public RequestHeader(Struct struct) {
        apiKey = struct.getShort(API_KEY_FIELD);
        apiVersion = struct.getShort(API_VERSION_FIELD);
        clientId = struct.getString(CLIENT_ID_FIELD);
        correlationId = struct.getInt(CORRELATION_ID_FIELD);
    }

     public static RequestHeader parse(ByteBuffer buffer) {
        // 使用内置的Schema,调用read方法读取
        return new RequestHeader(Protocol.REQUEST_HEADER.read(buffer));
    }

AbstractRequest

在Request类中,调用AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer),初始化AbstractRequest。最后request和struct实例化RequestAndSize

public abstract class AbstractRequest extends AbstractRequestResponse {
    public static RequestAndSize getRequest(int requestId, short version, ByteBuffer buffer) {
        // 根据header.apiKey,取出ApiKeys
        ApiKeys apiKey = ApiKeys.forId(requestId);
        Struct struct = apiKey.parseRequest(version, buffer);
        AbstractRequest request;
        // 根据apiKey实例化不一样的RequestAndSize
        switch (apiKey) {
            case PRODUCE:
                request = new ProduceRequest(struct, version);
                break;
            case FETCH:
                request = new FetchRequest(struct, version);
                break;
            case LIST_OFFSETS:
                request = new ListOffsetRequest(struct, version);
                break;
            case METADATA:
                request = new MetadataRequest(struct, version);
                break;
            case OFFSET_COMMIT:
                request = new OffsetCommitRequest(struct, version);
                break;
            case OFFSET_FETCH:
                request = new OffsetFetchRequest(struct, version);
                break;
        ....
        }
        return new RequestAndSize(request, struct.sizeOf());

ApiKeys

ApiKeys是一个枚举类型,它定义了header的apikey。经过这个apikey,能够解析出这个请求是什么类型的

public enum ApiKeys {
    PRODUCE(0, "Produce"),
    FETCH(1, "Fetch"),
    LIST_OFFSETS(2, "Offsets"),
    METADATA(3, "Metadata"),
    LEADER_AND_ISR(4, "LeaderAndIsr", true),
    ....   
    // id 到ApiKeys的value的列表
    private static final ApiKeys[] ID_TO_TYPE;
    private static final int MIN_API_KEY = 0;
    public static final int MAX_API_KEY;
    
    static {
        int maxKey = -1;
        // 更新maxKey
        for (ApiKeys key : ApiKeys.values())
            maxKey = Math.max(maxKey, key.id);
        ApiKeys[] idToType = new ApiKeys[maxKey + 1];
        // 更新idToType
        for (ApiKeys key : ApiKeys.values())
            idToType[key.id] = key;
        ID_TO_TYPE = idToType;
        MAX_API_KEY = maxKey;
    }
    .......
    
    // 根据获取id从ID_TO_TYPE获取相应的type
    public static ApiKeys forId(int id) {
        if (!hasId(id))
            throw new IllegalArgumentException(String.format("Unexpected ApiKeys id `%s`, it should be between `%s` " +
                    "and `%s` (inclusive)", id, MIN_API_KEY, MAX_API_KEY));
        return ID_TO_TYPE[id];
    }
    // 解析请求
    public Struct parseRequest(short version, ByteBuffer buffer) {
        return requestSchema(version).read(buffer);
    }
    // 返回对应version的Request Schema
    public Schema requestSchema(short version) {
        return schemaFor(Protocol.REQUESTS, version);
    }
    // 针对schemas列表,返回对应version的Schema
    private Schema schemaFor(Schema[][] schemas, short version) {
        // 检查version的值
        if (id > schemas.length)
            throw new IllegalArgumentException("No schema available for API key " + this);
        if (version < 0 || version > latestVersion())
            throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version);
        // 返回id对应的Schema列表,里面包含了不一样的version
        Schema[] versions = schemas[id];
        if (versions[version] == null)
            throw new IllegalArgumentException("Unsupported version for API key " + this + ": " + version);
        // 返回version对应的
        return versions[version];
    }

Protocol

Protocol类,里面定义了不少apikey和schema的关系。

// REQUESTS是Schema的二维数据。一维坐标是key_id,二维坐标是version
public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];

// produce request不一样版本的集合
public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3};

static {
        REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
        REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST;
        REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST;
        REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
        REQUESTS[ApiKeys.LEADER_AND_ISR.id] = LEADER_AND_ISR_REQUEST;
        .......
}

归纳

Schema由多个Field组成,Field主要包含了Type,name等属性。Schema负责从buffer解析数据,返回Struct结果。

Protocol包含了许多内置的Schema。

RequestHeader代表请求头部,就是使用了内置的Protocol.REQUEST_HEADER这个Schema解析。请求头部包含apiKey,version等属性。

ApiKeys包含了apikey的集合。它能够根据apikey和version找到对应的Schema。

AbstractRequest提供了根据apikey和version,解析和返回对应的Request的实例。