業(yè)務(wù)模型 讀、寫(xiě)、刪的比例大致是7:3:1,至少要支持500w條緩存,平均每條緩存6k,要求設(shè)計(jì)一套性能比較好的緩存算法。 不考慮MemCached,Velocity等現(xiàn)成的key-value緩存方案,也不考慮脫離.net gc自己管理內(nèi)存,不考慮隨機(jī)讀取數(shù)據(jù)及順序讀取數(shù)據(jù)的場(chǎng)景,目前想到的有如下幾種LRU方案
目前幾種方案在多線程下應(yīng)該都需要加鎖,不太好設(shè)計(jì)無(wú)鎖的方案,下面這個(gè)鏈接是一個(gè)支持多線程的方案,但原理至今沒(méi)搞特明白 用普通鏈表簡(jiǎn)單實(shí)現(xiàn)LRU緩存
public class LRUCacheHelper<K, V> {
readonly Dictionary<K, V> _dict; readonly LinkedList<K> _queue = new LinkedList<K>(); readonly object _syncRoot = new object(); private readonly int _max; public LRUCacheHelper(int capacity, int max) { _dict = new Dictionary<K, V>(capacity); _max = max; } public void Add(K key, V value) { lock (_syncRoot) { checkAndTruncate(); _queue.AddFirst(key); //O(1) _dict[key] = value; //O(1) } } private void checkAndTruncate() { lock (_syncRoot) { int count = _dict.Count; //O(1) if (count >= _max) { int needRemoveCount = count / 10; for (int i = 0; i < needRemoveCount; i++) { _dict.Remove(_queue.Last.Value); //O(1) _queue.RemoveLast(); //O(1) } } } } public void Delete(K key) { lock (_syncRoot) { _dict.Remove(key); //(1) _queue.Remove(key); // O(n) } } public V Get(K key) { lock (_syncRoot) { V ret; _dict.TryGetValue(key, out ret); //O(1) _queue.Remove(key); //O(n) _queue.AddFirst(key); //(1) return ret; } } }
_dict.TryGetValue(key, out ret); //O(1) 我改進(jìn)后的鏈表就差不多滿足需求了,
其中截?cái)嗟臅r(shí)候可以指定當(dāng)緩存滿的時(shí)候截?cái)喟俜种嗌俚淖钌偈褂玫木彺骓?xiàng)。 其它的就是多線程的時(shí)候鎖再看看怎么優(yōu)化,字典有線程安全的版本,就把.net 3.0的讀寫(xiě)鎖扣出來(lái)再把普通的泛型字典保證成ThreadSafelyDictionay就行了,性能應(yīng)該挺好的。 鏈表的話不太好用讀寫(xiě)鎖來(lái)做線程同步,大不了用互斥鎖,但得考慮下鎖的粒度,Move,AddFirst,RemoveLast的時(shí)候只操作一兩個(gè)節(jié)點(diǎn),是不是想辦法只lock這幾個(gè)節(jié)點(diǎn)就行了,Truncate的時(shí)候因?yàn)橐坎僮骱芏喙?jié)點(diǎn),所以要上個(gè)大多鏈表鎖,但這時(shí)候怎么讓其它操作停止得考慮考慮,類(lèi)似數(shù)據(jù)庫(kù)的表鎖和行鎖。
實(shí)現(xiàn)代碼
public class DoubleLinkedListNode<T> {
public T Value { get; set; } public DoubleLinkedListNode<T> Next { get; set; } public DoubleLinkedListNode<T> Prior { get; set; } public DoubleLinkedListNode(T t) { Value = t; } public DoubleLinkedListNode() { } public void RemoveSelf() { Prior.Next = Next; Next.Prior = Prior; } } public class DoubleLinkedList<T> { protected DoubleLinkedListNode<T> m_Head; private DoubleLinkedListNode<T> m_Tail; public DoubleLinkedList() { m_Head = new DoubleLinkedListNode<T>(); m_Tail = m_Head; } public DoubleLinkedList(T t) : this() { m_Head.Next = new DoubleLinkedListNode<T>(t); m_Tail = m_Head.Next; m_Tail.Prior = m_Head; } public DoubleLinkedListNode<T> Tail { get { return m_Tail; } } public DoubleLinkedListNode<T> AddHead(T t) { DoubleLinkedListNode<T> insertNode = new DoubleLinkedListNode<T>(t); DoubleLinkedListNode<T> currentNode = m_Head; insertNode.Prior = null; insertNode.Next = currentNode; currentNode.Prior = insertNode; m_Head = insertNode; return insertNode; } public void RemoveTail() { m_Tail = m_Tail.Prior; m_Tail.Next = null; return; } } public class LRUCacheHelper<K, V> { class DictItem { public DoubleLinkedListNode<K> Node { get; set; } public V Value { get; set; } } readonly Dictionary<K, DictItem> _dict; readonly DoubleLinkedList<K> _queue = new DoubleLinkedList<K>(); readonly object _syncRoot = new object(); private readonly int _max; public LRUCacheHelper(int capacity, int max) { _dict = new Dictionary<K, DictItem>(capacity); _max = max; } public void Add(K key, V value) { lock (this) { checkAndTruncate(); DoubleLinkedListNode<K> v = _queue.AddHead(key); //O(1) _dict[key] = new DictItem() { Node = v, Value = value }; //O(1) } } private void checkAndTruncate() { int count = _dict.Count; //O(1) if (count >= _max) { int needRemoveCount = count / 10; for (int i = 0; i < needRemoveCount; i++) { _dict.Remove(_queue.Tail.Value); //O(1) _queue.RemoveTail(); //O(1) } } } public void Delete(K key) { lock (this) { _dict[key].Node.RemoveSelf(); _dict.Remove(key); //(1) } } public V Get(K key) { lock (this) { DictItem ret; if (_dict.TryGetValue(key, out ret)) { ret.Node.RemoveSelf(); _queue.AddHead(key); return ret.Value; } return default(V); } } }
程序初始化200w條緩存,然后不斷的加,每加到500w,截?cái)嗟?/span>10分之一,然后繼續(xù)加。 測(cè)試模型中每秒鐘的讀和寫(xiě)的比例是7:3,以下是依次在3個(gè)時(shí)間點(diǎn)截取的性能計(jì)數(shù)器圖。
后續(xù)改進(jìn) 1、 用游標(biāo)鏈表來(lái)代替普通的雙頭鏈表,程序起來(lái)就收工分配固定大小的數(shù)組,然后用數(shù)組的索引來(lái)做鏈表,省得每次添加和刪除節(jié)點(diǎn)都要GC的參與,這相當(dāng)于手工管理內(nèi)存了,但目前我還沒(méi)找到c#合適的實(shí)現(xiàn)。 2、 有人說(shuō)鏈表不適合用在多線程環(huán)境中,因?yàn)閷?duì)鏈表的每個(gè)操作都要加互斥鎖,連讀寫(xiě)鎖都用不上,我目前的實(shí)現(xiàn)是直接用互斥鎖做的線程同步,每秒的吞吐量七八十萬(wàn),感覺(jué)lock也不是瓶頸,如果要改進(jìn)的話可以把Dictionary用ThreadSafelyDictionary來(lái)代替,然后鏈表還用互斥鎖(剛開(kāi)始設(shè)想的鏈表操作只鎖要操作的幾個(gè)節(jié)點(diǎn)以降低并發(fā)沖突的想法應(yīng)該不可取,不嚴(yán)謹(jǐn))。 3、 還有一個(gè)地方就是把鎖細(xì)分以下,鏈表還用鏈表,但每個(gè)鏈表的節(jié)點(diǎn)是個(gè)HashSet,對(duì)HashSet的操作如果只有讀,寫(xiě),刪,沒(méi)有遍歷的話應(yīng)該不需要做線程同步(我感覺(jué)不用,因?yàn)?/span>Set就是一個(gè)集合,一個(gè)線程往里插入,一個(gè)線程往里刪除,一個(gè)線程讀取應(yīng)該沒(méi)問(wèn)題,頂多讀進(jìn)來(lái)的數(shù)據(jù)可能馬上就刪除了,而整個(gè)Set的結(jié)構(gòu)不會(huì)破壞)。然后新增數(shù)據(jù)的時(shí)候往鏈表頭頂Set里插入,讀取某個(gè)數(shù)據(jù)的時(shí)候把它所在的節(jié)點(diǎn)的Set里刪除該數(shù)據(jù),然后再鏈表頭的Set里插入一個(gè)數(shù)據(jù),這樣反復(fù)操作后,鏈表的最后一個(gè)節(jié)點(diǎn)的Set里的數(shù)據(jù)都是舊數(shù)據(jù)了,可以安全的刪除了,當(dāng)然這個(gè)刪除的時(shí)候應(yīng)該是要鎖整個(gè)鏈表的。每個(gè)Set應(yīng)該有個(gè)大小上限,比如20w,但set不能安全的遍歷,就不能得到當(dāng)前大小,所以添加、刪除Set的數(shù)據(jù)的時(shí)候應(yīng)該用Interlocked.Decrement()和 Interlocked.Increment()維護(hù)一個(gè)Count,一遍一個(gè)Set滿的時(shí)候,再到鏈表的頭新增一個(gè)Set節(jié)點(diǎn)。 性能測(cè)試腳本
class Program {
private static PerformanceCounter _addCounter; private static PerformanceCounter _getCounter; static void Main(string[] args) { SetupCategory(); _addCounter = new PerformanceCounter("wawasoft.lrucache", "add/sec", false); _getCounter = new PerformanceCounter("wawasoft.lrucache", "get/sec", false); _addCounter.RawValue = 0; _getCounter.RawValue = 0; Random rnd = new Random(); const int max = 500 * 10000; LRUCacheHelper<int, int> cache = new LRUCacheHelper<int, int>(200 * 10000, max); for (int i = 10000*100000 - 1; i >= 0; i--) { if(i % 10 > 7) { ThreadPool.QueueUserWorkItem( delegate { cache.Add(rnd.Next(0, 10000), 0); _addCounter.Increment(); }); } else { ThreadPool.QueueUserWorkItem( delegate { int pop = cache.Get(i); _getCounter.Increment(); }); } } Console.ReadKey(); } private static void SetupCategory() { if (!PerformanceCounterCategory.Exists("wawasoft.lrucache")) { CounterCreationDataCollection CCDC = new CounterCreationDataCollection(); CounterCreationData addcounter = new CounterCreationData(); addcounter.CounterType = PerformanceCounterType.RateOfCountsPerSecond32; addcounter.CounterName = "add/sec"; CCDC.Add(addcounter); CounterCreationData getcounter = new CounterCreationData(); getcounter.CounterType = PerformanceCounterType.RateOfCountsPerSecond32; getcounter.CounterName = "get/sec"; CCDC.Add(getcounter); PerformanceCounterCategory.Create("wawasoft.lrucache","lrucache",CCDC); } } }
|
|