IOCP Thread Pooling in C# - Part I
(Page 1 of 3 )
This is the first part of William's two part series on thread pooling in C#. By importing a dll file for IOCP thread support.
When building server
based applications in C#, it is important to have the ability to create
thread pools. Thread pools allow our server to queue and perform work
in the most efficient and scalable way possible. Without thread
pooling we are left with two options.
The first option is to
perform all of the work on a single thread. The second option is to
spawn a thread every time some piece of work needs to be done. For
this article, work is defined as an event that requires the processing
of code. Work may or may not be associated with data, and it is our
job to process all of the work our server receives in the most
efficient and fastest way possible.
As a general rule, if you can accomplish all of the work required with a single thread, then only use a single thread. Having multiple threads performing work at the same time does not necessarily mean our application is getting more work done, or getting work done faster. This is true for many reasons.
For
example, if you spawn multiple threads which attempt to access the same
resource bound to a synchronization object, like a monitor object,
these threads will serialize and fall in line waiting for the resource
to become available. As each thread tries to access the resource, it
has the potential to block, and wait for the thread that owns the
resource to release the resource.
At that point, these
waiting threads are put to sleep, and not getting any work done. In
fact, these waiting threads have caused more work for the operating
system to perform. Now the operating system must task another thread
to perform work, and then determine which thread, waiting for the
resource, may access the resource next, once it becomes available.
If
the threads that need to perform work are sleeping, because they are
waiting for the resource to become available, we have actually created
a performance problem. In this case it would be more efficient to
queue up this work and have a single thread process the queue.
Threads
that start waiting for a resource before other threads, are not
guaranteed to be given the resource first. In diagram A, thread 1
requests access to the resource before thread 2, and thread 2 requests
access to the resource before thread 3. The operating system however
decides to give the resource to thread 1 first, then thread 3, and then
thread 2. This scenario causes work to be performed in an undetermined
order. The possible issues are endless when dealing with
multi-threaded applications.
If
work received can be performed independent of each other, we could
always spawn a thread for processing that piece of work. The problem
here is that an operating system like Windows has severe performance
problems when a large number of threads are created or running at the
same time, waiting to have access to the CPU.
The Windows
operating system needs to manage all of these threads, and compared to
the UNIX operating system, it just doesn’t hold up. If large amounts
of work are issued to the server, this model will most likely cause the
Windows operating system to become overloaded. System performance will
degrade drastically.<>
This article is a case study comparing thread performance between Windows NT and Solaris.
http://www.usenix.org/publications/libr ··· tta.html
In
the .NET framework, the “System.Threading” namespace has a ThreadPool
class. Unfortunately, it is a static class and therefore our server
can only have a single thread pool. This isn’t the only issue. The
ThreadPool class does not allow us to set the concurrency level of the
thread pool.
The concurrency level is the most important
setting when configuring a thread pool. The concurrency level defines
how many threads in the pool may be in an “active state” at the same
time. If we set this parameter correctly, we will have the most
efficient, performance enhanced thread pool for the work being
processed.
Imagine
we have a thread pool with 4 threads and a concurrency level of 1.
Then, three pieces of work are queued up for processing in the pool.
Since the concurrency level for the thread pool is 1, only a single
thread from the pool is activated and given work from the queue. Even
though there are two pieces of work queued up, no other threads are
activated. This is because the concurrency level is set to 1. If the
concurrency level was set to 2, then another thread would have been
activated immediately and given work from the queue. In diagram B we
have thread 1 running and all of the other threads sleeping with two
pieces of work queued.
So
the question exists, why have more than 1 thread in the pool if the
concurrency level is set to 1? If thread 1 in diagram B ever goes to
sleep before it completes its work, another thread from the pool will
be activated. When thread 1 goes to sleep, there are 0 threads
“active” in the pool and it is ok to activate a new thread based on the
concurrency level. In diagram C, we now have thread 1 sleeping and
thread 4 running with one piece of work queued.
Eventually, thread 1 will wake up, and it is possible for thread 4 to still be active. We have 2 threads active in the pool, even though the concurrency level is set to 1. In diagram D, we now have thread 1 and thread 4 running and one piece of work still queued.
The last piece of work in the queue will need to wait until both threads return to a sleeping state. This is because the concurrency level is set to 1. As we can see, even though the concurrency level restricts the number of active threads in the pool at any given time, we could have more active threads then the concurrency level allows. It all depends on the state of the threads in the pool and how fast the threads can complete the work they are processing.
A good rule of thumb is to set the concurrency level to match the number of CPU’s in the system. If the machine our server is running on only has one CPU, then only one thread can be executing at any given time. It will require a task swap to have another thread get CPU time. We want to reduce the number of active threads at any given time to maximize performance. This also leads to scalability. As the number of CPU’s increase, we can increase the concurrency level because there is a CPU to execute that thread. This is a general rule and is always a good starting point for configuring our thread pools.
The bottom line is, if the CPU is available, and there is work to perform, activate a thread. If the CPU is not available, do not activate a thread. One other thing, we need to be careful that we don’t cause a situation where the threads in the pool are constantly being put to sleep for long periods of time during the processing of work. This may cause all of the threads in the pool to constantly be in an active state, defeating the efficiency of the pool and the performance of the server.
The remaining scope of this article will show you how to add IOCP thread pools to your C# server based applications. How to configure the thread pools for your specific application will not be covered. It is suggested to use the general rules as discussed.
This
Win32 API call is used to create an IOCP thread pool. The first
argument will always be set to INVALID_HANDLE_VALUE, which is
0xFFFFFFFF. This tells the operating system this IOCP thread pool is
not linked to a device. The second argument will always be set to 0.
There is no existing IOCP thread pool because we are creating this for
the first time. The third argument will always be null. IOCP Thread Pooling in C# - Part I - The Article System Requirements A
basic understanding of C# is required to follow through the examples
and the classes. Basic concepts of type, properties, threading,
synchronization, and delegates are required. Defining the Problem IOCP
thread support has not been made available to C# developers through the
“System.Threading” namespace. We need to access the Win32 API calls
from the Kernel32.dll. This requires us to write unsafe code. This is
really not a problem, but something that needs to be discussed. Let’s
take a look at the Win32 API calls we need to implement an IOCP thread
pool.
(Page 2 of 3 )
[DllImport("Kernel32", CharSet=CharSet.Auto)]
private
unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32
hExistingCompletionPort, UInt32* puiCompletionKey, UInt32
uiNumberOfConcurrentThreads);
We
do not require a key because we have not associated this IOCP thread
pool with a device. The last argument is the important argument. Here
we define the concurrency level of the thread pool. If we pass a 0 for
this argument the operating system will set the concurrency level to
match the number of CPU’s in the machine. This
option gives us our best chance to be scalable and take advantage of
the number of CPU’s present in the machine. This API call will return
a handle to the newly created IOCP thread pool. If the API call fails,
it will return null. [DllImport("Kernel32", CharSet=CharSet.Auto)] This
Win32 API call is used to close our thread pool. The only argument is
the handle to the IOCP thread pool. This API call will return TRUE or
FALSE if the handle can not be closed. [DllImport("Kernel32", CharSet=CharSet.Auto)] This
Win32 API call is used to post work in the IOCP thread pool queue.
Other threads in our application will make this Win32 API call. The
first argument is the handle to the IOCP thread pool. The second
argument is the size of the data we are posting to the queue. The
third argument is a value or a reference to an object or data structure
we are posting to the queue. The last argument will always be null.
The following diagram shows how the data is associated with the posted
work. In
diagram E, we have two threads actively processing posted work and one
piece of work on the queue waiting for its data to be processed. The
thing to note here is that each piece of work was given a reference to
its specific data. I am calling this variable pData to help describe
what is happening in the IOCP thread pool. The actual name or
structure of this variable is undocumented. When
we make this API call in a C++ application, we can pass the address of
any object in memory we wish, as in diagram E. In C#, we don’t have
the same luxury because of the managed heap. The managed heap is a
contiguous region of address space that contains all of the memory
allocated for reference variables. The heap maintains a pointer that
indicates where the next object is to be allocated, and all allocations
are contiguous from that point. This is much different from the
C-runtime heap. The
C-runtime heap uses a link list of data structures to reference
available memory blocks. For the C-runtime heap to allocate memory, it
must walk through the link list until a large enough block of free
memory is found. Then the free block of memory must be resized, and
the link list adjusted. If
objects are allocated consecutively in a C++ application, those objects
could be allocated anywhere on the heap. This can never happen with
the managed heap. Objects that are allocated consecutively in a C#
application will always be allocated consecutively on the managed
heap. The catch is that the managed heap must be compacted to
guarantee the heap does not run out of memory. That is the job of
garbage collection. For more information on garbage collection, try these links: http://msdn.microsoft.com/msdnmag/issues/1100/GCI/default.aspx http://msdn.microsoft.com/msdnmag/issues/1200/GCI2/default.aspx pClass2 = null; Diagram G shows what happens to the managed heap after garbage collection takes place and the managed heap is compacted. The
Class 2 object has been removed from the managed heap and the Class3
and Class 4 objects have been moved. Now the value of pClass3 is FDDO
and the value of pClass4 is FDCO. The value that the pointer points to
has changed. The garbage collection process changes the values of all
reference variables to make sure they are pointing to the correct
objects after the managed heap is compacted. So
what does this mean for our IOCP thread pool implementation? If we
pass the reference of a managed object as the data for the work, there
is a chance the reference is no longer valid when a thread in the pool
is chosen to work on the data. In
diagram H, we have passed a reference to the Class 3 object as the data
for the work posted to the IOCP thread pool. This object is at address
FDCO. Before the work is given to thread 1, the Class 2 object is
marked for deletion. Then the garbage collection process runs, and the
managed heap is compacted. Now in diagram I, the work has been given
to thread 1 for processing. The value of pData is still FDCO, but
Class 3 is no longer at address FDCO, it is at address FDDO. The
thread will perform the work, but using Class 4 instead of Class 3. The
garbage collection process can not change the value of pData, as it
does with other variables, because this variable is not a managed
variable. It is a variable owned by the IOCP thread pool and exists
outside the scope of the CLR. The garbage collector has no knowledge
of this variable or access to this variable. The variable is set
during the unsafe call to PostQueuedCompletionStatus. Int32 iArray = new Int32[5] {12, 34, 56, 78, 90}; The
safest thing we can do is pass a value to the IOCP thread pool. This
value could be the index from a managed array, containing a reference
to an object on the managed heap. If the garbage collection process
does compact the heap, the index values of the array will not change.
In diagram J and K, we can see one way to properly pass data for the
work posted to the IOCP thread pool. After the garbage collection
process compacts the heap, the values of pData change, but the index
positions to the pData variables do not change. [DllImport("Kernel32", CharSet=CharSet.Auto)] The
final Win32 API call is used to add threads to the IOCP thread pool.
Any thread that makes this Win32 API call will become part of the IOCP
thread pool. This is a blocking call and the method will return when
the IOCP thread pool chooses the thread to perform work. The
first argument is the handle to the IOCP thread pool. The second
argument is the size of the data associated with the work. This value
was provided when the work was posted. The third argument is the data
value or data reference associated with the work. This value was
provided when the work was posted. The forth argument is the address
to a pointer of type OVERLAPPED. This
address is returned after the call. The last argument is the time in
milliseconds the thread should wait to be activated to perform work.
We will always pass INFINITE or 0xFFFFFFFF. These are the Win32 API calls we need to add IOCP thread pool support to our C# server
based applications. We need to encapsulate these Win32 API calls using
.NET threads and minimize the sections of unsafe code. We need to
prevent the application developer from passing a reference variable
into the IOCP thread pool, by restricting them to passing only integer
values. In
part II of this article, we will build a class that encapsulates a
single IOCP thread pool. The application developer will be able to
instantiate as many thread pools as he wishes. During
construction, the application developer will be able to: set the
concurrency level of the thread pool, set the minimum and maximum
number of threads in the pool, and will be able to provide a method to
be called when work posted to the thread pool needs to be processed.
The application developer will also be able to post work with data into
the IOCP thread pool. IOCP Thread Pooling in C# - Part II Defining the Solution We
will build a class that encapsulates a single IOCP thread pool. The
application developer will be able to instantiate as many thread pools
as he wishes. During construction, the application developer will be
able to: set the concurrency level of the thread pool, set the minimum
and maximum number of threads in the pool, and will be able to provide
a method to be called when work posted to the thread pool needs to be
processed. The application developer will also be able to post work
with data into the IOCP thread pool. Component Design and Coding Start
by adding a new class to your C# project. Remove all of the code
provided by the Visual Studio .NET wizard. Then add the following
namespaces. The System.Runtime.InteropServices namespace is required
to access the Win32 API methods from the Kernel32 DLL. using System; Don’t
forget to change the project properties to allow unsafe code blocks.
This can be done by opening the project properties and selecting the
Configuration Properties. Under the Build / Code Generation section
you will see “Allow unsafe code blocks”. Set this to true. Next
add the namespace. You will notice that I have defined a two level
namespace. This is great when you are building a class library with
many different classes. namespace Continuum.Threading The
PostQueuedCompletionStatus and GetQueuedCompletionStats Win32 API
methods both require a pointer to the Win32 OVERLAPPED structure.
Because this structure will be used by the unsafe Win32 API call, we
need to make sure the structure is aligned exactly the same way it
would be in our C++ applications. This can be accomplished by using
the StructLayout attribute. By setting the attribute to
“LayoutKind.Sequential”, the structure will be aligned based on the
same rules as the C++ compiler. The
structure requires members that are pointers. The only way to add
pointers to this structure is to use the unsafe keyword. We can still
use the FCL types when defining the structure. This is very important
because we can make sure the structure is identical to the C++ version. // Structures Now
it is time to define the IOCP thread pool class. I am using the
keyword sealed in the definition of this class. The sealed keyword
tells the compiler that this class can not be inherited. If you know
there is no reason for a class to be inherited, use the sealed
keyword. Certain run-time optimizations are enabled for the class when
the sealed keyword is used. // Classes The first section of the IOCP thread pool class is the Win32 function prototypes. These are the same ones described earlier. // Win32 Function Prototypes /// <summary> Win32Func: Closes an IO Completion Port Thread Pool </summary> /// <summary> Win32Func: Posts a context based event into an IO Completion Port Thread Pool </summary> /// <summary> Win32Func: Waits on a context based event from an IO Completion Port Thread Pool. The
next section is the constants section. Here we need to define the
Win32 constants required for the Win32 API calls we are going to make
later. // Constants /// <summary> SimTypeConst: This represents the Win32 INFINITE Macro </summary> /// <summary> SimTypeConst: This tells the IOCP Function to shutdown </summary> The
delegate function type section is where we define any delegate
functions. We need one delegate function type to define the signature
of the function we will call when work needs to be processed. // Delegate Function Types These
private properties are required to maintain the application developer’s
settings. The most interesting property is the GetUserFunction
property. This property contains a reference to a method supplied by
the application developer. We will use this property to call the
application developers method. // Private Properties private Int32 m_uiMaxConcurrency; private Int32 m_iMinThreadsInPool; private Int32 m_iMaxThreadsInPool; private Object m_pCriticalSection; private USER_FUNCTION m_pfnUserFunction; // Public Properties The
constructor method does several things. The class state is initialized
and then the IOCP thread pool is created with a call to the
CreateIoCompletionPort method. Notice the method call is within the
scope of the unsafe keyword. This is required because we are passing
pointers into the Win32 API call. // Constructor, Finalize, and Dispose The
finalize method is only required to guarantee the IOCP thread pool
handle is closed. As a general rule, if a class allocates a resource
outside the scope of the .NET framework, a finalize method is required
else do not add a finalize method. A finalize method will cause the
garbage collection process to spend more time trying to release the
memory for the object. //*********************************************** The
dispose method will not return until all of the threads in the pool
have been terminated. We can’t use the Abort method to kill the
threads in the pool because any thread blocked, via the call to the
GetQueuedCompletionStatus Win32 API method, will not respond to the
Abort message. //********************************************** The
only private method is the IOCPFunction method. This method is spawned
as a thread and is made part of the IOCP thread pool by calling the
GetQueuedCompletionStatus Win32 API method. When the
GetQueuedCompletionStatus Win32 API method returns, we check to make
sure we are not being asked to shutdown the thread. The third argument
is the data associated with the posted work. If the data is not
SHUTDOWN_IOCPTHREAD, then real work has been posted into the IOCP
thread pool and this thread has been chosen to process the work. // Private Methods The
last two public methods are the PostEvent methods. The first method
takes an integer as an argument and the second version takes no
argument at all. The integer is the data the application developer
wishes to pass with the work posted into the IOCP thread pool. In the
PostQueuedCompletionStatus Win32 API call, we can see that the third
argument is where we pass the data value. Since this value is always
an integer we set the size of the data to four, as seen in the second
argument. Like in the IOCPFunction, we check to see if we need to add
a new thread to the pool. // Public Methods We have now completed the implementation of the IOCP thread pool class. Now it is time to test it. IOCP Thread Pooling in C# - Part II - The Sample Application Start
by adding a new class to your C# project. Remove all of the code
provided by the Visual Studio .NET wizard. Then add all of the
following code. In Main, an IOCP thread pool is created, and a single
piece of work is posted to the IOCP thread pool. We pass the data
value of 10 along with the posted work. The
main thread is then put to sleep. This gives the IOCP thread function
time to wake up to process the work posted. The last thing in main is
to dispose the IOCP thread pool. The IOCP thread function displays the
value of the data passed into the IOCP thread pool. using System; This
is what you should see when you run the sample application. On your
own change the main function to call the PostEvent method several times
and see how the IOCP thread pool performs.
private unsafe static extern Boolean CloseHandle(UInt32 hObject);
private
unsafe static extern Boolean PostQueuedCompletionStatus(UInt32
hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg,
OVERLAPPED* pOverlapped);
MyClass pClass1 = new MyClass();
MyClass pClass2 = new MyClass();
MyClass pClass3 = new MyClass();
MyClass pClass4 = new MyClass();
Now we write the following code.
Unfortunately,
pinning the objects we want to pass as the data for the work posted to
the IOCP thread pool is not a possible solution. Pinning provides the
ability to prevent an object from being moved on the manage heap during
the garbage collection process. We can not pin these objects because
there is no way to pin an object in one thread and unpin the object in
a different thread. To pin an object, we need to use the fixed
keyword. This keyword can only be used in the context of a single
method. Here is a quick example of pinning.
unsafe
{
fixed (Int32* piArray = iArray)
{
// Do Something
}
}
private
static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort,
UInt32* pSizeOfArgument, UInt32* puiUserArg, OVERLAPPED** ppOverlapped,
UInt32 uiMilliseconds);
(Page 3 of 3 )
(Page 1 of 3 )
In part 2, William will continue to explain how the create a class that will handle threads using a IOCP Thread Pool.
using System.Threading;
using System.Runtime.InteropServices;
{
//==========================================
/// <summary> This is the WIN32 OVERLAPPED structure </summary>
[StructLayout(LayoutKind.Sequential, CharSet=CharSet.Auto)]
public unsafe struct OVERLAPPED
{
UInt32* ulpInternal;
UInt32* ulpInternalHigh;
Int32 lOffset;
Int32 lOffsetHigh;
UInt32 hEvent;
}
//============================================
/// <summary> This class provides the ability to create a thread pool to manage work. The
/// class abstracts the Win32 IOCompletionPort API so it requires the use of
/// unmanaged code. Unfortunately the .NET framework does not provide this functionality </summary>
public sealed class IOCPThreadPool
{
/// <summary> Win32Func: Create an IO Completion Port Thread Pool </summary>
[DllImport("Kernel32", CharSet=CharSet.Auto)]
private unsafe static extern UInt32 CreateIoCompletionPort(UInt32
hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32
uiNumberOfConcurrentThreads);
[DllImport("Kernel32", CharSet=CharSet.Auto)]
private unsafe static extern Boolean CloseHandle(UInt32 hObject);
[DllImport("Kernel32", CharSet=CharSet.Auto)]
private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32
hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg,
OVERLAPPED* pOverlapped);
/// All threads in the pool wait in this Win32 Function </summary>
[DllImport("Kernel32", CharSet=CharSet.Auto)]
private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32
hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg,
OVERLAPPED** ppOverlapped, UInt32 uiMilliseconds);
/// <summary> SimTypeConst: This represents the Win32 Invalid Handle Value Macro </summary>
private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff;
private const UInt32 INIFINITE = 0xffffffff;
private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff;
/// <summary> DelType: This is the type of user function to be supplied for the thread pool </summary>
public delegate void USER_FUNCTION(Int32 iValue);
private UInt32 m_hHandle;
/// <summary> SimType: Contains the IO Completion Port Thread Pool handle for this instance </summary>
private UInt32 GetHandle { get { return m_hHandle; } set { m_hHandle = value; } }
/// <summary> SimType: The maximum number of threads that may be running at the same time </summary>
private Int32 GetMaxConcurrency { get { return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } }
/// <summary> SimType: The minimal number of threads the thread pool maintains </summary>
private Int32 GetMinThreadsInPool { get { return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } }
/// <summary> SimType: The maximum number of threads the thread pool maintains </summary>
private Int32 GetMaxThreadsInPool { get { return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } }
/// <summary> RefType: A serialization object to protect the class state </summary>
private Object GetCriticalSection { get { return m_pCriticalSection; } set { m_pCriticalSection = value; } }
/// <summary> DelType: A reference to a user specified function to be call by the thread pool </summary>
private USER_FUNCTION GetUserFunction { get { return m_pfnUserFunction; } set { m_pfnUserFunction = value; } }
private Boolean m_bDisposeFlag;
/// <summary> SimType: Flag to indicate if the class is disposing </summary>
private Boolean IsDisposed { get { return m_bDisposeFlag; } set { m_bDisposeFlag = value; } }
These
public properties are used to determine if new threads need to be added
to the thread pool. These properties also provide statistical data
about the thread pool. Here we use the Interlocked class to provide
serialization when we increment or decrement these properties. This is
the least expensive way to perform serialization.
private Int32 m_iCurThreadsInPool;
/// <summary> SimType: The current number of threads in the thread pool </summary>
public Int32 GetCurThreadsInPool { get { return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } }
/// <summary> SimType: Increment current number of threads in the thread pool </summary>
private Int32 IncCurThreadsInPool() { return Interlocked.Increment(ref m_iCurThreadsInPool); }
/// <summary> SimType: Decrement current number of threads in the thread pool </summary>
private Int32 DecCurThreadsInPool() { return Interlocked.Decrement(ref m_iCurThreadsInPool); }
private Int32 m_iActThreadsInPool;
/// <summary> SimType: The current number of active threads in the thread pool </summary>
public Int32 GetActThreadsInPool { get { return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } }
/// <summary> SimType: Increment current number of active threads in the thread pool </summary>
private Int32 IncActThreadsInPool() { return Interlocked.Increment(ref m_iActThreadsInPool); }
/// <summary> SimType: Decrement current number of active threads in the thread pool </summary>
private Int32 DecActThreadsInPool() { return Interlocked.Decrement(ref m_iActThreadsInPool); }
private Int32 m_iCurWorkInPool;
/// <summary> SimType: The current number of Work posted in the thread pool </summary>
public Int32 GetCurWorkInPool { get { return m_iCurWorkInPool; } set { m_iCurWorkInPool = value; } }
/// <summary> SimType: Increment current number of Work posted in the thread pool </summary>
private Int32 IncCurWorkInPool() { return Interlocked.Increment(ref m_iCurWorkInPool); }
/// <summary> SimType: Decrement current number of Work posted in the thread pool </summary>
private Int32 DecCurWorkInPool() { return Interlocked.Decrement(ref m_iCurWorkInPool); }
The last thing we do is
create the minimal number of threads specified by the application
developer. Notice we use the .NET threading classes to create the
threads. We do not need to use the unsafe CreateThread method. One
might think we need to because these threads will be calling the
GetQueuedCompletionStatus Win32 API method.
//***********************************************
/// <summary> Constructor </summary>
/// <param name = "iMaxConcurrency"> SimType: Max number of running threads allowed </param>
/// <param name = "iMinThreadsInPool"> SimType: Min number of threads in the pool </param>
/// <param name = "iMaxThreadsInPool"> SimType: Max number of threads in the pool </param>
/// <param name = "pfnUserFunction"> DelType: Reference to a function to call to perform work </param>
/// <exception cref = "Exception"> Unhandled Exception </exception>
public IOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool,
Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction)
{
try
{
// Set initial class state
GetMaxConcurrency = iMaxConcurrency;
GetMinThreadsInPool = iMinThreadsInPool;
GetMaxThreadsInPool = iMaxThreadsInPool;
GetUserFunction = pfnUserFunction;
// Init the thread counters
GetCurThreadsInPool = 0;
GetActThreadsInPool = 0;
GetCurWorkInPool = 0;
// Initialize the Monitor Object
GetCriticalSection = new Object();
// Set the disposing flag to false
IsDisposed = false;
unsafe
{
// Create an IO Completion Port for Thread Pool use
GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, null, (UInt32) GetMaxConcurrency);
}
// Test to make sure the IO Completion Port was created
if (GetHandle == 0)
throw new Exception("Unable To Create IO Completion Port");
// Allocate and start the Minimum number of threads specified
Int32 iStartingCount = GetCurThreadsInPool;
ThreadStart tsThread = new ThreadStart(IOCPFunction);
for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread)
{
// Create a thread and start it
Thread thThread = new Thread(tsThread);
thThread.Name = "IOCP " + thThread.GetHashCode();
thThread.Start();
// Increment the thread pool count
IncCurThreadsInPool();
}
}
catch
{
throw new Exception("Unhandled Exception");
}
}
/// <summary> Finalize called by the GC </summary>
~IOCPThreadPool()
{
if (!IsDisposed)
Dispose();
}
The GetQueuedCompletionStatus Win32 API method
will cause the thread to run outside the scope of the CLR and the .NET
framework will lose access to the thread. So what we do is post work
into the IOCP thread pool. We pass the SHUTDOWN_IOCPTHREAD data when
we post the work. This will tell the thread to terminate. Then, we
wait in a spin lock, until all of the threads have terminated. The
last thing is to close the IOCP thread pool.
/// <summary> Called when the object will be shutdown. This
/// function will wait for all of the work to be completed
/// inside the queue before completing </summary>
public void Dispose()
{
try
{
// Flag that we are disposing this object
IsDisposed = true;
// Get the current number of threads in the pool
Int32 iCurThreadsInPool = GetCurThreadsInPool;
// Shutdown all thread in the pool
for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread)
{
unsafe
{
bool bret = PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) SHUTDOWN_IOCPTHREAD, null);
}
}
// Wait here until all the threads are gone
while (GetCurThreadsInPool != 0) Thread.Sleep(100);
unsafe
{
// Close the IOCP Handle
CloseHandle(GetHandle);
}
}
catch
{
}
}
The
application developer’s supplied user function is called since the
application developer is the only one who knows what needs to be done.
Once that is complete, the method checks if a new thread should be
added to the pool. This is done by reviewing the number of active
threads in the pool.
//*******************************************
/// <summary> IOCP Worker Function that calls the specified user function </summary>
private void IOCPFunction()
{
UInt32 uiNumberOfBytes;
Int32 iValue;
try
{
while (true)
{
unsafe
{
OVERLAPPED* pOv;
// Wait for an event
GetQueuedCompletionStatus(GetHandle, &uiNumberOfBytes, (UInt32*) &iValue, &pOv, INIFINITE);
}
// Decrement the number of events in queue
DecCurWorkInPool();
// Was this thread told to shutdown
if (iValue == SHUTDOWN_IOCPTHREAD)
break;
// Increment the number of active threads
IncActThreadsInPool();
try
{
// Call the user function
GetUserFunction(iValue);
}
catch
{
}
// Get a lock
Monitor.Enter(GetCriticalSection);
try
{
// If we have less than max threads currently in the pool
if (GetCurThreadsInPool < GetMaxThreadsInPool)
{
// Should we add a new thread to the pool
if (GetActThreadsInPool == GetCurThreadsInPool)
{
if (IsDisposed == false)
{
// Create a thread and start it
ThreadStart tsThread = new ThreadStart(IOCPFunction);
Thread thThread = new Thread(tsThread);
thThread.Name = "IOCP " + thThread.GetHashCode();
thThread.Start();
// Increment the thread pool count
IncCurThreadsInPool();
}
}
}
}
catch
{
}
// Relase the lock
Monitor.Exit(GetCriticalSection);
// Increment the number of active threads
DecActThreadsInPool();
}
}
catch
{
}
// Decrement the thread pool count
DecCurThreadsInPool();
}
//******************************************
/// <summary> IOCP Worker Function that calls the specified user function </summary>
/// <param name="iValue"> SimType: A value to be passed with the event </param>
/// <exception cref = "Exception"> Unhandled Exception </exception>
public void PostEvent(Int32 iValue)
{
try
{
// Only add work if we are not disposing
if (IsDisposed == false)
{
unsafe
{
// Post an event into the IOCP Thread Pool
PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) iValue, null);
}
// Increment the number of item of work
IncCurWorkInPool();
// Get a lock
Monitor.Enter(GetCriticalSection);
try
{
// If we have less than max threads currently in the pool
if (GetCurThreadsInPool < GetMaxThreadsInPool)
{
// Should we add a new thread to the pool
if (GetActThreadsInPool == GetCurThreadsInPool)
{
if (IsDisposed == false)
{
// Create a thread and start it
ThreadStart tsThread = new ThreadStart(IOCPFunction);
Thread thThread = new Thread(tsThread);
thThread.Name = "IOCP " + thThread.GetHashCode();
thThread.Start();
// Increment the thread pool count
IncCurThreadsInPool();
}
}
}
}
catch
{
}
// Release the lock
Monitor.Exit(GetCriticalSection);
}
}
catch (Exception e)
{
throw e;
}
catch
{
throw new Exception("Unhandled Exception");
}
}
//*****************************************
/// <summary> IOCP Worker Function that calls the specified user function </summary>
/// <exception cref = "Exception"> Unhandled Exception </exception>
public void PostEvent()
{
try
{
// Only add work if we are not disposing
if (IsDisposed == false)
{
unsafe
{
// Post an event into the IOCP Thread Pool
PostQueuedCompletionStatus(GetHandle, 0, null, null);
}
// Increment the number of item of work
IncCurWorkInPool();
// Get a lock
Monitor.Enter(GetCriticalSection);
try
{
// If we have less than max threads currently in the pool
if (GetCurThreadsInPool < GetMaxThreadsInPool)
{
// Should we add a new thread to the pool
if (GetActThreadsInPool == GetCurThreadsInPool)
{
if (IsDisposed == false)
{
// Create a thread and start it
ThreadStart tsThread = new ThreadStart(IOCPFunction);
Thread thThread = new Thread(tsThread);
thThread.Name = "IOCP " + thThread.GetHashCode();
thThread.Start();
// Increment the thread pool count
IncCurThreadsInPool();
}
}
}
}
catch
{
}
// Release the lock
Monitor.Exit(GetCriticalSection);
}
}
catch (Exception e)
{
throw e;
}
catch
{
throw new Exception("Unhandled Exception");
}
}
}
}
(Page 2 of 3 )
using System.Threading; // Included for the Thread.Sleep call
using Continuum.Threading;
namespace Sample
{
//============================================
/// <summary> Sample class for the threading class </summary>
public class UtilThreadingSample
{
//*******************************************
/// <summary> Test Method </summary>
static void Main()
{
// Create the MSSQL IOCP Thread Pool
IOCPThreadPool pThreadPool = new IOCPThreadPool(0, 5, 10, new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction));
pThreadPool.PostEvent(10);
Thread.Sleep(100);
pThreadPool.Dispose();
}
//*****************************************
/// <summary> Function to be called by the IOCP thread pool. Called when
/// a command is posted for processing by the SocketManager </summary>
/// <param name="iValue"> The value provided by the thread posting the event </param>
static public void IOCPThreadFunction(Int32 iValue)
{
try
{
Console.WriteLine("Value: {0}", iValue);
}
catch (Exception pException)
{
Console.WriteLine(pException.Message);
}
}
}
}
- Socket Programming in C# (0)2007/07/27
- Network Programming in C# (0)2007/07/27
- IOCP Thread Pooling in C# (0)2007/07/26
- UDP 프로그래밍의 기초 (0)2007/05/14
- ICMP 프로그래밍 (0)2007/05/14

수안이의 컴퓨터 연구실











Leave your greetings.