Autre [MUDOWN][DotNet] D2Com, facilitez la communication D2 :)

  • Auteur de la discussion Anonymous
  • Date de début
A

Anonymous

Invité
#41
Re: [DotNet] D2Com, facilitez la communication D2 :)

Super ;) il utiliser 90% ce mon CPU voire plus ! Avec 8 theard en route je comprend pas .. ça vener d’où sinon ?!
 
A

Anonymous

Invité
#42
Re: [DotNet] D2Com, facilitez la communication D2 :)

Une connexion utilise deux threads : un thread qui écoute en permanence la socket et dès que des données sont reçues, on la met en file d'attente. un deuxième thread qui traite donc en permanence la file d'attente. Ces deux threads sont en boucle presque infinie. Ce qui les faisaient s’exécuter le plus de fois possible lorsqu'ils ont le jeton. Et donc ça alourdit énormément le processeur.
J'ai juste placé une temporisation de 50ms après chaque tour de boucle histoire de temporiser le thread ce qui permet de passer le jeton à un autre processus afin de ne pas faire calculer le processeur pour rien.

Et voilà comment gagner près de 75% de ressources consommées :)
Vous allez dire, pourquoi pas augmenter encore plus cette temporisation, en effet, on aurait pu le faire, mais j'ai remarqué qu'à partir de 125ms de temporisation, le gain n'était plus trop important pour que ça soit rentable sur le ralentissement du traitement des données. 50ms me parait un bon compromis.
J'instaurerais surement un paramètre à définir dans une propriété pour cette temporisation dans une version future si le besoin l'est ;)
 
A

Anonymous

Invité
#43
Re: [DotNet] D2Com, facilitez la communication D2 :)

Je voit !
Un theard.sleep que tu as fait ou autre ?
 
A

Anonymous

Invité
#44
Re: [DotNet] D2Com, facilitez la communication D2 :)

Thread.Sleep oui :)

EDIT : OMG ! Tu as 666 messages, le nombre du diable !!!
 
A

Anonymous

Invité
#45
Re: [DotNet] D2Com, facilitez la communication D2 :)

Ah ouai lol bien vu !
 
A

Anonymous

Invité
#46
Re: [DotNet] D2Com, facilitez la communication D2 :)

As-tu besoin d'un coup de main sur ton multithread raphy? (pas besoin de sleep)
 
A

Anonymous

Invité
#47
Re: [DotNet] D2Com, facilitez la communication D2 :)

Pas besoin de sleep c'est à dire ?
J'ai essayé aussi les priorités des threads, ça fonctionnait pas...
Je suis ouvert à toutes suggestions ;)
 
A

Anonymous

Invité
#48
Re: [DotNet] D2Com, facilitez la communication D2 :)

Pour ton thread d'écoute socket, si tu utilises un thread pour faire ça alors tu n'as pas le choix de faire un sleep dans ta boucle while(!Socket.Available). Une autre option (plus performante) est d'utilisé BeginReceive et EndReceive (ou ReceiveAsync qui est plus compliqué mais plus performant encore). De cette manière, aucun sleep n'est nécessaire durant l'écoute du socket (tout est gérer par le framework). J'ai fait un tuto la dessus il y a un moment dans la section c#.

Pour ce qui est du thread qui traite les données reçues, il est possible d'utilisé un système de producer/consumer qui ne nécessite pas de sleep. Voici je t'ai préparer un exemple. Dépendemment de tes besoins, tu devra peut-être modifier un peu le tout mais le principe de base reste semblable.

Code:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ProducerConsumerSample
{
    class Program
    {
        static void Main(string[] args)
        {
            ConsumerQueue queue = new ConsumerQueue();

            queue.Start();

            new Thread(new ThreadStart(delegate()
                {
                    for (int i = 0; i < 5; i++)
                    {
                        int tmp = i;
                        queue.Enqueue(new Action(() =>
                        {
                            Console.WriteLine("Hello #{0} from thread 1", tmp);
                        }));
                    }
                })).Start();

            new Thread(new ThreadStart(delegate()
                {
                    for (int i = 0; i < 5; i++)
                    {
                        int tmp = i;
                        queue.Enqueue(new Action(() =>
                        {
                            Console.WriteLine("Hello #{0} from thread 2", tmp);
                        }));
                    }
                })).Start();

            Console.Read();
        }

        class ConsumerQueue
        {

            private readonly object _locker = new object();
            private Queue<Action> _queue = new Queue<Action>();

            public void Start()
            {
                new Thread(new ThreadStart(this.ConsumerThread)).Start();
            }

            public void Enqueue(Action action)
            {
                lock (_locker)
                {
                    _queue.Enqueue(action);
                    Monitor.Pulse(_locker);
                }
            }

            private void ConsumerThread()
            {
                while (true)
                {
                    Action action;

                    lock (_locker)
                    {
                        while (_queue.Count == 0)
                            Monitor.Wait(_locker);
                        action = _queue.Dequeue();
                    }

                    action();
                }
            }

        }
    }
}

