隨著互聯網時代的發展,運營商作為內容傳送的管道服務商,在數據領域具有巨大的優勢,如何將這些數據轉化為價值,越來越被運營商所重視。
運營商的大數據具有體量大,種類多的特點,如各類話單、信令等,通常一種話單每天的數據量就有上百億條。隨著業務分析需求對數據處理實時性的要求越來越高,也給我們的大數據處理架構帶來了巨大的挑戰,參照網絡上可查的例子,運用到實際處理架構上,經常會因為實時數據流量大,造成系統運行不穩定及各種異常。從大數據實時處理架構開發到上線,耗時近2個月時間,經過大量優化,我們的系統才趨于穩定。終我們使用10臺服務器的集群,實時處理每天上百億條的數據,這里每條數據的字段數量有100個,長的字段內容超過1000字節。
下面就來分享一下我們在實時大數據處理大體量數據的過程中,總結出來的酸甜苦辣。
在有限服務器集群數量的基礎上,實現對每天超過百億條、體量超過20T的某話單進行實時處理。具體需求是FTP收集多臺話單服務器上的詳單,進行實時處理后將數據存儲到Hbase數據庫供用戶即時詳單查詢,同時將話單存儲到Hdfs供離線分析使用。
10臺x86服務器,單機配置16盒CPU,128G內存,2T硬盤*10,300G硬盤*2(系統盤)。
10臺服務器組成hadoop集群,其中NameNode節點同時作為采集機安裝FTP和Flume,選取其他5臺服務器安裝Kafka,Zookeeper和Storm實現大數據實時流處理架構,為了充分利用集群計算資源,這5臺服務器也配置了少量的Yarn計算資源,參與日常的離線數據分析需求。剩下的4臺服務器我們安裝了Hbase滿足大數據下的秒級查詢需求,系統拓撲圖如下:

1、使用的相關技術
我們先來回顧一下相關的大數據架構和開源技術,大數據處理分離線分析架構和實時處理架構。離線分析架構(如Hive,Map/Reduce,Spark Sql等)可以滿足數據后分析,數據挖掘的應用需求。對于實時性要求高的應用,如用戶即時詳單查詢,業務量監控等,需要應用實時處理架構。目前大數據開源實時處理架構常見的是Storm和Spark Streaming,相比Spark Streaming準實時批處理系統,Strom是更純粹的實時處理系統,即來一條事件就處理一條,具有更高的實時性。
Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統。Flume支持單機也支持集群,支持多種數據源,如不斷寫入的文件、Socket、不斷生成新文件的文件夾等,支持多種輸出,如Hdfs、Kafka、Mysql數據庫等。Flume使用時僅需實現簡單配置,無需開發程序。
Kafka是一種高吞吐量的分布式發布訂閱消息系統,類似一個大數據量的緩存池,支持一份數據多用戶消費。ZooKeeper是一個分布式的,開源的分布式應用程序協調服務,負責存儲集群間部分組件的狀態同步信息。Storm分布式實時計算系統,包含Nimbus主節點和Supervisor從節點(從storm1.0以后,增加了Nimbus備份節點),節點之間需要依靠Zookeeper做狀態同步。Storm集群組件:

