We have talked about buffering the data before sending it out on the
network. The idea is that this should result in fewer and larger TCP
packets being generated.
I originally did not do this since Nagle algorithm is supposed to do
it for us. But I did experiment it with it anyway -- as far as I
remember I didn't see any improvement, but that might have been
because I was only testing with all three players on the same machine.
This patch is from way back... back when VIFF was called PySMPC, so it
must be more than a year old! I put it here for reference in case
someone wants to play with this:
diff --git a/pysmpc/runtime.py b/pysmpc/runtime.py
--- a/pysmpc/runtime.py
+++ b/pysmpc/runtime.py
@@ -31,7 +31,7 @@
from pysmpc.field import FieldElement, IntegerFieldElement,
GF256Element, GMPIntegerFieldElement
from pysmpc.util import rand
-from twisted.internet import defer, reactor
+from twisted.internet import task, defer, reactor
from twisted.internet.defer import Deferred, DeferredList, gatherResults
from twisted.internet.protocol import ClientFactory, ServerFactory,
Protocol
from twisted.protocols.basic import Int16StringReceiver
@@ -138,6 +138,9 @@
#@trace
def __init__(self, id):
self.id = id
+ self._buffer = []
+ self.buffer_size = 300
+
vals = [IntegerFieldElement, GMPIntegerFieldElement,
GF256Element]
keys = range(len(vals))
self.class_to_type = dict(zip(vals, keys))
@@ -145,15 +148,19 @@
#@trace
def stringReceived(self, string):
- program_counter, share_type, value = marshal.loads(string)
- share = self.type_to_class[share_type].unmarshal(value)
- key = (program_counter, self.id)
+
+ buffer = marshal.loads(string)
+ #println("Received buffer: %s" % str(buffer))
- shares = self.factory.incoming_shares
- try:
- reactor.callLater(0, shares.pop(key).callback, share)
- except KeyError:
- shares[key] = defer.succeed(share)
+ for program_counter, share_type, value in buffer:
+ share = self.type_to_class[share_type].unmarshal(value)
+ key = (program_counter, self.id)
+
+ shares = self.factory.incoming_shares
+ try:
+ reactor.callLater(0, shares.pop(key).callback, share)
+ except KeyError:
+ shares[key] = defer.succeed(share)
# TODO: marshal.loads can raise EOFError, ValueError, and
# TypeError. They should be handled somehow.
@@ -168,8 +175,22 @@
data = (program_counter,
self.class_to_type[type(share)],
share.marshal())
- self.sendString(marshal.dumps(data))
+ self._buffer.append(data)
+ if len(self._buffer) > self.buffer_size:
+ #println("Early sending of %d shares" % len(self._buffer))
+ self._send_data()
+
return self
+
+ def _send_data(self):
+ if self._buffer:
+ #println("Sending %d shares" % len(self._buffer))
+ self.sendString(marshal.dumps(self._buffer))
+ self._buffer = []
+
+ def connectionMade(self):
+ self.send_task = task.LoopingCall(self._send_data)
+ self.send_task.start(0.01)
def loseConnection(self):
|