output:

Code:
Hello #0 from thread 1
Hello #1 from thread 1
Hello #2 from thread 1
Hello #3 from thread 1
Hello #4 from thread 1
Hello #0 from thread 2
Hello #1 from thread 2
Hello #2 from thread 2
Hello #3 from thread 2
Hello #4 from thread 2

Tu peux alors mettre les données reçues du socket dans ta queue et les gérer à la place du "action()". Même si çelà nécessite du temps de traitement, les données vont continuer à s'accumuler dans la queue et seront traités en ordre FIFO (First in first out). Quand il n'y a plus de données à traiter, le ConsumerThread va bloquer sur Monitor.Wait jusqu'à ce qu'il recoive un signale avec Monitor.Pulse.
 
A

Anonymous

Invité
#49
Re: [DotNet] D2Com, facilitez la communication D2 :)

Pourquoi : int tmp = i; ?
la variable i peut être utilisé dans la boucle for :p

Ensuite je ne comprends pas d'où viendrait la performance en jsute créant une classe qui gère le thread de traitement de la file d'attente. Ok tu as ajouté un lock pour ne pas interrompre les instructions, mais je ne vois pas où le processeur va moins calculer puisque tu es en boucle infinie.
De plus, bon okay c'est un exemple :p mais là tu n'as aucun contrôle sur tes threads afin de les arrêter etc. Et il est évidement pour avoir un code maintenable, il est mieux de créer une méthode dédié au lieux de passer par une delegate :p

Peut être que j'ai mal compris, je t'explique ce que j'ai compris :
Dans la méthode Main, tu commences par lancer la classe de traitement de la file d'attente puis tu simules des données en file d'attente dans des threads différents.
Dans la classe ConsumerThread, tu as une méthode Start pour lancer le thread de traitement de la file d'attente déléguant la méthode ConsumerThread qui en boucle infinie traite toutes les données en fuile d'attentes jusqu'à ce qu'il en ai plus mais avec le lock permettant ainsi d'assurer l’exécution complète de la boucle avant de passer le jeton.

Mais actuellement je fonctionne dans la théorie identiquement sauf que j'ai pas mis de lock car ça me sert pas plus que ça.
Pourrais tu m'expliquer d'avantage l'apport en performance de cette méthode ?
 
A

Anonymous

Invité
#50
Re: [DotNet] D2Com, facilitez la communication D2 :)

Aucun prob Raphy je vais tout t'expliquer ca. Premièrement, je voudrais préciser que ceci est seulement un exemple créer en 5 minutes pour démontrer une fonctionnalité. Tu auras inévitablement besoin d'adapter tout ça.

Le tmp = i est utilisé pour garder la valeure de i et la passer au thread. si tu essaie cet exemple en l'enlevant, tu verras que tout les Console.WriteLine() diront: Hello #5 from thread 1. La raison de ce comportement est que la boucle finie avant que le consumer appeles action(), donc la valeure de i est de 5 à ce moment.

Pour l'arrêt d'un consumer comme celui-ci, il est généralement conseiller d'envoyer une valeure null pour terminer l'execution en beauté (Thread.Abort étant dangereux). Par exemple:

Code:
            public void Stop()
            {
                Enqueue(null);
            }

            private void ConsumerThread()
            {
                while (true)
                {
                    Action action;

                    lock (_locker)
                    {
                        while (_queue.Count == 0)
                            Monitor.Wait(_locker);
                        action = _queue.Dequeue();
                    }

                    if (action == null)
                    {
                        //clean exit
                        return;
                    }

                    action();
                }
            }

"il est mieux de créer une méthode dédié au lieux de passer par une delegate": Entièrement daccord, j'ai fait ça pour des raisons de simplicité de l'exemple seulement.

Pour expliquer le gain de performances, c'est un peu plus complexe. Je vais essayer de faire de mon mieux. Premièrement, l'approche que tu utilises est nommé "variable polling", choses déconseiller pour des raisons de performances. Par exemple:

Code:
while(_queue.Count == 0)
  Thread.Sleep(50);

Ici, le code va vérifier la valeure de Count à chaque 50ms, d'où une perte de performance (si tu enleves le sleep c'est encore pire). Avec mon l'exemple:

Code:
while(_queue.Count == 0)
  Monitor.Wait();

Ici, nous avons un scénario totalement différent. Si l'évaluation de de Count est égale à 0, le thread va entrer en sleep pour un laps de temps indéterminer (relàchant temporairement le lock grâce à Monitor.Wait()). La prochaine fois que _queue.Count sera évaluer, il sera obligatoirement ­plus grand que 0. Le thread va être en sleep jusqu'à ce que Monitor.Pulse de la méthode Enqueue le réveil. Le principe est semblable à çelui d'un AutoResetEvent.

