Search

Categories

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Send mail to the author(s) E-mail

# Friday, 31 October 2014
( Async | Task | Threading )

As part of the 70-483 exam there is a section on threads/async, which I’m using the study book to go through.

An application runs in its own process.
Each process runs in its own thread

Windows context switches after a period.  On a single core machine giving the illusion of being able to multi task.

Thread

Only use under special circumstances.

public static void ThreadMethod() { for (int i = 0; i < 10; i++) { Console.WriteLine("ThreadProc: {0}", i); } } public static void ThreadMethod2() { for (int i = 0; i < 10; i++) { Console.WriteLine("ThreadProc2: {0}", i); } } public static void Main() { var regularThread = new Thread(ThreadMethod); regularThread.Start(); var regularThread2 = new Thread(ThreadMethod2); regularThread2.Start(); // Wait for spawned thread to end regularThread.Join(); Console.WriteLine("regularThread returned."); regularThread2.Join(); Console.WriteLine("regularThread2 returned.");

Main thread getting returns in the correct order.
public static void ThreadMethod() { for (int i = 0; i < 5; i++) { Console.WriteLine("ThreadProc: {0}", i); // Thread is finished, jump to next time slice Thread.Sleep(0); } } public static void Main() { var t = new Thread(new ThreadStart(ThreadMethod)); t.Start(); for (int i = 0; i < 10; i++) { Console.WriteLine("Main thread: Do some work."); Thread.Sleep(0); } // Wait until the other thread finishes t.Join(); Console.WriteLine("main app ended"); }

Background

public static void ThreadMethod() { for (int i = 0; i < 10; i++) { Console.WriteLine("ThreadProc: {0}", i); Thread.Sleep(1000); } } public static void Main() { var t = new Thread(new ThreadStart(ThreadMethod)); // App will exit immediately and not print anything t.IsBackground = true; t.Start(); }

Parameters

public static void ThreadMethod(object o) { for (int i = 0; i < (int)o; i++) { Console.WriteLine("ThreadProc: {0}", i); Thread.Sleep(0); } } public static void Main() { // Passing a parameter to the Thread var t = new Thread(new ParameterizedThreadStart(ThreadMethod)); t.Start(5); t.Join(); }

Stopping a Thread

public static void Main() { // Better way to stop a thread is with a shared variable bool stopped = false; // Thread initialised with a lambda (shorthand version of a delegate) var t = new Thread(new ThreadStart(() => { while (!stopped) { Console.WriteLine("Running..."); Thread.Sleep(10); Console.WriteLine("Running end"); } })); t.Start(); Console.WriteLine("Press any key to exit"); Console.ReadKey(); stopped = true; // App will always wait for the t thread to end before continuing t.Join(); Console.WriteLine("app stopped"); }

ThreadStatic


// Each thread gets its own copy of a field [ThreadStatic] public static int _field; public static void Main() { new Thread(() => { for (int x = 0; x < 10; x++) { _field++; Console.WriteLine("Thread A: {0}", _field); } }).Start(); new Thread(() => { for (int x = 0; x < 10; x++) { _field++; Console.WriteLine("Thread B: {0}", _field); } }).Start(); Console.ReadKey(); }

ThreadLocal<T>

public static ThreadLocal<int> _field = new ThreadLocal<int>(() =>{ return Thread.CurrentThread.ManagedThreadId; }); public static void Main(){ new Thread(() =>{ // 4 Console.WriteLine("Thread A ManagedThreadId: {0}", _field.Value); }).Start(); new Thread(() =>{ // 3 Console.WriteLine("Thread B ManagedThreadId: {0}", _field.Value); }).Start(); Console.ReadKey(); }

Thread pools

Creating new threads takes time and resources.  Can reuse a thread with a Thread pool.

public static void Main() { ThreadPool.QueueUserWorkItem((s) => { Console.WriteLine("Working on a thread from threadpool"); }); Console.ReadLine(); }

Thread pool limits the number of available threads, so parallelism is lesser.  However this has advantages eg if a webserver has many requests coming it, then can limit the number of threads being spawned by using a pool.

Because threads are being reused they resuse their local state.  May not rely on this state that can potentially be shared.

Tasks

Queuing a work item to a thread pool can be useful, however don’t know when its done, nor the return value.

A task scheduler is responsible for starting the task and managing it.  The task scheduler uses threads from the thread pool.

Executing a task on another thread only makes sense if you want to

  • keep the ‘UI’ free for other work
  • parallelise work to other processors
public static void Main() { Task t = Task.Run(() => { for (int x = 0; x < 100; x++) { Console.Write("*"); } }); // Wait is equivalent to .Join in threads - waits for task to finish t.Wait(); }

then:

public static void Main(){ Task<int> t = Task.Run(() =>{ return 42; }); // By reading the result, forces current thread to wait until the task has finished Console.WriteLine(t.Result); // Displays 42 }

Continuation Task

public static void Main() { Task<int> t = Task.Run(() => { return 42; // Continuation }).ContinueWith(i => i.Result * 2); Console.WriteLine(t.Result); // Displays 84 t = Task.Run(() => { return 42; }); t.ContinueWith((i) => Console.WriteLine("Canceled"), TaskContinuationOptions.OnlyOnCanceled); t.ContinueWith((i) => Console.WriteLine("Faulted"), TaskContinuationOptions.OnlyOnFaulted); // However how to get the return value out of Task? var completedTask = t.ContinueWith((i) => Console.WriteLine("Completed"), TaskContinuationOptions.OnlyOnRanToCompletion); completedTask.Wait(); }

Child Tasks

public static void Main() { // Parent task finished only when 3 children have finished Task<Int32[]> parent = Task.Run(() => { var results = new Int32[3]; new Task(() => results[0] = 0,TaskCreationOptions.AttachedToParent).Start(); new Task(() => results[1] = 1,TaskCreationOptions.AttachedToParent).Start(); new Task(() => results[2] = 2,TaskCreationOptions.AttachedToParent).Start(); return results; }); // Finaltask only runs after after parent tasks have finished var finalTask = parent.ContinueWith( parentTask => { foreach (int i in parentTask.Result) Console.WriteLine("Result of child task: {0}",i); }); Console.ReadLine(); }

 

Task Factory

To make it easier to create tasks all with the same options, can use a TaskFactory.

Task<Int32[]> parent = Task.Run(() => { var results = new Int32[3]; TaskFactory tf = new TaskFactory(TaskCreationOptions.AttachedToParent,TaskContinuationOptions.ExecuteSynchronously); tf.StartNew(() => results[0] = 0); tf.StartNew(() => results[1] = 1); tf.StartNew(() => results[2] = 2); return results; }); var finalTask = parent.ContinueWith( parentTask => { foreach (int i in parentTask.Result) Console.WriteLine("Result {0}", i); }); finalTask.Wait();

asdf
Task[] tasks = new Task[3]; tasks[0] = Task.Run(() => { Thread.Sleep(1000); Console.WriteLine("1"); }); tasks[1] = Task.Run(() => { Thread.Sleep(1000); Console.WriteLine("2"); }); tasks[2] = Task.Run(() => { Thread.Sleep(1000); Console.WriteLine("3"); } ); // 3 Threads run simultaneously, so program takes approx 1000ms to run Task.WaitAll(tasks);

WaitAny

public static void Main() { // Create an array of tasks that return ints Task<int>[] tasks = new Task<int>[3]; // Run 3 tasks on different threads tasks[0] = Task.Run(() => { Thread.Sleep(2000); return 1; }); tasks[1] = Task.Run(() => { Thread.Sleep(1000); return 2; }); tasks[2] = Task.Run(() => { Thread.Sleep(3000); return 3; }); while (tasks.Length > 0) { // Wait for any of the tasks to finsih int i = Task.WaitAny(tasks); Task<int> completedTask = tasks[i]; Console.WriteLine(completedTask.Result); // Convert array to a list var temp = tasks.ToList(); // Remove completed task from list temp.RemoveAt(i); tasks = temp.ToArray(); } }

Parallel Class

public static void Main() { // Takes a few secs to run on 8 core machine Parallel.For(0, 10, i => { Console.WriteLine("start on {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(1000); //only prints hello after thread finsihes Console.WriteLine("finish on {0}", Thread.CurrentThread.ManagedThreadId); }); var numbers = Enumerable.Range(0, 10); Parallel.ForEach(numbers, i => { Thread.Sleep(1000); Console.WriteLine("hello"); }); // 4billion var watch = new Stopwatch(); watch.Start(); ParallelLoopResult result = Parallel.For(0, 4000001000, (long i, ParallelLoopState loopState) => { if (i == 4000000000) { Console.WriteLine("Breaking loop"); loopState.Break(); } }); // 12secs on 8 core laptop Console.WriteLine(watch.Elapsed); }

Fun with parallel.

Async Await

Long running CPU bound tasks can be handed to another thread.  However when waiting for I/O bound work things are different.  We don’t want the thread paused by windows whilst waiting for the UI, as it uses memory, and can’t be used for other tasks.

Async solved this problem (keyword added in C#5).  Compiler makes a state machine.

static void Main() { // Create task and start it. // ... Wait for it to complete. Task task = new Task(ProcessDataAsync); task.Start(); task.Wait(); Console.ReadLine(); } static async void ProcessDataAsync() { // Start the HandleFile method. Task<int> task = HandleFileAsync("C:\\temp\\enable1.txt"); // Control returns here before HandleFileAsync returns. // ... Prompt the user. Console.WriteLine("Please wait patiently while I do something important."); Thread.Sleep(1200); Console.WriteLine("after sleep"); // Wait for the HandleFile task to complete. // ... Display its results. int x = await task; Console.WriteLine("Count: " + x); } static async Task<int> HandleFileAsync(string file) { Console.WriteLine("HandleFile enter"); int count = 0; // Read in the specified file. // ... Use async StreamReader method. using (var reader = new StreamReader(file)) { string v = await reader.ReadToEndAsync(); // ... Process the file data somehow. count += v.Length; Thread.Sleep(500); // ... A slow-running computation. // Dummy code. for (int i = 0; i < 10000; i++) { int x = v.GetHashCode(); if (x == 0) { count--; } } } Console.WriteLine("HandleFile exit"); return count; }

Some interesting async code.  So code coming back in a strange order, but fine.

 

Parallel Language Integrated Query (PLINQ)

public static void Main() { var numbers = Enumerable.Range(0, 10); var parallelResult = numbers.AsParallel() // Results are buffered and sorted so come back in correct order .AsOrdered() .Where(i => i % 2 == 0) .ToArray(); foreach (int i in parallelResult) Console.WriteLine(i); }

As ordered – hurts performance.

public static void Main() { var numbers = Enumerable.Range(0, 20); var parallelResult = numbers .AsParallel() //.AsOrdered() .Where(i => i%2 == 0); //.AsSequential(); //Stops query being done in parallel eg to make sure take(5) works //foreach (int i in parallelResult.Take(5)) foreach (int i in parallelResult) Console.WriteLine(i); }

Not a great example..AsSequential operator to make sure Take doesn’t mess up the order

public static void Main() { var numbers = Enumerable.Range(0, 2000000000); var parallelResult = numbers.AsParallel() .Where(i => i % 20000012 == 0); //parallelResult.ForAll(e => Console.WriteLine(e)); // Same as above - cw writes the current parallelResult.ForAll(Console.WriteLine); Action<string> thing = Console.WriteLine; thing("hello"); }

Interesting use of cw!  Also, on point, iterating in parallel.  It does not need to results before starting to iterate (in contracts to foreach. 
Parallel.Foreach is generally favoured.

Exceptions

public static void Main() { var numbers = Enumerable.Range(0, 20); try { var parallelResult = numbers.AsParallel() //.Where(i => IsEven(i)); .Where(IsEven); parallelResult.ForAll(Console.WriteLine); } catch (AggregateException e) { Console.WriteLine("There where {0} exceptions",e.InnerExceptions.Count); } } public static bool IsEven(int i) { if (i % 10 == 0) throw new ArgumentException("i"); return i % 2 == 0; }

 


Concurrent Collections

BlockCollection<T>..

Adding items is fast, and you can set an upper limit.  Will block the calling thread until there is room.  Actually uses a ConcurrentQueue by default.

public static void Main() { var col = new BlockingCollection<string>(); Task read = Task.Run(() => { while (true) { Console.WriteLine(col.Take()); } }); Task write = Task.Run(() => { while (true) { string s = Console.ReadLine(); if (string.IsNullOrWhiteSpace(s)) break; col.Add(s); } }); write.Wait(); }

ConcurrentBag<T>

A bag of items.  Enables duplicates and no order.

public static void Main() { var bag = new ConcurrentBag<int>(); bag.Add(42); bag.Add(21); int result; if (bag.TryTake(out result)) Console.WriteLine(result); // TryPeek not very useful as anohter thread may have already taken it if (bag.TryPeek(out result)) Console.WriteLine("There is a next item: {0}", result); }

then

public static void Main() { var bag = new ConcurrentBag<int>(); Task.Run(() => { bag.Add(42); Thread.Sleep(1000); bag.Add(21); }); // Only displays 42 as Bag is snapshotted before iterating Task.Run(() => { foreach (int i in bag) Console.WriteLine(i); }).Wait(); }

ConcurrentStack and ConcurrentQueue

Stack is LastIn FirstOut (LIFO) – stack of pancakes

Queue if FirstIn FirstOut (FIFO) – a British queue  :-)

public static void Main() { var stack = new ConcurrentStack<int>(); stack.Push(42); int result; if (stack.TryPop(out result)) Console.WriteLine("Popped: {0}", result); // pushes in that order stack.PushRange(new int[] { 1, 2, 3 }); // only 2 spaces in the array int[] values = new int[2]; stack.TryPopRange(values); // will show 3,2 foreach (int i in values) Console.WriteLine(i); }

asdf

public static void Main() { var queue = new ConcurrentQueue<int>(); queue.Enqueue(42); int result; if (queue.TryDequeue(out result)) Console.WriteLine("Dequeued: {0}", result); }

ConcurrentDictionary

public static void Main() { var dict = new ConcurrentDictionary<string, int>(); if (dict.TryAdd("k1", 42)) { Console.WriteLine("Added"); } if (dict.TryUpdate("k1", 21, 42)) { Console.WriteLine("42 updated to 21"); } dict["k1"] = 42; // Overwrite unconditionally int r1 = dict.AddOrUpdate("k1", 3, (s, i) => i * 2); int r2 = dict.GetOrAdd("k2", 3); }

Lots of functionality!

| | #