1
2
3 import os.path, weakref, binascii
4 from zope.interface import implements
5 from twisted.internet import defer, protocol, error
6 from twisted.application import service, strports
7 from twisted.python.failure import Failure
8
9 from foolscap import ipb, base32, negotiate, broker, observer, eventual, storage
10 from foolscap import util
11 from foolscap.referenceable import SturdyRef
12 from foolscap.tokens import PBError, BananaError, WrongTubIdError, \
13 WrongNameError, NoLocationError
14 from foolscap.reconnector import Reconnector
15 from foolscap.logging import log as flog
16 from foolscap.logging import log
17 from foolscap.logging import publish as flog_publish
18 from foolscap.logging.log import UNUSUAL
19
20 crypto_available = False
21 try:
22 from foolscap import crypto
23 crypto_available = crypto.available
24 except ImportError:
25 pass
26
27
28 Listeners = []
30 """I am responsible for a single listening port, which may connect to
31 multiple Tubs. I have a strports-based Service, which I will attach as a
32 child of one of my Tubs. If that Tub disconnects, I will reparent the
33 Service to a remaining one.
34
35 Unauthenticated Tubs use a TubID of 'None'. There may be at most one such
36 Tub attached to any given Listener."""
37
38 noisy = False
39
40
41
44 """
45 @type port: string
46 @param port: a L{twisted.application.strports} -style description.
47 """
48 name, args, kw = strports.parse(port, None)
49 assert name in ("TCP", "UNIX")
50 self.port = port
51 self.options = options
52 self.negotiationClass = negotiationClass
53 self.parentTub = None
54 self.tubs = {}
55 self.redirects = {}
56 self.s = strports.service(port, self)
57 Listeners.append(self)
58
60 """When this Listener was created with a strport string of '0' or
61 'tcp:0' (meaning 'please allocate me something'), and if the Listener
62 is active (it is attached to a Tub which is in the 'running' state),
63 this method will return the port number that was allocated. This is
64 useful for the following pattern::
65
66 t = Tub()
67 l = t.listenOn('tcp:0')
68 t.setLocation('localhost:%d' % l.getPortnum())
69 """
70
71 assert self.s.running
72 name, args, kw = strports.parse(self.port, None)
73 assert name in ("TCP",)
74 return self.s._port.getHost().port
75
77 if self.tubs:
78 return "<Listener at 0x%x on %s with tubs %s>" % (
79 abs(id(self)),
80 self.port,
81 ",".join([str(k) for k in self.tubs.keys()]))
82 return "<Listener at 0x%x on %s with no tubs>" % (abs(id(self)),
83 self.port)
84
86 if tub.tubID in self.tubs:
87 if tub.tubID is None:
88 raise RuntimeError("This Listener (on %s) already has an "
89 "unauthenticated Tub, you cannot add a "
90 "second one" % self.port)
91 raise RuntimeError("This Listener (on %s) is already connected "
92 "to TubID '%s'" % (self.port, tub.tubID))
93 self.tubs[tub.tubID] = tub
94 if self.parentTub is None:
95 self.parentTub = tub
96 self.s.setServiceParent(self.parentTub)
97
99
100
101 del self.tubs[tub.tubID]
102 if self.parentTub is tub:
103
104 tubs = self.tubs.values()
105 if tubs:
106 self.parentTub = tubs[0]
107
108
109
110
111 self.s.setServiceParent(self.parentTub)
112 else:
113
114 d = self.s.disownServiceParent()
115 Listeners.remove(self)
116 return d
117 return None
118
121
123 assert tubID is not None
124 self.redirects[tubID] = location
126 del self.redirects[tubID]
127
129 log.msg("Starting factory %r" % self, facility="foolscap.listener")
130 return protocol.ServerFactory.startFactory(self)
132 log.msg("Stopping factory %r" % self, facility="foolscap.listener")
133 return protocol.ServerFactory.stopFactory(self)
134
135
137 """Return a Broker attached to me (as the service provider).
138 """
139 lp = log.msg("%s accepting connection from %s" % (self, addr),
140 addr=(addr.host, addr.port),
141 facility="foolscap.listener")
142 proto = self.negotiationClass(logparent=lp)
143 proto.initServer(self)
144 proto.factory = self
145 return proto
146
148 return self.tubs.get(tubID), self.redirects.get(tubID)
149
151 bytes = os.urandom(bits/8)
152 return base32.encode(bytes)
153
154 -class Tub(service.MultiService):
155 """I am a presence in the PB universe, also known as a Tub.
156
157 I am a Service (in the twisted.application.service.Service sense),
158 so you either need to call my startService() method before using me,
159 or setServiceParent() me to a running service.
160
161 This is the primary entry point for all PB-using applications, both
162 clients and servers.
163
164 I am known to the outside world by a base URL, which may include
165 authentication information (a yURL). This is my 'TubID'.
166
167 I contain Referenceables, and manage RemoteReferences to Referenceables
168 that live in other Tubs.
169
170
171 @param certData: if provided, use it as a certificate rather than
172 generating a new one. This is a PEM-encoded
173 private/public keypair, as returned by Tub.getCertData()
174
175 @param certFile: if provided, the Tub will store its certificate in
176 this file. If the file does not exist when the Tub is
177 created, the Tub will generate a new certificate and
178 store it here. If the file does exist, the certificate
179 will be loaded from this file.
180
181 The simplest way to use the Tub is to choose a long-term
182 location for the certificate, use certFile= to tell the
183 Tub about it, and then let the Tub manage its own
184 certificate.
185
186 You may provide certData, or certFile, (or neither), but
187 not both.
188
189 @param options: a dictionary of options that can influence connection
190 connection negotiation. Currently defined keys are:
191 - debug_slow: if True, wait half a second between
192 each negotiation response
193
194 @ivar brokers: maps TubIDs to L{Broker} instances
195
196 @ivar listeners: maps strport to TCPServer service
197
198 @ivar referenceToName: maps Referenceable to a name
199 @ivar nameToReference: maps name to Referenceable
200
201 @type tubID: string
202 @ivar tubID: a global identifier for this Tub, possibly including
203 authentication information, hash of SSL certificate
204
205 """
206 implements(ipb.ITub)
207
208 unsafeTracebacks = True
209 logLocalFailures = False
210 logRemoteFailures = False
211 debugBanana = False
212 NAMEBITS = 160
213 TUBIDBITS = 16
214 encrypted = True
215 negotiationClass = negotiate.Negotiation
216 brokerClass = broker.Broker
217 keepaliveTimeout = 4*60
218 disconnectTimeout = None
219 tubID = None
220
221 - def __init__(self, certData=None, certFile=None, options={}):
228
230 return "<Tub id=%s>" % self.tubID
231
233 try:
234 certData = open(certFile, "rb").read()
235 except EnvironmentError:
236 certData = None
237 self.setupEncryption(certData)
238
239 if certData is None:
240 f = open(certFile, "wb")
241 f.write(self.getCertData())
242 f.close()
243
255
257 unique = os.urandom(8)
258
259
260 sequential = None
261 self.incarnation = (unique, sequential)
262 self.incarnation_string = binascii.b2a_hex(unique)
263
265 return self.incarnation_string
266
267 - def setup(self, options):
268 self.options = options
269 self.logger = flog.theLogger
270 self.listeners = []
271 self.locationHints = []
272
273
274 self.make_incarnation()
275
276
277
278
279 self.master_table = {}
280
281
282
283 self.slave_table = {}
284
285
286 self.nameToReference = weakref.WeakValueDictionary()
287 self.referenceToName = weakref.WeakKeyDictionary()
288 self.strongReferences = []
289 self.nameLookupHandlers = []
290
291
292
293 self.tubConnectors = {}
294 self.waitingForBrokers = {}
295 self.brokers = {}
296 self.unauthenticatedBrokers = []
297 self.reconnectors = []
298
299 self._allBrokersAreDisconnected = observer.OneShotObserverList()
300 self._activeConnectors = []
301 self._allConnectorsAreFinished = observer.OneShotObserverList()
302
303 self._pending_getReferences = []
304
305 self._logport = None
306 self._logport_furl = None
307 self._logport_furlfile = None
308
309 self._log_gatherer_furl = None
310 self._log_gatherer_furlfile = None
311 self._log_gatherer_connectors = {}
312
313 self._handle_old_duplicate_connections = False
314 self._expose_remote_exception_types = True
315
363
365 assert not self._log_gatherer_furl
366 self._log_gatherer_furl = gatherer_furl
367 self._maybeConnectToGatherer()
368
370 assert not self._log_gatherer_furlfile
371 self._log_gatherer_furlfile = gatherer_furlfile
372 self._maybeConnectToGatherer()
373
375 if not self.locationHints:
376 return
377 furls = []
378 if self._log_gatherer_furl:
379 furls.append(self._log_gatherer_furl)
380 if self._log_gatherer_furlfile:
381 try:
382
383 for line in open(self._log_gatherer_furlfile, "r").readlines():
384 furl = line.strip()
385 if furl:
386 furls.append(furl)
387 except EnvironmentError:
388 pass
389 for f in furls:
390 if f in self._log_gatherer_connectors:
391 continue
392 connector = self.connectTo(f, self._log_gatherer_connected)
393 self._log_gatherer_connectors[f] = connector
394
406
407
412
414 if not self._logport:
415 self._logport = flog_publish.LogPublisher(self.logger)
416 return self._logport
417
421
423 if not self._logport_furlfile:
424 return
425 if not self.locationHints:
426 return
427
428 ignored = self.getLogPortFURL()
429 del ignored
430
432 if not self.locationHints:
433 raise NoLocationError
434 if self._logport_furl:
435 return self._logport_furl
436 furlfile = self._logport_furlfile
437
438 self._logport_furl = self.registerReference(self.getLogPort(),
439 furlFile=furlfile)
440 return self._logport_furl
441
442
443 - def log(self, *args, **kwargs):
444 kwargs['tubID'] = self.tubID
445 return log.msg(*args, **kwargs)
446
460
462
463
464
465
466 return self.myCertificate.dumpPEM()
467
469 """Tell this service what its location is: a host:port description of
470 how to reach it from the outside world. You need to use this because
471 the Tub can't do it without help. If you do a
472 C{s.listenOn('tcp:1234')}, and the host is known as
473 C{foo.example.com}, then it would be appropriate to do::
474
475 s.setLocation('foo.example.com:1234')
476
477 You must set the location before you can register any references.
478
479 Encrypted Tubs can have multiple location hints, just provide
480 multiple arguments. Unauthenticated Tubs can only have one location."""
481
482 if not self.encrypted and len(hints) > 1:
483 raise PBError("Unauthenticated tubs may only have one "
484 "location hint")
485 if self.locationHints:
486 raise PBError("Tub.setLocation() can only be called once")
487 self.locationHints = hints
488 self._maybeCreateLogPortFURLFile()
489 self._maybeConnectToGatherer()
490
492 """Determine one of this host's publically-visible IP addresses and
493 use it to set our location. This uses whatever source address would
494 be used to get to a well-known public host (A.ROOT-SERVERS.NET),
495 which is effectively the interface on which a default route lives.
496 This is neither very pretty (IP address instead of hostname) nor
497 guaranteed to work (it may very well be a 192.168 'private' address),
498 but for publically-visible hosts this will probably produce a useable
499 FURL.
500
501 This method returns a Deferred that will fire once the location is
502 actually established. Calls to registerReference() must be put off
503 until the location has been set. And of course, you must call
504 listenOn() before calling setLocationAutomatically()."""
505
506
507
508 d = eventual.fireEventually()
509
510 def _reactor_running(res):
511 assert self.running
512
513 return util.get_local_ip_for()
514 d.addCallback(_reactor_running)
515
516 def _got_local_ip(local_address):
517 local_addresses = set(extra_addresses)
518 if local_address:
519 local_addresses.add(local_address)
520 local_addresses.add("127.0.0.1")
521 locations = set()
522 for l in self.getListeners():
523 portnum = l.getPortnum()
524 for addr in local_addresses:
525 locations.add("%s:%d" % (addr, portnum))
526 locations = list(locations)
527 locations.sort()
528 assert len(locations) >= 1
529 location = ",".join(locations)
530 self.setLocation(location)
531 d.addCallback(_got_local_ip)
532 return d
533
535 """Start listening for connections.
536
537 @type what: string or Listener instance
538 @param what: a L{twisted.application.strports} -style description,
539 or a L{Listener} instance returned by a previous call to
540 listenOn.
541 @param options: a dictionary of options that can influence connection
542 negotiation before the target Tub has been determined
543
544 @return: The Listener object that was created. This can be used to
545 stop listening later on, to have another Tub listen on the same port,
546 and to figure out which port was allocated when you used a strports
547 specification of 'tcp:0'. """
548
549 if type(what) is str:
550 l = Listener(what, options, self.negotiationClass)
551 else:
552 assert not options
553 l = what
554 assert l not in self.listeners
555 l.addTub(self)
556 self.listeners.append(l)
557 return l
558
560
561 self.listeners.remove(l)
562 d = defer.maybeDeferred(l.removeTub, self)
563 return d
564
566 """Return the set of Listener objects that allow the outside world to
567 connect to this Tub."""
568 return self.listeners[:]
569
571 """Return a new Tub (with a different ID), listening on the same
572 ports as this one."""
573 if self.encrypted:
574 t = Tub()
575 else:
576 t = UnauthenticatedTub()
577 for l in self.listeners:
578 t.listenOn(l)
579 return t
580
584 return self.tubID[:4]
585
587 assert self.running
588 self._activeConnectors.append(c)
590 if c in self._activeConnectors:
591 self._activeConnectors.remove(c)
592 if not self.running and not self._activeConnectors:
593 self._allConnectorsAreFinished.fire(self)
594
604
606 raise RuntimeError("Sorry, but Tubs cannot be restarted.")
608 raise RuntimeError("Sorry, but this Tub has been shut down.")
609
611
612
613
614 assert self.running
615 self.startService = self._tubsAreNotRestartable
616 self.getReference = self._tubHasBeenShutDown
617 self.connectTo = self._tubHasBeenShutDown
618 dl = []
619 for rc in self.reconnectors:
620 rc.stopConnecting()
621 del self.reconnectors
622 for l in self.listeners:
623
624
625
626 d = l.removeTub(self)
627 if isinstance(d, defer.Deferred):
628 dl.append(d)
629 dl.append(service.MultiService.stopService(self))
630
631 if self._activeConnectors:
632 dl.append(self._allConnectorsAreFinished.whenFired())
633 for c in self._activeConnectors:
634 c.shutdown()
635
636 if self.brokers or self.unauthenticatedBrokers:
637 dl.append(self._allBrokersAreDisconnected.whenFired())
638 why = Failure(error.ConnectionDone("Tub.stopService was called"))
639 for b in self.brokers.values():
640 b.shutdown(why, fireDisconnectWatchers=False)
641 for b in self.unauthenticatedBrokers:
642 b.shutdown(why, fireDisconnectWatchers=False)
643
644 return defer.DeferredList(dl)
645
648
650 if self.encrypted:
651
652
653 hints = ",".join(self.locationHints)
654 return "pb://" + self.tubID + "@" + hints + "/" + name
655 return "pbu://" + self.locationHints[0] + "/" + name
656
658 """Make a Referenceable available to the outside world. A URL is
659 returned which can be used to access this object. This registration
660 will remain in effect (and the Tub will retain a reference to the
661 object to keep it meaningful) until explicitly unregistered, or the
662 Tub is shut down.
663
664 @type name: string (optional)
665 @param name: if provided, the object will be registered with this
666 name. If not, a random (unguessable) string will be
667 used.
668
669 @param furlFile: if provided, get the name from this file (if
670 it exists), and write the new FURL to this file.
671 If 'name=' is also provided, it is used for the
672 name, but the FURL is still written to this file.
673
674 @rtype: string
675 @return: the URL which points to this object. This URL can be passed
676 to Tub.getReference() in any Tub on any host which can reach this
677 one.
678 """
679
680 if not self.locationHints:
681 raise RuntimeError("you must setLocation() before "
682 "you can registerReference()")
683 oldfurl = None
684 if furlFile:
685 try:
686 oldfurl = open(furlFile, "r").read().strip()
687 except EnvironmentError:
688 pass
689 if oldfurl:
690 sr = SturdyRef(oldfurl)
691 if name is None:
692 name = sr.name
693 if self.tubID != sr.tubID:
694 raise WrongTubIdError("I cannot keep using the old FURL from %s"
695 " because it does not have the same"
696 " TubID as I do (%s)" %
697 (furlFile, self.tubID))
698 if name != sr.name:
699 raise WrongNameError("I cannot keep using the old FURL from %s"
700 " because you called registerReference"
701 " with a new name (%s)" %
702 (furlFile, name))
703 name = self._assignName(ref, name)
704 assert name
705 if ref not in self.strongReferences:
706 self.strongReferences.append(ref)
707 furl = self.buildURL(name)
708 if furlFile:
709 need_to_chmod = not os.path.exists(furlFile)
710 open(furlFile, "w").write(furl + "\n")
711 if need_to_chmod:
712 os.chmod(furlFile, 0600)
713 return furl
714
715
716
718 """Make a Referenceable available to the outside world, but do not
719 retain a strong reference to it. If we must create a new name, use
720 preferred_name. If that is None, use a random unguessable name.
721 """
722 if not self.locationHints:
723
724 return None
725 if self.referenceToName.has_key(ref):
726 return self.referenceToName[ref]
727 name = preferred_name
728 if not name:
729 name = self.generateSwissnumber(self.NAMEBITS)
730 self.referenceToName[ref] = name
731 self.nameToReference[name] = ref
732 return name
733
735 if name in self.nameToReference:
736 return self.nameToReference[name]
737 for lookup in self.nameLookupHandlers:
738 ref = lookup(name)
739 if ref:
740 if ref not in self.referenceToName:
741 self.referenceToName[ref] = name
742 return ref
743
744 hint = name[:2]
745 raise KeyError("unable to find reference for name starting with '%s'"
746 % hint)
747
753
755 """Return the global URL for the reference, if there is one, or None
756 if there is not."""
757 name = self._assignName(ref)
758 if name:
759 return self.buildURL(name)
760 return None
761
765
773
775 name = self.referenceToName[ref]
776 url = self.buildURL(name)
777 sturdy = SturdyRef(url)
778 name = sturdy.name
779 del self.nameToReference[name]
780 del self.referenceToName[ref]
781 if ref in self.strongReferences:
782 self.strongReferences.remove(ref)
783 self.revokeReference(ref)
784
786 """Add a function to help convert names to Referenceables.
787
788 When remote systems pass a FURL to their Tub.getReference(), our Tub
789 will be asked to locate a Referenceable for the name inside that
790 furl. The normal mechanism for this is to look at the table
791 maintained by registerReference() and unregisterReference(). If the
792 name does not exist in that table, other 'lookup handler' functions
793 are given a chance. Each lookup handler is asked in turn, and the
794 first which returns a non-None value wins.
795
796 This may be useful for cases where the furl represents an object that
797 lives on disk, or is generated on demand: rather than creating all
798 possible Referenceables at startup, the lookup handler can create or
799 retrieve the objects only when someone asks for them.
800
801 Note that constructing the FURLs of these objects may be non-trivial.
802 It is safe to create an object, use tub.registerReference in one
803 invocation of a program to obtain (and publish) the furl, parse the
804 furl to extract the name, save the contents of the object on disk,
805 then in a later invocation of the program use a lookup handler to
806 retrieve the object from disk. This approach means the objects that
807 are created in a given invocation stick around (inside
808 tub.strongReferences) for the rest of that invocation. An alternatve
809 approach is to create the object but *not* use tub.registerReference,
810 but in that case you have to construct the FURL yourself, and the Tub
811 does not currently provide any support for doing this robustly.
812
813 @param lookup: a callable which accepts a name (as a string) and
814 returns either a Referenceable or None. Note that
815 these strings should not contain a slash, a question
816 mark, or an ampersand, as these are reserved in the
817 FURL for later expansion (to add parameters beyond the
818 object name)
819 """
820 self.nameLookupHandlers.append(lookup)
821
823 self.nameLookupHandlers.remove(lookup)
824
826 """Acquire a RemoteReference for the given SturdyRef/URL.
827
828 The Tub must be running (i.e. Tub.startService()) when this is
829 invoked. Future releases may relax this requirement.
830
831 @return: a Deferred that fires with the RemoteReference. Any failures
832 are returned asynchronously.
833 """
834
835 return defer.maybeDeferred(self._getReference, sturdyOrURL)
836
838 if isinstance(sturdyOrURL, SturdyRef):
839 sturdy = sturdyOrURL
840 else:
841 sturdy = SturdyRef(sturdyOrURL)
842
843
844
845
846 if sturdy.encrypted and not crypto_available:
847 e = BananaError("crypto for PB is not available, "
848 "we cannot handle encrypted PB-URLs like %s"
849 % sturdy.getURL())
850 return defer.fail(e)
851
852 if not self.running:
853
854 log.msg("Tub.getReference(%s) queued until Tub.startService called"
855 % sturdy, facility="foolscap.tub")
856 d = defer.Deferred()
857 self._pending_getReferences.append((d, sturdy))
858 return d
859
860 name = sturdy.name
861 d = self.getBrokerForTubRef(sturdy.getTubRef())
862 d.addCallback(lambda b: b.getYourReferenceByName(name))
863 return d
864
865 - def connectTo(self, _sturdyOrURL, _cb, *args, **kwargs):
866 """Establish (and maintain) a connection to a given PBURL.
867
868 I establish a connection to the PBURL and run a callback to inform
869 the caller about the newly-available RemoteReference. If the
870 connection is lost, I schedule a reconnection attempt for the near
871 future. If that one fails, I keep trying at longer and longer
872 intervals (exponential backoff).
873
874 I accept a callback which will be fired each time a connection
875 attempt succeeds. This callback is run with the new RemoteReference
876 and any additional args/kwargs provided to me. The callback should
877 then use rref.notifyOnDisconnect() to get a message when the
878 connection goes away. At some point after it goes away, the
879 Reconnector will reconnect.
880
881 The Tub must be running (i.e. Tub.startService()) when this is
882 invoked. Future releases may relax this requirement.
883
884 I return a Reconnector object. When you no longer want to maintain
885 this connection, call the stopConnecting() method on the Reconnector.
886 I promise to not invoke your callback after you've called
887 stopConnecting(), even if there was already a connection attempt in
888 progress. If you had an active connection before calling
889 stopConnecting(), you will still have access to it, until it breaks
890 on its own. (I will not attempt to break existing connections, I will
891 merely stop trying to create new ones). All my Reconnector objects
892 will be shut down when the Tub is stopped.
893
894 Usage::
895
896 def _got_ref(rref, arg1, arg2):
897 rref.callRemote('hello again')
898 # etc
899 rc = tub.connectTo(_got_ref, 'arg1', 'arg2')
900 ...
901 rc.stopConnecting() # later
902 """
903
904 rc = Reconnector(_sturdyOrURL, _cb, args, kwargs)
905 if self.running:
906 rc.startConnecting(self)
907 else:
908 self.log("Tub.connectTo(%s) queued until Tub.startService called"
909 % _sturdyOrURL, level=UNUSUAL)
910 self.reconnectors.append(rc)
911 return rc
912
918
925
926
927
928
930 self.reconnectors.remove(rc)
931
933 if tubref in self.brokers:
934 return defer.succeed(self.brokers[tubref])
935 if tubref.getTubID() == self.tubID:
936 b = self._createLoopbackBroker(tubref)
937
938
939
940 return defer.succeed(b)
941
942 d = defer.Deferred()
943 if tubref not in self.waitingForBrokers:
944 self.waitingForBrokers[tubref] = []
945 self.waitingForBrokers[tubref].append(d)
946
947 if tubref not in self.tubConnectors:
948
949
950 c = negotiate.TubConnector(self, tubref)
951 self.tubConnectors[tubref] = c
952 c.connect()
953
954 return d
955
972
974
975
976
977
978
979 if tubref in self.tubConnectors:
980 del self.tubConnectors[tubref]
981 if tubref in self.brokers:
982
983
984 return
985
986
987 if tubref in self.waitingForBrokers:
988 waiting = self.waitingForBrokers[tubref]
989 del self.waitingForBrokers[tubref]
990 for d in waiting:
991 d.errback(why)
992
994 assert self.running
995 if not tubref:
996
997 assert not isClient
998
999 self.unauthenticatedBrokers.append(broker)
1000 return
1001
1002 if tubref in self.tubConnectors:
1003
1004 if not isClient:
1005
1006
1007
1008 self.tubConnectors[tubref].shutdown()
1009
1010
1011 del self.tubConnectors[tubref]
1012
1013 if tubref in self.brokers:
1014
1015
1016 self.log("ERROR: unexpected duplicate connection from %s" % tubref)
1017 raise BananaError("unexpected duplicate connection")
1018 self.brokers[tubref] = broker
1019
1020
1021 if tubref in self.waitingForBrokers:
1022 for d in self.waitingForBrokers[tubref]:
1023 eventual.eventually(d.callback, broker)
1024 del self.waitingForBrokers[tubref]
1025
1027
1028
1029
1030 had_connections = (bool(self.brokers) or
1031 bool(self.unauthenticatedBrokers))
1032
1033 for tubref in self.brokers.keys():
1034 if self.brokers[tubref] is broker:
1035 del self.brokers[tubref]
1036 if broker in self.unauthenticatedBrokers:
1037 self.unauthenticatedBrokers.remove(broker)
1038
1039
1040
1041
1042
1043 if (not self.running
1044 and had_connections
1045 and not self.brokers
1046 and not self.unauthenticatedBrokers):
1047 self._allBrokersAreDisconnected.fire(self)
1048
1050
1051
1052
1053
1054
1055 output = []
1056 all_brokers = (self.brokers.items()
1057 + [("unauth",broker)
1058 for broker in self.unauthenticatedBrokers])
1059 for tubref,broker in all_brokers:
1060 inbound = broker.inboundDeliveryQueue[:]
1061 outbound = [pr
1062 for (reqID, pr) in
1063 sorted(broker.waitingForAnswers.items()) ]
1064 output.append( (str(tubref), inbound, outbound) )
1065 output.sort(lambda x,y: cmp( (len(x[1]), len(x[2])),
1066 (len(y[1]), len(y[2])) ))
1067 return output
1068
1069
1084