Donc en gros, la différence est que l'exemple que je t'ai donné ne réévalue pas la condition sans arrête mais seulement quand çelà est nécessaire. J'ai modifié l'exemple légèrement pour essayer d'illustrer ce que tu ne comprends pas (sans les threads dans le main pour simplifier le tout):

Code:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ProducerConsumerSample
{
    class Program
    {

        private static Action _action = new Action(() =>
            {
                Console.WriteLine();
                Console.WriteLine(">>>>>> An action is executing <<<<<<");
                Thread.Sleep(1000);
            });

        static void Main(string[] args)
        {
            ConsumerQueue queue = new ConsumerQueue();

            queue.Start();

            queue.WriteLine("Starting to enqueue the first actions");

            queue.Enqueue(_action);
            queue.Enqueue(_action);
            queue.Enqueue(_action);
            queue.Enqueue(_action);

            queue.WriteLine("Sleeping 14 seconds");
            Thread.Sleep(TimeSpan.FromSeconds(14));

            Console.WriteLine();
            queue.WriteLine("Starting to enqueue the second actions and stopping the queue right after");

            queue.Enqueue(_action);
            queue.Enqueue(_action);
            queue.Enqueue(_action);
            queue.Enqueue(_action);

            queue.Stop();

            queue.WriteLine("Main() stopped.");

            Console.Read();
        }



        class ConsumerQueue
        {

            private readonly object _locker = new object();
            private Queue<Action> _queue = new Queue<Action>();

            public void Start()
            {
                new Thread(new ThreadStart(this.ConsumerThread)).Start();
            }

            public void Stop()
            {
                Enqueue(null);
            }

            public void Enqueue(Action action)
            {
                lock (_locker)
                {
                    WriteLine("ConsumerThread is enqueuing an action");
                    _queue.Enqueue(action);
                    Monitor.Pulse(_locker);
                }
            }

            private void ConsumerThread()
            {
                while (true)
                {
                    Action action;

                    lock (_locker)
                    {
                        WriteLine("ConsumerThread is waiting for job!");   
                        while (_queue.Count == 0)
                            Monitor.Wait(_locker);
                        WriteLine("ConsumerThread woke up, let's do some work!");
                        action = _queue.Dequeue();
                    }

                    if (action == null)
                    {
                        WriteLine("ConsumerThread action is null. A clean exit is requested");
                        //clean exit
                        return;
                    }

                    WriteLine("ConsumerThread is executing an action");
                    action();
                }
            }

            public void WriteLine(string text)
            {
                Console.WriteLine("{0}: {1}", DateTime.Now, text);
            }

        }
    }
}

et le nouveau output:

Code:
10/09/2011 4:22:36 PM: Starting to enqueue the first actions
10/09/2011 4:22:36 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:36 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:36 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:36 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:36 PM: ConsumerThread is executing an action
10/09/2011 4:22:36 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:36 PM: ConsumerThread is enqueuing an action

>>>>>> An action is executing <<<<<<
10/09/2011 4:22:36 PM: Sleeping 14 seconds
10/09/2011 4:22:37 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:37 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:37 PM: ConsumerThread is executing an action

>>>>>> An action is executing <<<<<<
10/09/2011 4:22:38 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:38 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:38 PM: ConsumerThread is executing an action

>>>>>> An action is executing <<<<<<
10/09/2011 4:22:39 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:39 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:39 PM: ConsumerThread is executing an action

>>>>>> An action is executing <<<<<<
10/09/2011 4:22:40 PM: ConsumerThread is waiting for job!

10/09/2011 4:22:50 PM: Starting to enqueue the second actions and stopping the q
ueue right after
10/09/2011 4:22:50 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:50 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:50 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:50 PM: ConsumerThread is executing an action

>>>>>> An action is executing <<<<<<
10/09/2011 4:22:50 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:50 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:50 PM: ConsumerThread is enqueuing an action
10/09/2011 4:22:50 PM: Main() stopped.
10/09/2011 4:22:51 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:51 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:51 PM: ConsumerThread is executing an action

>>>>>> An action is executing <<<<<<
10/09/2011 4:22:52 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:52 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:52 PM: ConsumerThread is executing an action

>>>>>> An action is executing <<<<<<
10/09/2011 4:22:53 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:53 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:53 PM: ConsumerThread is executing an action

>>>>>> An action is executing <<<<<<
10/09/2011 4:22:54 PM: ConsumerThread is waiting for job!
10/09/2011 4:22:54 PM: ConsumerThread woke up, let's do some work!
10/09/2011 4:22:54 PM: ConsumerThread action is null. A clean exit is requested
 
A

Anonymous

Invité
#51
Re: [DotNet] D2Com, facilitez la communication D2 :)

Ah oui j'ai tout compris :)
Donc en gros, on fait un sorte de "Thread.Sleep" dynamique pour alléger le CPU.

Pourquoi toujours utiliser "_locker" au lieu d'un "this" ? Ça revient au même non ? Le but est de sécuriser l'instance actuelle de ConsumerThread.

