using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ProducerConsumerSample
{
class Program
{
static void Main(string[] args)
{
ConsumerQueue queue = new ConsumerQueue();
queue.Start();
new Thread(new ThreadStart(delegate()
{
for (int i = 0; i < 5; i++)
{
int tmp = i;
queue.Enqueue(new Action(() =>
{
Console.WriteLine("Hello #{0} from thread 1", tmp);
}));
}
})).Start();
new Thread(new ThreadStart(delegate()
{
for (int i = 0; i < 5; i++)
{
int tmp = i;
queue.Enqueue(new Action(() =>
{
Console.WriteLine("Hello #{0} from thread 2", tmp);
}));
}
})).Start();
Console.Read();
}
class ConsumerQueue
{
private readonly object _locker = new object();
private Queue<Action> _queue = new Queue<Action>();
public void Start()
{
new Thread(new ThreadStart(this.ConsumerThread)).Start();
}
public void Enqueue(Action action)
{
lock (_locker)
{
_queue.Enqueue(action);
Monitor.Pulse(_locker);
}
}
private void ConsumerThread()
{
while (true)
{
Action action;
lock (_locker)
{
while (_queue.Count == 0)
Monitor.Wait(_locker);
action = _queue.Dequeue();
}
action();
}
}
}
}
}
Hello #0 from thread 1
Hello #1 from thread 1
Hello #2 from thread 1
Hello #3 from thread 1
Hello #4 from thread 1
Hello #0 from thread 2
Hello #1 from thread 2
Hello #2 from thread 2
Hello #3 from thread 2
Hello #4 from thread 2
public void Stop()
{
Enqueue(null);
}
private void ConsumerThread()
{
while (true)
{
Action action;
lock (_locker)
{
while (_queue.Count == 0)
Monitor.Wait(_locker);
action = _queue.Dequeue();
}
if (action == null)
{
//clean exit
return;
}
action();
}
}
while(_queue.Count == 0)
Thread.Sleep(50);
while(_queue.Count == 0)
Monitor.Wait();
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ProducerConsumerSample
{
class Program
{
private static Action _action = new Action(() =>
{
Console.WriteLine();
Console.WriteLine(">>>>>> An action is executing <<<<<<");
Thread.Sleep(1000);
});
static void Main(string[] args)
{
ConsumerQueue queue = new ConsumerQueue();
queue.Start();
queue.WriteLine("Starting to enqueue the first actions");
queue.Enqueue(_action);
queue.Enqueue(_action);
queue.Enqueue(_action);
queue.Enqueue(_action);
queue.WriteLine("Sleeping 14 seconds");
Thread.Sleep(TimeSpan.FromSeconds(14));
Console.WriteLine();
queue.WriteLine("Starting to enqueue the second actions and stopping the queue right after");
queue.Enqueue(_action);
queue.Enqueue(_action);
queue.Enqueue(_action);
queue.Enqueue(_action);
queue.Stop();
queue.WriteLine("Main() stopped.");
Console.Read();
}
class ConsumerQueue
{
private readonly object _locker = new object();
private Queue<Action> _queue = new Queue<Action>();
public void Start()
{
new Thread(new ThreadStart(this.ConsumerThread)).Start();
}
public void Stop()
{
Enqueue(null);
}
public void Enqueue(Action action)
{
lock (_locker)
{
WriteLine("ConsumerThread is enqueuing an action");
_queue.Enqueue(action);
Monitor.Pulse(_locker);
}
}
private void ConsumerThread()
{
while (true)
{
Action action;
lock (_locker)
{
WriteLine("ConsumerThread is waiting for job!");
while (_queue.Count == 0)
Monitor.Wait(_locker);
WriteLine("ConsumerThread woke up, let's do some work!");
action = _queue.Dequeue();
}
if (action == null)
{
WriteLine("ConsumerThread action is null. A clean exit is requested");
//clean exit
return;
}
WriteLine("ConsumerThread is executing an action");
action();
}
}
public void WriteLine(string text)
{
Console.WriteLine("{0}: {1}", DateTime.Now, text);
}
}
}
}
10/09/2011 4:22:36 PM: Starting to enqueue the first actions
10/09/2011 4:22:36 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:36 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:36 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:36 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:36 PM: ConsumerThread is executing an action
10/09/2011 4:22:36 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:36 PM: ConsumerThread is enqueuing an action
>>>>>> An action is executing <<<<<<
10/09/2011 4:22:36 PM: Sleeping 14 seconds
10/09/2011 4:22:37 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:37 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:37 PM: ConsumerThread is executing an action
>>>>>> An action is executing <<<<<<
10/09/2011 4:22:38 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:38 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:38 PM: ConsumerThread is executing an action
>>>>>> An action is executing <<<<<<
10/09/2011 4:22:39 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:39 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:39 PM: ConsumerThread is executing an action
>>>>>> An action is executing <<<<<<
10/09/2011 4:22:40 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:50 PM: Starting to enqueue the second actions and stopping the q
ueue right after
10/09/2011 4:22:50 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:50 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:50 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:50 PM: ConsumerThread is executing an action
>>>>>> An action is executing <<<<<<
10/09/2011 4:22:50 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:50 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:50 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:50 PM: Main() stopped.
10/09/2011 4:22:51 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:51 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:51 PM: ConsumerThread is executing an action
>>>>>> An action is executing <<<<<<
10/09/2011 4:22:52 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:52 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:52 PM: ConsumerThread is executing an action
>>>>>> An action is executing <<<<<<
10/09/2011 4:22:53 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:53 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:53 PM: ConsumerThread is executing an action
>>>>>> An action is executing <<<<<<
10/09/2011 4:22:54 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:54 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:54 PM: ConsumerThread action is null. A clean exit is requested
while (_queue.Count == 0)
{
Console.WriteLine("Pas de données à traiter.");
Monitor.Wait(_locker);
}
public static class Monitor
{
private static locked = false;
public static void Wait()
{
locked = true;
while(locked)
Thread.Sleep(1);
}
public static void Pulse()
{
locked = false;
}
}
private void Listening()
{
while (this.Socket.Connected)
{
lock (this)
{
if (this.Socket.Available > 0)
{
byte[] buffer = new byte[this.Socket.Available];
this.Socket.Receive(buffer);
this.m_Queue.Enqueue(buffer);
Monitor.Pulse(this);
}
}
}
}
private void QueueTreatment()
{
while (true)
{
lock (this)
{
while (this.m_Queue.Count == 0)
Monitor.Wait(this);
this.ParseData(this.m_Queue.Dequeue());
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace DeadLockExample
{
class Program
{
static void Main(string[] args)
{
ThisLockedObject thisLockedObject1 = new ThisLockedObject();
ThisLockedObject thisLockedObject2 = new ThisLockedObject();
new Thread(new ThreadStart(delegate()
{
Console.WriteLine("Locking thisLockedObject2");
lock (thisLockedObject2)
{
Console.WriteLine("Performing action on thisLockedObject1");
thisLockedObject1.DoSomethingThatLockThis();
}
})).Start();
new Thread(new ThreadStart(delegate()
{
Console.WriteLine("Locking thisLockedObject1");
lock (thisLockedObject1)
{
Console.WriteLine("Performing action on thisLockedObject2");
thisLockedObject2.DoSomethingThatLockThis();
}
})).Start();
Console.Read();
}
class ThisLockedObject
{
public void DoSomethingThatLockThis()
{
Console.WriteLine("ThisLockedObject acquiring lock on self");
lock (this)
{
Thread.Sleep(1000);
}
Console.WriteLine("ThisLockedObject done");
}
}
}
}
Locking thisLockedObject2
Performing action on thisLockedObject1
Locking thisLockedObject1
Performing action on thisLockedObject2
ThisLockedObject acquiring lock on self
ThisLockedObject acquiring lock on self
class ThisLockedObject
{
private readonly object _locker = new object();
public void DoSomethingThatLockThis()
{
Console.WriteLine("ThisLockedObject acquiring lock on self");
lock (_locker)
{
Thread.Sleep(1000);
}
Console.WriteLine("ThisLockedObject done");
}
}
this.ParseData(this.m_Queue.Dequeue());
private void QueueTreatment()
{
while (true)
{
object itemInQueue;
lock (_locker)
{
while (this.m_Queue.Count == 0)
Monitor.Wait(_locker);
itemInQueue = this.m_Queue.Dequeue();
}
this.ParseData(itemInQueue);
}
}
private void Listening()
{
while (this.Socket.Connected)
{
lock (this.m_Locker)
{
if (this.Socket.Available > 0)
{
byte[] buffer = new byte[this.Socket.Available];
this.Socket.Receive(buffer);
this.m_Queue.Enqueue(buffer);
Monitor.Pulse(this.m_Locker);
}
}
Thread.Sleep(this.ThreadsSleep);
}
}
private void QueueTreatment()
{
byte[] buffer;
while (true)
{
lock (this.m_Locker)
{
while (this.m_Queue.Count == 0)
Monitor.Wait(this.m_Locker);
buffer = this.m_Queue.Dequeue();
}
this.ParseData(buffer);
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using NLog;
using Stump.Core.Attributes;
using Stump.Core.Collections;
using Stump.Core.Pool;
using Stump.Core.Reflection;
namespace Stump.Server.BaseServer.Network
{
public class ClientManager : Singleton<ClientManager>
{
private readonly Logger logger = LogManager.GetCurrentClassLogger();
#region Config Variables
/// <summary>
/// Max number of clients connected
/// </summary>
[Variable]
public static int MaxConcurrentConnections = 2000;
/// <summary>
/// Max number of clients waiting for a connection
/// </summary>
[Variable]
public static int MaxPendingConnections = 100;
/// <summary>
/// Max number of clients connected on the same IP or NULL for unlimited
/// </summary>
[Variable]
public static int? MaxIPConnexions = 10;
/// <summary>
/// Min interval between two received message or NULL for unlimited ( in ms )
/// </summary>
[Variable]
public static int? MinMessageInterval = 1;
/// <summary>
/// Buffer size /!\ Advanced users only /!\
/// </summary>
[Variable]
public static int BufferSize = 8192;
#endregion
#region Events
public event Action<BaseClient> ClientConnected;
private void NotifyClientConnected(BaseClient client)
{
Action<BaseClient> handler = ClientConnected;
if (handler != null) handler(client);
}
public event Action<BaseClient> ClientDisconnected;
private void NotifyClientDisconnected(BaseClient client)
{
Action<BaseClient> handler = ClientDisconnected;
if (handler != null) handler(client);
}
#endregion
public delegate BaseClient CreateClientHandler(Socket clientSocket);
private CreateClientHandler m_createClientDelegate;
private readonly Socket m_listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream,
ProtocolType.Tcp);
private readonly ConcurrentList<BaseClient> m_clients = new ConcurrentList<BaseClient>();
private BufferManager m_bufferManager; // allocate memory dedicated to a client to avoid memory alloc on each send/recv
private SocketAsyncEventArgsPool m_readAsyncEventArgsPool; // pool of SocketAsyncEventArgs
private SocketAsyncEventArgsPool m_writeAsyncEventArgsPool;
private SocketAsyncEventArgs m_acceptArgs = new SocketAsyncEventArgs(); // async arg used on client connection
private SemaphoreSlim m_semaphore; // limit the number of threads accessing to a ressource
/// <summary>
/// List of connected Clients
/// </summary>
public IEnumerable<BaseClient> Clients
{
get { return m_clients; }
}
public bool Paused
{
get;
private set;
}
public bool IsInitialized
{
get;
private set;
}
public bool Started
{
get;
private set;
}
public string Host
{
get;
private set;
}
public int Port
{
get;
private set;
}
/// <summary>
/// Represents a queue containing received messages to treat
/// </summary>
internal MessageQueue MessageQueue
{
get;
private set;
}
internal void Initialize(CreateClientHandler createClientHandler)
{
if (IsInitialized)
throw new Exception("ClientManager already initialized");
m_createClientDelegate = createClientHandler;
MessageQueue = new MessageQueue(Settings.EnableBenchmarking);
// init semaphore
m_semaphore = new SemaphoreSlim(MaxConcurrentConnections, MaxConcurrentConnections);
// init buffer manager
m_bufferManager = new BufferManager(MaxConcurrentConnections * BufferSize, BufferSize);
m_bufferManager.InitializeBuffer();
// initialize read pool
m_readAsyncEventArgsPool = new SocketAsyncEventArgsPool(MaxConcurrentConnections);
for (var i = 0; i < MaxConcurrentConnections; i++)
{
var args = new SocketAsyncEventArgs();
m_bufferManager.SetBuffer(args);
args.Completed += OnReceiveCompleted;
m_readAsyncEventArgsPool.Push(args);
}
// initialize write pool
m_writeAsyncEventArgsPool = new SocketAsyncEventArgsPool(MaxConcurrentConnections * 2);
for (var i = 0; i < MaxConcurrentConnections * 2; i++)
{
var args = new SocketAsyncEventArgs();
args.Completed += OnSendCompleted;
m_writeAsyncEventArgsPool.Push(args);
}
m_acceptArgs.Completed += (sender, e) => ProcessAccept(e);
IsInitialized = true;
}
/// <summary>
/// Start to listen client connections
/// </summary>
public void Start(string address, int port)
{
if (!IsInitialized)
throw new Exception("Attempt to start ClientManager before initializing it. Call Initialize()");
if (Started)
throw new Exception("ClientManager already started");
Host = address;
Port = port;
var ipEndPoint = new IPEndPoint(Dns.GetHostAddresses(Host).First(ip => ip.AddressFamily == AddressFamily.InterNetwork), Port);
m_listenSocket.Bind(ipEndPoint);
m_listenSocket.Listen(MaxPendingConnections);
Started = true;
StartAccept();
}
/// <summary>
/// Pause the listener and reject all new connections
/// </summary>
public void Pause()
{
Paused = true;
}
/// <summary>
/// Resume the actual pause
/// </summary>
public void Resume()
{
Paused = false;
}
/// <summary>
/// Close the listener and dispose ressources
/// </summary>
public void Close()
{
m_listenSocket.Close();
m_listenSocket.Dispose();
m_bufferManager.Dispose();
m_writeAsyncEventArgsPool.Dispose();
m_readAsyncEventArgsPool.Dispose();
}
private void StartAccept()
{
m_acceptArgs.AcceptSocket = null;
m_semaphore.Wait();
// raise or not the event depending on AcceptAsync return
if (!m_listenSocket.AcceptAsync(m_acceptArgs))
{
ProcessAccept(m_acceptArgs);
}
}
/// <summary>
/// Called when a new client is connecting
/// </summary>
/// <param name="e"></param>
private void ProcessAccept(SocketAsyncEventArgs e)
{
// do not accept connections while pausing
if (Paused)
return;
if (MaxIPConnexions.HasValue &&
CountClientWithSameIp(( (IPEndPoint) e.AcceptSocket.RemoteEndPoint ).Address) > MaxIPConnexions.Value)
{
logger.Error("Client {0} try to connect more {1} time", e.AcceptSocket.RemoteEndPoint.ToString(), MaxIPConnexions.Value);
m_semaphore.Release();
StartAccept();
return;
}
// use a async arg from the pool avoid to re-allocate memory on each connection
SocketAsyncEventArgs readAsyncEventArgs = PopReadSocketAsyncArgs();
// create the client instance
var client = m_createClientDelegate(e.AcceptSocket);
readAsyncEventArgs.UserToken = client;
m_clients.Add(client);
NotifyClientConnected(client);
// if the event is not raised we first check new connections before parsing message that can blocks the connection queue
if (!client.Socket.ReceiveAsync(readAsyncEventArgs))
{
StartAccept();
ProcessReceive(readAsyncEventArgs);
}
else
{
StartAccept();
}
}
private void OnReceiveCompleted(object sender, SocketAsyncEventArgs e)
{
try
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessReceive(e);
break;
case SocketAsyncOperation.Disconnect:
CloseClientSocket(e);
break;
default:
break;
}
}
catch (Exception exception)
{
// theoretically it shouldn't go up to there.
logger.Error("Last chance exception on receiving ! : " + exception);
}
}
private void ProcessReceive(SocketAsyncEventArgs e)
{
if (e.BytesTransferred <= 0 || e.SocketError != SocketError.Success)
{
CloseClientSocket(e);
}
else
{
var client = e.UserToken as BaseClient;
if (client == null)
{
CloseClientSocket(e);
}
else
{
client.ProcessReceive(e.Buffer, e.Offset, e.BytesTransferred);
if (client.Socket == null)
{
CloseClientSocket(e);
}
else
{
// just continue to receive
bool willRaiseEvent = client.Socket.ReceiveAsync(e);
if (!willRaiseEvent)
{
ProcessReceive(e);
}
}
}
}
}
private void CloseClientSocket(SocketAsyncEventArgs e)
{
var client = e.UserToken as BaseClient;
if (client != null)
{
client.Disconnect();
m_clients.Remove(client);
NotifyClientDisconnected(client);
}
m_semaphore.Release();
// free the SocketAsyncEventArg so it can be reused by another client
PushReadSocketAsyncArgs(e);
}
private void OnSendCompleted(object sender, SocketAsyncEventArgs e)
{
// free an async arg on the pool when a send is completed
PushWriteSocketAsyncArgs(e);
}
internal SocketAsyncEventArgs PopWriteSocketAsyncArgs()
{
if (m_writeAsyncEventArgsPool.Count <= 0)
throw new Exception("The writer async argument pool is empty");
return m_writeAsyncEventArgsPool.Pop();
}
internal void PushWriteSocketAsyncArgs(SocketAsyncEventArgs args)
{
m_writeAsyncEventArgsPool.Push(args);
}
internal SocketAsyncEventArgs PopReadSocketAsyncArgs()
{
if (m_readAsyncEventArgsPool.Count <= 0)
throw new Exception("The reader async argument pool is empty");
return m_readAsyncEventArgsPool.Pop();
}
internal void PushReadSocketAsyncArgs(SocketAsyncEventArgs args)
{
m_readAsyncEventArgsPool.Push(args);
}
public IEnumerable<BaseClient> FindAll(Predicate<BaseClient> predicate)
{
return m_clients.Where(entry => predicate(entry));
}
public IEnumerable<T> FindAll<T>(Predicate<T> predicate)
{
return m_clients.OfType<T>().Where(entry => predicate(entry));
}
public int CountClientWithSameIp(IPAddress ipAddress)
{
return m_clients.Count(t => t.Socket != null && ( (IPEndPoint)t.Socket.RemoteEndPoint ).Address == ipAddress);
}
}
}