Ticket #28: implementation.diff

File implementation.diff, 13.2 KB (added by Brian Warner, 16 years ago)

first pass at implementing this approach

  • .hgignore

    diff -r 31139ebb5a96 .hgignore
    a b  
    66^doc/.*\.html$
    77^doc/specifications/.*\.html$
    88^doc/api
     9
     10^conn-test
  • foolscap/broker.py

    diff -r 31139ebb5a96 foolscap/broker.py
    a b class Broker(banana.Banana, referenceabl 
    157157            table = vocab.INITIAL_VOCAB_TABLES[vocab_table_index]
    158158            self.populateVocabTable(table)
    159159        self.initBroker()
     160        self.current_slave_IR = params.get('current-slave-IR')
     161        self.current_seqnum = params.get('current-seqnum')
    160162
    161163    def initBroker(self):
    162164
  • foolscap/ipb.py

    diff -r 31139ebb5a96 foolscap/ipb.py
    a b _ignored = [ISlicer, IRootSlicer, IUnsli 
    99
    1010class DeadReferenceError(Exception):
    1111    """The RemoteReference is dead, Jim."""
     12
     13class DuplicateConnectionError(Exception):
     14    """The old connection was replaced by a newer one."""
    1215
    1316
    1417class IReferenceable(Interface):
  • foolscap/negotiate.py

    diff -r 31139ebb5a96 foolscap/negotiate.py
    a b from twisted.python import log 
    33from twisted.python import log
    44from twisted.python.failure import Failure
    55from twisted.internet import protocol, reactor
     6from twisted.internet.error import ConnectionDone
    67
    78from foolscap import broker, referenceable, vocab
    89from foolscap.eventual import eventually
    class Negotiation(protocol.Protocol): 
    212213        self.wantEncryption = bool(self.target.encrypted
    213214                                   or self.tub.myCertificate)
    214215        self.options = self.tub.options.copy()
     216        IR = self.tub.getIncarnationString()
     217        self.negotiationOffer['my-incarnation'] = IR
     218        tubID = self.target.getTubID()
     219        # note that UnauthenticatedTubs will never have a record here
     220        slave_record = self.tub.slave_table.get(tubID, ("none",0))
     221        self.negotiationOffer['last-connection'] = "%s %s" % slave_record
    215222
    216223    def initServer(self, listener):
    217224        # servers do listenTCP and respond to the GET
    class Negotiation(protocol.Protocol): 
    743750            # a decision block.
    744751            self.send_phase = DECIDING
    745752            decision = {}
     753            params = {}
    746754            # combine their 'offer' and our own self.negotiationOffer to come
    747755            # up with a 'decision' to be sent back to the other end, and the
    748756            # 'params' to be used on our connection
    749757
    750             # first, do we continue with this connection? we might
    751             # have an existing connection for this particular tub
     758            # first, do we continue with this connection? we might have an
     759            # existing connection for this particular tub
    752760            if theirTubRef and theirTubRef in self.tub.brokers:
    753                 # there is an existing connection, so drop this one
    754                 if self.debugNegotiation:
    755                     log.msg(" abandoning the connection: we already have one")
    756                 raise NegotiationError("Duplicate connection")
     761                # there is an existing connection.. we might want to prefer
     762                # this new offer, because the old connection might be stale
     763                # (NAT boxes and laptops that disconnect abruptly are two
     764                # ways for a single process to disappear silently and then
     765                # reappear with a different IP address).
     766                log.msg("UNUSUAL: got offer for an existing connection")
     767                existing = self.tub.brokers[theirTubRef]
     768                acceptOffer = self.compareOfferAndExisting(offer, existing)
     769                if acceptOffer:
     770                    # drop the old one
     771                    log.msg(" accepting new offer, dropping existing connection")
     772                    why = ConnectionDone("replaced by a new connection")
     773                    existing.shutdown(why)
     774                else:
     775                    # reject the new one
     776                    log.msg(" rejecting the offer: we already have one")
     777                    raise NegotiationError("Duplicate connection")
     778
     779            if theirTubRef:
     780                # generate a new seqnum, one higher than the last one we've
     781                # used. Note that UnauthenticatedTubs all share the same
     782                # index, so we leak certain information about how many
     783                # connections we've established.
     784                old_seqnum = self.tub.master_table.get(theirTubRef.getTubID(),
     785                                                       0)
     786                new_seqnum = old_seqnum + 1
     787                new_slave_IR = offer.get('my-incarnation', None)
     788                self.tub.master_table[theirTubRef.getTubID()] = new_seqnum
     789                my_IR = self.tub.getIncarnationString()
     790                decision['current-connection'] = "%s %s" % (my_IR, new_seqnum)
     791                # these params will be copied into the Broker where we can
     792                # retrieve them later, when we need to compare it against a new
     793                # offer. TODO
     794                params['current-slave-IR'] = new_slave_IR
     795                params['current-seqnum'] = new_seqnum
    757796
    758797            # what initial vocab set should we use?
    759798            theirVocabRange_s = offer.get("initial-vocab-table-range", "0 0")
    class Negotiation(protocol.Protocol): 
    771810            decision['banana-decision-version'] = str(self.decision_version)
    772811
    773812            # v1: handle vocab table index
    774             params = { 'banana-decision-version': self.decision_version,
    775                        'initial-vocab-table-index': vocab_index,
    776                        }
     813            params['banana-decision-version'] = self.decision_version
     814            params['initial-vocab-table-index'] = vocab_index
    777815
    778816        else:
    779817            # otherwise, the other side gets to decide. The next thing they
    class Negotiation(protocol.Protocol): 
    804842        # idle-disconnect. No other protocol changes were made, and no
    805843        # changes were made to the offer or decision blocks.
    806844        return self.evaluateNegotiationVersion1(offer)
     845
     846    def compareOfferAndExisting(self, offer, existing):
     847        """Compare the new offer against the existing connection, and
     848        decide which to keep.
     849
     850        @return: True to accept the new offer, False to stick with the
     851                 existing connection.
     852        """
     853        # step one: does the inbound offer have a my-incarnation
     854        # header? If not, this is an older peer (<foolscap-0.1.7),
     855        # and it
     856        if "my-incarnation" not in offer or "last-connection" not in offer:
     857            # this is an old peer (foolscap 0.1.7 or earlier), which won't
     858            # give us enough information to make some of the decisions below.
     859            # We reject the offer to avoid connection flap, and the
     860            # situtation won't be worse than it was in 0.1.7 .
     861            if self.tub._handle_old_duplicate_connections:
     862                # but if we've been configured to do better (with the
     863                # 60-second age heuristic), do that.
     864                return self.handle_old(offer, existing)
     865            return False # reject the offer
     866
     867        existing_slave_IR = existing.current_slave_IR
     868        existing_seqnum = existing.current_seqnum
     869
     870        if offer["my-incarnation"] != existing_slave_IR:
     871            # this offer is from a different invocation of the peer than we
     872            # think we're currently talking to. That means the slave has
     873            # restarted since we made our connection, so clearly our
     874            # connection is stale. Accept the offer.
     875            return True
     876        pieces = offer['last-connection'].split()
     877        offer_master_IR = pieces[0]
     878        offer_master_seqnum = int(pieces[1])
     879        if offer_master_IR != self.tub.getIncarnationString():
     880            # the peer doesn't remember talking to us at all: they remember
     881            # talking to a past life. That means our last decision message
     882            # didn't make it to them, and the last connection they *did* hear
     883            # about was from one of our previous runs. Therefore our existing
     884            # connection isn't viable, and we should accept their offer.
     885            return True
     886        # at this point, the offer's IR matches our own, so the seqnum is
     887        # worth comparing
     888        if offer_master_seqnum == existing_seqnum:
     889            # the offer demonstrates that the client knows about the same
     890            # connection that we do, and they made a new connection anyways.
     891            # From this we can conclude that our connection is stale, so we
     892            # should accept the offer.
     893            return True
     894        if offer_master_seqnum < existing_seqnum:
     895            # the offer was delayed, or (more likely) it was part of a set of
     896            # offers sent to multiple connection hints at the same time, one
     897            # of which has already completed negotiation. To avoid connection
     898            # flap, we reject the offer.
     899            return False
     900        # offer_master_seqnum > existing_seqnum indicates something really
     901        # weird has taken place.
     902        log.msg("WEIRD: offer_master_seqnum %d > existing_seqnum %d" %
     903                (offer_master_seqnum, existing_seqnum))
     904        return False # reject weirdness
     905
     906    def handle_old(self, offer, existing):
     907        # TODO: determine the age of the existing broker
     908        age = 0
     909        if age < 60:
     910            return False # reject the offer
     911        return True # accept the offer
    807912
    808913    def sendDecision(self, decision, params):
    809914        if self.debug_doTimer("sendDecision", 1,
    class Negotiation(protocol.Protocol): 
    872977            msg = ("Our hash for vocab-table-index %d (%s) does not match "
    873978                   "your hash (%s)" % (vocab_index, our_hash, vocab_hash))
    874979            raise NegotiationError(msg)
     980
     981        if self.theirTubRef in self.tub.brokers:
     982            # we're the slave, so we need to drop our existing connection and
     983            # use the one picked by the master
     984            log.msg("UNUSUAL: master told us to use a new connection, "
     985                    "so we must drop the existing one")
     986            why = ConnectionDone("replaced by a new connection")
     987            self.tub.brokers[self.theirTubRef].shutdown(why)
     988
     989        current_connection = decision.get('current-connection')
     990        if current_connection:
     991            tubID = self.theirTubRef.getTubID()
     992            if tubID != "<unauth>":
     993                self.tub.slave_table[tubID] = current_connection
     994        else:
     995            log.msg("UNUSUAL: no current-connection in decision from %s" %
     996                    self.theirTubRef)
     997
    875998        params = { 'banana-decision-version': ver,
    876999                   'initial-vocab-table-index': vocab_index,
    8771000                   }
  • foolscap/pb.py

    diff -r 31139ebb5a96 foolscap/pb.py
    a b  
    11# -*- test-case-name: foolscap.test.test_pb -*-
    22
    3 import os.path, weakref
     3import os.path, weakref, binascii
    44from zope.interface import implements
    55from twisted.internet import defer, protocol, error
    66from twisted.application import service, strports
    class Tub(service.MultiService): 
    233233        self.myCertificate = cert
    234234        self.tubID = crypto.digest32(cert.digest("sha1"))
    235235
     236    def make_incarnation(self):
     237        unique = os.urandom(8)
     238        # TODO: it'd be nice to have a sequential component, so incarnations
     239        # could be ordered, but it requires disk space
     240        sequential = None
     241        return (unique, sequential)
     242
     243    def getIncarnationString(self):
     244        return binascii.b2a_hex(self.incarnation[0])
     245
    236246    def setup(self, options):
    237247        self.options = options
    238248        self.listeners = []
    239249        self.locationHints = []
     250
     251        # duplicate-connection management
     252        self.incarnation = self.make_incarnation()
     253        # the master_table records the master-seqnum we used for the last
     254        # established connection with the given tubid. It only contains
     255        # entries for which we were the master.
     256        self.master_table = {} # k:tubid, v:seqnum
     257        # the slave_table records the (master-IR,master-seqnum) pair for the
     258        # last established conenction with the given tubid. It only contains
     259        # entries for which we were the slave.
     260        self.slave_table = {} # k:tubid, v:(master-IR,seqnum)
    240261
    241262        # local Referenceables
    242263        self.nameToReference = weakref.WeakValueDictionary()
    class Tub(service.MultiService): 
    264285
    265286        self._log_gatherer_furl = None
    266287        self._log_gatherer_furlfile = None
     288
     289        self._handle_old_duplicate_connections = False
    267290
    268291    def setOption(self, name, value):
    269292        if name == "logLocalFailures":
    class Tub(service.MultiService): 
    302325                flog.setTwistedLogBridge(tlb)
    303326            else:
    304327                flog.setTwistedLogBridge(None)
     328        elif name == "handle-old-duplicate-connections":
     329            self._handle_old_duplicate_connections = True
    305330        else:
    306331            raise KeyError("unknown option name '%s'" % name)
    307332