大數(shù)據(jù)我們也許聽得很多了,一般都知道hadoop,但并不都是hadoop。那么我們該如何構(gòu)建自己的大數(shù)據(jù)項目呢?對于離線處理,hadoop還是比較適合的,但對于實時性比較強的,數(shù)據(jù)量比較大的,可以采用spark,那spark又跟什么技術(shù)搭配才能做一個適合自己的項目呢?各技術(shù)之間又是如何整合的呢?這篇文章將給大家介紹下大數(shù)據(jù)項目中用到的各技術(shù)框架知識,并通過一個實際項目的分布式集群部署和實際業(yè)務(wù)應(yīng)用來詳細講述大數(shù)據(jù)架構(gòu)是如何構(gòu)建的,供大家參考。 1大數(shù)據(jù)架構(gòu)介紹 1.1 數(shù)據(jù)采集 負責(zé)從各節(jié)點上實時采集數(shù)據(jù),選用flume來實現(xiàn) Flume介紹 Flume是Cloudera(Hadoop數(shù)據(jù)管理軟件與服務(wù)提供商)提供的一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸?shù)娜罩臼占到y(tǒng),支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時,F(xiàn)lume提供對數(shù)據(jù)進行簡單處理,并寫到各種數(shù)據(jù)接受方(可定制)的能力。 Flume的一些核心概念
Flume以agent為最小的獨立運行單位。一個agent就是一個JVM(JavaVirtual Machine)。單agent由Source、Sink和Channel三大組件構(gòu)成,如下圖: Flume的數(shù)據(jù)流由事件(Event)貫穿始終。事件是Flume的基本數(shù)據(jù)單位,它攜帶日志數(shù)據(jù)(字節(jié)數(shù)組形式)并且攜帶有頭信息,這些Event由Agent外部的Client,比如圖中的WebServer生成。當(dāng)Source捕獲事件后會進行特定的格式化,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區(qū),它將保存事件直到Sink處理完該事件。Sink負責(zé)持久化日志或者把事件推向另一個Source。 Flume Sources
Flume Sinks 我們系統(tǒng)中使用的是HDFS Sink和Kafka Sink
1.2、數(shù)據(jù)接入 由于采集數(shù)據(jù)的速度和數(shù)據(jù)處理的速度不一定同步,因此添加一個消息中間件來作為緩沖,選用apache的kafka,對于離線數(shù)據(jù),選用hdfs HDFS介紹 HDFS(Hadoop Distribute File System Hadoop分布式文件系統(tǒng))是一個運行在通用硬件上的分布式文件系統(tǒng),是ApacheHadoop Core項目的一部分。HDFS有著高容錯性(fault-tolerant)的特點,并且設(shè)計用來部署在低廉的(low-cost)硬件上。而且它提供高吞吐量(high throughput)來訪問應(yīng)用程序的數(shù)據(jù),適合那些有著超大數(shù)據(jù)集(large data set)的應(yīng)用程序。HDFS放寬了(relax)POSIX的要求(requirements)這樣可以實現(xiàn)流的形式訪問(streaming access)文件系統(tǒng)中的數(shù)據(jù)。 HDFS的一些核心概念 HDFS的高可用性 在一個典型的HA(High Availability)集群中,每個NameNode是一臺獨立的服務(wù)器。在任一時刻,只有一個NameNode處于active狀態(tài),另一個處于standby狀態(tài)。其中,active狀態(tài)的NameNode負責(zé)所有的客戶端操作,standby狀態(tài)的NameNode處于從屬地位,維護著數(shù)據(jù)狀態(tài),隨時準備切換。 兩個NameNode為了數(shù)據(jù)同步,會通過一組稱作JournalNodes的獨立進程進行相互通信。當(dāng)active狀態(tài)的NameNode的命名空間有任何修改時,會告知大部分的JournalNodes進程。standby狀態(tài)的NameNode有能力讀取JNs中的變更信息,并且一直監(jiān)控edit log的變化,把變化應(yīng)用于自己的命名空間。standby可以確保在集群出錯時,命名空間狀態(tài)已經(jīng)完全同步了,如圖所示 HDFS的體系結(jié)構(gòu) HDFS是一個主/從(Master/Slave)體系結(jié)構(gòu),從最終用戶的角度來看,它就像傳統(tǒng)的文件系統(tǒng)一樣,可以通過目錄路徑對文件執(zhí)行CRUD(Create、Read、Update和Delete)操作。但由于分布式存儲的性質(zhì),HDFS集群擁有一個NameNode和一些DataNode。NameNode管理文件系統(tǒng)的元數(shù)據(jù),DataNode存儲實際的數(shù)據(jù)??蛻舳送ㄟ^同NameNode和DataNodes的交互訪問文件系統(tǒng)。客戶端聯(lián)系NameNode以獲取文件的元數(shù)據(jù),而真正的文件I/O操作是直接和DataNode進行交互的。如圖所示。 Kafka介紹 Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費者規(guī)模的網(wǎng)站中的所有動作流數(shù)據(jù)。這種動作(網(wǎng)頁瀏覽,搜索和其他用戶的行動)是在現(xiàn)代網(wǎng)絡(luò)上的許多社會功能的一個關(guān)鍵因素。 這些數(shù)據(jù)通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。 對于像Hadoop的一樣的日志數(shù)據(jù)和離線分析系統(tǒng),但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的并行加載機制來統(tǒng)一線上和離線的消息處理,也是為了通過集群機來提供實時的消費。 Kafka的一些核心概念
一個典型的Kafka集群中包含若干Producer(可以是web前端產(chǎn)生的PageView,或者是服務(wù)器日志,系統(tǒng)CPU、Memory等),若干broker(Kafka支持水平擴展,一般broker數(shù)量越多,集群吞吐率越高),若干Consumer Group,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發(fā)生變化時進行rebalance。Producer使用push模式將消息發(fā)布到broker,Consumer使用pull模式從broker訂閱并消費消息。如圖所示 Zookeeper介紹 ZooKeeper是一個為分布式應(yīng)用所設(shè)計的分布的、開源的協(xié)調(diào)服務(wù)。它提供了一些簡單的操作,使得分布式應(yīng)用可以基于這些接口實現(xiàn)諸如配置維護、域名服務(wù)、分布式同步、組服務(wù)等。 Zookeeper很容易編程接入,它使用了一個和文件樹結(jié)構(gòu)相似的數(shù)據(jù)模型??梢允褂肑ava或者C來進行編程接入。 Zookeeper的一些基本概念
(1)最終一致性 client不論連接到哪個Server,展示給它都是同一個視圖,這是zookeeper最重要的性能。 (2)可靠性 具有簡單、健壯、良好的性能,如果消息m被一臺服務(wù)器接受,那么它將被所有的服務(wù)器接受。 (3)實時性 Zookeeper保證客戶端將在一個時間間隔范圍內(nèi)獲得服務(wù)器的更新信息,或者服務(wù)器失效的信息。但由于網(wǎng)絡(luò)延時等原因,Zookeeper不能保證兩個客戶端能同時得到剛更新的數(shù)據(jù),如果需要最新數(shù)據(jù),應(yīng)該在讀數(shù)據(jù)之前調(diào)用sync()接口。 (4)等待無關(guān)(wait-free) 慢的或者失效的client不得干預(yù)快速的client的請求,使得每個client都能有效的等待 (5)原子性 更新只能成功或者失敗,沒有中間狀態(tài)。 (6)順序性 包括全局有序和偏序兩種:全局有序是指如果在一臺服務(wù)器上消息a在消息b前發(fā)布,則在所有Server上消息a都將在消息b前被發(fā)布;偏序是指如果一個消息b在消息a后被同一個發(fā)送者發(fā)布,a必將排在b前面。
1.3、流式計算 對采集到的數(shù)據(jù)進行實時分析,選用apache的Spark Spark介紹 Spark是UCBerkeley AMP lab (加州大學(xué)伯克利分校的AMP實驗室)所開源的類Hadoop MapReduce的通用并行框架,而后又加入到Apache孵化器項目。Spark,擁有HadoopMapReduce所具有的優(yōu)點;但不同于MapReduce的是Job中間輸出結(jié)果可以保存在內(nèi)存中,從而不再需要讀寫磁盤,因此Spark能更好地適用于數(shù)據(jù)挖掘與機器學(xué)習(xí)等需要迭代的MapReduce的算法。 Spark不僅實現(xiàn)了MapReduce的算子map 函數(shù)和reduce函數(shù)及計算模型,還提供更為豐富的算子,如filter、join、groupByKey等。是一個用來實現(xiàn)快速而通用的集群計算的平臺。 Spark支持多處理模式以及支持庫,Spark Streaming、Spark SQL、Spark MLlib、Spark GraphX等極大程度方便開發(fā)者在大數(shù)據(jù)處理方面的不同需求 Spark的生態(tài)體系 MapReduce屬于Hadoop生態(tài)體系之一,Spark則屬于BDAS(BerkeleyData Analytics Stack,中文:伯克利數(shù)據(jù)分析棧)生態(tài)體系之一 Hadoop包含了MapReduce、HDFS、HBase、Hive、Zookeeper、Pig、Sqoop等 BDAS包含了Spark、Shark(相當(dāng)于Hive)、BlinkDB、Spark Streaming(消息實時處理框架,類似Storm)等等 Spark的運行模式 l Local (用于測試、開發(fā)) l Standlone (獨立集群模式) l Spark on Yarn (Spark在Yarn上) l Spark on Mesos (Spark在Mesos) 我們系統(tǒng)中使用的是Spark on Yarn 1.4、數(shù)據(jù)輸出 對分析后的結(jié)果持久化,可以使用HDFS、Hbase、MongoDB Hbase介紹 HBase是ApacheHadoop的數(shù)據(jù)庫,能夠?qū)Υ笮蛿?shù)據(jù)提供隨機、實時的讀寫訪問。HBase的目標是存儲并處理大型的數(shù)據(jù)。HBase是一個開源的,分布式的,多版本的,面向列的存儲模型。它存儲的是松散型數(shù)據(jù)。 HBase是一個構(gòu)建在HDFS上的分布式列存儲系統(tǒng); HBase是基于GoogleBigTable模型開發(fā)的,典型的key/value系統(tǒng); HBase是ApacheHadoop生態(tài)系統(tǒng)中的重要一員,主要用于海量結(jié)構(gòu)化數(shù)據(jù)存儲; 從邏輯上講,HBase將數(shù)據(jù)按照表、行和列進行存儲。 與hadoop一樣,Hbase目標主要依靠橫向擴展,通過不斷增加廉價的商用服務(wù)器,來增加計算和存儲能力。 Hbase的系統(tǒng)架構(gòu) 關(guān)于HBase與ZooKeeper,可以分三點來描述: A、Zookeeper集群的職責(zé) A.1、負責(zé)監(jiān)控整個hbase集群中節(jié)點的狀態(tài)和通信。 A.2、管理hbase 集群的-ROOT-表,即所有Region Server的地址和HTable信息。 A.3、避免HMsater的單點故障問題(重啟故障的HMaster;如果zkLeader掛掉,重新選舉出leader)。 B、HMaster Server的職責(zé) B.1、為HRegionserver分配HRegion,并持續(xù)均衡負載; B.2、當(dāng)有HRegionserver失效時,由HMaster負責(zé)重新分配其上的HRegion。 C、HRegion Server的職責(zé) C.1、維護HMaster分配的HRegin,響應(yīng)客戶端的請求(增刪改查)。 C.2、管理.META.表數(shù)據(jù),該表中包含當(dāng)前HRegion Server上HRegion的相關(guān)信息。 C.3、負責(zé)region的切分,并將相關(guān)region切分信息更新到.META.表中。 MongoDB介紹 MongoDB是一個基于分布式文件存儲的數(shù)據(jù)庫。由C++語言編寫。旨在為WEB應(yīng)用提供可擴展的高性能數(shù)據(jù)存儲解決方案。 MongoDB是一個介于關(guān)系數(shù)據(jù)庫和非關(guān)系數(shù)據(jù)庫之間的產(chǎn)品,是非關(guān)系數(shù)據(jù)庫當(dāng)中功能最豐富,最像關(guān)系數(shù)據(jù)庫的。他支持的數(shù)據(jù)結(jié)構(gòu)非常松散,是類似json的bson格式,因此可以存儲比較復(fù)雜的數(shù)據(jù)類型。MongoDB最大的特點是他支持的查詢語言非常強大,其語法有點類似于面向?qū)ο蟮牟樵冋Z言,幾乎可以實現(xiàn)類似關(guān)系數(shù)據(jù)庫單表查詢的絕大部分功能,而且還支持對數(shù)據(jù)建立索引。 MongoDB的一些基本概念 MongoDB的數(shù)據(jù)模型 一個MongoDB 實例可以包含一組數(shù)據(jù)庫,一個DataBase 可以包含一組Collection(集合),一個集合可以包含一組Document(文檔)。一個Document包含一組field(字段),每一個字段都是一個key/value pair。 key: 必須為字符串類型。 value:可以包含如下類型: 1、基本類型,例如,string,int,float,timestamp,binary 等類型。 2、一個document。 3、數(shù)組類型。 文檔結(jié)構(gòu)如下: 2 HA模式下的分布式集群環(huán)境搭建 2.1、機器規(guī)劃 2.2、Hadoop+Hbase+Zookeeper集群部署 (1)修改各機器上的/etc/hosts文件,刪除文件中本機ip與 機器名對應(yīng)的一行 作用:有時候/etc/hosts文件里對應(yīng)的主機名包含一些特殊字符或其他原因?qū)е虏荒苷_解析主機名,最好是去掉這一行 (2)ssh免密碼登錄 作用:Hadoop中主節(jié)點管理從節(jié)點是通過SSH協(xié)議登錄到從節(jié)點實現(xiàn)的,而一般的SSH登錄,都是需要輸入密碼驗證的,為了Hadoop主節(jié)點方便管理成千上百的從節(jié)點,這里將主節(jié)點公鑰拷貝到從節(jié)點,實現(xiàn)SSH協(xié)議免秘鑰登錄,我這里做的是所有主從節(jié)點之間機器免秘鑰登錄 在三臺機器上分別執(zhí)行: ssh-keygen -t rsa chmod 755 用戶目錄 chmod 700 用戶目錄的~/.ssh 在機器上執(zhí)行了ssh-keygen -t rsa命令后會在用戶目錄的~/.ssh文件夾中生成這兩個文件:id_rsa和id_rsa.pub,其中id_rsa中存的算法rsa生成的私鑰,id_rsa.pub存放算法rsa生成的公鑰。 下一步,將所有機器上的id_rsa.pub中的內(nèi)容匯總到某一臺機器上,并寫入文件:用戶目錄的~/.ssh/authorized_keys中。 注意:每臺機器的id_rsa.pub中的內(nèi)容在文件authorized_keys中是一行 下一步:將用戶目錄的~/.ssh/authorized_keys文件復(fù)制到其他兩臺機器上 在每臺機器上執(zhí)行: chmod 600 用戶目錄的~/.ssh/authorized_keys (3)Zookeeper部署 A、下載zookeeper的安裝包, 解壓到合適目錄 B、修改配置文件:zoo.cfg 集群中部署了幾臺zookeeper機器,文件中就添加幾行,并且集群中的zookeeper機器上的zoo.cfg文件必須一致 (4)Hadoop部署 A、下載hadoop的安裝包, 解壓到合適目錄 B、配置hadoop 修改hadoopconf目錄下的文件,特別是core-site.xml,hadoop-env.sh,hdfs-site.xml,mapred-site.xml,yarn-site.xml,注意根據(jù)實際部署機器,修改ip和端口 復(fù)制hadoopconf目錄到集群中其他兩臺機器 (5)Hbase部署 A、下載hbase的安裝包, 解壓到合適目錄 B、配置Hbase 修改hbaseconf目錄下的文件,特別是backup-masters,hbase-env.sh,hbase-site.xml,regionservers,注意根據(jù)實際部署機器,修改ip和端口 復(fù)制hbaseconf目錄到集群中其他兩臺機器 (6)啟動集群 A、首次啟動集群
(7)關(guān)閉集群 注意:一定要按順序停止,如果先停 ZooKeeper 再停 HBase 的話,基本停不下來 (8)HDFS實用工具 HDFS狀態(tài)查詢:bin/hdfsdfsadmin -report 離開安全模式:bin/hdfs dfsadmin -safemodeleave 文件目錄查詢:bin/hadoop fs -ls 文件內(nèi)容查看:bin/hadoop fs -cat 文件上傳:bin/hadoop fs -put 文件下載:bin/hadoop fs -get 文件(夾)刪除:bin/hadoop fs -rm (-rmr) (9)Hbase實用工具 連接hbase客戶端:bin/hbaseshell shell命令: 表清單:list 表創(chuàng)建: create 'UserProfile', {NAME => 'realtime',VERSIONS => 1, TTL => 2147483647},{NAME => 'profile', VERSIONS =>1, TTL => 2147483647} 表查詢: scan 'UserProfile', LIMIT => 10 表數(shù)據(jù)構(gòu)造:(put '表名','openid','列族:列','值') put 'UserProfile','1222234','realtime:sex','1' put 'UserProfile','4555554','realtime:age','25' 表刪除,分兩步: disable 'UserProfile' drop 'UserProfile' 查看表結(jié)構(gòu): describe 'UserProfile' (10)Kafka部署 A、下載kafka的安裝包,解壓到合適目錄 B、配置Kafka 修改kafkaconf目錄下的文件,特別是server.properties、zookeeper.properties、producer.properties、consumer.properties ,注意根據(jù)實際部署機器,修改ip和端口 C、啟動Kafka kafka/bin/kafka-server-start.sh../config/server.properties 啟動成功后可通過jps命令查看進程
kafka/bin/kafka-server-stop.sh E、Kafka實用命令 數(shù)據(jù)查詢:bin/kafka-console-consumer.sh 數(shù)據(jù)構(gòu)造:bin/kafka-console-producer.sh Topic查詢修改:bin/kafka-topics.sh (11)Flume部署 A、下載flume的安裝包,解壓到合適目錄 B、配置Flume 修改flume/conf/目錄下的文件,特別是core-site-lf.xml、hdfs-site-lf.xml、flume-wq.conf,注意根據(jù)實際部署機器,修改ip和端口 C、啟動Flume flume/sbin/flume-ng agent --conf conf --conf-fileconf/flume-wq.conf --name a1 啟動成功后可通過jps命令查看進程
A、下載mongodb的安裝包,解壓到合適目錄 B、配置Mongodb 修改/export/shadows/mongodb/conf目錄下的文件:configsvr.conf,mongos.conf,shard1.conf C、啟動Mongodb 啟動配置服務(wù): mongodb/bin/mongod -f conf/configsvr.conf 啟動路由服務(wù): mongos -f conf/mongos.conf 啟動物理存儲: mongod -f conf/shard1.conf 啟動成功后查看進程:ps -ef |grep mongo D、Mongodb實用工具 連接客戶端:mongo --port 30000 客戶端命令: 數(shù)據(jù)庫清單:show databases; 數(shù)據(jù)庫切換:use tablename1; 集合(表)清單:show collections; 查詢表所有記錄:db.tablename1.find(); //相當(dāng)于select * from tablename1; 條件查詢:db.tablename1.find({'groupId':'8037'}); //相當(dāng)于select * from tablename1where groupId = 8037
3、實際業(yè)務(wù)應(yīng)用 3.1、業(yè)務(wù)曝光 (1)配置中心修改如下手工服務(wù)為flume機器的ip和端口 mcoss_ds_mmart_show-------全量數(shù)據(jù)曝光 mcoss_ds_simpshow-----------簡要數(shù)據(jù)曝光 mcoss_ds_keystat--------------關(guān)鍵數(shù)據(jù)曝光 (2)訪問曝光tws服務(wù) http://wq.jd.com/mcoss/mmart/show?actid=xxxx&pc=xxxx (3)數(shù)據(jù)驗證 A hdfs數(shù)據(jù)驗證 hadoop fs -ls -h /data/flume 類似于ls命令 hadoop fs -cat /data/flume/xxx/xxx/xxx/xxx 類似于cat查看文件內(nèi)容 下載文件到本地: hadoop fs-get /data/flume/xxx/xxx/xxx/xxx /tmp/xxx B Kafka數(shù)據(jù)驗證 查看有哪些topic kafka/bin/kafka-topics.sh --list --zookeeper zookeeper機器ip:端口,zookeeper機器ip:端口,zookeeper機器ip:端口 數(shù)據(jù)查詢: ./kafka-console-consumer.sh --zookeeper zookeeper機器ip:端口,zookeeper機器ip:端口,zookeeper機器ip:端口 --from-beginning --topic mmart_show|more |
|
來自: openlabzeng > 《待分類》