国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語(yǔ)言|JavaScript|易語(yǔ)言|vb.net|

服務(wù)器之家 - 編程語(yǔ)言 - Java教程 - Java API方式調(diào)用Kafka各種協(xié)議的方法

Java API方式調(diào)用Kafka各種協(xié)議的方法

2020-12-28 09:51huxihx Java教程

本篇文章主要介紹了Java API方式調(diào)用Kafka各種協(xié)議的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧

眾所周知,Kafka自己實(shí)現(xiàn)了一套二進(jìn)制協(xié)議(binary protocol)用于各種功能的實(shí)現(xiàn),比如發(fā)送消息,獲取消息,提交位移以及創(chuàng)建topic等。具體協(xié)議規(guī)范參見:Kafka協(xié)議  這套協(xié)議的具體使用流程為:

1.客戶端創(chuàng)建對(duì)應(yīng)協(xié)議的請(qǐng)求

2.客戶端發(fā)送請(qǐng)求給對(duì)應(yīng)的broker

3.broker處理請(qǐng)求,并發(fā)送response給客戶端

雖然Kafka提供的大量的腳本工具用于各種功能的實(shí)現(xiàn),但很多時(shí)候我們還是希望可以把某些功能以編程的方式嵌入到另一個(gè)系統(tǒng)中。這時(shí)使用Java API的方式就顯得異常地靈活了。本文我將嘗試給出Java API底層框架的一個(gè)范例,同時(shí)也會(huì)針對(duì)“創(chuàng)建topic”和“查看位移”這兩個(gè)主要功能給出對(duì)應(yīng)的例子。 需要提前說(shuō)明的是,本文給出的范例并沒有考慮Kafka集群開啟安全的情況。另外Kafka的KIP4應(yīng)該一直在優(yōu)化命令行工具以及各種管理操作,有興趣的讀者可以關(guān)注這個(gè)KIP。

本文中用到的API依賴于kafka-clients,所以如果你使用Maven構(gòu)建的話,請(qǐng)加上:

?
1
2
3
4
5
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.2.0</version>
</dependency>

如果是gradle,請(qǐng)加上:

?
1
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'

底層框架

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
   * 發(fā)送請(qǐng)求主方法
   * @param host     目標(biāo)broker的主機(jī)名
   * @param port     目標(biāo)broker的端口
   * @param request    請(qǐng)求對(duì)象
   * @param apiKey    請(qǐng)求類型
   * @return       序列化后的response
   * @throws IOException
   */
  public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException {
    Socket socket = connect(host, port);
    try {
      return send(request, apiKey, socket);
    } finally {
      socket.close();
    }
  }
 
  /**
   * 發(fā)送序列化請(qǐng)求并等待response返回
   * @param socket      連向目標(biāo)broker的socket
   * @param request      序列化后的請(qǐng)求
   * @return         序列化后的response
   * @throws IOException
   */
  private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException {
    sendRequest(socket, request);
    return getResponse(socket);
  }
 
  /**
   * 發(fā)送序列化請(qǐng)求給socket
   * @param socket      連向目標(biāo)broker的socket
   * @param request      序列化后的請(qǐng)求
   * @throws IOException
   */
  private void sendRequest(Socket socket, byte[] request) throws IOException {
    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
    dos.writeInt(request.length);
    dos.write(request);
    dos.flush();
  }
 
  /**
   * 從給定socket處獲取response
   * @param socket      連向目標(biāo)broker的socket
   * @return         獲取到的序列化后的response
   * @throws IOException
   */
  private byte[] getResponse(Socket socket) throws IOException {
    DataInputStream dis = null;
    try {
      dis = new DataInputStream(socket.getInputStream());
      byte[] response = new byte[dis.readInt()];
      dis.readFully(response);
      return response;
    } finally {
      if (dis != null) {
        dis.close();
      }
    }
  }
 
  /**
   * 創(chuàng)建Socket連接
   * @param hostName     目標(biāo)broker主機(jī)名
   * @param port       目標(biāo)broker服務(wù)端口, 比如9092
   * @return         創(chuàng)建的Socket連接
   * @throws IOException
   */
  private Socket connect(String hostName, int port) throws IOException {
    return new Socket(hostName, port);
  }
 
  /**
   * 向給定socket發(fā)送請(qǐng)求
   * @param request    請(qǐng)求對(duì)象
   * @param apiKey    請(qǐng)求類型, 即屬于哪種請(qǐng)求
   * @param socket    連向目標(biāo)broker的socket
   * @return       序列化后的response
   * @throws IOException
   */
  private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException {
    RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0);
    ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
    header.writeTo(buffer);
    request.writeTo(buffer);
    byte[] serializedRequest = buffer.array();
    byte[] response = issueRequestAndWaitForResponse(socket, serializedRequest);
    ByteBuffer responseBuffer = ByteBuffer.wrap(response);
    ResponseHeader.parse(responseBuffer);
    return responseBuffer;
  }

