国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|數據庫技術|

服務器之家 - 數據庫 - Mysql - MySQL特定表全量、增量數據同步到消息隊列-解決方案

MySQL特定表全量、增量數據同步到消息隊列-解決方案

2022-01-12 18:14李雷 Mysql

mysql要同步原始全量數據,也要實時同步MySQL特定庫的特定表增量數據,同時對應的修改、刪除也要對應,下面就為大家分享一下

1、原始需求

既要同步原始全量數據,也要實時同步mysql特定庫的特定表增量數據,同時對應的修改、刪除也要對應。

數據同步不能有侵入性:不能更改業務程序,并且不能對業務側有太大性能壓力。

應用場景:數據etl同步、降低業務服務器壓力。

2、解決方案

MySQL特定表全量、增量數據同步到消息隊列-解決方案

3、canal介紹、安裝

canal是阿里巴巴旗下的一款開源項目,純java開發。基于數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了mysql(也支持mariadb)。

工作原理:mysql主備復制實現

MySQL特定表全量、增量數據同步到消息隊列-解決方案

從上層來看,復制分成三步:

  1. master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);
  2. slave將master的binary log events拷貝到它的中繼日志(relay log);
  3. slave重做中繼日志中的事件,將改變反映它自己的數據。

canal的工作原理

MySQL特定表全量、增量數據同步到消息隊列-解決方案

原理相對比較簡單:

  1. canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議
  2. mysql master收到dump請求,開始推送binary log給slave(也就是canal)
  3. canal解析binary log對象(原始為byte流)

架構

MySQL特定表全量、增量數據同步到消息隊列-解決方案

說明:

  • server代表一個canal運行實例,對應于一個jvm
  • instance對應于一個數據隊列 (1個server對應1..n個instance)

instance模塊:

  • eventparser (數據源接入,模擬slave協議和master進行交互,協議解析)
  • eventsink (parser和store鏈接器,進行數據過濾,加工,分發的工作)
  • eventstore (數據存儲)
  • metamanager (增量訂閱&消費信息管理器)

安裝

1、mysql、kafka環境準備

2、canal下載:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

3、解壓:tar -zxvf canal.deployer-1.1.3.tar.gz

4、對目錄conf里文件參數配置

對canal.properties配置:

MySQL特定表全量、增量數據同步到消息隊列-解決方案

MySQL特定表全量、增量數據同步到消息隊列-解決方案

進入conf/example里,對instance.properties配置:

MySQL特定表全量、增量數據同步到消息隊列-解決方案

MySQL特定表全量、增量數據同步到消息隊列-解決方案

5、啟動:bin/startup.sh

6、日志查看:

MySQL特定表全量、增量數據同步到消息隊列-解決方案

4、驗證

1、開發對應的kafka消費者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package org.kafka;
 
import java.util.arrays;
import java.util.properties;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.apache.kafka.clients.consumer.consumerrecords;
import org.apache.kafka.clients.consumer.kafkaconsumer;
import org.apache.kafka.common.serialization.stringdeserializer;
 
 
/**
 *
 * title: kafkaconsumertest
 * description:
 *  kafka消費者 demo
 * version:1.0.0
 * @author pancm
 * @date 2018年1月26日
 */
public class kafkaconsumertest implements runnable {
 
    private final kafkaconsumer<string, string> consumer;
    private consumerrecords<string, string> msglist;
    private final string topic;
    private static final string groupid = "groupa";
 
    public kafkaconsumertest(string topicname) {
        properties props = new properties();
        props.put("bootstrap.servers", "192.168.7.193:9092");
        props.put("group.id", groupid);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", stringdeserializer.class.getname());
        props.put("value.deserializer", stringdeserializer.class.getname());
        this.consumer = new kafkaconsumer<string, string>(props);
        this.topic = topicname;
        this.consumer.subscribe(arrays.aslist(topic));
    }
 
