Search

Categories

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Send mail to the author(s) E-mail

# Friday, 31 October 2014
( Async | Task | Threading )

As part of the 70-483 exam there is a section on threads/async, which I’m using the study book to go through.

An application runs in its own process.
Each process runs in its own thread

Windows context switches after a period.  On a single core machine giving the illusion of being able to multi task.

Thread

Only use under special circumstances.

public static void ThreadMethod() { for (int i = 0; i < 10; i++) { Console.WriteLine("ThreadProc: {0}", i); } } public static void ThreadMethod2() { for (int i = 0; i < 10; i++) { Console.WriteLine("ThreadProc2: {0}", i); } } public static void Main() { var regularThread = new Thread(ThreadMethod); regularThread.Start(); var regularThread2 = new Thread(ThreadMethod2); regularThread2.Start(); // Wait for spawned thread to end regularThread.Join(); Console.WriteLine("regularThread returned."); regularThread2.Join(); Console.WriteLine("regularThread2 returned.");

Main thread getting returns in the correct order.
public static void ThreadMethod() { for (int i = 0; i < 5; i++) { Console.WriteLine("ThreadProc: {0}", i); // Thread is finished, jump to next time slice Thread.Sleep(0); } } public static void Main() { var t = new Thread(new ThreadStart(ThreadMethod)); t.Start(); for (int i = 0; i < 10; i++) { Console.WriteLine("Main thread: Do some work."); Thread.Sleep(0); } // Wait until the other thread finishes t.Join(); Console.WriteLine("main app ended"); }

Background

public static void ThreadMethod() { for (int i = 0; i < 10; i++) { Console.WriteLine("ThreadProc: {0}", i); Thread.Sleep(1000); } } public static void Main() { var t = new Thread(new ThreadStart(ThreadMethod)); // App will exit immediately and not print anything t.IsBackground = true; t.Start(); }

Parameters

public static void ThreadMethod(object o) { for (int i = 0; i < (int)o; i++) { Console.WriteLine("ThreadProc: {0}", i); Thread.Sleep(0); } } public static void Main() { // Passing a parameter to the Thread var t = new Thread(new ParameterizedThreadStart(ThreadMethod)); t.Start(5); t.Join(); }

Stopping a Thread

public static void Main() { // Better way to stop a thread is with a shared variable bool stopped = false; // Thread initialised with a lambda (shorthand version of a delegate) var t = new Thread(new ThreadStart(() => { while (!stopped) { Console.WriteLine("Running..."); Thread.Sleep(10); Console.WriteLine("Running end"); } })); t.Start(); Console.WriteLine("Press any key to exit"); Console.ReadKey(); stopped = true; // App will always wait for the t thread to end before continuing t.Join(); Console.WriteLine("app stopped"); }

ThreadStatic


// Each thread gets its own copy of a field [ThreadStatic] public static int _field; public static void Main() { new Thread(() => { for (int x = 0; x < 10; x++) { _field++; Console.WriteLine("Thread A: {0}", _field); } }).Start(); new Thread(() => { for (int x = 0; x < 10; x++) { _field++; Console.WriteLine("Thread B: {0}", _field); } }).Start(); Console.ReadKey(); }

ThreadLocal<T>

public static ThreadLocal<int> _field = new ThreadLocal<int>(() =>{ return Thread.CurrentThread.ManagedThreadId; }); public static void Main(){ new Thread(() =>{ // 4 Console.WriteLine("Thread A ManagedThreadId: {0}", _field.Value); }).Start(); new Thread(() =>{ // 3 Console.WriteLine("Thread B ManagedThreadId: {0}", _field.Value); }).Start(); Console.ReadKey(); }

Thread pools

Creating new threads takes time and resources.  Can reuse a thread with a Thread pool.

public static void Main() { ThreadPool.QueueUserWorkItem((s) => { Console.WriteLine("Working on a thread from threadpool"); }); Console.ReadLine(); }

Thread pool limits the number of available threads, so parallelism is lesser.  However this has advantages eg if a webserver has many requests coming it, then can limit the number of threads being spawned by using a pool.

Because threads are being reused they resuse their local state.  May not rely on this state that can potentially be shared.

Tasks

Queuing a work item to a thread pool can be useful, however don’t know when its done, nor the return value.

A task scheduler is responsible for starting the task and managing it.  The task scheduler uses threads from the thread pool.

Executing a task on another thread only makes sense if you want to

  • keep the ‘UI’ free for other work
  • parallelise work to other processors
public static void Main() { Task t = Task.Run(() => { for (int x = 0; x < 100; x++) { Console.Write("*"); } }); // Wait is equivalent to .Join in threads - waits for task to finish t.Wait(); }

then:

public static void Main(){ Task<int> t = Task.Run(() =>{ return 42; }); // By reading the result, forces current thread to wait until the task has finished Console.WriteLine(t.Result); // Displays 42 }

Continuation Task

public static void Main() { Task<int> t = Task.Run(() => { return 42; // Continuation }).ContinueWith(i => i.Result * 2); Console.WriteLine(t.Result); // Displays 84 t = Task.Run(() => { return 42; }); t.ContinueWith((i) => Console.WriteLine("Canceled"), TaskContinuationOptions.OnlyOnCanceled); t.ContinueWith((i) => Console.WriteLine("Faulted"), TaskContinuationOptions.OnlyOnFaulted); // However how to get the return value out of Task? var completedTask = t.ContinueWith((i) => Console.WriteLine("Completed"), TaskContinuationOptions.OnlyOnRanToCompletion); completedTask.Wait(); }

Child Tasks

public static void Main() { // Parent task finished only when 3 children have finished Task<Int32[]> parent = Task.Run(() => { var results = new Int32[3]; new Task(() => results[0] = 0,TaskCreationOptions.AttachedToParent).Start(); new Task(() => results[1] = 1,TaskCreationOptions.AttachedToParent).Start(); new Task(() => results[2] = 2,TaskCreationOptions.AttachedToParent).Start(); return results; }); // Finaltask only runs after after parent tasks have finished var finalTask = parent.ContinueWith( parentTask => { foreach (int i in parentTask.Result) Console.WriteLine("Result of child task: {0}",i); }); Console.ReadLine(); }

 

Task Factory

To make it easier to create tasks all with the same options, can use a TaskFactory.

Task<Int32[]> parent = Task.Run(() => { var results = new Int32[3]; TaskFactory tf = new TaskFactory(TaskCreationOptions.AttachedToParent,TaskContinuationOptions.ExecuteSynchronously); tf.StartNew(() => results[0] = 0); tf.StartNew(() => results[1] = 1); tf.StartNew(() => results[2] = 2); return results; }); var finalTask = parent.ContinueWith( parentTask => { foreach (int i in parentTask.Result) Console.WriteLine("Result {0}", i); }); finalTask.Wait();

asdf
Task[] tasks = new Task[3]; tasks[0] = Task.Run(() => { Thread.Sleep(1000); Console.WriteLine("1"); }); tasks[1] = Task.Run(() => { Thread.Sleep(1000); Console.WriteLine("2"); }); tasks[2] = Task.Run(() => { Thread.Sleep(1000); Console.WriteLine("3"); } ); // 3 Threads run simultaneously, so program takes approx 1000ms to run Task.WaitAll(tasks);

WaitAny

public static void Main() { // Create an array of tasks that return ints Task<int>[] tasks = new Task<int>[3]; // Run 3 tasks on different threads tasks[0] = Task.Run(() => { Thread.Sleep(2000); return 1; }); tasks[1] = Task.Run(() => { Thread.Sleep(1000); return 2; }); tasks[2] = Task.Run(() => { Thread.Sleep(3000); return 3; }); while (tasks.Length > 0) { // Wait for any of the tasks to finsih int i = Task.WaitAny(tasks); Task<int> completedTask = tasks[i]; Console.WriteLine(completedTask.Result); // Convert array to a list var temp = tasks.ToList(); // Remove completed task from list temp.RemoveAt(i); tasks = temp.ToArray(); } }

Parallel Class

public static void Main() { // Takes a few secs to run on 8 core machine Parallel.For(0, 10, i => { Console.WriteLine("start on {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(1000); //only prints hello after thread finsihes Console.WriteLine("finish on {0}", Thread.CurrentThread.ManagedThreadId); }); var numbers = Enumerable.Range(0, 10); Parallel.ForEach(numbers, i => { Thread.Sleep(1000); Console.WriteLine("hello"); }); // 4billion var watch = new Stopwatch(); watch.Start(); ParallelLoopResult result = Parallel.For(0, 4000001000, (long i, ParallelLoopState loopState) => { if (i == 4000000000) { Console.WriteLine("Breaking loop"); loopState.Break(); } }); // 12secs on 8 core laptop Console.WriteLine(watch.Elapsed); }

Fun with parallel.

Async Await

Long running CPU bound tasks can be handed to another thread.  However when waiting for I/O bound work things are different.  We don’t want the thread paused by windows whilst waiting for the UI, as it uses memory, and can’t be used for other tasks.

Async solved this problem (keyword added in C#5).  Compiler makes a state machine.

static void Main() { // Create task and start it. // ... Wait for it to complete. Task task = new Task(ProcessDataAsync); task.Start(); task.Wait(); Console.ReadLine(); } static async void ProcessDataAsync() { // Start the HandleFile method. Task<int> task = HandleFileAsync("C:\\temp\\enable1.txt"); // Control returns here before HandleFileAsync returns. // ... Prompt the user. Console.WriteLine("Please wait patiently while I do something important."); Thread.Sleep(1200); Console.WriteLine("after sleep"); // Wait for the HandleFile task to complete. // ... Display its results. int x = await task; Console.WriteLine("Count: " + x); } static async Task<int> HandleFileAsync(string file) { Console.WriteLine("HandleFile enter"); int count = 0; // Read in the specified file. // ... Use async StreamReader method. using (var reader = new StreamReader(file)) { string v = await reader.ReadToEndAsync(); // ... Process the file data somehow. count += v.Length; Thread.Sleep(500); // ... A slow-running computation. // Dummy code. for (int i = 0; i < 10000; i++) { int x = v.GetHashCode(); if (x == 0) { count--; } } } Console.WriteLine("HandleFile exit"); return count; }

Some interesting async code.  So code coming back in a strange order, but fine.

 

Parallel Language Integrated Query (PLINQ)

public static void Main() { var numbers = Enumerable.Range(0, 10); var parallelResult = numbers.AsParallel() // Results are buffered and sorted so come back in correct order .AsOrdered() .Where(i => i % 2 == 0) .ToArray(); foreach (int i in parallelResult) Console.WriteLine(i); }

As ordered – hurts performance.

public static void Main() { var numbers = Enumerable.Range(0, 20); var parallelResult = numbers .AsParallel() //.AsOrdered() .Where(i => i%2 == 0); //.AsSequential(); //Stops query being done in parallel eg to make sure take(5) works //foreach (int i in parallelResult.Take(5)) foreach (int i in parallelResult) Console.WriteLine(i); }

Not a great example..AsSequential operator to make sure Take doesn’t mess up the order

public static void Main() { var numbers = Enumerable.Range(0, 2000000000); var parallelResult = numbers.AsParallel() .Where(i => i % 20000012 == 0); //parallelResult.ForAll(e => Console.WriteLine(e)); // Same as above - cw writes the current parallelResult.ForAll(Console.WriteLine); Action<string> thing = Console.WriteLine; thing("hello"); }

Interesting use of cw!  Also, on point, iterating in parallel.  It does not need to results before starting to iterate (in contracts to foreach. 
Parallel.Foreach is generally favoured.

Exceptions

public static void Main() { var numbers = Enumerable.Range(0, 20); try { var parallelResult = numbers.AsParallel() //.Where(i => IsEven(i)); .Where(IsEven); parallelResult.ForAll(Console.WriteLine); } catch (AggregateException e) { Console.WriteLine("There where {0} exceptions",e.InnerExceptions.Count); } } public static bool IsEven(int i) { if (i % 10 == 0) throw new ArgumentException("i"); return i % 2 == 0; }

 


Concurrent Collections

BlockCollection<T>..

Adding items is fast, and you can set an upper limit.  Will block the calling thread until there is room.  Actually uses a ConcurrentQueue by default.

public static void Main() { var col = new BlockingCollection<string>(); Task read = Task.Run(() => { while (true) { Console.WriteLine(col.Take()); } }); Task write = Task.Run(() => { while (true) { string s = Console.ReadLine(); if (string.IsNullOrWhiteSpace(s)) break; col.Add(s); } }); write.Wait(); }

ConcurrentBag<T>

A bag of items.  Enables duplicates and no order.

public static void Main() { var bag = new ConcurrentBag<int>(); bag.Add(42); bag.Add(21); int result; if (bag.TryTake(out result)) Console.WriteLine(result); // TryPeek not very useful as anohter thread may have already taken it if (bag.TryPeek(out result)) Console.WriteLine("There is a next item: {0}", result); }

then

public static void Main() { var bag = new ConcurrentBag<int>(); Task.Run(() => { bag.Add(42); Thread.Sleep(1000); bag.Add(21); }); // Only displays 42 as Bag is snapshotted before iterating Task.Run(() => { foreach (int i in bag) Console.WriteLine(i); }).Wait(); }

ConcurrentStack and ConcurrentQueue

Stack is LastIn FirstOut (LIFO) – stack of pancakes

Queue if FirstIn FirstOut (FIFO) – a British queue  :-)

public static void Main() { var stack = new ConcurrentStack<int>(); stack.Push(42); int result; if (stack.TryPop(out result)) Console.WriteLine("Popped: {0}", result); // pushes in that order stack.PushRange(new int[] { 1, 2, 3 }); // only 2 spaces in the array int[] values = new int[2]; stack.TryPopRange(values); // will show 3,2 foreach (int i in values) Console.WriteLine(i); }

asdf

public static void Main() { var queue = new ConcurrentQueue<int>(); queue.Enqueue(42); int result; if (queue.TryDequeue(out result)) Console.WriteLine("Dequeued: {0}", result); }

ConcurrentDictionary

public static void Main() { var dict = new ConcurrentDictionary<string, int>(); if (dict.TryAdd("k1", 42)) { Console.WriteLine("Added"); } if (dict.TryUpdate("k1", 21, 42)) { Console.WriteLine("42 updated to 21"); } dict["k1"] = 42; // Overwrite unconditionally int r1 = dict.AddOrUpdate("k1", 3, (s, i) => i * 2); int r2 = dict.GetOrAdd("k2", 3); }

Lots of functionality!

| | # 
# Tuesday, 07 August 2012

http://www.yoda.arachsys.com/csharp/threads/ Jon Skeet

Threading: “..trying to do more than one thing at a time within a process”

Process:  eg a browser downloading on one process, and a word processor allowing you to type on another process

Josepth Albahari

http://www.albahari.com/threading/ – Joseph Albahari (24th July 2011 updated).  C#5 book now available (.NET4.5 still in RC)

class Program
    {
        static void Main(string[] args)
        {
            Thread t = new Thread(WriteY);          // Kick off a new thread
            t.Start();                               // running WriteY()

            // Simultaneously, do something on the main thread.
            for (int i = 0; i < 1000; i++) Console.Write("x");
        }
        
        static void WriteY()
        {
            for (int i = 0; i < 1000; i++) Console.Write("y");
        }
    }

image
2 Threads writing to the console

Local Variables Separate

static void Main(string[] args)
        {
            new Thread(Go).Start();
            Go();
        }
        
        static void Go()
        {
            //CLR assigns thread its own memory stack so that local variables are kept separate
            for (int i = 0; i < 5; i++) Console.Write("?");
        }

10 ?’s are printed.

Same Object Instance share Data

class ThreadTest
    {
        bool done;

        static void Main()
        {
            ThreadTest tt = new ThreadTest();   // Create a common instance
            new Thread(tt.Go).Start();
            tt.Go();
        }

        // Note that Go is now an instance method
        void Go()
        {
            if (!done) { done = true; Console.WriteLine("Done"); }
        }
    }

Threads share data if they have a common reference to the same object instance.

Done is only printed once.

Static Fields Shared

class ThreadTest
    {
        static bool done;    // Static fields are shared between all threads

        static void Main()
        {
            new Thread(Go).Start();
            Go();
        }

        static void Go()
        {
            if (!done) { done = true; Console.WriteLine("Done"); }
        }
    }

Thread non Safety

class ThreadTest
    {
        static bool done;    // Static fields are shared between all threads

        static void Main()
        {
            new Thread(Go).Start();
            Go();
        }

        static void Go()
        {
            //Swapped the order of statements - usually get Done twice now
            //ie This is not thread safe
            if (!done) { Console.WriteLine("Done"); done = true; }
        }
    }

Usually displays Done twice

Locking

class ThreadTest
    {
        static bool done;    // Static fields are shared between all threads
        static readonly object locker = new object();

        static void Main()
        {
            new Thread(Go).Start();
            Go();
        }

        static void Go()
        {
            //When two threads simultaneously contend a lock (in this case, locker), one thread waits, or blocks, until the lock becomes available
            lock (locker)
            {
                if (!done)
                {
                    Console.WriteLine("Done");
                    done = true;
                }
            }
        }
    }
Display Done once!

Parallel Programming

eg PLINQ makes it easy.

image

static void Main()
        {
            // Calculate prime numbers using a simple (unoptimized) algorithm.
            //
            IEnumerable<int> numbers = Enumerable.Range(3, 100000000 - 3);

            var parallelQuery =
              from n in numbers.AsParallel()
              where Enumerable.Range(2, (int)Math.Sqrt(n)).All(i => n % i > 0)
              select n;

            int[] primes = parallelQuery.ToArray();
            //foreach (var prime in primes)
            //{
            //    Console.WriteLine(prime);
            //}
        }

Spell Checker in Parallel with PLINQ

struct IndexedWord { public string Word; public int Index; }

        static void Main()
        {
            if (!File.Exists("WordLookup.txt"))    // Contains about 150,000 words
                new WebClient().DownloadFile("http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");

            var wordLookup = new HashSet<string>(File.ReadAllLines("WordLookup.txt"),StringComparer.InvariantCultureIgnoreCase);

            var random = new Random();
            string[] wordList = wordLookup.ToArray();

            //Test document of a 100 million different words
            string[] wordsToTest = Enumerable.Range(0, 100000000)
              .Select(i => wordList[random.Next(0, wordList.Length)])
              .ToArray();

            wordsToTest[12345] = "woozsh";     // Introduce a couple
            wordsToTest[23456] = "wubsie";     // of spelling mistakes.

            Stopwatch s = new Stopwatch();
            s.Start();
            var query = wordsToTest
                      //.AsParallel()
                      .Select((word, index) => new IndexedWord { Word = word, Index = index })
                      .Where(iword => !wordLookup.Contains(iword.Word))
                      .OrderBy(iword => iword.Index);
           
            foreach (var indexedWord in query)
            {
                Console.WriteLine(indexedWord.Word);
            }
            s.Stop();
            TimeSpan t = s.Elapsed;
            Console.WriteLine(t.Seconds);

        }

11s for Parallel (8 cores)
52s non Parallel (1 core)

SpellChecker with Parallel.ForEach

struct IndexedWord { public string Word; public int Index; }

        static void Main()
        {
            if (!File.Exists("WordLookup.txt"))    // Contains about 150,000 words
                new WebClient().DownloadFile("http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");

            var wordLookup = new HashSet<string>(File.ReadAllLines("WordLookup.txt"),StringComparer.InvariantCultureIgnoreCase);

            var random = new Random();
            string[] wordList = wordLookup.ToArray();

            //Test document of a 100 million different words..
            string[] wordsToTest = Enumerable.Range(0, 100000000)
              .Select(i => wordList[random.Next(0, wordList.Length)])
              .ToArray();

            wordsToTest[12345] = "woozsh";     // Introduce a couple
            wordsToTest[23456] = "wubsie";     // of spelling mistakes.

            Stopwatch s = new Stopwatch();
            s.Start();

            var misspellings = new ConcurrentBag<Tuple<int, string>>();

            Parallel.ForEach(wordsToTest, (word, state, i) =>
            {
                if (!wordLookup.Contains(word))
                    misspellings.Add(Tuple.Create((int)i, word));
            });

            foreach (var tu in misspellings)
            {
                Console.WriteLine("{0}, {1}",tu.Item1,tu.Item2);
            }
            s.Stop();
            TimeSpan t = s.Elapsed;
            Console.WriteLine(t.Seconds);
        }

10s for Parallel. Indexed ForEach is more efficient than Select.

| | #