有了這些方法的鋪墊,我們就可以創(chuàng)建具體的請(qǐng)求了。

創(chuàng)建topic

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
   * 創(chuàng)建topic
   * 由于只是樣例代碼,有些東西就硬編碼寫到程序里面了(比如主機(jī)名和端口),各位看官自行修改即可
   * @param topicName       topic名
   * @param partitions      分區(qū)數(shù)
   * @param replicationFactor   副本數(shù)
   * @throws IOException
   */
  public void createTopics(String topicName, int partitions, short replicationFactor) throws IOException {
    Map<String, CreateTopicsRequest.TopicDetails> topics = new HashMap<>();
    // 插入多個(gè)元素便可同時(shí)創(chuàng)建多個(gè)topic
    topics.put(topicName, new CreateTopicsRequest.TopicDetails(partitions, replicationFactor));
    int creationTimeoutMs = 60000;
    CreateTopicsRequest request = new CreateTopicsRequest.Builder(topics, creationTimeoutMs).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.CREATE_TOPICS);
    CreateTopicsResponse.parse(response, request.version());
  }

查看位移

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
   * 獲取某個(gè)consumer group下的某個(gè)topic分區(qū)的位移
   * @param groupID      group id
   * @param topic       topic名
   * @param parititon     分區(qū)號(hào)
   * @throws IOException
   */
  public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException {
    TopicPartition tp = new TopicPartition(topic, parititon);
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp))
        .setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp);
    System.out.println(partitionData.offset);
  }
?
1
2
3
4
5
6
7
8
9
10
11
12
/**
   * 獲取某個(gè)consumer group下所有topic分區(qū)的位移信息
   * @param groupID      group id
   * @return         (topic分區(qū) --> 分區(qū)信息)的map
   * @throws IOException
   */
  public Map<TopicPartition, OffsetFetchResponse.PartitionData> getAllOffsetsForGroup(String groupID) throws IOException {
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    return resp.responseData();
  }

okay, 上面就是“創(chuàng)建topic”和“查看位移”的樣例代碼,各位看官可以參考著這兩個(gè)例子構(gòu)建其他類型的請(qǐng)求。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。

原文鏈接:http://www.cnblogs.com/huxi2b/p/6508274.html

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 一区二区三区四区在线 | 亚洲日韩中文字幕一区 | 亚洲视频1区 | 免费观看av电影 | av电影免费在线观看 | 成人亚洲视频 | 精品久久久久久亚洲精品 | 免费一区二区三区 | 久久99精品一区二区三区三区 | 美女视频一区二区三区 | 欧美日本在线观看 | 欧洲精品久久久久69精品 | 日韩黄色片免费看 | 男人的天堂久久 | 综合五月 | 日本欧美一区二区 | 亚洲精品a在线观看 | 久久伊人中文字幕 | 成人在线观看av | 日韩精品成人 | 91久久精品一区二区二区 | 久久精品a一级国产免视看成人 | 亚洲第十页 | 九九99 | 国产成人久久 | 日韩视频一区 | 国产精品原创av片国产免费 | 国产精品免费一区二区三区四区 | 国产日韩精品入口 | 国产在线拍揄自揄拍视频 | 日韩精品中文字幕在线观看 | 久久久国产精品一区 | 亚洲成av人片在线观看香蕉 | 中文字幕av亚洲精品一部二部 | 一区中文字幕 | 美女在线视频一区二区 | 在线视频一区二区 | 国产一区成人 | 亚洲第一成年人视频 | 久久99综合久久爱伊人 | 中文字幕亚洲视频 |