Ticket #28: implementation.diff
File implementation.diff, 13.2 KB (added by , 17 years ago) |
---|
-
.hgignore
diff -r 31139ebb5a96 .hgignore
a b 6 6 ^doc/.*\.html$ 7 7 ^doc/specifications/.*\.html$ 8 8 ^doc/api 9 10 ^conn-test -
foolscap/broker.py
diff -r 31139ebb5a96 foolscap/broker.py
a b class Broker(banana.Banana, referenceabl 157 157 table = vocab.INITIAL_VOCAB_TABLES[vocab_table_index] 158 158 self.populateVocabTable(table) 159 159 self.initBroker() 160 self.current_slave_IR = params.get('current-slave-IR') 161 self.current_seqnum = params.get('current-seqnum') 160 162 161 163 def initBroker(self): 162 164 -
foolscap/ipb.py
diff -r 31139ebb5a96 foolscap/ipb.py
a b _ignored = [ISlicer, IRootSlicer, IUnsli 9 9 10 10 class DeadReferenceError(Exception): 11 11 """The RemoteReference is dead, Jim.""" 12 13 class DuplicateConnectionError(Exception): 14 """The old connection was replaced by a newer one.""" 12 15 13 16 14 17 class IReferenceable(Interface): -
foolscap/negotiate.py
diff -r 31139ebb5a96 foolscap/negotiate.py
a b from twisted.python import log 3 3 from twisted.python import log 4 4 from twisted.python.failure import Failure 5 5 from twisted.internet import protocol, reactor 6 from twisted.internet.error import ConnectionDone 6 7 7 8 from foolscap import broker, referenceable, vocab 8 9 from foolscap.eventual import eventually … … class Negotiation(protocol.Protocol): 212 213 self.wantEncryption = bool(self.target.encrypted 213 214 or self.tub.myCertificate) 214 215 self.options = self.tub.options.copy() 216 IR = self.tub.getIncarnationString() 217 self.negotiationOffer['my-incarnation'] = IR 218 tubID = self.target.getTubID() 219 # note that UnauthenticatedTubs will never have a record here 220 slave_record = self.tub.slave_table.get(tubID, ("none",0)) 221 self.negotiationOffer['last-connection'] = "%s %s" % slave_record 215 222 216 223 def initServer(self, listener): 217 224 # servers do listenTCP and respond to the GET … … class Negotiation(protocol.Protocol): 743 750 # a decision block. 744 751 self.send_phase = DECIDING 745 752 decision = {} 753 params = {} 746 754 # combine their 'offer' and our own self.negotiationOffer to come 747 755 # up with a 'decision' to be sent back to the other end, and the 748 756 # 'params' to be used on our connection 749 757 750 # first, do we continue with this connection? we might 751 # have anexisting connection for this particular tub758 # first, do we continue with this connection? we might have an 759 # existing connection for this particular tub 752 760 if theirTubRef and theirTubRef in self.tub.brokers: 753 # there is an existing connection, so drop this one 754 if self.debugNegotiation: 755 log.msg(" abandoning the connection: we already have one") 756 raise NegotiationError("Duplicate connection") 761 # there is an existing connection.. we might want to prefer 762 # this new offer, because the old connection might be stale 763 # (NAT boxes and laptops that disconnect abruptly are two 764 # ways for a single process to disappear silently and then 765 # reappear with a different IP address). 766 log.msg("UNUSUAL: got offer for an existing connection") 767 existing = self.tub.brokers[theirTubRef] 768 acceptOffer = self.compareOfferAndExisting(offer, existing) 769 if acceptOffer: 770 # drop the old one 771 log.msg(" accepting new offer, dropping existing connection") 772 why = ConnectionDone("replaced by a new connection") 773 existing.shutdown(why) 774 else: 775 # reject the new one 776 log.msg(" rejecting the offer: we already have one") 777 raise NegotiationError("Duplicate connection") 778 779 if theirTubRef: 780 # generate a new seqnum, one higher than the last one we've 781 # used. Note that UnauthenticatedTubs all share the same 782 # index, so we leak certain information about how many 783 # connections we've established. 784 old_seqnum = self.tub.master_table.get(theirTubRef.getTubID(), 785 0) 786 new_seqnum = old_seqnum + 1 787 new_slave_IR = offer.get('my-incarnation', None) 788 self.tub.master_table[theirTubRef.getTubID()] = new_seqnum 789 my_IR = self.tub.getIncarnationString() 790 decision['current-connection'] = "%s %s" % (my_IR, new_seqnum) 791 # these params will be copied into the Broker where we can 792 # retrieve them later, when we need to compare it against a new 793 # offer. TODO 794 params['current-slave-IR'] = new_slave_IR 795 params['current-seqnum'] = new_seqnum 757 796 758 797 # what initial vocab set should we use? 759 798 theirVocabRange_s = offer.get("initial-vocab-table-range", "0 0") … … class Negotiation(protocol.Protocol): 771 810 decision['banana-decision-version'] = str(self.decision_version) 772 811 773 812 # v1: handle vocab table index 774 params = { 'banana-decision-version': self.decision_version, 775 'initial-vocab-table-index': vocab_index, 776 } 813 params['banana-decision-version'] = self.decision_version 814 params['initial-vocab-table-index'] = vocab_index 777 815 778 816 else: 779 817 # otherwise, the other side gets to decide. The next thing they … … class Negotiation(protocol.Protocol): 804 842 # idle-disconnect. No other protocol changes were made, and no 805 843 # changes were made to the offer or decision blocks. 806 844 return self.evaluateNegotiationVersion1(offer) 845 846 def compareOfferAndExisting(self, offer, existing): 847 """Compare the new offer against the existing connection, and 848 decide which to keep. 849 850 @return: True to accept the new offer, False to stick with the 851 existing connection. 852 """ 853 # step one: does the inbound offer have a my-incarnation 854 # header? If not, this is an older peer (<foolscap-0.1.7), 855 # and it 856 if "my-incarnation" not in offer or "last-connection" not in offer: 857 # this is an old peer (foolscap 0.1.7 or earlier), which won't 858 # give us enough information to make some of the decisions below. 859 # We reject the offer to avoid connection flap, and the 860 # situtation won't be worse than it was in 0.1.7 . 861 if self.tub._handle_old_duplicate_connections: 862 # but if we've been configured to do better (with the 863 # 60-second age heuristic), do that. 864 return self.handle_old(offer, existing) 865 return False # reject the offer 866 867 existing_slave_IR = existing.current_slave_IR 868 existing_seqnum = existing.current_seqnum 869 870 if offer["my-incarnation"] != existing_slave_IR: 871 # this offer is from a different invocation of the peer than we 872 # think we're currently talking to. That means the slave has 873 # restarted since we made our connection, so clearly our 874 # connection is stale. Accept the offer. 875 return True 876 pieces = offer['last-connection'].split() 877 offer_master_IR = pieces[0] 878 offer_master_seqnum = int(pieces[1]) 879 if offer_master_IR != self.tub.getIncarnationString(): 880 # the peer doesn't remember talking to us at all: they remember 881 # talking to a past life. That means our last decision message 882 # didn't make it to them, and the last connection they *did* hear 883 # about was from one of our previous runs. Therefore our existing 884 # connection isn't viable, and we should accept their offer. 885 return True 886 # at this point, the offer's IR matches our own, so the seqnum is 887 # worth comparing 888 if offer_master_seqnum == existing_seqnum: 889 # the offer demonstrates that the client knows about the same 890 # connection that we do, and they made a new connection anyways. 891 # From this we can conclude that our connection is stale, so we 892 # should accept the offer. 893 return True 894 if offer_master_seqnum < existing_seqnum: 895 # the offer was delayed, or (more likely) it was part of a set of 896 # offers sent to multiple connection hints at the same time, one 897 # of which has already completed negotiation. To avoid connection 898 # flap, we reject the offer. 899 return False 900 # offer_master_seqnum > existing_seqnum indicates something really 901 # weird has taken place. 902 log.msg("WEIRD: offer_master_seqnum %d > existing_seqnum %d" % 903 (offer_master_seqnum, existing_seqnum)) 904 return False # reject weirdness 905 906 def handle_old(self, offer, existing): 907 # TODO: determine the age of the existing broker 908 age = 0 909 if age < 60: 910 return False # reject the offer 911 return True # accept the offer 807 912 808 913 def sendDecision(self, decision, params): 809 914 if self.debug_doTimer("sendDecision", 1, … … class Negotiation(protocol.Protocol): 872 977 msg = ("Our hash for vocab-table-index %d (%s) does not match " 873 978 "your hash (%s)" % (vocab_index, our_hash, vocab_hash)) 874 979 raise NegotiationError(msg) 980 981 if self.theirTubRef in self.tub.brokers: 982 # we're the slave, so we need to drop our existing connection and 983 # use the one picked by the master 984 log.msg("UNUSUAL: master told us to use a new connection, " 985 "so we must drop the existing one") 986 why = ConnectionDone("replaced by a new connection") 987 self.tub.brokers[self.theirTubRef].shutdown(why) 988 989 current_connection = decision.get('current-connection') 990 if current_connection: 991 tubID = self.theirTubRef.getTubID() 992 if tubID != "<unauth>": 993 self.tub.slave_table[tubID] = current_connection 994 else: 995 log.msg("UNUSUAL: no current-connection in decision from %s" % 996 self.theirTubRef) 997 875 998 params = { 'banana-decision-version': ver, 876 999 'initial-vocab-table-index': vocab_index, 877 1000 } -
foolscap/pb.py
diff -r 31139ebb5a96 foolscap/pb.py
a b 1 1 # -*- test-case-name: foolscap.test.test_pb -*- 2 2 3 import os.path, weakref 3 import os.path, weakref, binascii 4 4 from zope.interface import implements 5 5 from twisted.internet import defer, protocol, error 6 6 from twisted.application import service, strports … … class Tub(service.MultiService): 233 233 self.myCertificate = cert 234 234 self.tubID = crypto.digest32(cert.digest("sha1")) 235 235 236 def make_incarnation(self): 237 unique = os.urandom(8) 238 # TODO: it'd be nice to have a sequential component, so incarnations 239 # could be ordered, but it requires disk space 240 sequential = None 241 return (unique, sequential) 242 243 def getIncarnationString(self): 244 return binascii.b2a_hex(self.incarnation[0]) 245 236 246 def setup(self, options): 237 247 self.options = options 238 248 self.listeners = [] 239 249 self.locationHints = [] 250 251 # duplicate-connection management 252 self.incarnation = self.make_incarnation() 253 # the master_table records the master-seqnum we used for the last 254 # established connection with the given tubid. It only contains 255 # entries for which we were the master. 256 self.master_table = {} # k:tubid, v:seqnum 257 # the slave_table records the (master-IR,master-seqnum) pair for the 258 # last established conenction with the given tubid. It only contains 259 # entries for which we were the slave. 260 self.slave_table = {} # k:tubid, v:(master-IR,seqnum) 240 261 241 262 # local Referenceables 242 263 self.nameToReference = weakref.WeakValueDictionary() … … class Tub(service.MultiService): 264 285 265 286 self._log_gatherer_furl = None 266 287 self._log_gatherer_furlfile = None 288 289 self._handle_old_duplicate_connections = False 267 290 268 291 def setOption(self, name, value): 269 292 if name == "logLocalFailures": … … class Tub(service.MultiService): 302 325 flog.setTwistedLogBridge(tlb) 303 326 else: 304 327 flog.setTwistedLogBridge(None) 328 elif name == "handle-old-duplicate-connections": 329 self._handle_old_duplicate_connections = True 305 330 else: 306 331 raise KeyError("unknown option name '%s'" % name) 307 332