3 Mart 2014 Pazartesi

Concurrent Collections

Not 1:  Java'da Concurrent Kuyruk konusu geniş olduğu için ayrı bir yazı olarak buraya taşıdım.
Not 2: Actor Model'de shared state olmaz.

Kuyruk Teorisi
Kuyruk teorisi veri yapısı ile ilgili değil. Ancak dolaylı olarak kuyruğun ne kadar büyük olması gerektiği ve bekleme süresini matematiksel olarak gösterdiği için bilinmesi faydalı. M/M/1 kuyruğu bu teoride kullanılan basit bir kuyruk.

Amdalh Kanunu
Bu kanunda concurrent programlama modeline geçilse bile hızın paralelleştirilemeyen kod parçasına bağımlı olduğu belirtilmiş.Örneğin sadece %50'si paralelleştirilebilen bir kod parçasına 16 işlemci ekleyerek 2 kat hızlanma elde edilir. Bundan sonra eklenen işlemci sayısının bir etkisi olmaz.

Concurrent Collections
Aşağıda çeşitli dillerde gördüğüm concurrent veri yapıları ile ilgili aldığım notlar var. Öncelikle ACE, Java ve C# arasındaki temel felsefe farkına dikkat etmek lazım.

ACE kütüphanesi, strategy örüntüsünü izleyerek veri yapısına dışarıdan kilitleme mekanizmasının geçilmesi felsefesini benimsemiş. Böylece aynı veri yapısı, kullanıcının isteğine göre thread-safe olarak veya olmayarak kullanılabiliyor.

Java ve C# ise veri yapılarının thread-safe olan ve olmayan 2 farklı sınıfını sunuyor.

ACE ACE_Map_Manager
Örneği buradan aldım.

//Eğer istenirse
//ACE_Null_Mutex
//ACE_Thread_Mutex
//ACE_Recursive_Thread_Mutex
//ACE_RW_Mutex kullanılabilir
typedef ACE_Map_Manager<unsigned long, std::string, ACE_Thread_Mutex> StringMap; 
StringMap m_MapInstance;

typedef StringMap::lock_type ACE_MAP_LOCK_TYPE;
//Create a guard
ACE_Guard<ACE_MAP_LOCK_TYPE> l(m_MapInstance.mutex());
for(StringMap::iterator it = m_MapInstance.begin(), end = m_MapInstance.end(); it!=end; ++it)
{
    const std::string & strContent = (*it).int_id_;
    const unsigned long & ulID = (*it).ext_id_;
    //do something with the element here
}


C# BlockingCollection
BlockingCollection klasik bir thread-safe collection. Yaratılması çok kolay
BlockingCollection<double> _collection = new     BlockingCollection<double>();

CompleteAdding
Bu metod niçin var ?
BlockingCollection sınıfı genellikle producer-consumer işlerinde kullanılıyor. Consumer Parallel.ForEach (...) ise, collection.GetConsumingEnumerable() metodunu kullanarak, hızlandırma (optimizasyon) amaçlı belli büyüklükteki bir tomar (chunk) okumaya çalışıyor. Ancak producer tomardan daha az sayıda eleman üretirse, consumer sonsuza kadar bekler. İşte bu durumun olmaması için, üreticiye artık daha fazla eleman gelmeyeceği, elindekileri işlemesi gerektiğini bildirmek için bu metod var. Burada da benzer bir açıklama var.

Parallel::ForEach will use a default Partitioner<T> implementation which, for an IEnumerable<T> which has no known length, will use a chunk partitioning strategy. What this means is each worker thread which Parallel::ForEach is going to use to work on the data set will read some number of elements from the IEnumerable<T> which will then only be processed by that thread (ignoring work stealing for now). It does this to save the expense of constantly having to go back to the source and allocate some new work and schedule it for another worker thread. So, usually, this is a good thing.

Bu metod çağırılınca artık kuyruğa yeni bir eleman eklenemiyor. Örnek burada.
Bir başka örnek ise buradan geldi.

void ReadInFile(string filePath, BlockingCollection<string> collection)
{
    foreach(var line in File.ReadLines(filePath))
    {
        collection.Add(line);
    }

    //This lets the consumer know that we will not be adding any more items to the collection.
    collection.CompleteAdding();
}
 

 
GetConsumingEnumerable
Bu sınıfın GetConsumingEnumerable() metodu ile kuyruktaki elemanların bir görüntüsü (snapshot) alınıyor ve bir başka thread tarafından kullanılabiliyor. Örnek:

C# Partitioner
Range Partitioning, Chunk Partitioning algoritmaları var.

EnumerablePartitionerOptions sınıfı