    @override
    public void run() {
        int messageno = 1;
        system.out.println("---------開始消費---------");
        try {
            for (; ; ) {
                msglist = consumer.poll(1000);
                if (null != msglist && msglist.count() > 0) {
                    for (consumerrecord<string, string> record : msglist) {
                        //消費100條就打印 ,但打印的數據不一定是這個規律的
 
                            system.out.println(messageno + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());
 
 
//                            string v = decodeunicode(record.value());
 
//                            system.out.println(v);
 
                        //當消費了1000條就退出
                        if (messageno % 1000 == 0) {
                            break;
                        }
                        messageno++;
                    }
                } else {
                    thread.sleep(11);
                }
            }
        } catch (interruptedexception e) {
            e.printstacktrace();
        } finally {
            consumer.close();
        }
    }
 
    public static void main(string args[]) {
        kafkaconsumertest test1 = new kafkaconsumertest("sample-data");
        thread thread1 = new thread(test1);
        thread1.start();
    }
 
 
    /*
     * 中文轉unicode編碼
     */
    public static string gbencoding(final string gbstring) {
        char[] utfbytes = gbstring.tochararray();
        string unicodebytes = "";
        for (int i = 0; i < utfbytes.length; i++) {
            string hexb = integer.tohexstring(utfbytes[i]);
            if (hexb.length() <= 2) {
                hexb = "00" + hexb;
            }
            unicodebytes = unicodebytes + "\\u" + hexb;
        }
        return unicodebytes;
    }
 
    /*
     * unicode編碼轉中文
     */
    public static string decodeunicode(final string datastr) {
        int start = 0;
        int end = 0;
        final stringbuffer buffer = new stringbuffer();
        while (start > -1) {
            end = datastr.indexof("\\u", start + 2);
            string charstr = "";
            if (end == -1) {
                charstr = datastr.substring(start + 2, datastr.length());
            } else {
                charstr = datastr.substring(start + 2, end);
            }
            char letter = (char) integer.parseint(charstr, 16); // 16進制parse整形字符串。
            buffer.append(new character(letter).tostring());
            start = end;
        }
        return buffer.tostring();
 
    }
}

2、對表bak1進行增加數據

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
create table `bak1` (
  `vin` varchar(20) not null,
  `p1` double default null,
  `p2` double default null,
  `p3` double default null,
  `p4` double default null,
  `p5` double default null,
  `p6` double default null,
  `p7` double default null,
  `p8` double default null,
  `p9` double default null,
  `p0` double default null
) engine=innodb default charset=utf8mb4
 
show create table bak1;
 
insert into bak1 select '李雷abcv',
  `p1` ,
  `p2` ,
  `p3` ,
  `p4` ,
  `p5` ,
  `p6` ,
  `p7` ,
  `p8` ,
  `p9` ,
  `p0`  from moci limit 10

3、查看輸出結果:

MySQL特定表全量、增量數據同步到消息隊列-解決方案

到此這篇關于mysql特定表全量、增量數據同步到消息隊列-解決方案的文章就介紹到這了,更多相關mysql特定表數據同步內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!

原文鏈接:https://www.cnblogs.com/lilei2blog/p/15608206.html

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 国产精品久久精品 | 久久久国产精品视频 | 偷拍自拍网 | 中文字幕一级毛片 | 久久99国产精品 | 日韩一区二区三区在线视频 | 成人av小说| 精品国偷自产在线 | 久久露脸国产精品 | 国产欧美日韩在线 | 大白屁股一区二区视频 | 午夜视频 | 日韩精品观看 | 日本伊人久久 | 亚洲欧美视频 | 精品久久久久久亚洲精品 | 日韩精品一区二区在线观看 | 精品国产黄a∨片高清在线 成人欧美 | 在线免费色视频 | 国产传媒一区 | 91国自产精品中文字幕亚洲 | 91精品一区二区三区久久久久久 | 国产精品久久久久久久福利院 | 亚洲精品一区在线观看 | 亚洲视频在线播放 | 免费的黄色一级片 | 91精品综合久久久久久五月天 | 中文字幕综合在线 | 亚洲日本视频 | 国产一区二区三区视频在线观看 | 日韩城人网站 | 欧美久久久久 | 超级碰在线视频 | 免费观看一级一片 | 久久av网站 | 国产精品久久久久久久久免费桃花 | 欧美一级c片 | 这里只有久久精品 | 精品免费国产 | 日韩精品极品视频在线观看免费 | 成人在线看片 |