Package foolscap :: Module pb
[hide private]
[frames] | no frames]

Source Code for Module foolscap.pb

   1  # -*- test-case-name: foolscap.test.test_pb -*- 
   2   
   3  import os.path, weakref, binascii 
   4  from zope.interface import implements 
   5  from twisted.internet import defer, protocol, error 
   6  from twisted.application import service, strports 
   7  from twisted.python.failure import Failure 
   8   
   9  from foolscap import ipb, base32, negotiate, broker, observer, eventual, storage 
  10  from foolscap import util 
  11  from foolscap.referenceable import SturdyRef 
  12  from foolscap.tokens import PBError, BananaError, WrongTubIdError, \ 
  13       WrongNameError, NoLocationError 
  14  from foolscap.reconnector import Reconnector 
  15  from foolscap.logging import log as flog 
  16  from foolscap.logging import log 
  17  from foolscap.logging import publish as flog_publish 
  18  from foolscap.logging.log import UNUSUAL 
  19   
  20  crypto_available = False 
  21  try: 
  22      from foolscap import crypto 
  23      crypto_available = crypto.available 
  24  except ImportError: 
  25      pass 
  26   
  27   
  28  Listeners = [] 
29 -class Listener(protocol.ServerFactory):
30 """I am responsible for a single listening port, which may connect to 31 multiple Tubs. I have a strports-based Service, which I will attach as a 32 child of one of my Tubs. If that Tub disconnects, I will reparent the 33 Service to a remaining one. 34 35 Unauthenticated Tubs use a TubID of 'None'. There may be at most one such 36 Tub attached to any given Listener.""" 37 38 noisy = False 39 40 # this also serves as the ServerFactory 41
42 - def __init__(self, port, options={}, 43 negotiationClass=negotiate.Negotiation):
44 """ 45 @type port: string 46 @param port: a L{twisted.application.strports} -style description. 47 """ 48 name, args, kw = strports.parse(port, None) 49 assert name in ("TCP", "UNIX") # TODO: IPv6 50 self.port = port 51 self.options = options 52 self.negotiationClass = negotiationClass 53 self.parentTub = None 54 self.tubs = {} 55 self.redirects = {} 56 self.s = strports.service(port, self) 57 Listeners.append(self)
58
59 - def getPortnum(self):
60 """When this Listener was created with a strport string of '0' or 61 'tcp:0' (meaning 'please allocate me something'), and if the Listener 62 is active (it is attached to a Tub which is in the 'running' state), 63 this method will return the port number that was allocated. This is 64 useful for the following pattern:: 65 66 t = Tub() 67 l = t.listenOn('tcp:0') 68 t.setLocation('localhost:%d' % l.getPortnum()) 69 """ 70 71 assert self.s.running 72 name, args, kw = strports.parse(self.port, None) 73 assert name in ("TCP",) 74 return self.s._port.getHost().port
75
76 - def __repr__(self):
77 if self.tubs: 78 return "<Listener at 0x%x on %s with tubs %s>" % ( 79 abs(id(self)), 80 self.port, 81 ",".join([str(k) for k in self.tubs.keys()])) 82 return "<Listener at 0x%x on %s with no tubs>" % (abs(id(self)), 83 self.port)
84
85 - def addTub(self, tub):
86 if tub.tubID in self.tubs: 87 if tub.tubID is None: 88 raise RuntimeError("This Listener (on %s) already has an " 89 "unauthenticated Tub, you cannot add a " 90 "second one" % self.port) 91 raise RuntimeError("This Listener (on %s) is already connected " 92 "to TubID '%s'" % (self.port, tub.tubID)) 93 self.tubs[tub.tubID] = tub 94 if self.parentTub is None: 95 self.parentTub = tub 96 self.s.setServiceParent(self.parentTub)
97
98 - def removeTub(self, tub):
99 # this might return a Deferred, since the removal might cause the 100 # Listener to shut down. It might also return None. 101 del self.tubs[tub.tubID] 102 if self.parentTub is tub: 103 # we need to switch to a new one 104 tubs = self.tubs.values() 105 if tubs: 106 self.parentTub = tubs[0] 107 # TODO: I want to do this without first doing 108 # disownServiceParent, so the port remains listening. Can we 109 # do this? It looks like setServiceParent does 110 # disownServiceParent first, so it may glitch. 111 self.s.setServiceParent(self.parentTub) 112 else: 113 # no more tubs, this Listener will go away now 114 d = self.s.disownServiceParent() 115 Listeners.remove(self) 116 return d 117 return None
118
119 - def getService(self):
120 return self.s
121
122 - def addRedirect(self, tubID, location):
123 assert tubID is not None # unauthenticated Tubs don't get redirects 124 self.redirects[tubID] = location
125 - def removeRedirect(self, tubID):
126 del self.redirects[tubID]
127
128 - def startFactory(self):
129 log.msg("Starting factory %r" % self, facility="foolscap.listener") 130 return protocol.ServerFactory.startFactory(self)
131 - def stopFactory(self):
132 log.msg("Stopping factory %r" % self, facility="foolscap.listener") 133 return protocol.ServerFactory.stopFactory(self)
134 135
136 - def buildProtocol(self, addr):
137 """Return a Broker attached to me (as the service provider). 138 """ 139 lp = log.msg("%s accepting connection from %s" % (self, addr), 140 addr=(addr.host, addr.port), 141 facility="foolscap.listener") 142 proto = self.negotiationClass(logparent=lp) 143 proto.initServer(self) 144 proto.factory = self 145 return proto
146
147 - def lookupTubID(self, tubID):
148 return self.tubs.get(tubID), self.redirects.get(tubID)
149
150 -def generateSwissnumber(bits):
151 bytes = os.urandom(bits/8) 152 return base32.encode(bytes)
153
154 -class Tub(service.MultiService):
155 """I am a presence in the PB universe, also known as a Tub. 156 157 I am a Service (in the twisted.application.service.Service sense), 158 so you either need to call my startService() method before using me, 159 or setServiceParent() me to a running service. 160 161 This is the primary entry point for all PB-using applications, both 162 clients and servers. 163 164 I am known to the outside world by a base URL, which may include 165 authentication information (a yURL). This is my 'TubID'. 166 167 I contain Referenceables, and manage RemoteReferences to Referenceables 168 that live in other Tubs. 169 170 171 @param certData: if provided, use it as a certificate rather than 172 generating a new one. This is a PEM-encoded 173 private/public keypair, as returned by Tub.getCertData() 174 175 @param certFile: if provided, the Tub will store its certificate in 176 this file. If the file does not exist when the Tub is 177 created, the Tub will generate a new certificate and 178 store it here. If the file does exist, the certificate 179 will be loaded from this file. 180 181 The simplest way to use the Tub is to choose a long-term 182 location for the certificate, use certFile= to tell the 183 Tub about it, and then let the Tub manage its own 184 certificate. 185 186 You may provide certData, or certFile, (or neither), but 187 not both. 188 189 @param options: a dictionary of options that can influence connection 190 connection negotiation. Currently defined keys are: 191 - debug_slow: if True, wait half a second between 192 each negotiation response 193 194 @ivar brokers: maps TubIDs to L{Broker} instances 195 196 @ivar listeners: maps strport to TCPServer service 197 198 @ivar referenceToName: maps Referenceable to a name 199 @ivar nameToReference: maps name to Referenceable 200 201 @type tubID: string 202 @ivar tubID: a global identifier for this Tub, possibly including 203 authentication information, hash of SSL certificate 204 205 """ 206 implements(ipb.ITub) 207 208 unsafeTracebacks = True # TODO: better way to enable this 209 logLocalFailures = False 210 logRemoteFailures = False 211 debugBanana = False 212 NAMEBITS = 160 # length of swissnumber for each reference 213 TUBIDBITS = 16 # length of non-crypto tubID 214 encrypted = True 215 negotiationClass = negotiate.Negotiation 216 brokerClass = broker.Broker 217 keepaliveTimeout = 4*60 # ping when connection has been idle this long 218 disconnectTimeout = None # disconnect after this much idle time 219 tubID = None 220
221 - def __init__(self, certData=None, certFile=None, options={}):
222 service.MultiService.__init__(self) 223 self.setup(options) 224 if certFile: 225 self.setupEncryptionFile(certFile) 226 else: 227 self.setupEncryption(certData)
228
229 - def __repr__(self):
230 return "<Tub id=%s>" % self.tubID
231
232 - def setupEncryptionFile(self, certFile):
233 try: 234 certData = open(certFile, "rb").read() 235 except EnvironmentError: 236 certData = None 237 self.setupEncryption(certData) 238 239 if certData is None: 240 f = open(certFile, "wb") 241 f.write(self.getCertData()) 242 f.close()
243
244 - def setupEncryption(self, certData):
245 if not crypto_available: 246 raise RuntimeError("crypto for PB is not available, " 247 "try importing foolscap.crypto and see " 248 "what happens") 249 if certData: 250 cert = crypto.PrivateCertificate.loadPEM(certData) 251 else: 252 cert = self.createCertificate() 253 self.myCertificate = cert 254 self.tubID = crypto.digest32(cert.digest("sha1"))
255
256 - def make_incarnation(self):
257 unique = os.urandom(8) 258 # TODO: it'd be nice to have a sequential component, so incarnations 259 # could be ordered, but it requires disk space 260 sequential = None 261 self.incarnation = (unique, sequential) 262 self.incarnation_string = binascii.b2a_hex(unique)
263
264 - def getIncarnationString(self):
265 return self.incarnation_string
266
267 - def setup(self, options):
268 self.options = options 269 self.logger = flog.theLogger 270 self.listeners = [] 271 self.locationHints = [] 272 273 # duplicate-connection management 274 self.make_incarnation() 275 276 # the master_table records the master-seqnum we used for the last 277 # established connection with the given tubid. It only contains 278 # entries for which we were the master. 279 self.master_table = {} # k:tubid, v:seqnum 280 # the slave_table records the (master-IR,master-seqnum) pair for the 281 # last established connection with the given tubid. It only contains 282 # entries for which we were the slave. 283 self.slave_table = {} # k:tubid, v:(master-IR,seqnum) 284 285 # local Referenceables 286 self.nameToReference = weakref.WeakValueDictionary() 287 self.referenceToName = weakref.WeakKeyDictionary() 288 self.strongReferences = [] 289 self.nameLookupHandlers = [] 290 291 # remote stuff. Most of these use a TubRef (or NoAuthTubRef) as a 292 # dictionary key 293 self.tubConnectors = {} # maps TubRef to a TubConnector 294 self.waitingForBrokers = {} # maps TubRef to list of Deferreds 295 self.brokers = {} # maps TubRef to a Broker that connects to them 296 self.unauthenticatedBrokers = [] # inbound Brokers without TubRefs 297 self.reconnectors = [] 298 299 self._allBrokersAreDisconnected = observer.OneShotObserverList() 300 self._activeConnectors = [] 301 self._allConnectorsAreFinished = observer.OneShotObserverList() 302 303 self._pending_getReferences = [] # list of (d, furl) pairs 304 305 self._logport = None 306 self._logport_furl = None 307 self._logport_furlfile = None 308 309 self._log_gatherer_furl = None 310 self._log_gatherer_furlfile = None 311 self._log_gatherer_connectors = {} # maps furl to reconnector 312 313 self._handle_old_duplicate_connections = False 314 self._expose_remote_exception_types = True
315
316 - def setOption(self, name, value):
317 if name == "logLocalFailures": 318 # log (with log.err) any exceptions that occur during the 319 # execution of a local Referenceable's method, which is invoked 320 # on behalf of a remote caller. These exceptions are reported to 321 # the remote caller through their callRemote's Deferred as usual: 322 # this option enables logging on the callee's side (i.e. our 323 # side) as well. 324 # 325 # TODO: This does not yet include Violations which were raised 326 # because the inbound callRemote had arguments that didn't meet 327 # our specifications. But it should. 328 self.logLocalFailures = value 329 elif name == "logRemoteFailures": 330 # log (with log.err) any exceptions that occur during the 331 # execution of a remote Referenceabe's method, invoked on behalf 332 # of a local RemoteReference.callRemote(). These exceptions are 333 # reported to our local caller through the usual Deferred.errback 334 # mechanism: this enables logging on the caller's side (i.e. our 335 # side) as well. 336 self.logRemoteFailures = value 337 elif name == "keepaliveTimeout": 338 self.keepaliveTimeout = value 339 elif name == "disconnectTimeout": 340 self.disconnectTimeout = value 341 elif name == "logport-furlfile": 342 self.setLogPortFURLFile(value) 343 elif name == "log-gatherer-furl": 344 self.setLogGathererFURL(value) 345 elif name == "log-gatherer-furlfile": 346 self.setLogGathererFURLFile(value) 347 elif name == "bridge-twisted-logs": 348 assert value is not False, "cannot unbridge twisted logs" 349 if value is True: 350 return flog.bridgeLogsFromTwisted(self.tubID) 351 else: 352 # for tests, bridge logs from a specific twisted LogPublisher 353 return flog.bridgeLogsFromTwisted(self.tubID, 354 twisted_logger=value) 355 elif name == "handle-old-duplicate-connections": 356 if value is True: 357 value = 60 358 self._handle_old_duplicate_connections = int(value) 359 elif name == "expose-remote-exception-types": 360 self._expose_remote_exception_types = bool(value) 361 else: 362 raise KeyError("unknown option name '%s'" % name)
363
364 - def setLogGathererFURL(self, gatherer_furl):
365 assert not self._log_gatherer_furl 366 self._log_gatherer_furl = gatherer_furl 367 self._maybeConnectToGatherer()
368
369 - def setLogGathererFURLFile(self, gatherer_furlfile):
370 assert not self._log_gatherer_furlfile 371 self._log_gatherer_furlfile = gatherer_furlfile 372 self._maybeConnectToGatherer()
373
374 - def _maybeConnectToGatherer(self):
375 if not self.locationHints: 376 return 377 furls = [] 378 if self._log_gatherer_furl: 379 furls.append(self._log_gatherer_furl) 380 if self._log_gatherer_furlfile: 381 try: 382 # allow multiple lines 383 for line in open(self._log_gatherer_furlfile, "r").readlines(): 384 furl = line.strip() 385 if furl: 386 furls.append(furl) 387 except EnvironmentError: 388 pass 389 for f in furls: 390 if f in self._log_gatherer_connectors: 391 continue 392 connector = self.connectTo(f, self._log_gatherer_connected) 393 self._log_gatherer_connectors[f] = connector
394
395 - def _log_gatherer_connected(self, rref):
396 # we want the logport's furl to be nailed down now, so we'll use the 397 # right (persistent) name even if the user never calls 398 # tub.getLogPortFURL() directly. 399 ignored = self.getLogPortFURL() 400 del ignored 401 tubID = self.tubID 402 if tubID is None: 403 # RILogGatherer.logport requires a string for nodeid= 404 tubID = '<unauth>' 405 rref.callRemoteOnly('logport', tubID, self.getLogPort())
406 407
408 - def getLogPort(self):
409 if not self.locationHints: 410 raise NoLocationError 411 return self._maybeCreateLogPort()
412
413 - def _maybeCreateLogPort(self):
414 if not self._logport: 415 self._logport = flog_publish.LogPublisher(self.logger) 416 return self._logport
417
418 - def setLogPortFURLFile(self, furlfile):
419 self._logport_furlfile = furlfile 420 self._maybeCreateLogPortFURLFile()
421
423 if not self._logport_furlfile: 424 return 425 if not self.locationHints: 426 return 427 # getLogPortFURL() creates the logport-furlfile as a side-effect 428 ignored = self.getLogPortFURL() 429 del ignored
430
431 - def getLogPortFURL(self):
432 if not self.locationHints: 433 raise NoLocationError 434 if self._logport_furl: 435 return self._logport_furl 436 furlfile = self._logport_furlfile 437 # the Tub must be running and configured (setLocation) by now 438 self._logport_furl = self.registerReference(self.getLogPort(), 439 furlFile=furlfile) 440 return self._logport_furl
441 442
443 - def log(self, *args, **kwargs):
444 kwargs['tubID'] = self.tubID 445 return log.msg(*args, **kwargs)
446
447 - def createCertificate(self):
448 # this is copied from test_sslverify.py 449 dn = crypto.DistinguishedName(commonName="newpb_thingy") 450 keypair = crypto.KeyPair.generate() 451 req = keypair.certificateRequest(dn) 452 certData = keypair.signCertificateRequest(dn, req, 453 lambda dn: True, 454 132) 455 cert = keypair.newCertificate(certData) 456 #opts = cert.options() 457 # 'opts' can be given to reactor.listenSSL, or to transport.startTLS 458 459 return cert
460
461 - def getCertData(self):
462 # the string returned by this method can be used as the certData= 463 # argument to create a new Tub with the same identity. TODO: actually 464 # test this, I don't know if dump/keypair.newCertificate is the right 465 # pair of methods. 466 return self.myCertificate.dumpPEM()
467
468 - def setLocation(self, *hints):
469 """Tell this service what its location is: a host:port description of 470 how to reach it from the outside world. You need to use this because 471 the Tub can't do it without help. If you do a 472 C{s.listenOn('tcp:1234')}, and the host is known as 473 C{foo.example.com}, then it would be appropriate to do:: 474 475 s.setLocation('foo.example.com:1234') 476 477 You must set the location before you can register any references. 478 479 Encrypted Tubs can have multiple location hints, just provide 480 multiple arguments. Unauthenticated Tubs can only have one location.""" 481 482 if not self.encrypted and len(hints) > 1: 483 raise PBError("Unauthenticated tubs may only have one " 484 "location hint") 485 if self.locationHints: 486 raise PBError("Tub.setLocation() can only be called once") 487 self.locationHints = hints 488 self._maybeCreateLogPortFURLFile() 489 self._maybeConnectToGatherer()
490
491 - def setLocationAutomatically(self, *extra_addresses):
492 """Determine one of this host's publically-visible IP addresses and 493 use it to set our location. This uses whatever source address would 494 be used to get to a well-known public host (A.ROOT-SERVERS.NET), 495 which is effectively the interface on which a default route lives. 496 This is neither very pretty (IP address instead of hostname) nor 497 guaranteed to work (it may very well be a 192.168 'private' address), 498 but for publically-visible hosts this will probably produce a useable 499 FURL. 500 501 This method returns a Deferred that will fire once the location is 502 actually established. Calls to registerReference() must be put off 503 until the location has been set. And of course, you must call 504 listenOn() before calling setLocationAutomatically().""" 505 506 # first, make sure the reactor is actually running, by using the 507 # eventual-send queue 508 d = eventual.fireEventually() 509 510 def _reactor_running(res): 511 assert self.running 512 # we can't use get_local_ip_for until the reactor is running 513 return util.get_local_ip_for()
514 d.addCallback(_reactor_running) 515 516 def _got_local_ip(local_address): 517 local_addresses = set(extra_addresses) 518 if local_address: 519 local_addresses.add(local_address) 520 local_addresses.add("127.0.0.1") 521 locations = set() 522 for l in self.getListeners(): 523 portnum = l.getPortnum() 524 for addr in local_addresses: 525 locations.add("%s:%d" % (addr, portnum)) 526 locations = list(locations) 527 locations.sort() 528 assert len(locations) >= 1 529 location = ",".join(locations) 530 self.setLocation(location)
531 d.addCallback(_got_local_ip) 532 return d 533
534 - def listenOn(self, what, options={}):
535 """Start listening for connections. 536 537 @type what: string or Listener instance 538 @param what: a L{twisted.application.strports} -style description, 539 or a L{Listener} instance returned by a previous call to 540 listenOn. 541 @param options: a dictionary of options that can influence connection 542 negotiation before the target Tub has been determined 543 544 @return: The Listener object that was created. This can be used to 545 stop listening later on, to have another Tub listen on the same port, 546 and to figure out which port was allocated when you used a strports 547 specification of 'tcp:0'. """ 548 549 if type(what) is str: 550 l = Listener(what, options, self.negotiationClass) 551 else: 552 assert not options 553 l = what 554 assert l not in self.listeners 555 l.addTub(self) 556 self.listeners.append(l) 557 return l
558
559 - def stopListeningOn(self, l):
560 # this returns a Deferred when the port is shut down 561 self.listeners.remove(l) 562 d = defer.maybeDeferred(l.removeTub, self) 563 return d
564
565 - def getListeners(self):
566 """Return the set of Listener objects that allow the outside world to 567 connect to this Tub.""" 568 return self.listeners[:]
569
570 - def clone(self):
571 """Return a new Tub (with a different ID), listening on the same 572 ports as this one.""" 573 if self.encrypted: 574 t = Tub() 575 else: 576 t = UnauthenticatedTub() 577 for l in self.listeners: 578 t.listenOn(l) 579 return t
580
581 - def getTubID(self):
582 return self.tubID
583 - def getShortTubID(self):
584 return self.tubID[:4]
585
586 - def connectorStarted(self, c):
587 assert self.running 588 self._activeConnectors.append(c)
589 - def connectorFinished(self, c):
590 if c in self._activeConnectors: 591 self._activeConnectors.remove(c) 592 if not self.running and not self._activeConnectors: 593 self._allConnectorsAreFinished.fire(self)
594
595 - def startService(self):
596 service.MultiService.startService(self) 597 for d,sturdy in self._pending_getReferences: 598 d1 = eventual.fireEventually(sturdy) 599 d1.addCallback(self.getReference) 600 d1.addBoth(lambda res, d=d: d.callback(res)) 601 del self._pending_getReferences 602 for rc in self.reconnectors: 603 eventual.eventually(rc.startConnecting, self)
604
605 - def _tubsAreNotRestartable(self, *args, **kwargs):
606 raise RuntimeError("Sorry, but Tubs cannot be restarted.")
607 - def _tubHasBeenShutDown(self, *args, **kwargs):
608 raise RuntimeError("Sorry, but this Tub has been shut down.")
609
610 - def stopService(self):
611 # note that once you stopService a Tub, I cannot be restarted. (at 612 # least this code is not designed to make that possible.. it might be 613 # doable in the future). 614 assert self.running 615 self.startService = self._tubsAreNotRestartable 616 self.getReference = self._tubHasBeenShutDown 617 self.connectTo = self._tubHasBeenShutDown 618 dl = [] 619 for rc in self.reconnectors: 620 rc.stopConnecting() 621 del self.reconnectors 622 for l in self.listeners: 623 # TODO: rethink this, what I want is for stopService to cause all 624 # Listeners to shut down, but I'm not sure this is the right way 625 # to do it. 626 d = l.removeTub(self) 627 if isinstance(d, defer.Deferred): 628 dl.append(d) 629 dl.append(service.MultiService.stopService(self)) 630 631 if self._activeConnectors: 632 dl.append(self._allConnectorsAreFinished.whenFired()) 633 for c in self._activeConnectors: 634 c.shutdown() 635 636 if self.brokers or self.unauthenticatedBrokers: 637 dl.append(self._allBrokersAreDisconnected.whenFired()) 638 why = Failure(error.ConnectionDone("Tub.stopService was called")) 639 for b in self.brokers.values(): 640 b.shutdown(why, fireDisconnectWatchers=False) 641 for b in self.unauthenticatedBrokers: 642 b.shutdown(why, fireDisconnectWatchers=False) 643 644 return defer.DeferredList(dl)
645
646 - def generateSwissnumber(self, bits):
647 return generateSwissnumber(bits)
648
649 - def buildURL(self, name):
650 if self.encrypted: 651 # TODO: IPv6 dotted-quad addresses have colons, but need to have 652 # host:port 653 hints = ",".join(self.locationHints) 654 return "pb://" + self.tubID + "@" + hints + "/" + name 655 return "pbu://" + self.locationHints[0] + "/" + name
656
657 - def registerReference(self, ref, name=None, furlFile=None):
658 """Make a Referenceable available to the outside world. A URL is 659 returned which can be used to access this object. This registration 660 will remain in effect (and the Tub will retain a reference to the 661 object to keep it meaningful) until explicitly unregistered, or the 662 Tub is shut down. 663 664 @type name: string (optional) 665 @param name: if provided, the object will be registered with this 666 name. If not, a random (unguessable) string will be 667 used. 668 669 @param furlFile: if provided, get the name from this file (if 670 it exists), and write the new FURL to this file. 671 If 'name=' is also provided, it is used for the 672 name, but the FURL is still written to this file. 673 674 @rtype: string 675 @return: the URL which points to this object. This URL can be passed 676 to Tub.getReference() in any Tub on any host which can reach this 677 one. 678 """ 679 680 if not self.locationHints: 681 raise RuntimeError("you must setLocation() before " 682 "you can registerReference()") 683 oldfurl = None 684 if furlFile: 685 try: 686 oldfurl = open(furlFile, "r").read().strip() 687 except EnvironmentError: 688 pass 689 if oldfurl: 690 sr = SturdyRef(oldfurl) 691 if name is None: 692 name = sr.name 693 if self.tubID != sr.tubID: 694 raise WrongTubIdError("I cannot keep using the old FURL from %s" 695 " because it does not have the same" 696 " TubID as I do (%s)" % 697 (furlFile, self.tubID)) 698 if name != sr.name: 699 raise WrongNameError("I cannot keep using the old FURL from %s" 700 " because you called registerReference" 701 " with a new name (%s)" % 702 (furlFile, name)) 703 name = self._assignName(ref, name) 704 assert name 705 if ref not in self.strongReferences: 706 self.strongReferences.append(ref) 707 furl = self.buildURL(name) 708 if furlFile: 709 need_to_chmod = not os.path.exists(furlFile) 710 open(furlFile, "w").write(furl + "\n") 711 if need_to_chmod: 712 os.chmod(furlFile, 0600) 713 return furl
714 715 # this is called by either registerReference or by 716 # getOrCreateURLForReference
717 - def _assignName(self, ref, preferred_name=None):
718 """Make a Referenceable available to the outside world, but do not 719 retain a strong reference to it. If we must create a new name, use 720 preferred_name. If that is None, use a random unguessable name. 721 """ 722 if not self.locationHints: 723 # without a location, there is no point in giving it a name 724 return None 725 if self.referenceToName.has_key(ref): 726 return self.referenceToName[ref] 727 name = preferred_name 728 if not name: 729 name = self.generateSwissnumber(self.NAMEBITS) 730 self.referenceToName[ref] = name 731 self.nameToReference[name] = ref 732 return name
733
734 - def getReferenceForName(self, name):
735 if name in self.nameToReference: 736 return self.nameToReference[name] 737 for lookup in self.nameLookupHandlers: 738 ref = lookup(name) 739 if ref: 740 if ref not in self.referenceToName: 741 self.referenceToName[ref] = name 742 return ref 743 # don't reveal the full swissnum 744 hint = name[:2] 745 raise KeyError("unable to find reference for name starting with '%s'" 746 % hint)
747
748 - def getReferenceForURL(self, url):
749 # TODO: who should this be used by? 750 sturdy = SturdyRef(url) 751 assert sturdy.tubID == self.tubID 752 return self.getReferenceForName(sturdy.name)
753
754 - def getOrCreateURLForReference(self, ref):
755 """Return the global URL for the reference, if there is one, or None 756 if there is not.""" 757 name = self._assignName(ref) 758 if name: 759 return self.buildURL(name) 760 return None
761
762 - def revokeReference(self, ref):
763 # TODO 764 pass
765
766 - def unregisterURL(self, url):
767 sturdy = SturdyRef(url) 768 name = sturdy.name 769 ref = self.nameToReference[name] 770 del self.nameToReference[name] 771 del self.referenceToName[ref] 772 self.revokeReference(ref)
773
774 - def unregisterReference(self, ref):
775 name = self.referenceToName[ref] 776 url = self.buildURL(name) 777 sturdy = SturdyRef(url) 778 name = sturdy.name 779 del self.nameToReference[name] 780 del self.referenceToName[ref] 781 if ref in self.strongReferences: 782 self.strongReferences.remove(ref) 783 self.revokeReference(ref)
784
785 - def registerNameLookupHandler(self, lookup):
786 """Add a function to help convert names to Referenceables. 787 788 When remote systems pass a FURL to their Tub.getReference(), our Tub 789 will be asked to locate a Referenceable for the name inside that 790 furl. The normal mechanism for this is to look at the table 791 maintained by registerReference() and unregisterReference(). If the 792 name does not exist in that table, other 'lookup handler' functions 793 are given a chance. Each lookup handler is asked in turn, and the 794 first which returns a non-None value wins. 795 796 This may be useful for cases where the furl represents an object that 797 lives on disk, or is generated on demand: rather than creating all 798 possible Referenceables at startup, the lookup handler can create or 799 retrieve the objects only when someone asks for them. 800 801 Note that constructing the FURLs of these objects may be non-trivial. 802 It is safe to create an object, use tub.registerReference in one 803 invocation of a program to obtain (and publish) the furl, parse the 804 furl to extract the name, save the contents of the object on disk, 805 then in a later invocation of the program use a lookup handler to 806 retrieve the object from disk. This approach means the objects that 807 are created in a given invocation stick around (inside 808 tub.strongReferences) for the rest of that invocation. An alternatve 809 approach is to create the object but *not* use tub.registerReference, 810 but in that case you have to construct the FURL yourself, and the Tub 811 does not currently provide any support for doing this robustly. 812 813 @param lookup: a callable which accepts a name (as a string) and 814 returns either a Referenceable or None. Note that 815 these strings should not contain a slash, a question 816 mark, or an ampersand, as these are reserved in the 817 FURL for later expansion (to add parameters beyond the 818 object name) 819 """ 820 self.nameLookupHandlers.append(lookup)
821
822 - def unregisterNameLookupHandler(self, lookup):
823 self.nameLookupHandlers.remove(lookup)
824
825 - def getReference(self, sturdyOrURL):
826 """Acquire a RemoteReference for the given SturdyRef/URL. 827 828 The Tub must be running (i.e. Tub.startService()) when this is 829 invoked. Future releases may relax this requirement. 830 831 @return: a Deferred that fires with the RemoteReference. Any failures 832 are returned asynchronously. 833 """ 834 835 return defer.maybeDeferred(self._getReference, sturdyOrURL)
836
837 - def _getReference(self, sturdyOrURL):
838 if isinstance(sturdyOrURL, SturdyRef): 839 sturdy = sturdyOrURL 840 else: 841 sturdy = SturdyRef(sturdyOrURL) 842 # pb->pb: ok, requires crypto 843 # pbu->pb: ok, requires crypto 844 # pbu->pbu: ok 845 # pb->pbu: ok, requires crypto 846 if sturdy.encrypted and not crypto_available: 847 e = BananaError("crypto for PB is not available, " 848 "we cannot handle encrypted PB-URLs like %s" 849 % sturdy.getURL()) 850 return defer.fail(e) 851 852 if not self.running: 853 # queue their request for service once the Tub actually starts 854 log.msg("Tub.getReference(%s) queued until Tub.startService called" 855 % sturdy, facility="foolscap.tub") 856 d = defer.Deferred() 857 self._pending_getReferences.append((d, sturdy)) 858 return d 859 860 name = sturdy.name 861 d = self.getBrokerForTubRef(sturdy.getTubRef()) 862 d.addCallback(lambda b: b.getYourReferenceByName(name)) 863 return d
864
865 - def connectTo(self, _sturdyOrURL, _cb, *args, **kwargs):
866 """Establish (and maintain) a connection to a given PBURL. 867 868 I establish a connection to the PBURL and run a callback to inform 869 the caller about the newly-available RemoteReference. If the 870 connection is lost, I schedule a reconnection attempt for the near 871 future. If that one fails, I keep trying at longer and longer 872 intervals (exponential backoff). 873 874 I accept a callback which will be fired each time a connection 875 attempt succeeds. This callback is run with the new RemoteReference 876 and any additional args/kwargs provided to me. The callback should 877 then use rref.notifyOnDisconnect() to get a message when the 878 connection goes away. At some point after it goes away, the 879 Reconnector will reconnect. 880 881 The Tub must be running (i.e. Tub.startService()) when this is 882 invoked. Future releases may relax this requirement. 883 884 I return a Reconnector object. When you no longer want to maintain 885 this connection, call the stopConnecting() method on the Reconnector. 886 I promise to not invoke your callback after you've called 887 stopConnecting(), even if there was already a connection attempt in 888 progress. If you had an active connection before calling 889 stopConnecting(), you will still have access to it, until it breaks 890 on its own. (I will not attempt to break existing connections, I will 891 merely stop trying to create new ones). All my Reconnector objects 892 will be shut down when the Tub is stopped. 893 894 Usage:: 895 896 def _got_ref(rref, arg1, arg2): 897 rref.callRemote('hello again') 898 # etc 899 rc = tub.connectTo(_got_ref, 'arg1', 'arg2') 900 ... 901 rc.stopConnecting() # later 902 """ 903 904 rc = Reconnector(_sturdyOrURL, _cb, args, kwargs) 905 if self.running: 906 rc.startConnecting(self) 907 else: 908 self.log("Tub.connectTo(%s) queued until Tub.startService called" 909 % _sturdyOrURL, level=UNUSUAL) 910 self.reconnectors.append(rc) 911 return rc
912
913 - def serialize(self, obj):
914 b = broker.StorageBroker(None) 915 b.setTub(self) 916 d = storage.serialize(obj, banana=b) 917 return d
918
919 - def unserialize(self, data):
920 b = broker.StorageBroker(None) 921 b.setTub(self) 922 d = storage.unserialize(data, banana=b) 923 assert isinstance(d, defer.Deferred) 924 return d
925 926 # beyond here are internal methods, not for use by application code 927 928 # _removeReconnector is called by the Reconnector
929 - def _removeReconnector(self, rc):
930 self.reconnectors.remove(rc)
931
932 - def getBrokerForTubRef(self, tubref):
933 if tubref in self.brokers: 934 return defer.succeed(self.brokers[tubref]) 935 if tubref.getTubID() == self.tubID: 936 b = self._createLoopbackBroker(tubref) 937 # _createLoopbackBroker will call brokerAttached, which will add 938 # it to self.brokers 939 # TODO: stash this in self.brokers, so we don't create multiples 940 return defer.succeed(b) 941 942 d = defer.Deferred() 943 if tubref not in self.waitingForBrokers: 944 self.waitingForBrokers[tubref] = [] 945 self.waitingForBrokers[tubref].append(d) 946 947 if tubref not in self.tubConnectors: 948 # the TubConnector will call our brokerAttached when it finishes 949 # negotiation, which will fire waitingForBrokers[tubref]. 950 c = negotiate.TubConnector(self, tubref) 951 self.tubConnectors[tubref] = c 952 c.connect() 953 954 return d
955
956 - def _createLoopbackBroker(self, tubref):
957 t1,t2 = broker.LoopbackTransport(), broker.LoopbackTransport() 958 t1.setPeer(t2); t2.setPeer(t1) 959 n = negotiate.Negotiation() 960 params = n.loopbackDecision() 961 b1,b2 = (self.brokerClass(tubref, params), 962 self.brokerClass(tubref, params)) 963 # we treat b1 as "our" broker, and b2 as "theirs", and we pretend 964 # that b2 has just connected to us. We keep track of b1, and b2 keeps 965 # track of us. 966 b1.setTub(self) 967 b2.setTub(self) 968 t1.protocol = b1; t2.protocol = b2 969 b1.makeConnection(t1); b2.makeConnection(t2) 970 self.brokerAttached(tubref, b1, False) 971 return b1
972
973 - def connectionFailed(self, tubref, why):
974 # we previously initiated an outbound TubConnector to this tubref, but 975 # it was unable to establish a connection. 'why' is the most useful 976 # Failure that occurred (i.e. it is a NegotiationError if we made it 977 # that far, otherwise it's a ConnectionFailed). 978 979 if tubref in self.tubConnectors: 980 del self.tubConnectors[tubref] 981 if tubref in self.brokers: 982 # oh, but fortunately an inbound connection must have succeeded. 983 # Nevermind. 984 return 985 986 # inform hopeful Broker-waiters that they aren't getting one 987 if tubref in self.waitingForBrokers: 988 waiting = self.waitingForBrokers[tubref] 989 del self.waitingForBrokers[tubref] 990 for d in waiting: 991 d.errback(why)
992
993 - def brokerAttached(self, tubref, broker, isClient):
994 assert self.running 995 if not tubref: 996 # this is an inbound connection from an unauthenticated Tub 997 assert not isClient 998 # we just track it so we can disconnect it later 999 self.unauthenticatedBrokers.append(broker) 1000 return 1001 1002 if tubref in self.tubConnectors: 1003 # we initiated an outbound connection to this tubref 1004 if not isClient: 1005 # however, the connection we got was from an inbound 1006 # connection. The completed (inbound) connection wins, so 1007 # abandon the outbound TubConnector 1008 self.tubConnectors[tubref].shutdown() 1009 1010 # we don't need the TubConnector any more 1011 del self.tubConnectors[tubref] 1012 1013 if tubref in self.brokers: 1014 # this shouldn't happen: acceptDecision is supposed to drop any 1015 # existing old connection first. 1016 self.log("ERROR: unexpected duplicate connection from %s" % tubref) 1017 raise BananaError("unexpected duplicate connection") 1018 self.brokers[tubref] = broker 1019 1020 # now inform everyone who's been waiting on it 1021 if tubref in self.waitingForBrokers: 1022 for d in self.waitingForBrokers[tubref]: 1023 eventual.eventually(d.callback, broker) 1024 del self.waitingForBrokers[tubref]
1025
1026 - def brokerDetached(self, broker, why):
1027 # a loopback connection will produce two Brokers that both use the 1028 # same tubref. Both will shut down about the same time. Make sure 1029 # this doesn't confuse us. 1030 had_connections = (bool(self.brokers) or 1031 bool(self.unauthenticatedBrokers)) 1032 # the Broker will have already severed all active references 1033 for tubref in self.brokers.keys(): 1034 if self.brokers[tubref] is broker: 1035 del self.brokers[tubref] 1036 if broker in self.unauthenticatedBrokers: 1037 self.unauthenticatedBrokers.remove(broker) 1038 # if the Tub has already shut down, we may need to notify observers 1039 # who are waiting for all of our connections to finish shutting down. 1040 # Only do this if we actually transitioned from having some 1041 # connections to not having any connections. 1042 1043 if (not self.running 1044 and had_connections 1045 and not self.brokers 1046 and not self.unauthenticatedBrokers): 1047 self._allBrokersAreDisconnected.fire(self)
1048
1049 - def debug_listBrokers(self):
1050 # return a list of (tubref, inbound, outbound) tuples. The tubref 1051 # tells you which broker this is, 'inbound' is a list of 1052 # InboundDelivery objects (one per outstanding inbound message), and 1053 # 'outbound' is a list of PendingRequest objects (one per message 1054 # that's waiting on a remote broker to complete). 1055 output = [] 1056 all_brokers = (self.brokers.items() 1057 + [("unauth",broker) 1058 for broker in self.unauthenticatedBrokers]) 1059 for tubref,broker in all_brokers: 1060 inbound = broker.inboundDeliveryQueue[:] 1061 outbound = [pr 1062 for (reqID, pr) in 1063 sorted(broker.waitingForAnswers.items()) ] 1064 output.append( (str(tubref), inbound, outbound) ) 1065 output.sort(lambda x,y: cmp( (len(x[1]), len(x[2])), 1066 (len(y[1]), len(y[2])) )) 1067 return output
1068 1069
1070 -class UnauthenticatedTub(Tub):
1071 encrypted = False 1072
1073 - def __init__(self, tubID=None, options={}):
1074 service.MultiService.__init__(self) 1075 self.setup(options) 1076 self.myCertificate = None 1077 assert not tubID # not yet 1078 self.tubID = tubID
1079
1080 - def getTubID(self):
1081 return "<unauth>"
1082 - def getShortTubID(self):
1083 return "<unauth>"
1084