Je n'ai pas testé le code, mais si tu mets :
Code:
while (_queue.Count == 0)
{
   Console.WriteLine("Pas de données à traiter.");
   Monitor.Wait(_locker);
}
Cela va bien afficher qu'une seule fois "Pas de données à traiter." ?

Donc on pourrait assimiler la classe Monitor de cette façon :
Code:
public static class Monitor
{
	private static locked = false;
	
	public static void Wait()
	{
		locked = true;
		while(locked)
			Thread.Sleep(1);
	}
	
	public static void Pulse()
	{
		locked = false;
	}
}
Ce n'est évidement qu'un schéma pour savoir si j'ai bien compris le système.

Si comme je peux le penser j'ai compris, alors oui, ça peut vraiment être intéressant implémenter ça dans D2Com :)
En tout cas je te remercie infiniment pour cette explication, je me coucherais moins bête ce soir :)

EDIT : J'ai essayé d’intégrer donc ce système, peux tu me dire ci c'est tout de même optimisé de cette façon :
Je n'ai pas ajouté les autres méthodes qui permettent entre autres de lancer les threads, initialiser la socket etc.
A noter aussi que je ne parle que de la file d'attente, j'optimiserais l'écoute de la socket plus tard :p
Code:
private void Listening()
{
	while (this.Socket.Connected)
	{
		lock (this)
		{
			if (this.Socket.Available > 0)
			{
				byte[] buffer = new byte[this.Socket.Available];
				this.Socket.Receive(buffer);
				this.m_Queue.Enqueue(buffer);
				Monitor.Pulse(this);
			}
		}
	}
}

private void QueueTreatment()
{
	while (true)
	{
		lock (this)
		{
			while (this.m_Queue.Count == 0)
				Monitor.Wait(this);
			this.ParseData(this.m_Queue.Dequeue());
		}
	}
}
 
A

Anonymous

Invité
#52
Re: [DotNet] D2Com, facilitez la communication D2 :)

Il ne faut jamais utiliser lock(this), cette pratique semble répandue mais est vraiment mauvaise. Le fait d'obtenir un lock sur this, pourrait résulter sur un deadlock ou un race condition car une classe externe pourrait obtenir un lock sur ton propre objet. Encore du code (je suis en feu!!):

Code:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace DeadLockExample
{
    class Program
    {
        static void Main(string[] args)
        {
            ThisLockedObject thisLockedObject1 = new ThisLockedObject();
            ThisLockedObject thisLockedObject2 = new ThisLockedObject();

            new Thread(new ThreadStart(delegate()
                {
                    Console.WriteLine("Locking thisLockedObject2");
                    lock (thisLockedObject2)
                    {
                        Console.WriteLine("Performing action on thisLockedObject1");
                        thisLockedObject1.DoSomethingThatLockThis();
                    }
                })).Start();
            new Thread(new ThreadStart(delegate()
                {
                    Console.WriteLine("Locking thisLockedObject1");
                    lock (thisLockedObject1)
                    {
                        Console.WriteLine("Performing action on thisLockedObject2");
                        thisLockedObject2.DoSomethingThatLockThis();
                    }
                })).Start();
            Console.Read();
        }

        class ThisLockedObject
        {

            public void DoSomethingThatLockThis()
            {
                Console.WriteLine("ThisLockedObject acquiring lock on self");
                lock (this)
                {
                    Thread.Sleep(1000);
                }
                Console.WriteLine("ThisLockedObject done");
            }

        }
    }
}

Voici le output:

Code:
Locking thisLockedObject2
Performing action on thisLockedObject1
Locking thisLockedObject1
Performing action on thisLockedObject2
ThisLockedObject acquiring lock on self
ThisLockedObject acquiring lock on self

Et l'application hang. La raison:

Le premier thread détient un lock sur thisLockedObject2 et le deuxième sur thisLockedObject1. Dans le premier thread, DoSomethingThatLockThis attends un lock sur thisLockedObject 1 qui est détenu par le thread #2. Le même scénario ce produit du bord du deuxième thread. Alors on a deux threads, qui attendent tous deux un lock qu'ils n'auront jamais...

Cet exemple peut paraitre extrème, mais de nombreuses situations du genre peuvent survenir en multi thread, il est donc important de ne jamais utilisé lock(this). Si je remplaces lock(this) par lock(_locker), alors nous n'avons pas de problèmes.

Code:
        class ThisLockedObject
        {

            private readonly object _locker = new object();

            public void DoSomethingThatLockThis()
            {
                Console.WriteLine("ThisLockedObject acquiring lock on self");
                lock (_locker)
                {
                    Thread.Sleep(1000);
                }
                Console.WriteLine("ThisLockedObject done");
            }
        }

Et puis il est important d'utiliser un lock dans l'exemple que je t'ai fourni. Monitor.Wait() "release" un lock, il doit donc avoir un lock acquis en premier!

Cela va bien afficher qu'une seule fois "Pas de données à traiter." ?
oui

