Package foolscap :: Module pb
[hide private]
[frames] | no frames]

Source Code for Module foolscap.pb

   1  # -*- test-case-name: foolscap.test.test_pb -*- 
   2   
   3  import os.path, weakref, binascii 
   4  from zope.interface import implements 
   5  from twisted.internet import defer, protocol, error 
   6  from twisted.application import service, strports 
   7  from twisted.python.failure import Failure 
   8   
   9  from foolscap import ipb, base32, negotiate, broker, observer, eventual, storage 
  10  from foolscap import util 
  11  from foolscap.referenceable import SturdyRef 
  12  from foolscap.tokens import PBError, BananaError, WrongTubIdError, \ 
  13       WrongNameError, NoLocationError 
  14  from foolscap.reconnector import Reconnector 
  15  from foolscap.logging import log as flog 
  16  from foolscap.logging import log 
  17  from foolscap.logging import publish as flog_publish 
  18  from foolscap.logging.log import WEIRD, UNUSUAL 
  19   
  20  crypto_available = False 
  21  try: 
  22      from foolscap import crypto 
  23      crypto_available = crypto.available 
  24  except ImportError: 
  25      pass 
  26   
  27   
  28  Listeners = [] 
29 -class Listener(protocol.ServerFactory):
30 """I am responsible for a single listening port, which may connect to 31 multiple Tubs. I have a strports-based Service, which I will attach as a 32 child of one of my Tubs. If that Tub disconnects, I will reparent the 33 Service to a remaining one. 34 35 Unauthenticated Tubs use a TubID of 'None'. There may be at most one such 36 Tub attached to any given Listener.""" 37 38 noisy = False 39 40 # this also serves as the ServerFactory 41
42 - def __init__(self, port, options={}, 43 negotiationClass=negotiate.Negotiation):
44 """ 45 @type port: string 46 @param port: a L{twisted.application.strports} -style description. 47 """ 48 name, args, kw = strports.parse(port, None) 49 assert name in ("TCP", "UNIX") # TODO: IPv6 50 self.port = port 51 self.options = options 52 self.negotiationClass = negotiationClass 53 self.parentTub = None 54 self.tubs = {} 55 self.redirects = {} 56 self.s = strports.service(port, self) 57 Listeners.append(self)
58
59 - def getPortnum(self):
60 """When this Listener was created with a strport string of '0' or 61 'tcp:0' (meaning 'please allocate me something'), and if the Listener 62 is active (it is attached to a Tub which is in the 'running' state), 63 this method will return the port number that was allocated. This is 64 useful for the following pattern:: 65 66 t = Tub() 67 l = t.listenOn('tcp:0') 68 t.setLocation('localhost:%d' % l.getPortnum()) 69 """ 70 71 assert self.s.running 72 name, args, kw = strports.parse(self.port, None) 73 assert name in ("TCP",) 74 return self.s._port.getHost().port
75
76 - def __repr__(self):
77 if self.tubs: 78 return "<Listener at 0x%x on %s with tubs %s>" % ( 79 abs(id(self)), 80 self.port, 81 ",".join([str(k) for k in self.tubs.keys()])) 82 return "<Listener at 0x%x on %s with no tubs>" % (abs(id(self)), 83 self.port)
84
85 - def addTub(self, tub):
86 if tub.tubID in self.tubs: 87 if tub.tubID is None: 88 raise RuntimeError("This Listener (on %s) already has an " 89 "unauthenticated Tub, you cannot add a " 90 "second one" % self.port) 91 raise RuntimeError("This Listener (on %s) is already connected " 92 "to TubID '%s'" % (self.port, tub.tubID)) 93 self.tubs[tub.tubID] = tub 94 if self.parentTub is None: 95 self.parentTub = tub 96 self.s.setServiceParent(self.parentTub)
97
98 - def removeTub(self, tub):
99 # this might return a Deferred, since the removal might cause the 100 # Listener to shut down. It might also return None. 101 del self.tubs[tub.tubID] 102 if self.parentTub is tub: 103 # we need to switch to a new one 104 tubs = self.tubs.values() 105 if tubs: 106 self.parentTub = tubs[0] 107 # TODO: I want to do this without first doing 108 # disownServiceParent, so the port remains listening. Can we 109 # do this? It looks like setServiceParent does 110 # disownServiceParent first, so it may glitch. 111 self.s.setServiceParent(self.parentTub) 112 else: 113 # no more tubs, this Listener will go away now 114 d = self.s.disownServiceParent() 115 Listeners.remove(self) 116 return d 117 return None
118
119 - def getService(self):
120 return self.s
121
122 - def addRedirect(self, tubID, location):
123 assert tubID is not None # unauthenticated Tubs don't get redirects 124 self.redirects[tubID] = location
125 - def removeRedirect(self, tubID):
126 del self.redirects[tubID]
127
128 - def startFactory(self):
129 log.msg("Starting factory %r" % self, facility="foolscap.listener") 130 return protocol.ServerFactory.startFactory(self)
131 - def stopFactory(self):
132 log.msg("Stopping factory %r" % self, facility="foolscap.listener") 133 return protocol.ServerFactory.stopFactory(self)
134 135
136 - def buildProtocol(self, addr):
137 """Return a Broker attached to me (as the service provider). 138 """ 139 lp = log.msg("%s accepting connection from %s" % (self, addr), 140 addr=(addr.host, addr.port), 141 facility="foolscap.listener") 142 proto = self.negotiationClass(logparent=lp) 143 proto.initServer(self) 144 proto.factory = self 145 return proto
146
147 - def lookupTubID(self, tubID):
148 return self.tubs.get(tubID), self.redirects.get(tubID)
149 150
151 -class Tub(service.MultiService):
152 """I am a presence in the PB universe, also known as a Tub. 153 154 I am a Service (in the twisted.application.service.Service sense), 155 so you either need to call my startService() method before using me, 156 or setServiceParent() me to a running service. 157 158 This is the primary entry point for all PB-using applications, both 159 clients and servers. 160 161 I am known to the outside world by a base URL, which may include 162 authentication information (a yURL). This is my 'TubID'. 163 164 I contain Referenceables, and manage RemoteReferences to Referenceables 165 that live in other Tubs. 166 167 168 @param certData: if provided, use it as a certificate rather than 169 generating a new one. This is a PEM-encoded 170 private/public keypair, as returned by Tub.getCertData() 171 172 @param certFile: if provided, the Tub will store its certificate in 173 this file. If the file does not exist when the Tub is 174 created, the Tub will generate a new certificate and 175 store it here. If the file does exist, the certificate 176 will be loaded from this file. 177 178 The simplest way to use the Tub is to choose a long-term 179 location for the certificate, use certFile= to tell the 180 Tub about it, and then let the Tub manage its own 181 certificate. 182 183 You may provide certData, or certFile, (or neither), but 184 not both. 185 186 @param options: a dictionary of options that can influence connection 187 connection negotiation. Currently defined keys are: 188 - debug_slow: if True, wait half a second between 189 each negotiation response 190 191 @ivar brokers: maps TubIDs to L{Broker} instances 192 193 @ivar listeners: maps strport to TCPServer service 194 195 @ivar referenceToName: maps Referenceable to a name 196 @ivar nameToReference: maps name to Referenceable 197 198 @type tubID: string 199 @ivar tubID: a global identifier for this Tub, possibly including 200 authentication information, hash of SSL certificate 201 202 """ 203 implements(ipb.ITub) 204 205 unsafeTracebacks = True # TODO: better way to enable this 206 logLocalFailures = False 207 logRemoteFailures = False 208 debugBanana = False 209 NAMEBITS = 160 # length of swissnumber for each reference 210 TUBIDBITS = 16 # length of non-crypto tubID 211 encrypted = True 212 negotiationClass = negotiate.Negotiation 213 brokerClass = broker.Broker 214 keepaliveTimeout = 4*60 # ping when connection has been idle this long 215 disconnectTimeout = None # disconnect after this much idle time 216 tubID = None 217
218 - def __init__(self, certData=None, certFile=None, options={}):
219 service.MultiService.__init__(self) 220 self.setup(options) 221 if certFile: 222 self.setupEncryptionFile(certFile) 223 else: 224 self.setupEncryption(certData)
225
226 - def __repr__(self):
227 return "<Tub id=%s>" % self.tubID
228
229 - def setupEncryptionFile(self, certFile):
230 try: 231 certData = open(certFile, "rb").read() 232 except EnvironmentError: 233 certData = None 234 self.setupEncryption(certData) 235 236 if certData is None: 237 f = open(certFile, "wb") 238 f.write(self.getCertData()) 239 f.close()
240
241 - def setupEncryption(self, certData):
242 if not crypto_available: 243 raise RuntimeError("crypto for PB is not available, " 244 "try importing foolscap.crypto and see " 245 "what happens") 246 if certData: 247 cert = crypto.PrivateCertificate.loadPEM(certData) 248 else: 249 cert = self.createCertificate() 250 self.myCertificate = cert 251 self.tubID = crypto.digest32(cert.digest("sha1"))
252
253 - def make_incarnation(self):
254 unique = os.urandom(8) 255 # TODO: it'd be nice to have a sequential component, so incarnations 256 # could be ordered, but it requires disk space 257 sequential = None 258 self.incarnation = (unique, sequential) 259 self.incarnation_string = binascii.b2a_hex(unique)
260
261 - def getIncarnationString(self):
262 return self.incarnation_string
263
264 - def setup(self, options):
265 self.options = options 266 self.logger = flog.theLogger 267 self.listeners = [] 268 self.locationHints = [] 269 270 # duplicate-connection management 271 self.make_incarnation() 272 273 # the master_table records the master-seqnum we used for the last 274 # established connection with the given tubid. It only contains 275 # entries for which we were the master. 276 self.master_table = {} # k:tubid, v:seqnum 277 # the slave_table records the (master-IR,master-seqnum) pair for the 278 # last established connection with the given tubid. It only contains 279 # entries for which we were the slave. 280 self.slave_table = {} # k:tubid, v:(master-IR,seqnum) 281 282 # local Referenceables 283 self.nameToReference = weakref.WeakValueDictionary() 284 self.referenceToName = weakref.WeakKeyDictionary() 285 self.strongReferences = [] 286 self.nameLookupHandlers = [] 287 288 # remote stuff. Most of these use a TubRef (or NoAuthTubRef) as a 289 # dictionary key 290 self.tubConnectors = {} # maps TubRef to a TubConnector 291 self.waitingForBrokers = {} # maps TubRef to list of Deferreds 292 self.brokers = {} # maps TubRef to a Broker that connects to them 293 self.unauthenticatedBrokers = [] # inbound Brokers without TubRefs 294 self.reconnectors = [] 295 296 self._allBrokersAreDisconnected = observer.OneShotObserverList() 297 self._activeConnectors = [] 298 self._allConnectorsAreFinished = observer.OneShotObserverList() 299 300 self._pending_getReferences = [] # list of (d, furl) pairs 301 302 self._logport = None 303 self._logport_furl = None 304 self._logport_furlfile = None 305 306 self._log_gatherer_furl = None 307 self._log_gatherer_furlfile = None 308 309 self._handle_old_duplicate_connections = False
310
311 - def setOption(self, name, value):
312 if name == "logLocalFailures": 313 # log (with log.err) any exceptions that occur during the 314 # execution of a local Referenceable's method, which is invoked 315 # on behalf of a remote caller. These exceptions are reported to 316 # the remote caller through their callRemote's Deferred as usual: 317 # this option enables logging on the callee's side (i.e. our 318 # side) as well. 31