1
2
3 import os.path, weakref, binascii
4 from zope.interface import implements
5 from twisted.internet import defer, protocol, error
6 from twisted.application import service, strports
7 from twisted.python.failure import Failure
8
9 from foolscap import ipb, base32, negotiate, broker, observer, eventual, storage
10 from foolscap import util
11 from foolscap.referenceable import SturdyRef
12 from foolscap.tokens import PBError, BananaError, WrongTubIdError, \
13 WrongNameError, NoLocationError
14 from foolscap.reconnector import Reconnector
15 from foolscap.logging import log as flog
16 from foolscap.logging import log
17 from foolscap.logging import publish as flog_publish
18 from foolscap.logging.log import WEIRD, UNUSUAL
19
20 crypto_available = False
21 try:
22 from foolscap import crypto
23 crypto_available = crypto.available
24 except ImportError:
25 pass
26
27
28 Listeners = []
30 """I am responsible for a single listening port, which may connect to
31 multiple Tubs. I have a strports-based Service, which I will attach as a
32 child of one of my Tubs. If that Tub disconnects, I will reparent the
33 Service to a remaining one.
34
35 Unauthenticated Tubs use a TubID of 'None'. There may be at most one such
36 Tub attached to any given Listener."""
37
38 noisy = False
39
40
41
44 """
45 @type port: string
46 @param port: a L{twisted.application.strports} -style description.
47 """
48 name, args, kw = strports.parse(port, None)
49 assert name in ("TCP", "UNIX")
50 self.port = port
51 self.options = options
52 self.negotiationClass = negotiationClass
53 self.parentTub = None
54 self.tubs = {}
55 self.redirects = {}
56 self.s = strports.service(port, self)
57 Listeners.append(self)
58
60 """When this Listener was created with a strport string of '0' or
61 'tcp:0' (meaning 'please allocate me something'), and if the Listener
62 is active (it is attached to a Tub which is in the 'running' state),
63 this method will return the port number that was allocated. This is
64 useful for the following pattern::
65
66 t = Tub()
67 l = t.listenOn('tcp:0')
68 t.setLocation('localhost:%d' % l.getPortnum())
69 """
70
71 assert self.s.running
72 name, args, kw = strports.parse(self.port, None)
73 assert name in ("TCP",)
74 return self.s._port.getHost().port
75
77 if self.tubs:
78 return "<Listener at 0x%x on %s with tubs %s>" % (
79 abs(id(self)),
80 self.port,
81 ",".join([str(k) for k in self.tubs.keys()]))
82 return "<Listener at 0x%x on %s with no tubs>" % (abs(id(self)),
83 self.port)
84
86 if tub.tubID in self.tubs:
87 if tub.tubID is None:
88 raise RuntimeError("This Listener (on %s) already has an "
89 "unauthenticated Tub, you cannot add a "
90 "second one" % self.port)
91 raise RuntimeError("This Listener (on %s) is already connected "
92 "to TubID '%s'" % (self.port, tub.tubID))
93 self.tubs[tub.tubID] = tub
94 if self.parentTub is None:
95 self.parentTub = tub
96 self.s.setServiceParent(self.parentTub)
97
99
100
101 del self.tubs[tub.tubID]
102 if self.parentTub is tub:
103
104 tubs = self.tubs.values()
105 if tubs:
106 self.parentTub = tubs[0]
107
108
109
110
111 self.s.setServiceParent(self.parentTub)
112 else:
113
114 d = self.s.disownServiceParent()
115 Listeners.remove(self)
116 return d
117 return None
118
121
123 assert tubID is not None
124 self.redirects[tubID] = location
126 del self.redirects[tubID]
127
129 log.msg("Starting factory %r" % self, facility="foolscap.listener")
130 return protocol.ServerFactory.startFactory(self)
132 log.msg("Stopping factory %r" % self, facility="foolscap.listener")
133 return protocol.ServerFactory.stopFactory(self)
134
135
137 """Return a Broker attached to me (as the service provider).
138 """
139 lp = log.msg("%s accepting connection from %s" % (self, addr),
140 addr=(addr.host, addr.port),
141 facility="foolscap.listener")
142 proto = self.negotiationClass(logparent=lp)
143 proto.initServer(self)
144 proto.factory = self
145 return proto
146
148 return self.tubs.get(tubID), self.redirects.get(tubID)
149
150
151 -class Tub(service.MultiService):
152 """I am a presence in the PB universe, also known as a Tub.
153
154 I am a Service (in the twisted.application.service.Service sense),
155 so you either need to call my startService() method before using me,
156 or setServiceParent() me to a running service.
157
158 This is the primary entry point for all PB-using applications, both
159 clients and servers.
160
161 I am known to the outside world by a base URL, which may include
162 authentication information (a yURL). This is my 'TubID'.
163
164 I contain Referenceables, and manage RemoteReferences to Referenceables
165 that live in other Tubs.
166
167
168 @param certData: if provided, use it as a certificate rather than
169 generating a new one. This is a PEM-encoded
170 private/public keypair, as returned by Tub.getCertData()
171
172 @param certFile: if provided, the Tub will store its certificate in
173 this file. If the file does not exist when the Tub is
174 created, the Tub will generate a new certificate and
175 store it here. If the file does exist, the certificate
176 will be loaded from this file.
177
178 The simplest way to use the Tub is to choose a long-term
179 location for the certificate, use certFile= to tell the
180 Tub about it, and then let the Tub manage its own
181 certificate.
182
183 You may provide certData, or certFile, (or neither), but
184 not both.
185
186 @param options: a dictionary of options that can influence connection
187 connection negotiation. Currently defined keys are:
188 - debug_slow: if True, wait half a second between
189 each negotiation response
190
191 @ivar brokers: maps TubIDs to L{Broker} instances
192
193 @ivar listeners: maps strport to TCPServer service
194
195 @ivar referenceToName: maps Referenceable to a name
196 @ivar nameToReference: maps name to Referenceable
197
198 @type tubID: string
199 @ivar tubID: a global identifier for this Tub, possibly including
200 authentication information, hash of SSL certificate
201
202 """
203 implements(ipb.ITub)
204
205 unsafeTracebacks = True
206 logLocalFailures = False
207 logRemoteFailures = False
208 debugBanana = False
209 NAMEBITS = 160
210 TUBIDBITS = 16
211 encrypted = True
212 negotiationClass = negotiate.Negotiation
213 brokerClass = broker.Broker
214 keepaliveTimeout = 4*60
215 disconnectTimeout = None
216 tubID = None
217
218 - def __init__(self, certData=None, certFile=None, options={}):
225
227 return "<Tub id=%s>" % self.tubID
228
230 try:
231 certData = open(certFile, "rb").read()
232 except EnvironmentError:
233 certData = None
234 self.setupEncryption(certData)
235
236 if certData is None:
237 f = open(certFile, "wb")
238 f.write(self.getCertData())
239 f.close()
240
252
254 unique = os.urandom(8)
255
256
257 sequential = None
258 self.incarnation = (unique, sequential)
259 self.incarnation_string = binascii.b2a_hex(unique)
260
262 return self.incarnation_string
263
264 - def setup(self, options):
265 self.options = options
266 self.logger = flog.theLogger
267 self.listeners = []
268 self.locationHints = []
269
270
271 self.make_incarnation()
272
273
274
275
276 self.master_table = {}
277
278
279
280 self.slave_table = {}
281
282
283 self.nameToReference = weakref.WeakValueDictionary()
284 self.referenceToName = weakref.WeakKeyDictionary()
285 self.strongReferences = []
286 self.nameLookupHandlers = []
287
288
289
290 self.tubConnectors = {}
291 self.waitingForBrokers = {}
292 self.brokers = {}
293 self.unauthenticatedBrokers = []
294 self.reconnectors = []
295
296 self._allBrokersAreDisconnected = observer.OneShotObserverList()
297 self._activeConnectors = []
298 self._allConnectorsAreFinished = observer.OneShotObserverList()
299
300 self._pending_getReferences = []
301
302 self._logport = None
303 self._logport_furl = None
304 self._logport_furlfile = None
305
306 self._log_gatherer_furl = None
307 self._log_gatherer_furlfile = None
308
309 self._handle_old_duplicate_connections = False
310
312 if name == "logLocalFailures":
313
314
315
316
317
318
31