Donc on pourrait assimiler la classe Monitor de cette façon
Schématiquement ouais.. on pourrait dire que ca ressemble à ca (sans le overhead du while()sleep(1))
 
A

Anonymous

Invité
#53
Re: [DotNet] D2Com, facilitez la communication D2 :)

J'ai compris donc le choix d'utiliser un object propre pour le lock, ceci dit, dans mon exemple, j'ai un seul thread par méthode, je peux dans ce cas particulier utiliser lock(this) sans problème donc ?
Mais je sais que je dois utiliser un objet propre dans le cas de multi thread plus avancé ;)

Et bien merci pour ce cours sur le multi threading performant :)
Tu devrais en rédiger un tutoriel, sur le Site du Zéro même ;)
 
A

Anonymous

Invité
#54
Re: [DotNet] D2Com, facilitez la communication D2 :)

Je viens de voir ton edit.

premièrement, enlève le lock(this) et utilises le même objet pour ton lock dans Listening() et QueueTreatment().

Ensuite, le reste semble bon, sauf un détail qui est très important:

Code:
this.ParseData(this.m_Queue.Dequeue());
Cette ligne va te causer des problèmes, ton lock sera acquis tout le long du traitement de ParseData. Il est donc important de s'assurer que ton Dequeue() est à l'intérieur du lock et que ton ParseData est à l'extérieur (sinon tu va bloquer ton socket sur l'appel de Enqueue à chaque fois pendant que ParseData s'exécute). Voici donc ce que je te proposes:

Code:
private void QueueTreatment()
{
   while (true)
   {
      object itemInQueue;
      lock (_locker)
      {
         while (this.m_Queue.Count == 0)
            Monitor.Wait(_locker);
         itemInQueue = this.m_Queue.Dequeue();
      }
      this.ParseData(itemInQueue);
   }
}
Peut-être auras-tu besoin d'un lock sur un objet différent à l'intérieur de ParseData.. mais je ne te conseilles pas de garder le lock de ton producer/consumer pendant ton ParseData.

Je ne connais pas le reste de ton code, mais je ne crois pas qu'il soit justifier de locker aussi longtemps, si tu as besoin de syncronisation à l'intérieur de ParseData alors le mieux est de locker un objet différent seulement aux endroits ou tu accède des variables partagées.

edit: réponse à ton dernier post. Tu "pourrais" faire ca. Par contre lock(this) est considérer comme une très mauvaise pratique. Alors tu es mieux de commencer à utiliser des locks de la bonne façon même si dans ton scénario lock(this) pourrait probablement causer aucun problème.

edit2: En fait il pourrait être important, dans ce cas, de garder le ParseData à l'intérieur du lock si tu veux conserver l'ordre d'entrée de ce que tu as dans ton queue.. Je ne connais pas ton code donc à toi de voir. Si tu fais ca, assures toi seulement que ton ParseData retourne le plus rapidement possible et qu'il n'appelles pas d'évènements directement dans son propre thread (surtout pour un API, quelqu'un pourrait bloquer dans un event handler pendant des secondes voir des minutes).
 
A

Anonymous

Invité
#55
Re: [DotNet] D2Com, facilitez la communication D2 :)

ParseData va juste découper les données en paramètre de type byte[] afin de sortir les messages reçus. Donc non il a pas besoin de bloquer.
Donc je vais devoir créer une variable buffer, car si je crée une variable à l'intérieur du lock, celle ci va automatiquement être détruite due à la portée, non ?
Mon attribut "m_Queue" est de type Queue<byte[]>, je n'ai pas crée une classe dédié au traitement des données, cela me semble pas totalement utile.

Donc dans mon cas précis, je ne suis pas dans l'obligation d'utiliser un attribut de type object pour le lock, je peux me contenter de "this", non ?
Même si pour la standardisation et pour les bonnes habitudes ça serait préférable de créer un attribut dédié au lock.
 
A

Anonymous

Invité
#56
Re: [DotNet] D2Com, facilitez la communication D2 :)

Je ne te dirai jamais d'utiliser lock(this): thou shall not lock this!!

Et puis tu va locker ta classe au complet, si tu as besoin de syncronisation plus tard quand ton bot va évoluer tu risque d'avoir des problèmes, je me demandes pourquoi tu tiens tant à utiliser ca (c'est seulement une ligne de code supplémentaire après tout).

Pour le ParseData, je crois que tu as raison. Regardes seulement mon edit2 du post en haut à propos des events.
 
A

Anonymous

Invité
#57
Re: [DotNet] D2Com, facilitez la communication D2 :)

Donc appeler ParseData à l’extérieur du lock est optimisé dans mon cas, puisqu’en effet, cette méthode va appeler des évènements ;)

Tant qu'on y est, pourrais-tu rapidement faire un briefing sur la socket asynchrone ? Dans ton tutoriel tu fais une classe dérivé et ce que je ne veux pas dans mon projet puisque ça me paraît inutile :p