NoBuffering seçeneği
Eğer listedeki elemanları teker teker thread'lere dağıtmak istersek aşağıdaki gibi yapabiliriz.
List<string> _files = new List<string>();
Parallel.ForEach(Partitioner.Create(_files, EnumerablePartitionerOptions.NoBuffering),
                new ParallelOptions
                {
                    MaxDegreeOfParallelism = 5 //limit number of parallel threads
                },
                (file, loopstate, index) =>
                {
                    if (token.IsCancellationRequested)
                        return;
                    //do work...
                });


C# ConcurrentBag
Bu sınıf unordered bir veri yapısı. Neden sırasız onu bilmiyorum. BlockingCollection'a göre ne avantaj sağlıyor bilmiyorum.

Örnek:
var stringResult = new ConcurrentBag<string>();
stringResult.Add ("line");
string item;
stringResult.TryTake (out item);//Bag'den bir eleman çek
C# ConcurrentDictionary
Örnek:
var dict = new ConcurrentDictionary<string, MyClass>();
MyClass match;
dict.TryGetValue("mykey", out match);

Java ConcurrentHashMap
ConcurrentHashMap tek bir lock ile korunan Collections.synchronizedMap yerine daha fazla lock kullanarak daha fazla ölçeklenebilirlik sağlamayı hedefliyor.  Bu konu hakkında yazılmış Java theory and practice: Building a better HashMap başlıklı yazı bilgilendirici.
Bu sınıfın en işe güzel metodu putIfAbsent() metodu. Aşağıdaki örneği Thread safety of cached ConcurrentHashMaps containing Lists yazısından aldım. Hem putIfAbsent() metodunu gösteriyor hem de kullanım örneği veriyor.

ConcurrentMap<String, List<Integer>> cachedMap = //...

//You don't need a synchronized block here
List<Integer> mappedList = cachedMap.get("key");
if (mappedList == null) {
        List<Integer> newMappedList = Lists.newArrayList();
        mappedList = cachedMap.putIfAbsent("key", newMappedList);
}
// ArrayList is not synchronized, so that's the only part you actually need to
// guard against concurrent modification.

synchronized (mappedList) {
    mappedList.add(value);
}
Java NavigableSet
NavigableSet aynı zamanda bir SortedSet ve elemanlar sıralanıyorlar.
< listesi için
headSet() ile verilen elemandan küçük olanların listesi alınır

> listesi için
tailSet() ile de verilen elemandan büyük olanlar alınabilir.

<= için
floor(E e) metodu ile verilen elemandan küçük eşit olan ilk eleman alınır. Örneği buradan aldım. Bir diğer
NavigableSet<Quote> topNQuotes = new TreeSet<Quote>();
//get the greatest Quote that is less than this one
Quote lowerQuote = topNQuotes.lower(quote);
if (lowerQuote == null) {
    //Do something...
}
Bir diğer örnek ise aşağıda.

>= için
ceiling (E e) metodu ile verilen elemandan büyük eşit olan ilk eleman alınır.

> için
higher(E e)  metodu ile verilen elemandan büyük olan ilk eleman alınır.

< için
lower(E e) metodu ile verilen elemandan küçük olan ilk eleman alınır.

Java NavigableMap
NavigableMap aynı zamanda bir SortedMap yani elemanları sıralı tutuyor. Aslında SortedMap tek başına artık kullanılmıyor. SortedMap olan herşey aynı zamanda NavigableMap. Kullanılan metodlar NavigableSet ile aynı anlama geliyor. Aralarındaki tek fark NavigableMap metodları Entry kelimesi ile biterler, yani xxxEntry şeklinde olurlar.

pollLastEntry
Bu metod ile en büyük elemanı almak mümkün.

floorEntry

Buradaki örnekte bir aralık içinde değer arama örneği var.
<K, V> V mappedValue(TreeMap<K, V> map, K key) {
    Entry<K, V> e = map.floorEntry(key);//<= koşulunu sağlayan en büyük elemanı bul
    if (e != null && e.getValue() == null) {//Eğer eleman varsa ve değeri null ise
        e = map.lowerEntry(key);//< koşulunu sağlayan en büyük elemanı bul
    }
    return e == null ? null : e.getValue();
}
Java ConcurrentSkipListMap
ConcurrentSkipListMap aynı zamanda bir ConcurrentNavigableMap ve NavigableMap. Kendi içinde red-black tree yerine skip list kullanıyor. QT dokümantasyonunda QMap'in de skip list kullandığı yazılı. why does qmap uses skiplist instead ob rb-tree? sorusunda da konuyla ilgili bilgi var. Sebep olarak ise
skip-lists result in less code in the executable and require less memory per node.
yazılmış. When should I use ConcurrentSkipListMap? sorusunda ise ConcurrentSkipListMap'in ConcurrentHashMap'e göre ekle/çıkar/al işlemlerinde biraz daha yavaş olabileceği ancak elemanları sıralı tutması ve ceilingEntry(), ceilingKey() metodlar sunduğu için sıralı erişimin gerektiği yerlerde daha hızlı olduğu söylenmiş.


Hiç yorum yok:

Yorum Gönder