乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      實戰(zhàn)!基于canal同步mysql數(shù)據(jù)到elasticsearch

       昵稱10087950 2022-06-16 發(fā)布于江蘇

      首發(fā)公眾號:MarkerHub

      原創(chuàng)作者:呂一明

      視頻講解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/

      hello,大家好呀,好久沒寫過原創(chuàng)了,今天帶大家做個實驗吧,基于canal同步mysql的數(shù)據(jù)到es中!

      原理啥的,都給我百度去吧,這里直接搞實驗!

      本文使用docker環(huán)境安裝mysql、canal、elasticsearch,基于binlog利用canal實現(xiàn)mysql的數(shù)據(jù)同步到elasticsearch中。

      實驗中間件版本說明:

      • centos 8

      • mysql 5.7.36

      • es 7.16.2

      • cannal.server: 1.1.5

      • canal.adapter: 1.1.5

      • postman

      0、安裝docker

      基本命令:

      #centos 7 安裝 docker
      yum install docker

      #centos 8 安裝docker
      yum erase podman buildah
      yum install -y yum-utils
      yum-config-manager --add-repo https://download./linux/centos/docker-ce.repo
      yum install docker-ce docker-ce-cli containerd.io

      #檢驗安裝是否成功
      [root@localhost opt]# docker --version
      Docker version 20.10.12, build e91ed57

      #啟動
      systemctl start docker

      #換鏡像源
      sudo vim /etc/docker/daemon.json
      內容如下:
      {
      "registry-mirrors": ["https://m9r2r2uj.mirror."]
      }
      保存退出,重啟docker

      #重啟
      sudo service docker restart

      #列出鏡像
      docker images

      #查看運行進程
      docker ps

      1、安裝mysql

      docker pull mysql:5.7.36
      docker run --name mysql5736 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:5.7.36

      docker exec -it mysql5736 /bin/bash
      apt-get update
      apt-get install vim
      cd /etc/mysql/mysql.conf.d
      vim mysqld.cnf  // 修改mysql配置

      配置:

      [mysqld]
      #binlog setting
      log-bin=mysql-bin  // 開啟logbin
      binlog-format=ROW  // binlog日志格式
      server-id=1  // mysql主從備份serverId,canal中不能與此相同

      圖片

      保存退出,重啟mysql:service mysql restart

      可能會退出docker鏡像,注意重啟啟動docker的mysql。

      mysql -uroot -p
      show master status // binlog日志文件
      reset master; // 重啟日志

      圖片

      查看是否配置成功:

      圖片

      查看日志文件:

      cd /var/lib/mysql  // 進入日志文件目錄
      mysqlbinlog -vv mysql-bin.000001 // row格式查看日志

      圖片

      使用數(shù)據(jù)庫工具連接上docker中的mysql,然后創(chuàng)建dailyhub數(shù)據(jù)庫,然后再查看日志(mysqlbinlog -vv mysql-bin.000001)可以看到截圖如下:

      圖片

      到這里,mysql已經(jīng)安裝成功了。

      圖片

      2、安裝es

      docker pull elasticsearch:7.16.2
      docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" --name='es7162' -d elasticsearch:7.16.2

      注意如果拉取不出對應的版本,可以上https://registry.hub./_/elasticsearch?tab=tags&page=1&ordering=last_updated,查看對應的版本再拉取。我之前是拉取7.15.2的實驗的,后來過來幾天發(fā)現(xiàn)這版本已經(jīng)拉取不了了,就改成了7.16.2?;蛘邠Q低一點的版本也可以。 圖片

      查看https:///artifact/org.springframework.boot/spring-boot-starter-data-elasticsearch/2.6.2,得到版本依賴關系,在springboot2.6.2版本下,7.15.2和7.16.2都可以用。

      圖片

      docker啟動es:

      圖片

      圖片

      然后我們需要配置一下es的信息:

      docker exec -ites es7162 /bin/bash
      cd config
      vi elasticsearch.yml

      配置文件:

      cluster.name: dailyhub-es
      network.host: 0.0.0.0

      node.name: node-1
      http.port: 9200
      http.cors.enabled: true
      http.cors.allow-origin: "*"
      node.master: true
      node.data: true

      docker restart es7162 重啟es,注意千萬別寫錯配置的信息,否則啟動會失敗,啟動失敗是后可以通過docker logs -f es7162查看原因,但也只能重新來了。然后服務器訪問:

      // 查詢es所有mapping
      http://119.45.25.164:9200/_mapping?pretty=true

      注意如果是云服務器的話,要在安全組中配置對應的端口開放、還有防火墻啥的,然后安全些的話,還需要給es配合賬號密碼啥的。我這里為了實驗就簡單來了。

      安裝中文分詞器

      可以有兩種方式安裝中文分詞器,如果在線安裝的時候分詞器插件下載不下來那就只能離線安裝了。

      1、在線安裝中文分詞器:

      docker exec -ites es7162 /bin/bash

      ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.16.2/elasticsearch-analysis-ik-7.16.2.zip

      圖片

      2、離線安裝中文分詞器:

      首先打開這個鏈接:https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.16.2,把分詞器插件下載下來,

      # 把插件復制到容器內
      docker cp elasticsearch-analysis-ik-7.16.2.zip es7162:/usr/share/elasticsearch/plugins

      docker exec -it es7162 /bin/bash
      cd /usr/share/elasticsearch/plugins/
      mkdir ik
      unzip elasticsearch-analysis-ik-7.16.2.zip -d ik
      rm -rf elasticsearch-analysis-ik-7.16.2.zip

      圖片

      重啟es,查看日志是否加載ik分詞器成功!

      docker restart es7162
      docker logs es7162

      圖片

      當你看到日志中有輸出analysis-ik,說明已經(jīng)安裝成功。

      3、安裝canal-server

      拉取鏡像并啟動:

      docker pull canal/canal-server:v1.1.5

      docker run --name canal115 -p 11111:11111  --link mysql5736:mysql5736 -id canal/canal-server:v1.1.5

      修改對應的配置:

      docker exec -it canal115 /bin/bash
      cd canal-server/conf/example/
      vi instance.properties // 修改配置

      # 把0改成10,只要不和mysql的id相同就行
      canal.instance.mysql.slaveId=10
      # 修改成mysql對應的賬號密碼,mysql5736就是mysql鏡像的鏈接別名
      canal.instance.master.address=mysql5736:3306
      canal.instance.dbUsername=root
      canal.instance.dbPassword=admin

      圖片

      驗證配置是否成功:

      #首先重啟一下canal
      docker restart  canal115

      docker exec -it canal115 /bin/bash
      cd canal-server/logs/example/
      tail -100f example.log // 查看日志

      截圖如下,說明已經(jīng)鏈接上了mysql主機,此時mysql中的數(shù)據(jù)變化,都會在canal中有同步。 圖片

      可以通過Java程序測試有沒連接上mysql:

      導入canal-client包

      <!-- 為了測試canal-server是否連接mysql成功,1.1.5版本少包,所以用1.1.4版本 -->
      <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.4</version>
      </dependency>
      • com.markerhub.SimpleCanalClientExample

      /**
      * 公眾號:MarkerHub
      *
      * 說明:用于測試canal是否已經(jīng)連接上了mysql
      */
      public class SimpleCanalClientExample {
      public static void main(String args[]) {
      // 創(chuàng)建鏈接
      CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("119.45.25.164",
      11111), "example", "", "");
      int batchSize = 1000;
      int emptyCount = 0;
      try {
      connector.connect();
      connector.subscribe(".*\\..*");
      connector.rollback();
      int totalEmptyCount = 120;
      while (emptyCount < totalEmptyCount) {
      Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
      long batchId = message.getId();
      int size = message.getEntries().size();
      if (batchId == -1 || size == 0) {
      emptyCount++;
      System.out.println("empty count : " + emptyCount);
      try {
      Thread.sleep(1000);
      } catch (InterruptedException e) {
      }
      } else {
      emptyCount = 0;
      // System.out.printf("
      message[batchId=%s,size=%s] \n", batchId, size);
      printEntry(message.getEntries());
      }
      connector.ack(batchId); // 提交確認
      // connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
      }
      System.out.println("empty too many times, exit");
      } finally {
      connector.disconnect();
      }
      }
      private static void printEntry(List<Entry> entrys) {
      for (Entry entry : entrys) {
      if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
      continue;
      }
      RowChange rowChage = null;
      try {
      rowChage = RowChange.parseFrom(entry.getStoreValue());
      } catch (Exception e) {
      throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
      e);
      }
      EventType eventType = rowChage.getEventType();
      System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
      entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
      entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
      eventType));
      for (RowData rowData : rowChage.getRowDatasList()) {
      if (eventType == EventType.DELETE) {
      printColumn(rowData.getBeforeColumnsList());
      } else if (eventType == EventType.INSERT) {
      printColumn(rowData.getAfterColumnsList());
      } else {
      System.out.println("-------> before");
      printColumn(rowData.getBeforeColumnsList());
      System.out.println("-------> after");
      printColumn(rowData.getAfterColumnsList());
      }
      }
      }
      }
      private static void printColumn(List<Column> columns) {
      for (Column column : columns) {
      System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
      }
      }
      }

      當mysql的數(shù)據(jù)更新時候效果如下: 圖片

      注意當后面canal-adapter也連接上canal-server后,程序就監(jiān)聽不到數(shù)據(jù)變化了。

      4、安裝canal-adapter

      由于目前canal-adapter沒有官方docker鏡像,所以拉去一個非官方的

      docker pull slpcat/canal-adapter:v1.1.5

      docker run --name adapter115 -p 8081:8081 --link mysql5736:mysql5736 --link canal115:canal115 --link es7162:es7162 -d slpcat/canal-adapter:v1.1.5

      圖片

      修改配置:

      docker exec -it adapter115 /bin/bash
      cd conf/
      vi application.yml

      配置修改如下,一些不需要的配置或者注釋掉的配置可以刪除掉:

      server:
        port: 8081
      spring:
        jackson:
          date-format: yyyy-MM-dd HH:mm:ss
          time-zone: GMT+8
          default-property-inclusion: non_null

      canal.conf:
        mode: tcp #tcp kafka rocketMQ rabbitMQ
        flatMessage: true
        zookeeperHosts:
        syncBatchSize: 1000
        retries: 0
        timeout:
        accessKey:
        secretKey:
        consumerProperties:
          # canal tcp consumer
          canal.tcp.server.host: canal115:11111
          canal.tcp.zookeeper.hosts:
          canal.tcp.batch.size: 500
          canal.tcp.username:
          canal.tcp.password:
        srcDataSources:
          defaultDS:
            url: jdbc:mysql://mysql5736:3306/dailyhub?useUnicode=true
            username: root
            password: admin
        canalAdapters:
        - instance: example # canal instance Name or mq topic name
          groups:
          - groupId: g1
            outerAdapters:
            - name: logger
            - name: es7
              hosts: es7162:9200 # 127.0.0.1:9200 for rest mode
              properties:
                mode: rest
                # security.auth: test:123456 #  only used for rest mode
                cluster.name: dailyhub-es

      圖片

      接下來是修改表映射索引文件:

      docker exec -it adapter115 /bin/bash
      cd conf/es7

      cp -v mytest_user.yml dailyhub_collect.yml
      # 刪除其他多余的
      rm -rf biz_order.yml customer.yml mytest_user.yml
      vi dailyhub_collect.yml

      配置文件:

      dataSourceKey: defaultDS
      destination: example
      groupId: g1
      esMapping:
        _index: dailyhub_collect
        _id: _id
        _type: _doc
        upsert: true
      #  pk: id
        sql: "
      SELECT
              c.id AS _id,
              c.user_id AS userId,
              c.title AS title,
              c.url AS url,
              c.note AS note,
              c.collected AS collected,
              c.created AS created,
              c.personal AS personal,
              u.username AS username,
              u.avatar AS userAvatar
      FROM
              m_collect c
      LEFT JOIN m_user u ON c.user_id = u.id

      "

      #  objFields:
      #    _labels: array:;
      #   etlCondition: "where c.c_time>={}"
        commitBatch: 3000

      注意對于時間類型,在后端一定要使用LocalDateTime或者LocalDate類型,如果是Date類型,需要自己手動設置格式。

      5、聯(lián)合測試

      然后就可以直接測試了,準備測試條件:

      • 在數(shù)據(jù)庫中生成表和字段,

      • 然后elasticsearch中生成索引。先新建數(shù)據(jù)庫dailyhub。然后數(shù)據(jù)表結構:

      SET FOREIGN_KEY_CHECKS=0;

      -- ----------------------------
      -- Table structure for m_collect
      -- ----------------------------
      DROP TABLE IF EXISTS `m_collect`;
      CREATE TABLE `m_collect` (
        `id` bigint(20) NOT NULL AUTO_INCREMENT,
        `collected` date DEFAULT NULL,
        `created` datetime(6) DEFAULT NULL,
        `note` varchar(255) DEFAULT NULL,
        `personal` int(11) DEFAULT NULL,
        `title` varchar(255) DEFAULT NULL,
        `url` varchar(255) DEFAULT NULL,
        `user_id` bigint(20) DEFAULT NULL,
        PRIMARY KEY (`id`),
        KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`),
        CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`)
      ) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;

      -- ----------------------------
      -- Records of m_collect
      -- ----------------------------

      -- ----------------------------
      -- Table structure for m_user
      -- ----------------------------
      DROP TABLE IF EXISTS `m_user`;
      CREATE TABLE `m_user` (
        `id` bigint(20) NOT NULL AUTO_INCREMENT,
        `avatar` varchar(255) DEFAULT NULL,
        `created` datetime(6) DEFAULT NULL,
        `lasted` datetime(6) DEFAULT NULL,
        `open_id` varchar(255) DEFAULT NULL,
        `statu` int(11) DEFAULT NULL,
        `username` varchar(255) DEFAULT NULL,
        PRIMARY KEY (`id`)
      ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

      -- ----------------------------
      -- Records of m_user
      -- ----------------------------
      INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou./upload/images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05 16:08:40.042000', '2022-01-06 13:07:45.153000', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', '公眾號:MarkerHub');

      接下來借postman來新建elasticsearch的索引:

      # 創(chuàng)建索引并添加映射字段
      PUT http://119.45.25.164:9200/dailyhub_collect

      {
          "mappings": {
              "properties": {
                  "collected": {
                      "type": "date",
                      "format": "date_optional_time||epoch_millis"
                  },
                  "created": {
                      "type": "date",
                      "format": "date_optional_time||epoch_millis"
                  },
                  "note": {
                      "type": "text",
                      "analyzer": "ik_max_word",
                      "search_analyzer": "ik_smart"
                  },
                  "personal": {
                      "type": "integer"
                  },
                  "title": {
                      "type": "text",
                      "analyzer": "ik_max_word",
                      "search_analyzer": "ik_smart"
                  },
                  "url": {
                      "type": "text"
                  },
                  "userAvatar": {
                      "type": "text"
                  },
                  "userId": {
                      "type": "long"
                  },
                  "username": {
                      "type": "keyword"
                  }
              }
          }
      }

      圖片

      其他常用操作:

      # 刪除索引
      PUT http://119.45.25.164:9200/dailyhub_collect

      # 查看素有索引映射
      GET http://119.45.25.164:9200/_mapping?pretty=true

      # 搜索文檔
      GET http://119.45.25.164:9200/dailyhub_collect/_search

      # 刪除ID為1的文檔
      DELETE http://119.45.25.164:9200/dailyhub_collect/_doc/1

      然后我們打開canal-adapter的輸入日志:

      docker logs --tail 100  -f adapter115

      然后我們在mysql的m_collect中新添加一條記錄,可以看到日志輸出如下: 圖片

      然后搜索全部文檔,發(fā)現(xiàn)es中有數(shù)據(jù)啦。

      圖片

      如果看到adaptar115一直出現(xiàn)這種異常,說明啟動順序不對,啟動順序應該是:mysql、es、canal、adapar

      2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK!

      到這里,實驗成功,over,關注公眾號:MarkerHub,帶你做更多Java實驗!視頻講解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/

        本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內容均由用戶發(fā)布,不代表本站觀點。請注意甄別內容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內容,請點擊一鍵舉報。
        轉藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多