EDIT :
Ma méthode d'écoute actuelle :
Code:
        private void Listening()
        {
            while (this.Socket.Connected)
            {
                lock (this.m_Locker)
                {
                    if (this.Socket.Available > 0)
                    {
                        byte[] buffer = new byte[this.Socket.Available];
                        this.Socket.Receive(buffer);
                        this.m_Queue.Enqueue(buffer);
                        Monitor.Pulse(this.m_Locker);
                    }
                }
                Thread.Sleep(this.ThreadsSleep);
            }
        }
Ma méthode de traitement de file d'attente des données :
Code:
        private void QueueTreatment()
        {
            byte[] buffer;
            while (true)
            {
                lock (this.m_Locker)
                {
                    while (this.m_Queue.Count == 0)
                        Monitor.Wait(this.m_Locker);

                    buffer = this.m_Queue.Dequeue();
                }
                this.ParseData(buffer);
            }
        }
 
A

Anonymous

Invité
#58
Re: [DotNet] D2Com, facilitez la communication D2 :)

Si tu ne veux pas traiter les données recues de ton socket de facon évènementielle, tu peux oublier la classe et mettre directement dans ta queue les données recues.

edit: Je viens de voir m_locker. Bravo!
 
A

Anonymous

Invité
#59
Re: [DotNet] D2Com, facilitez la communication D2 :)

Okay mais ça m'oblige encore à faire une boucle infinie...
Je passe par un autre lock alors ? :p
Dasn ce cas je ne vois pas comment je pourrais faire pour le Pulse oO

EDIT : Avec cette technique j'ai gagné encore 10% de performance :o
Dans les mêmes conditions avec 10 lancements identiques de 1 connexion au serveur complète :
D2Com sans opti : 90% CPU de moyenne
D2Com avec sleep : 15% CPU de moyenne
D2Com avec lock et Monitor : 5% CPU de moyenne
Énorme :) GRAND merci à toi Mike ;)
 

bouh2

Membre Actif
Inscrit
12 Septembre 2008
Messages
184
Reactions
21
#60
Re: [DotNet] D2Com, facilitez la communication D2 :)

J'ai l'impression que ça parle Sockets.

Ne te casse pas la tête à faire un thread listener, utilise les fonctions AsyncRecv et AsyncAccept.

Au niveau des lock et Monitor.Wait, l'utilisation est différente. J'utilise le lock pour restreindre l'accès à une ressource (ex : une List<>) à un seul thread ou dans certains cas au contenu d'une méthode entière. Le Monitor.Wait je l'utilise lorsque j'ai un thread et que je dois le réveiller. Exemple j'ai une BlockedQueue, avec un thread en arrêt sur un Monitor.Wait, et lorsque j'appel Enqueue, j'appelle Monitor.Pulse() ce qui débloque le thread lecteur.

Par contre l'utilisation d'un lock + Monitor.Wait je ne vois pas trop à quoi ça sert.


EDIT : Thread.Sleep(1) c'est pas bon, utilise Thread.Yield pour donner l'execution à un autre thread.

EDIT 2 : Petit exemple des méthodes asynchrones

Code:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using NLog;
using Stump.Core.Attributes;
using Stump.Core.Collections;
using Stump.Core.Pool;
using Stump.Core.Reflection;

namespace Stump.Server.BaseServer.Network
{
    public class ClientManager : Singleton<ClientManager>
    {
        private readonly Logger logger = LogManager.GetCurrentClassLogger();

        #region Config Variables
        /// <summary>
        /// Max number of clients connected
        /// </summary>
        [Variable]
        public static int MaxConcurrentConnections = 2000;

        /// <summary>
        /// Max number of clients waiting for a connection
        /// </summary>
        [Variable]
        public static int MaxPendingConnections = 100;

        /// <summary>
        /// Max number of clients connected on the same IP or NULL for unlimited
        /// </summary>
        [Variable]
        public static int? MaxIPConnexions = 10;

        /// <summary>
        /// Min interval between two received message or NULL for unlimited ( in ms )
        /// </summary>
        [Variable]
        public static int? MinMessageInterval = 1;

        /// <summary>
        /// Buffer size /!\ Advanced users only /!\
        /// </summary>
        [Variable]
        public static int BufferSize = 8192; 
        #endregion

        #region Events
        public event Action<BaseClient> ClientConnected;

        private void NotifyClientConnected(BaseClient client)
        {
            Action<BaseClient> handler = ClientConnected;
            if (handler != null) handler(client);
        }

        public event Action<BaseClient> ClientDisconnected;

        private void NotifyClientDisconnected(BaseClient client)
        {
            Action<BaseClient> handler = ClientDisconnected;
            if (handler != null) handler(client);
        } 
        #endregion

        public delegate BaseClient CreateClientHandler(Socket clientSocket);

        private CreateClientHandler m_createClientDelegate;

        private readonly Socket m_listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream,
                                                    ProtocolType.Tcp);