Storm應用涉及到Java程序的開發,編程模型中涉及的概念:
2、開源組件安裝及配置
a)Flume安裝及配置
從http://flume.apache.org/下載flume的安裝包,解壓縮;如果使用Cloudera Manager或者Ambari安裝,僅需通過相應的管理頁面安裝配置。我們僅安裝了單機的Flume,未安裝Flume集群,單機Flume處理效率非常高,完全能夠滿足我們每天處理上百億條數據的需求,但需要說明一點的是Flume魯棒性非常差,經常出現進程在、但數據不處理的進程卡死狀態,使用Flume時要注意以下幾點:
flume監控目錄中不能含有目錄;
flume正在處理的文件,其他進程不能更改(如FTP正在傳送中的文件,需要設置過濾條件,避免flume處理)。建議flume監控目錄與FTP實時傳送目錄分開,避免flume處理FTP傳送中的文件,導致異常,也可以設置正則表達式忽略正在傳送的文件:
a1.sources.r1.ignorePattern = ^(.)*\\.tmp$
a1.sources.r1.decodeErrorPolicy = IGNORE
export JAVA_OPTS="-Xms1024m -Xmx2048m -Dcom.sun.management.jmxremote"
/hadoop/apache-flume-1.6.0-bin/bin/flume-ng agent -c /hadoop/apache-flume-1.6.0-bin/conf/
a1.channels.c1.transactionCapacity = 2000 a1.sinks.k1.batchSize = 2000
增加batchSize可以提升flume處理速度,原理是flume處理的event都保存在transaction隊列中,直到滿足了batchSize的數量條件,才一次性批量向sink發送。但是要注意實際數據量的大小,如果實際數據量很小,batchSize就不能配置過大,否則數據達不到batchSize的數量條件,會長時間積壓在transaction隊列中,后面的實時處理程序反而得不到數據,導致實時性變差;
flume中讀取的一條記錄長度超過2048字符,也就是4096字節就會被截斷,可以在配置文件中增加如下配置項解決:
producer.sources.s.deserializer.maxLineLength=65535
a1.sources.r1.inputCharset = ISO8859-1
producer.sources.s.decodeErrorPolicy=IGNORE
a1.sources.r1.deletePolicy = immediate
Flume配置:
a1.sources = r1 a1.sinks = k1
a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /ftpdata/xdr/HTTP_tmp
a1.sources.r1.ignorePattern = ^(.)*\\.tmp$
a1.sources.r1.fileHeader = false
a1.sources.r1.deletePolicy = immediate
a1.sources.r1.inputCharset = ISO8859-1 a1.sources.r1.deserializer.maxLineLength = 8192 a1.sources.r1.decodeErrorPolicy = IGNORE # Describe the sink a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.batchSize = 10000 a1.sinks.k1.brokerList = stormmaster:9092,storm01:9092,storm02:9092,storm03:9092,storm04:9092 a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder a1.sinks.k1.requiredAcks = 0 a1.sinks.k1.producer.type = async
a1.sinks.k1.topic = sighttpnew # Use a channel which buffers events in memory a1.channels.c1.type = memory
a1.channels.c1.capacity = 80000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.keep-alive = 30
Flume-env.sh配置:
# Enviroment variables can be set here. export JAVA_HOME=/usr/java/jdk1.7.0_80 export FLUME_HOME=/hadoop/apache-flume-1.6.0-bin # Give Flume more memory and pre-allocate, enable remote monitoring via JMX export JAVA_OPTS="-Xms1024m -Xmx2048m -Dcom.sun.management.jmxremote" # Note that the Flume conf directory is always included in the classpath. export FLUME_CLASSPATH="/hadoop/apache-flume-1.6.0-bin/lib"
Flume啟動命令:
/hadoop/apache-flume-1.6.0-bin/bin/flume-ng agent -c /hadoop/apache-flume-1.6.0-bin/conf/ -f /hadoop/apache-flume-1.6.0-bin/conf/viewdata.conf -n producer –Dflume.root.logger=ERROR &
注意一定要給全Flume配置文件的路徑,否則啟動Flume不能正確加載Flume-env.sh的配置。
b)Kafka集群安裝及配置
從http://kafka.apache.org/下載kafka安裝包:kafka_*.tgz,解壓后,配置server.properties文件。
server.properties配置:
#本機在kafka集群中的id broker.id=48 #服務端口 port=9092 #主機名 host.name=storm01 # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 #kafka數據存儲位置(數據量大時,需要存儲的目錄大小也要充分) log.dirs=/data1/kafka-logs #默認topic創建partition的數量 num.partitions=1 # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 #kafka事件只有flash到硬盤才能被后續消費者消費,因此要配置flash時間參數,避免小數據量情況下數據刷新時間過久 log.flush.interval.messages=10000 log.flush.interval.ms=1000 # 數據在kafka中保存的時間,單位小時,超時的數據kafka會自動刪除 log.retention.hours=48 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000 # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. log.cleaner.enable=false # zookeeper集群配置 zookeeper.connect=master:2181,storm01:2181,storm02:2181,storm03:2181,storm04:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 #是否能夠刪除topic的配置,默認false不能刪除topic delete.topic.enable=true
Kafka服務啟動:jps命令可以看到kafka的進程名,說明kafka已經成功啟動。
nohup kafka-server-start.sh /home/hadoop/kafka_2.9.1-0.8.2.1/config/server.properties &
創建topic:創建復制因子2,有24個partition的topic,創建多個partition的目的是增加并行性,復制因子的目的是數據安全冗余。
kafka-topics.sh --create --zookeeper master:2181,storm01:2181,storm02:2181,storm03:2181,storm04:2181 --replication-factor 2 --partitions 24 --topic sighttp
kafka數據存儲方式:在kafka數據存儲目錄下,可以看到以每個-方式命名的文件夾,例如sighttp-19表示topic:sighttp,partition:19,如下圖所示:

進入topic-partition目錄,可以看到很多.index和.log結尾的文件。其中.log是數據文件,其中存儲的是kafka緩存池中的數據,.index是索引文件,數據文件和索引文件成對出現,文件名為一串數字,標識了該文件中存儲數據的起始序列號,如下:

kafka數據消費狀態查詢:消費者從kafka消費數據狀態是記錄在zookeeper中的,使用zkCli.sh命令可以查看,如下圖查詢了消費topic:sighttp,partition:0的狀態,offset表明已經處理到49259227840行,如下圖所示:

經驗:通過消費到的行數與存儲到的行數,可以判斷數據處理程序的速度是否滿足數據生成速度的需求。
kafka消費典型異常:
[2016-10-27 16:15:42,536] ERROR [Replica Manager on Broker 51]: Error when processing fetch request for partition [sighttp,3] offset 6535061966 from consumer with correlation id 0. Possible cause: Request for offset 6535061966 but we only have log segments in the range 6580106664 to 6797636149. (kafka.server.ReplicaManager)
異常原因:kafka中由于消息過期已經把序號是6535061966的消息刪除了,目前kafka中只有范圍是6580106664到6797636149的日志,但是消費者還要處理過期刪除的消息,那就會出現此異常消息(通常是由于數據處理速度慢,無法滿足數據生成速度的要求,導致消息積壓,積壓的消息到達kafka配置的過期時間,被kafka刪除)。
c)Storm集群安裝及配置
在http://storm.apache.org/下載Storm安裝包,建議使用Storm 0.10.0 released以上版本,因為新版本修正了很多bug,特別是STORM-935的問題(拓撲啟動后會占用大量系統資源,導致Topology運行不穩定)。
storm.yaml文件配置:
#zookeeper集群服務器配置 storm.zookeeper.servers: - "master" - "storm01" - "storm02" - "storm03" - "storm04" #storm主節點 nimbus.host: "master" #strom管理頁面服務端口 ui.port: 8081 #storm從節點服務端口配置,默認6700-6703共4個端口,意味著每臺服務器可以提供4個worker插槽,這里增加了6704和6705端口,即為單臺服務器增加了2個worker插槽,worker數增加意味著storm集群可以提供更多的計算資源。 supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 - 6704 - 6705 #狀態信息存儲位置,避免使用/tmp storm.local.dir: "/home/hadoop/apache-storm-0.10.0/workdir" #主節點的內存 nimbus.childopts: "-Xmx3072m" #從節點的內存 supervisor.childopts: "-Xmx3072m" #worker的內存,增加內存可以減少GC overload的問題 worker.childopts: "-Xmx3072m" #默認為30,增加netty超時時長等參數,降低因Netty通信問題,造成worker不穩定 storm.messaging.netty.max_retries:60 #增加storm.messaging.netty.max_wait_ms設置,默認為1000 storm.messaging.netty.max_wait_ms:2000
啟動服務:
Storm管理頁面:
瀏覽器輸入Storm UI所在服務器地址+8081端口號,打開Strom管理頁面如下圖:

從圖六Cluster Summary中可以看出Storm集群共有4個Supervisor節點,因每臺Supervisor提供6個slot(如果在storm.yaml配置文件中不配置supervisor.slots.ports屬性,則每個Supervisor默認提供4個slot),因此共有4*6=24個slot,已使用22個,還有2個空閑。需要注意的是每個拓撲一旦發布,將長久占用slot,如果沒有足夠的slot,新發布的拓撲只會占用空閑的slot,不會搶占其他已經被占用的slot資源;如果沒有slot,將無法發布新的拓撲,此時需要挖潛Storm集群服務器,通過配置文件增加slot資源或增加新的服務器。
從圖六Topology Summary中可以看出,集群上已經發布了7個Topology,每個Topology占用的worker資源,啟動的executor線程數,具體資源占用多少是在Storm Topology開發程序中指定的。
d)Kafka+Storm+Hdfs+Hbase拓撲開發
我們使用Eclipse創建MAVEN工程,在pom.xml配置文件中添加Storm及Hdfs的相關依賴,本例是Storm從Kafka中消費數據,經過ETL處理后存儲到Hdfs和Hbase中,因此需要添加Storm-Kafka、Storm-Hdfs、Storm-Hbase等依賴,注意依賴包版本要與集群一致。
抽取過程繼承BaseRichBolt類:
public class splitBolt extends BaseRichBolt { private static final String TAB = ","; private OutputCollector collector; public void prepare(Map config,TopologyContext context,OutputCollector collector){ this.collector=collector;
} public void execute(Tuple input){
String line=input.getString(0);
String[] words=line.split(TAB); if (words.length>74)
{
String Account; if (words[0].length()>0) Account=words[0]; else Account="NULL";
String LocalIPv4; if (words[1].length()>0) LocalIPv4=words[1]; else LocalIPv4="NULL";
String RemoteIPv4; if (words[2].length()>0) RemoteIPv4=words[2]; else RemoteIPv4="NULL";
String newline=Account+"|"+LocalIPv4+"|"+RemoteIPv4;
collector.emit(input,new Values(newline));
}
collector.ack(input);
} public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("newline"));
}
}
寫Hbase需要實現HBaseMapper類:
public class myHbaseMapper implements HBaseMapper { public ColumnList columns(Tuple tuple) {
String line=tuple.getString(0);
String[] words=line.split("\\|");
ColumnList cols = new ColumnList(); //參數依次是列族名,列名,值 if (words[1].length()>0) cols.addColumn("content".getBytes(), "LocalIPv4".getBytes(), words[1].getBytes()); if (words[2].length()>0) cols.addColumn("content".getBytes(), "RemoteIPv4".getBytes(), words[2].getBytes()); return cols;
} public byte[] rowKey(Tuple tuple) {
String line=tuple.getString(0);
String[] words=line.split("\\|");
String key; //rowkey設置成Account的反字符串,便于hbase表內分區的數據均衡 key=new StringBuilder(words[0]).reverse().toString(); return key.getBytes();
}
}
main函數:
public static void main(String[] args)
{ String zks = "master:2181,storm01:2181,storm02:2181 "; //zookeeper集群 String topic = "topicname"; //kafka中topic名稱 String zkRoot = "/storm";//zookeeper中存儲狀態信息的根目錄 String id = "kafkatopicname";//zookeeper中存儲本拓撲狀態信息的子目錄 FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm/tmp/").withPrefix("tmp_").withExtension(".dat");
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|"); //寫到hdfs的目錄文件名以’tmp_’開頭,’.dat’結尾 //每10分鐘重寫一個hdfs的新文件 FileRotationPolicy rotationPolicy = new TimedRotationPolicy(10.0f, TimeUnit.MINUTES);
BrokerHosts brokerHosts = new ZkHosts(zks); //配置storm拓撲的spout SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new MessageScheme());
spoutConf.zkServers = Arrays.asList(new String[] {"master", "storm01","storm02"});
spoutConf.zkPort = 2181;
spoutConf.ignoreZkOffsets = false;//重啟拓撲時,需要從zookeeper中讀取偏移量 //如果偏移量中的數據已經從kafka中刪除,則從kafka中保存的早數據開始處理。 spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
spoutConf.useStartOffsetTimeIfOffsetOutOfRange = true; //配置hdfs bolt HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://hdfsmaster:9000")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy) //hdfs數據文件寫完后,move到新目錄 .addRotationAction(new MoveFileAction().toDestination("/storm/http/")); //實例化HBaseMapper HBaseMapper mapper = new myHbaseMapper(); //實例化HBaseBolt,指定hbase中的表名 HBaseBolt hBolt = new HBaseBolt("hbasetable", mapper).withConfigKey("hbase.conf");
TopologyBuilder builder = new TopologyBuilder(); //配置spout線程數為24,此數要與kafka中topic的partition數一致,partition數越多,則spout讀取數據的并行性越高,處理速度越快 builder.setSpout("kafka-reader", new KafkaSpout(spoutConf),24); //配置bolt,此bolt開發處理邏輯,bolt可以串接多個 builder.setBolt("etl", new splitBolt(), 24).shuffleGrouping("kafka-reader");
builder.setBolt("hdfs-bolt", hdfsBolt, 24).shuffleGrouping("etl");
builder.setBolt("hbase-bolt", hBolt, 24).shuffleGrouping("etl");
Config conf = new Config(); //增加hbase配置,指定hbase在hdfs集群上的目錄,zookeeper服務器集群 Map<String, Object> hbConf = new HashMap<String, Object>();
hbConf.put("hbase.rootdir", "hdfs://hdfsmaster:9000/hbase");
hbConf.put("hbase.zookeeper.quorum","master,storm01,storm02");
conf.put("hbase.conf", hbConf); String name = sighttphdfs.class.getSimpleName(); if (args != null && args.length > 0) {
conf.put(Config.NIMBUS_HOST, args[0]);
conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); //設置拓撲占用worker數為4,根據實時處理數據量大小按需配置 conf.setNumWorkers(4);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
}
}
上面程序實現了Storm讀Kafka寫Hdfs和Hbase的例子,抽取類中可以根據不同的業務需求,通過Java代碼實現不同的邏輯。編譯后的jar包上傳到集群,使用storm命令行提交Topology:
storm jar ./kafkastream.jar sighdfs.sighttphdfs stormmaster
經過幾個月的實際運行,我們的大數據實時處理架構能夠始終保持穩定,話單處理速度高于話單生成速度,有效的支撐了運營商大數據的各種分析查詢需求。開發和優化過程充滿挑戰,經過各種研究和嘗試,問題逐漸解決,在此我們也積累了大量的開發和優化經驗。
后再分享2個我們實際遇到的問題:
因Storm集群需要Zookeeper集群作狀態同步,因此所有是Storm服務器worker進程都會不停連接Zookeeper節點,Zookeeper節點的默認連接數是60,當Storm計算拓撲數量較多時,需要修改Zookeeper配置maxClientCnxns=1000,增加Zookeeper連接數。
由于Storm是實時計算,每個環節的擁塞都將引起Storm拓撲的不穩定,在開發中我們遇到Hdfs某個節點磁盤I/O高,導致Storm寫Hdfs超時,終引發Supervisor殺掉worker,造成拓撲不穩定的問題。究其原因是在某個Hdfs節點上,Yarn任務正在進行Reduce操作,用iostat -x 1 10命令查看,Yarn的中間盤I/O長時間被100%占用,同時Yarn的中間盤也是Hdfs的數據盤,導致寫入請求無法響應,終導致Storm寫Hdfs的worker超時,引發拓撲運行不穩定。此處建議配置Yarn的中間盤時,不要使用操作系統根盤,不要使用Hdfs的數據盤,可以有效避免Storm寫Hdfs超時的問題。
本站文章版權歸原作者及原出處所有 。內容為作者個人觀點, 并不代表本站贊同其觀點和對其真實性負責,本站只提供參考并不構成任何投資及應用建議。本站是一個個人學習交流的平臺,網站上部分文章為轉載,并不用于任何商業目的,我們已經盡可能的對作者和來源進行了通告,但是能力有限或疏忽,造成漏登,請及時聯系我們,我們將根據著作權人的要求,立即更正或者刪除有關內容。本站擁有對此聲明的最終解釋權。