
7月12日一款叫做TDengine的時序數(shù)據(jù)庫項目在GitHub上開源了,這個項目一經(jīng)發(fā)布就穩(wěn)穩(wěn)占據(jù)了GitHub排行榜的C位,目前TdEngine已經(jīng)累積了5000多個star,并且連續(xù)一周排在上升榜首位。而且你要知道TdEngine的開發(fā)語言并不是火熱的Python或JAVA,而是C語言。C語言無巧可取,雖見功夫,但是代碼比較難讀,能引發(fā)如此的關注絕對堪稱奇跡,在我印象中即使是Mysql也沒有達到如此的熱度。
相信很多人也和筆者一樣,是通過《比hadoop快至少10 倍的物聯(lián)網(wǎng)大數(shù)據(jù)平臺,我把它開源》的刷屏文才了解到陶老師與TdEngine的,當看到這位50歲的IT老兵老兵,依舊奮斗在編程一線,為TDengine開發(fā)貢獻3萬行代碼時候,我就立刻四處向朋友打聽,并最終要了陶老師的微信,做為一名80后程序員,我近不急待的想和陶老師直接溝通,想從他身上找到保持編程水平的秘決。 
大神面對面:這才是10倍程序員該有的樣子 2008年的時候筆者還是CSDN論壇WINDOWS MOBILE版的版主,從事手機導航軟件的開發(fā)工作,而在彼時陶老師也創(chuàng)辦了和信公司,并親自開發(fā)了WindowsMobile版和信客戶端,相同的開發(fā)平臺經(jīng)歷也讓我們迅速的拉進了彼此的距離。 在使用TdEngine的過程中我發(fā)現(xiàn)了兩個小問題,一是數(shù)據(jù)庫用戶密碼明文存放,二是數(shù)據(jù)文件權限設置不合理。讓我十分震驚的是,這兩個問題是我下午在和陶老師聊天時提出的,當晚發(fā)布版本就把問題全部解決了。后來溝通得知這些BUG都是陶老師自己動手修改的。我意識到TdEngine的效率應該來自于創(chuàng)始人對于代碼的執(zhí)著與熱愛,而不是對員工996式的工作要求。 陶老師是真的愛編程,尤其對于代碼運行效率有著近乎狂熱的追求,我查閱了陶老師近年來的作品,其和信客戶端只有18K大小,胎心算法的實現(xiàn)只用了600行代碼,而TDengine這樣一個數(shù)據(jù)庫項目竟然只需要1.5M安裝包就能搞定,在手機APP都動轍上百M的今天,TDengine體量甚至顯得有些異類。如果沒有深厚的功底和堅定的信念是絕對無法達到如此高度的。我想陶老師應該就是傳說中10倍程序員的典范吧。 10倍程序員對于他周圍親友的影響也是非常巨大的,當我打開TdEngine的官網(wǎng)(https://www./cn/),其簡潔明快的風格,一目了然的配圖,實在讓我無法把這一切和一位年近半百的老派IT士人聯(lián)系到一起,當然后來我和陶老師聊到這件事的時候才知道,整個網(wǎng)站從設計、前端、后臺、瀏覽器適配、數(shù)據(jù)分析到搜索引擎優(yōu)化,都是由陶老師的兒子,一位剛剛高中畢業(yè)的00后操刀主持的,而且整個網(wǎng)站從無到有只用了三周時間,除了感嘆一句后生可畏,由此也可以看出來和10倍程序員并肩作戰(zhàn)的也都是10倍程序員,所以it團隊的負責人在感嘆自己沒有18程序員相助時也要反思一下,自己是不是一位10程序員。 TdEngine為什么會火? 傳統(tǒng)數(shù)據(jù)庫廠商的問題在于傲慢、自大,他們認為數(shù)據(jù)是零件,數(shù)據(jù)庫則是各類零件的加中心,很多工序都是為數(shù)據(jù)的修改準備的,無論修改是否發(fā)生加工車間為了保證一致性,都會對流水線上的數(shù)據(jù)加上各種各樣的鎖。這些操作浪費了很多時間,而且?guī)缀鯖]有任何輕量級的框架,可供用戶選擇省略掉這些冗余操作。而且傳統(tǒng)廠商為了解決數(shù)據(jù)庫的性能問題不是從底層架構邏輯下手,而是不休止的在應用與數(shù)據(jù)庫之間加入各種像REDIS,NGIX等等代理或者緩存層,這種方式其實是加大了各層級間的性能開銷。傳統(tǒng)廠商認為自己非常了解數(shù)據(jù),但卻忘了用戶比廠商更加了解自己的數(shù)據(jù),天下可謂苦秦久已。 而TdEngine是認為數(shù)據(jù)是信息流,它要做的非常簡單,只是數(shù)據(jù)的錄像機而已,信息調(diào)閱只要找到對應的錄像帶即可,這樣的設計思路從底層邏輯上決定了td會是一款性能極高的產(chǎn)品。它更加貼合物聯(lián)網(wǎng)時代的數(shù)據(jù)模型,而且代碼只有10萬行的量級,非常適合從從頭開始學習。 所以TdEngine精確的找到了數(shù)據(jù)庫市場的細分戰(zhàn)場。他可以在相同的硬件條件下達到其它產(chǎn)品10倍的速度,完美解決了很多物聯(lián)網(wǎng),量化交易等場景的痛點。 TdEngine代碼導讀
當筆者打TdEngine的代碼時不由眼前一亮,其代碼風格及規(guī)范性絕對堪稱一流,于是我打開了久違的souce insight,,再一次開始了閱讀C語言代碼的美妙旅程,在這里強烈推薦各位讀者也來讀一下,絕對堪稱享受。 這里將給我啟示最大的一段代碼其鏈接在 https://github.com/taosdata/TDengine/blob/master/src/util/src/tsched.c
向大家分享一下。鑒于本文肯定會分享給陶老師,所以估計會有作者親答的環(huán)節(jié):-),以下代碼是一個典型的consumer-producer消息傳遞功能的實現(xiàn),也就是有多個生產(chǎn)者(producer)生成并不斷向隊列中傳遞消息,也有多個消費者(consumer)不斷從隊列中取消息,而在java等高級語言中類似的功能已經(jīng)被封裝好了,這其實也讓程序員無法了解線程間的同步和互斥機制。在正式進入到代碼之前我想請大家思考這樣的一個,互斥體( mutex)和信號量(semaphore)的使用是如何做到多線程安全的。 先來看結(jié)構體設計,具體我已經(jīng)注釋好了: typedef struct { char label[16];#消息內(nèi)容 sem_t emptySem;#此信號量代表隊列的可寫狀態(tài) sem_t fullSem;#此信號量代表隊列的可讀狀態(tài) pthread_mutex_t queueMutex;#此互斥體為保證消息不會被誤修改,保證線程程安全 int fullSlot;#隊尾位置 int emptySlot;#隊頭位置 int queueSize;#隊列長度 int numOfThreads;#同時操作的線程數(shù)量 pthread_t * qthread;#線程指針 SSchedMsg * queue;#隊列指針 } SSchedQueue;
再來看初始化函數(shù),這里需要特別說明的是,兩個信號量的創(chuàng)建,其中emptySem是隊列的可寫狀態(tài),初始化時其值為queueSize,即初始時隊列可寫,可接受消息長度為隊列長度,fullSem是隊列的可讀狀態(tài),初始化時其值為0,即初始時隊列不可讀。具體代碼及我的注釋如下: void *taosInitScheduler(int queueSize, int numOfThreads, char *label) { pthread_attr_t attr; SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));
memset(pSched, 0, sizeof(SSchedQueue)); pSched->queueSize = queueSize; pSched->numOfThreads = numOfThreads; strcpy(pSched->label, label);
if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) { pError('init %s:queueMutex failed, reason:%s', pSched->label, strerror(errno)); goto _error; } #emptySem是隊列的可寫狀態(tài),初始化時其值為queueSize,即初始時隊列可寫,可接受消息長度為隊列長度。 if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) { pError('init %s:empty semaphore failed, reason:%s', pSched->label, strerror(errno)); goto _error; } #fullSem是隊列的可讀狀態(tài),初始化時其值為0,即初始時隊列不可讀 if (sem_init(&pSched->fullSem, 0, 0) != 0) { pError('init %s:full semaphore failed, reason:%s', pSched->label, strerror(errno)); goto _error; }
if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) { pError('%s: no enough memory for queue, reason:%s', pSched->label, strerror(errno)); goto _error; }
memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg)); pSched->fullSlot = 0;#實始化時隊列為空,故隊頭和隊尾的位置都是0 pSched->emptySlot = 0;#實始化時隊列為空,故隊頭和隊尾的位置都是0
pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads);
pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (int i = 0; i < pSched->numOfThreads; ++i) { if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) { pError('%s: failed to create rpc thread, reason:%s', pSched->label, strerror(errno)); goto _error; } }
pTrace('%s scheduler is initialized, numOfThreads:%d', pSched->label, pSched->numOfThreads);
return (void *)pSched;
_error: taosCleanUpScheduler(pSched); return NULL; }
再來看讀消息的taosProcessSchedQueue函數(shù),這個主要邏輯是 1.使用無限循環(huán),只要隊列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續(xù)向下處理 2.在操作msg前,加入互斥體防止msg被誤用。 3.讀操作完畢后修改fullSlot的值,注意這為避免fullSlot溢出,需要對于queueSize取余。同時退出互斥體。 4.對emptySem進行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個消息后,emptySem的值為6,即可寫狀態(tài),且能接受的消息數(shù)量為6。 具體代碼及注釋如下: void *taosProcessSchedQueue(void *param) { SSchedMsg msg; SSchedQueue *pSched = (SSchedQueue *)param; #注意這里是個無限循環(huán),只要隊列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續(xù)處理 while (1) { if (sem_wait(&pSched->fullSem) != 0) { pError('wait %s fullSem failed, errno:%d, reason:%s', pSched->label, errno, strerror(errno)); if (errno == EINTR) { /* sem_wait is interrupted by interrupt, ignore and continue */ continue; } } #加入互斥體防止msg被誤用。 if (pthread_mutex_lock(&pSched->queueMutex) != 0) pError('lock %s queueMutex failed, reason:%s', pSched->label, strerror(errno));
msg = pSched->queue[pSched->fullSlot]; memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg)); #讀取完畢修改fullSlot的值,注意這為避免fullSlot溢出,需要對于queueSize取余。 pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize; #讀取完畢修改退出互斥體 if (pthread_mutex_unlock(&pSched->queueMutex) != 0) pError('unlock %s queueMutex failed, reason:%s\n', pSched->label, strerror(errno)); #讀取完畢對emptySem進行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個消息后,emptySem的值為6,即可寫狀態(tài),且能接受的消息數(shù)量為6 if (sem_post(&pSched->emptySem) != 0) pError('post %s emptySem failed, reason:%s\n', pSched->label, strerror(errno));
if (msg.fp) (*(msg.fp))(&msg); else if (msg.tfp) (*(msg.tfp))(msg.ahandle, msg.thandle); } }
最后來看寫消息的taosScheduleTask函數(shù),其基本邏輯是 1.寫隊列前先對emptySem進行減1操作,如emptySem原值為1,那么減1后為0,也就是隊列已滿,必須在讀取消息后,即emptySem進行post操作后,隊列才能進行可寫狀態(tài)。 2.加入互斥體防止msg被誤操作,寫入完成后退出互斥體。 3.寫隊列完成后對fullSem進行加1操作,如fullSem原值為0,那么加1后為1,也就是隊列可讀,咱們上面介紹的讀取taosProcessSchedQueue中 sem_wait(&pSched->fullSem)不再阻塞就繼續(xù)向下。 int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) { SSchedQueue *pSched = (SSchedQueue *)qhandle; if (pSched == NULL) { pError('sched is not ready, msg:%p is dropped', pMsg); return 0; } #在寫隊列前先對emptySem進行減1操作,如emptySem原值為1,那么減1后為0,也就是隊列已滿,必須在讀取消息后,即emptySem進行post操作后,隊列才能進行可寫狀態(tài)。 if (sem_wait(&pSched->emptySem) != 0) pError('wait %s emptySem failed, reason:%s', pSched->label, strerror(errno)); #加入互斥體防止msg被誤操作 if (pthread_mutex_lock(&pSched->queueMutex) != 0) pError('lock %s queueMutex failed, reason:%s', pSched->label, strerror(errno));
pSched->queue[pSched->emptySlot] = *pMsg; pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
if (pthread_mutex_unlock(&pSched->queueMutex) != 0) pError('unlock %s queueMutex failed, reason:%s', pSched->label, strerror(errno)); #在寫隊列前先對fullSem進行加1操作,如fullSem原值為0,那么加1后為1,也就是隊列可讀,咱們上面介紹的讀取函數(shù)可以進行處理。 if (sem_post(&pSched->fullSem) != 0) pError('post %s fullSem failed, reason:%s', pSched->label, strerror(errno));
return 0; }
當然以上只是TdEngine優(yōu)美代碼的一小部分,而且筆者解讀的功力也十分有限,這里再次強烈建議大家下載全部源碼仔細學習,定能受益匪淺。 原文: https://blog.csdn.net/BEYONDMA/article/details/96578186 【End】
|