        private readonly ConcurrentList<BaseClient> m_clients = new ConcurrentList<BaseClient>();

        private BufferManager m_bufferManager; // allocate memory dedicated to a client to avoid memory alloc on each send/recv
        private SocketAsyncEventArgsPool m_readAsyncEventArgsPool; // pool of SocketAsyncEventArgs
        private SocketAsyncEventArgsPool m_writeAsyncEventArgsPool;
        private SocketAsyncEventArgs m_acceptArgs = new SocketAsyncEventArgs(); // async arg used on client connection
        private SemaphoreSlim m_semaphore; // limit the number of threads accessing to a ressource

        /// <summary>
        /// List of connected Clients
        /// </summary>
        public IEnumerable<BaseClient> Clients
        {
            get { return m_clients; }
        }

        public bool Paused
        {
            get;
            private set;
        }

        public bool IsInitialized
        {
            get;
            private set;
        }

        public bool Started
        {
            get;
            private set;
        }

        public string Host
        {
            get;
            private set;
        }

        public int Port
        {
            get;
            private set;
        }

        /// <summary>
        /// Represents a queue containing received messages to treat
        /// </summary>
        internal MessageQueue MessageQueue
        {
            get;
            private set;
        }

        internal void Initialize(CreateClientHandler createClientHandler)
        {
            if (IsInitialized)
                throw new Exception("ClientManager already initialized");

            m_createClientDelegate = createClientHandler;
            MessageQueue = new MessageQueue(Settings.EnableBenchmarking);

            // init semaphore
            m_semaphore = new SemaphoreSlim(MaxConcurrentConnections, MaxConcurrentConnections);

            // init buffer manager
            m_bufferManager = new BufferManager(MaxConcurrentConnections * BufferSize, BufferSize);
            m_bufferManager.InitializeBuffer();

            // initialize read pool
            m_readAsyncEventArgsPool = new SocketAsyncEventArgsPool(MaxConcurrentConnections);
            for (var i = 0; i < MaxConcurrentConnections; i++)
            {
                var args = new SocketAsyncEventArgs();

                m_bufferManager.SetBuffer(args);
                args.Completed += OnReceiveCompleted;
                m_readAsyncEventArgsPool.Push(args);
            }

            // initialize write pool
            m_writeAsyncEventArgsPool = new SocketAsyncEventArgsPool(MaxConcurrentConnections * 2);
            for (var i = 0; i < MaxConcurrentConnections * 2; i++)
            {
                var args = new SocketAsyncEventArgs();

                args.Completed += OnSendCompleted;
                m_writeAsyncEventArgsPool.Push(args);
            }

            m_acceptArgs.Completed += (sender, e) => ProcessAccept(e);

            IsInitialized = true;
        }

        /// <summary>
        /// Start to listen client connections
        /// </summary>
        public void Start(string address, int port)
        {
            if (!IsInitialized)
                throw new Exception("Attempt to start ClientManager before initializing it. Call Initialize()");

            if (Started)
                throw new Exception("ClientManager already started");

            Host = address;
            Port = port;

            var ipEndPoint = new IPEndPoint(Dns.GetHostAddresses(Host).First(ip => ip.AddressFamily == AddressFamily.InterNetwork), Port);
            m_listenSocket.Bind(ipEndPoint);
            m_listenSocket.Listen(MaxPendingConnections);

            Started = true;

            StartAccept();
        }

        /// <summary>
        /// Pause the listener and reject all new connections
        /// </summary>
        public void Pause()
        {
            Paused = true;
        }

        /// <summary>
        /// Resume the actual pause
        /// </summary>
        public void Resume()
        {
            Paused = false;
        }

        /// <summary>
        /// Close the listener and dispose ressources
        /// </summary>
        public void Close()
        {
            m_listenSocket.Close();
            m_listenSocket.Dispose();

            m_bufferManager.Dispose();

            m_writeAsyncEventArgsPool.Dispose();
            m_readAsyncEventArgsPool.Dispose();
        }

        private void StartAccept()
        {
            m_acceptArgs.AcceptSocket = null;
            m_semaphore.Wait();

            // raise or not the event depending on AcceptAsync return
            if (!m_listenSocket.AcceptAsync(m_acceptArgs))
            {
                ProcessAccept(m_acceptArgs);
            }
        }

