Issue65

Title Buffer data before sending onto network
Type wish Status chatting
Importance 50.0
Superseder Nosy List mg, mk, tpj
Assigned To Keywords design

Created on 2008-09-18.22:13:11 by mg, last changed 2008-09-23.18:03:15 by mg.

Messages
msg222 (view) Author: mg Date: 2008-09-23.18:03:15
Maybe this should get higher priority -- we will only know when we
have tried implementing it :-)
msg198 (view) Author: mg Date: 2008-09-18.22:16:20
Nagle's algorithm: http://en.wikipedia.org/wiki/Nagle's_algorithm
msg197 (view) Author: mg Date: 2008-09-18.22:14:26
Yep, the patch is based on this revision:

% hg parents
changeset:   106:b2ca4d2d360f
tag:         tip
user:        Martin Geisler <mg@daimi.au.dk>
date:        Mon Jul 23 21:57:54 2007 +0200
summary:     Formatted docstrings as per PEP 257.

So it is only 800+ changesets out of date! :-)
msg196 (view) Author: mg Date: 2008-09-18.22:13:10
We have talked about buffering the data before sending it out on the
network. The idea is that this should result in fewer and larger TCP
packets being generated.

I originally did not do this since Nagle algorithm is supposed to do
it for us. But I did experiment it with it anyway -- as far as I
remember I didn't see any improvement, but that might have been
because I was only testing with all three players on the same machine.

This patch is from way back... back when VIFF was called PySMPC, so it
must be more than a year old! I put it here for reference in case
someone wants to play with this:

diff --git a/pysmpc/runtime.py b/pysmpc/runtime.py
--- a/pysmpc/runtime.py
+++ b/pysmpc/runtime.py
@@ -31,7 +31,7 @@
 from pysmpc.field import FieldElement, IntegerFieldElement,
GF256Element, GMPIntegerFieldElement
 from pysmpc.util import rand
 
-from twisted.internet import defer, reactor
+from twisted.internet import task, defer, reactor
 from twisted.internet.defer import Deferred, DeferredList, gatherResults
 from twisted.internet.protocol import ClientFactory, ServerFactory,
Protocol
 from twisted.protocols.basic import Int16StringReceiver
@@ -138,6 +138,9 @@
     #@trace
     def __init__(self, id):
         self.id = id
+        self._buffer = []
+        self.buffer_size = 300
+
         vals = [IntegerFieldElement, GMPIntegerFieldElement,
GF256Element]
         keys = range(len(vals))
         self.class_to_type = dict(zip(vals, keys))
@@ -145,15 +148,19 @@
         
     #@trace
     def stringReceived(self, string):
-        program_counter, share_type, value = marshal.loads(string)
-        share = self.type_to_class[share_type].unmarshal(value)
-        key = (program_counter, self.id)
+        
+        buffer = marshal.loads(string)
+        #println("Received buffer: %s" % str(buffer))
 
-        shares = self.factory.incoming_shares
-        try:
-            reactor.callLater(0, shares.pop(key).callback, share)
-        except KeyError:
-            shares[key] = defer.succeed(share)
+        for program_counter, share_type, value in buffer:
+            share = self.type_to_class[share_type].unmarshal(value)
+            key = (program_counter, self.id)
+
+            shares = self.factory.incoming_shares
+            try:
+                reactor.callLater(0, shares.pop(key).callback, share)
+            except KeyError:
+                shares[key] = defer.succeed(share)
 
         # TODO: marshal.loads can raise EOFError, ValueError, and
         # TypeError. They should be handled somehow.
@@ -168,8 +175,22 @@
         data = (program_counter,
                 self.class_to_type[type(share)],
                 share.marshal())
-        self.sendString(marshal.dumps(data))
+        self._buffer.append(data)
+        if len(self._buffer) > self.buffer_size:
+            #println("Early sending of %d shares" % len(self._buffer))
+            self._send_data()
+
         return self
+
+    def _send_data(self):
+        if self._buffer:
+            #println("Sending %d shares" % len(self._buffer))
+            self.sendString(marshal.dumps(self._buffer))
+            self._buffer = []
+
+    def connectionMade(self):
+        self.send_task = task.LoopingCall(self._send_data)
+        self.send_task.start(0.01)
 
 
     def loseConnection(self):
History
Date User Action Args
2008-09-23 18:03:15mgsetimportance: 50.0
nosy: + mk, tpj
messages: + msg222
2008-09-23 16:28:57tpjsettype: wish
2008-09-18 22:16:20mgsetmessages: + msg198
2008-09-18 22:14:26mgsetstatus: unread -> chatting
messages: + msg197
2008-09-18 22:13:11mgcreate