        /// <summary>
        /// Called when a new client is connecting
        /// </summary>
        /// <param name="e"></param>
        private void ProcessAccept(SocketAsyncEventArgs e)
        {
            // do not accept connections while pausing
            if (Paused)
                return;

            if (MaxIPConnexions.HasValue &&
                CountClientWithSameIp(( (IPEndPoint) e.AcceptSocket.RemoteEndPoint ).Address) > MaxIPConnexions.Value)
            {
                logger.Error("Client {0} try to connect more {1} time", e.AcceptSocket.RemoteEndPoint.ToString(), MaxIPConnexions.Value);
                m_semaphore.Release();

                StartAccept();
                return;
            }

            // use a async arg from the pool avoid to re-allocate memory on each connection
            SocketAsyncEventArgs readAsyncEventArgs = PopReadSocketAsyncArgs();

            // create the client instance
            var client = m_createClientDelegate(e.AcceptSocket);
            readAsyncEventArgs.UserToken = client;

            m_clients.Add(client);

            NotifyClientConnected(client);

            // if the event is not raised we first check new connections before parsing message that can blocks the connection queue
            if (!client.Socket.ReceiveAsync(readAsyncEventArgs))
            {
                StartAccept();
                ProcessReceive(readAsyncEventArgs);
            }
            else
            {
                StartAccept();
            }
        }

        private void OnReceiveCompleted(object sender, SocketAsyncEventArgs e)
        {
            try
            {
                switch (e.LastOperation)
                {
                    case SocketAsyncOperation.Receive:
                        ProcessReceive(e);
                        break;
                    case SocketAsyncOperation.Disconnect:
                        CloseClientSocket(e);
                        break;
                    default:
                        break;
                }
            }
            catch (Exception exception)
            {
                // theoretically it shouldn't go up to there.
                logger.Error("Last chance exception on receiving ! : " + exception);
            }
        }

        private void ProcessReceive(SocketAsyncEventArgs e)
        {
            if (e.BytesTransferred <= 0 || e.SocketError != SocketError.Success)
            {
                CloseClientSocket(e);
            }
            else
            {
                var client = e.UserToken as BaseClient;

                if (client == null)
                {
                    CloseClientSocket(e);
                }
                else
                {
                    client.ProcessReceive(e.Buffer, e.Offset, e.BytesTransferred);

                    if (client.Socket == null)
                    {
                        CloseClientSocket(e);
                    }
                    else
                    {
                        // just continue to receive
                        bool willRaiseEvent = client.Socket.ReceiveAsync(e);

                        if (!willRaiseEvent)
                        {
                            ProcessReceive(e);
                        }
                    }
                }
            }
        }

        private void CloseClientSocket(SocketAsyncEventArgs e)
        {
            var client = e.UserToken as BaseClient;

            if (client != null)
            {
                client.Disconnect();
                m_clients.Remove(client);

                NotifyClientDisconnected(client);
            }

            m_semaphore.Release();

            // free the SocketAsyncEventArg so it can be reused by another client
            PushReadSocketAsyncArgs(e);
        }

        private void OnSendCompleted(object sender, SocketAsyncEventArgs e)
        {
            // free an async arg on the pool when a send is completed
            PushWriteSocketAsyncArgs(e);
        }

        internal SocketAsyncEventArgs PopWriteSocketAsyncArgs()
        {
            if (m_writeAsyncEventArgsPool.Count <= 0)
                throw new Exception("The writer async argument pool is empty");

            return m_writeAsyncEventArgsPool.Pop();
        }

        internal void PushWriteSocketAsyncArgs(SocketAsyncEventArgs args)
        {
            m_writeAsyncEventArgsPool.Push(args);
        }

        internal SocketAsyncEventArgs PopReadSocketAsyncArgs()
        {
            if (m_readAsyncEventArgsPool.Count <= 0)
                throw new Exception("The reader async argument pool is empty");

            return m_readAsyncEventArgsPool.Pop();
        }

        internal void PushReadSocketAsyncArgs(SocketAsyncEventArgs args)
        {
            m_readAsyncEventArgsPool.Push(args);
        }

        public IEnumerable<BaseClient> FindAll(Predicate<BaseClient> predicate)
        {
            return m_clients.Where(entry => predicate(entry));
        }

        public IEnumerable<T> FindAll<T>(Predicate<T> predicate)
        {
            return m_clients.OfType<T>().Where(entry => predicate(entry));
        }

        public int CountClientWithSameIp(IPAddress ipAddress)
        {
            return m_clients.Count(t => t.Socket != null && ( (IPEndPoint)t.Socket.RemoteEndPoint ).Address == ipAddress);
        }
    }
}

=> Il s'agit en faite de 2 'cycles'. Les fonctions StartAccept et ProcessReceive sont appelées en boucle, non pas dans une boucle infini, mais par appels récursifs.
StartAccept est appelée une première fois, et qui appel à son tour "m_listenSocket.AcceptAsync(m_acceptArgs)". Lorsque la valeur retournée est false alors l'event ne sera pas invoqué -> nous avons déjà un client en attente de connexion, on doit le process directement. Sinon l'event sera invoqué lorsque un client demandera la connexion (event qui appel ProcessAccept également)
ProcessAccept rappelle StartAccept, et tourne en boucle

Même schéma avec ProcessReceive, qui s'appelle en boucle dès qu'un message est reçu.

Avec ce système, aucune attente pour traiter le message et pas de perte de performance.

A noter que l'ajout d'un try/finally serait préférable pour ne pas briser l'appel en boucle
 
Haut Bas