root/foolscap/call.py

Revision 327:f38e397365d2, 34.6 kB (checked in by warner@lothar.com, 1 year ago)

CopiedFailure?: don't use reflect.qual on a string

Line 
1 from twisted.python import failure, reflect
2 from twisted.internet import defer
3
4 from foolscap import copyable, slicer, tokens
5 from foolscap.copyable import AttributeDictConstraint
6 from foolscap.constraint import ByteStringConstraint
7 from foolscap.slicers.list import ListConstraint
8 from tokens import BananaError, Violation
9 from foolscap.util import AsyncAND
10 from foolscap.logging import log
11
12
13 class FailureConstraint(AttributeDictConstraint):
14     opentypes = [("copyable", "twisted.python.failure.Failure")]
15     name = "FailureConstraint"
16     klass = failure.Failure
17
18     def __init__(self):
19         attrs = [('type', ByteStringConstraint(200)),
20                  ('value', ByteStringConstraint(1000)),
21                  ('traceback', ByteStringConstraint(2000)),
22                  ('parents', ListConstraint(ByteStringConstraint(200))),
23                  ]
24         AttributeDictConstraint.__init__(self, *attrs)
25
26     def checkObject(self, obj, inbound):
27         if not isinstance(obj, self.klass):
28             raise Violation("is not an instance of %s" % self.klass)
29
30
31 class PendingRequest(object):
32     # this object is a local representation of a message we have sent to
33     # someone else, that will be executed on their end.
34     active = True
35
36     def __init__(self, reqID, rref=None):
37         self.reqID = reqID
38         self.rref = rref # keep it alive
39         self.broker = None # if set, the broker knows about us
40         self.deferred = defer.Deferred()
41         self.constraint = None # this constrains the results
42         self.failure = None
43
44     def setConstraint(self, constraint):
45         self.constraint = constraint
46
47     def complete(self, res):
48         if self.broker:
49             self.broker.removeRequest(self)
50         if self.active:
51             self.active = False
52             self.deferred.callback(res)
53         else:
54             log.msg("PendingRequest.complete called on an inactive request")
55
56     def fail(self, why):
57         if self.active:
58             if self.broker:
59                 self.broker.removeRequest(self)
60             self.active = False
61             self.failure = why
62             if (self.broker and
63                 self.broker.tub and
64                 self.broker.tub.logRemoteFailures):
65
66                 my_short_tubid = "??"
67                 if self.broker.tub: # for tests
68                     my_short_tubid = self.broker.tub.getShortTubID()
69                 their_short_tubid = self.broker.remote_tubref.getShortTubID()
70
71                 lp = log.msg("an outbound callRemote (that we [%s] sent to "
72                              "someone else [%s]) failed on the far end"
73                              % (my_short_tubid, their_short_tubid),
74                              level=log.UNUSUAL)
75                 methname = ".".join([self.interfaceName or "?",
76                                      self.methodName or "?"])
77                 log.msg(" reqID=%d, rref=%s, methname=%s"
78                         % (self.reqID, self.rref, methname),
79                         level=log.NOISY, parent=lp)
80                 #stack = why.getTraceback()
81                 # TODO: include the first few letters of the remote tubID in
82                 # this REMOTE tag
83                 #stack = "REMOTE: " + stack.replace("\n", "\nREMOTE: ")
84                 log.msg(" the REMOTE failure was:", failure=why,
85                         level=log.NOISY, parent=lp)
86                 #log.msg(stack, level=log.NOISY, parent=lp)
87             self.deferred.errback(why)
88         else:
89             log.msg("WEIRD: fail() on an inactive request", traceback=True)
90             if self.failure:
91                 log.msg("multiple failures")
92                 log.msg("first one was:", self.failure)
93                 log.msg("this one was:", why)
94                 log.err("multiple failures indicate a problem")
95
96 class ArgumentSlicer(slicer.ScopedSlicer):
97     opentype = ('arguments',)
98
99     def __init__(self, args, kwargs):
100         slicer.ScopedSlicer.__init__(self, None)
101         self.args = args
102         self.kwargs = kwargs
103         self.which = ""
104
105     def sliceBody(self, streamable, banana):
106         yield len(self.args)
107         for i,arg in enumerate(self.args):
108             self.which = "arg[%d]" % i
109             yield arg
110         keys = self.kwargs.keys()
111         keys.sort()
112         for argname in keys:
113             self.which = "arg[%s]" % argname
114             yield argname
115             yield self.kwargs[argname]
116
117     def describe(self):
118         return "<%s>" % self.which
119
120
121 class CallSlicer(slicer.ScopedSlicer):
122     opentype = ('call',)
123
124     def __init__(self, reqID, clid, methodname, args, kwargs):
125         slicer.ScopedSlicer.__init__(self, None)
126         self.reqID = reqID
127         self.clid = clid
128         self.methodname = methodname
129         self.args = args
130         self.kwargs = kwargs
131
132     def sliceBody(self, streamable, banana):
133         yield self.reqID
134         yield self.clid
135         yield self.methodname
136         yield ArgumentSlicer(self.args, self.kwargs)
137
138     def describe(self):
139         return "<call-%s-%s-%s>" % (self.reqID, self.clid, self.methodname)
140
141 class InboundDelivery:
142     """An inbound message that has not yet been delivered.
143
144     This is created when a 'call' sequence has finished being received. The
145     Broker will add it to a queue. The delivery at the head of the queue is
146     serviced when all of its arguments have been resolved.
147
148     The only way that the arguments might not all be available is if one of
149     the Unslicers which created them has provided a 'ready_deferred' along
150     with the prospective object. The only standard Unslicer which does this
151     is the TheirReferenceUnslicer, which handles introductions. (custom
152     Unslicers might also provide a ready_deferred, for example a URL
153     slicer/unslicer pair for which the receiving end fetches the target of
154     the URL as its value, or a UnixFD slicer/unslicer that had to wait for a
155     side-channel unix-domain socket to finish transferring control over the
156     FD to the recipient before being ready).
157
158     Most Unslicers refuse to accept unready objects as their children (most
159     implementations of receiveChild() do 'assert ready_deferred is None').
160     The CallUnslicer is fairly unique in not rejecting such objects.
161
162     We do require, however, that all of the arguments be at least
163     referenceable. This is not generally a problem: the only time an
164     unslicer's receiveChild() can get a non-referenceable object (represented
165     by a Deferred) is if that unslicer is participating in a reference cycle
166     that has not yet completed, and CallUnslicers only live at the top level,
167     above any cycles.
168     """
169
170     def __init__(self, broker, reqID, obj,
171                  interface, methodname, methodSchema,
172                  allargs):
173         self.broker = broker
174         self.reqID = reqID
175         self.obj = obj
176         self.interface = interface
177         self.methodname = methodname
178         self.methodSchema = methodSchema
179         self.allargs = allargs
180
181     def logFailure(self, f):
182         # called if tub.logLocalFailures is True
183         my_short_tubid = "??"
184         if self.broker.tub: # for tests
185             my_short_tubid = self.broker.tub.getShortTubID()
186         their_short_tubid = "<unauth>"
187         if self.broker.remote_tubref:
188             their_short_tubid = self.broker.remote_tubref.getShortTubID()
189         lp = log.msg("an inbound callRemote that we [%s] executed (on behalf "
190                      "of someone else, TubID %s) failed"
191                      % (my_short_tubid, their_short_tubid),
192                      level=log.UNUSUAL)
193         if self.interface:
194             methname = self.interface.getName() + "." + self.methodname
195         else:
196             methname = self.methodname
197         log.msg(" reqID=%d, rref=%s, methname=%s" %
198                 (self.reqID, self.obj, methname),
199                 level=log.NOISY, parent=lp)
200         log.msg(" args=%s" % (self.allargs.args,), level=log.NOISY, parent=lp)
201         log.msg(" kwargs=%s" % (self.allargs.kwargs,),
202                 level=log.NOISY, parent=lp)
203         #if isinstance(f.type, str):
204         #    stack = "getTraceback() not available for string exceptions\n"
205         #else:
206         #    stack = f.getTraceback()
207         # TODO: trim stack to everything below Broker._doCall
208         #stack = "LOCAL: " + stack.replace("\n", "\nLOCAL: ")
209         log.msg(" the LOCAL failure was:", failure=f,
210                 level=log.NOISY, parent=lp)
211         #log.msg(stack, level=log.NOISY, parent=lp)
212
213 class ArgumentUnslicer(slicer.ScopedUnslicer):
214     methodSchema = None
215     debug = False
216
217     def setConstraint(self, methodSchema):
218         self.methodSchema = methodSchema
219
220     def start(self, count):
221         if self.debug:
222             log.msg("%s.start: %s" % (self, count))
223         self.numargs = None
224         self.args = []
225         self.kwargs = {}
226         self.argname = None
227         self.argConstraint = None
228         self.num_unreferenceable_children = 0
229         self._all_children_are_referenceable_d = None
230         self._ready_deferreds = []
231         self.closed = False
232
233     def checkToken(self, typebyte, size):
234         if self.numargs is None:
235             # waiting for positional-arg count
236             if typebyte != tokens.INT:
237                 raise BananaError("posarg count must be an INT")
238             return
239         if len(self.args) < self.numargs:
240             # waiting for a positional arg
241             if self.argConstraint:
242                 self.argConstraint.checkToken(typebyte, size)
243             return
244         if self.argname is None:
245             # waiting for the name of a keyword arg
246             if typebyte not in (tokens.STRING, tokens.VOCAB):
247                 raise BananaError("kwarg name must be a STRING")
248             # TODO: limit to longest argument name of the method?
249             return
250         # waiting for the value of a kwarg
251         if self.argConstraint:
252             self.argConstraint.checkToken(typebyte, size)
253
254     def doOpen(self, opentype):
255         if self.argConstraint:
256             self.argConstraint.checkOpentype(opentype)
257         unslicer = self.open(opentype)
258         if unslicer:
259             if self.argConstraint:
260                 unslicer.setConstraint(self.argConstraint)
261         return unslicer
262
263     def receiveChild(self, token, ready_deferred=None):
264         if self.debug:
265             log.msg("%s.receiveChild: %s %s %s %s %s args=%s kwargs=%s" %
266                     (self, self.closed, self.num_unreferenceable_children,
267                      len(self._ready_deferreds), token, ready_deferred,
268                      self.args, self.kwargs))
269         if self.numargs is None:
270             # this token is the number of positional arguments
271             assert isinstance(token, int)
272             assert ready_deferred is None
273             self.numargs = token
274             if self.numargs:
275                 ms = self.methodSchema
276                 if ms:
277                     accept, self.argConstraint = \
278                             ms.getPositionalArgConstraint(0)
279                     assert accept
280             return
281
282         if len(self.args) < self.numargs:
283             # this token is a positional argument
284             argvalue = token
285             argpos = len(self.args)
286             self.args.append(argvalue)
287             if isinstance(argvalue, defer.Deferred):
288                 # this may occur if the child is a gift which has not
289                 # resolved yet.
290                 self.num_unreferenceable_children += 1
291                 argvalue.addCallback(self.updateChild, argpos)
292             if ready_deferred:
293                 if self.debug:
294                     log.msg("%s.receiveChild got an unready posarg" % self)
295                 self._ready_deferreds.append(ready_deferred)
296             if len(self.args) < self.numargs:
297                 # more to come
298                 ms = self.methodSchema
299                 if ms:
300                     nextargnum = len(self.args)
301                     accept, self.argConstraint = \
302                             ms.getPositionalArgConstraint(nextargnum)
303                     assert accept
304             return
305
306         if self.argname is None:
307             # this token is the name of a keyword argument
308             assert ready_deferred is None
309             self.argname = token
310             # if the argname is invalid, this may raise Violation
311             ms = self.methodSchema
312             if ms:
313                 accept, self.argConstraint = \
314                         ms.getKeywordArgConstraint(self.argname,
315                                                    self.numargs,
316                                                    self.kwargs.keys())
317                 assert accept
318             return
319
320         # this token is the value of a keyword argument
321         argvalue = token
322         self.kwargs[self.argname] = argvalue
323         if isinstance(argvalue, defer.Deferred):
324             self.num_unreferenceable_children += 1
325             argvalue.addCallback(self.updateChild, self.argname)
326         if ready_deferred:
327             if self.debug:
328                 log.msg("%s.receiveChild got an unready kwarg" % self)
329             self._ready_deferreds.append(ready_deferred)
330         self.argname = None
331         return
332
333     def updateChild(self, obj, which):
334         # one of our arguments has just now become referenceable. Normal
335         # types can't trigger this (since the arguments to a method form a
336         # top-level serialization domain), but special Unslicers might. For
337         # example, the Gift unslicer will eventually provide us with a
338         # RemoteReference, but for now all we get is a Deferred as a
339         # placeholder.
340
341         if self.debug:
342             log.msg("%s.updateChild, [%s] became referenceable: %s" %
343                     (self, which, obj))
344         if isinstance(which, int):
345             self.args[which] = obj
346         else:
347             self.kwargs[which] = obj
348         self.num_unreferenceable_children -= 1
349         if self.num_unreferenceable_children == 0:
350             if self._all_children_are_referenceable_d:
351                 self._all_children_are_referenceable_d.callback(None)
352         return obj
353
354
355     def receiveClose(self):
356         if self.debug:
357             log.msg("%s.receiveClose: %s %s %s" %
358                     (self, self.closed, self.num_unreferenceable_children,
359                      len(self._ready_deferreds)))
360         if (self.numargs is None or
361             len(self.args) < self.numargs or
362             self.argname is not None):
363             raise BananaError("'arguments' sequence ended too early")
364         self.closed = True
365         dl = []
366         if self.num_unreferenceable_children:
367             d = self._all_children_are_referenceable_d = defer.Deferred()
368             dl.append(d)
369         dl.extend(self._ready_deferreds)
370         ready_deferred = None
371         if dl:
372             ready_deferred = AsyncAND(dl)
373         return self, ready_deferred
374
375     def describe(self):
376         s = "<arguments"
377         if self.numargs is not None:
378             if len(self.args) < self.numargs:
379                 s += " arg[%d]" % len(self.args)
380             else:
381                 if self.argname is not None:
382                     s += " arg[%s]" % self.argname
383                 else:
384                     s += " arg[?]"
385         if self.closed:
386             s += " closed"
387             # TODO: it would be nice to indicate if we still have unready
388             # children
389         s += ">"
390         return s
391
392
393 class CallUnslicer(slicer.ScopedUnslicer):
394
395     debug = False
396
397     def start(self, count):
398         # start=0:reqID, 1:objID, 2:methodname, 3: arguments
399         self.stage = 0
400         self.reqID = None
401         self.obj = None
402         self.interface = None
403         self.methodname = None
404         self.methodSchema = None # will be a MethodArgumentsConstraint
405         self._ready_deferreds = []
406
407     def checkToken(self, typebyte, size):
408         # TODO: limit strings by returning a number instead of None
409         if self.stage == 0:
410             if typebyte != tokens.INT:
411                 raise BananaError("request ID must be an INT")
412         elif self.stage == 1:
413             if typebyte not in (tokens.INT, tokens.NEG):
414                 raise BananaError("object ID must be an INT/NEG")
415         elif self.stage == 2:
416             if typebyte not in (tokens.STRING, tokens.VOCAB):
417                 raise BananaError("method name must be a STRING")
418             # TODO: limit to longest method name of self.obj in the interface
419         elif self.stage == 3:
420             if typebyte != tokens.OPEN:
421                 raise BananaError("arguments must be an 'arguments' sequence")
422         else:
423             raise BananaError("too many objects given to CallUnslicer")
424
425     def doOpen(self, opentype):
426         # checkToken insures that this can only happen when we're receiving
427         # an arguments object, so we don't have to bother checking self.stage
428         assert self.stage == 3
429         unslicer = self.open(opentype)
430         if self.methodSchema:
431             unslicer.setConstraint(self.methodSchema)
432         return unslicer
433
434     def reportViolation(self, f):
435         # if the Violation is because we received an ABORT, then we know
436         # that the sender knows there was a problem, so don't respond.
437         if f.value.args[0] == "ABORT received":
438             return f
439
440         # if the Violation was raised after we know the reqID, we can send
441         # back an Error.
442         if self.stage > 0:
443             self.broker.callFailed(f, self.reqID)
444         return f # give up our sequence
445
446     def receiveChild(self, token, ready_deferred=None):
447         assert not isinstance(token, defer.Deferred)
448         if self.debug:
449             log.msg("%s.receiveChild [s%d]: %s" %
450                     (self, self.stage, repr(token)))
451
452         if self.stage == 0: # reqID
453             # we don't yet know which reqID to send any failure to
454             assert ready_deferred is None
455             self.reqID = token
456             self.stage = 1
457             if self.reqID != 0:
458                 assert self.reqID not in self.broker.activeLocalCalls
459                 self.broker.activeLocalCalls[self.reqID] = self
460             return
461
462         if self.stage == 1: # objID
463             # this might raise an exception if objID is invalid
464             assert ready_deferred is None
465             self.objID = token
466             self.obj = self.broker.getMyReferenceByCLID(token)
467             #iface = self.broker.getRemoteInterfaceByName(token)
468             if self.objID < 0:
469                 self.interface = None
470             else:
471                 self.interface = self.obj.getInterface()
472             self.stage = 2
473             return
474
475         if self.stage == 2: # methodname
476             # validate the methodname, get the schema. This may raise an
477             # exception for unknown methods
478
479             # must find the schema, using the interfaces
480
481             # TODO: getSchema should probably be in an adapter instead of in
482             # a pb.Referenceable base class. Old-style (unconstrained)
483             # flavors.Referenceable should be adapted to something which
484             # always returns None
485
486             # TODO: make this faster. A likely optimization is to take a
487             # tuple of components.getInterfaces(obj) and use it as a cache
488             # key. It would be even faster to use obj.__class__, but that
489             # would probably violate the expectation that instances can
490             # define their own __implements__ (independently from their
491             # class). If this expectation were to go away, a quick
492             # obj.__class__ -> RemoteReferenceSchema cache could be built.
493
494             assert ready_deferred is None
495             self.stage = 3
496
497             if self.objID < 0:
498                 # the target is a bound method, ignore the methodname
499                 self.methodSchema = getattr(self.obj, "methodSchema", None)
500                 self.methodname = None # TODO: give it something useful
501                 if self.broker.requireSchema and not self.methodSchema:
502                     why = "This broker does not accept unconstrained " + \
503                           "method calls"
504                     raise Violation(why)
505                 return
506
507             self.methodname = token
508
509             if self.interface:
510                 # they are calling an interface+method pair
511                 ms = self.interface.get(self.methodname)
512                 if not ms:
513                     why = "method '%s' not defined in %s" % \
514                           (self.methodname, self.interface.__remote_name__)
515                     raise Violation(why)
516                 self.methodSchema = ms
517
518             return
519
520         if self.stage == 3: # arguments
521             assert isinstance(token, ArgumentUnslicer)
522             self.allargs = token
523             # queue the message. It will not be executed until all the
524             # arguments are ready. The .args list and .kwargs dict may change
525             # before then.
526             if ready_deferred:
527                 self._ready_deferreds.append(ready_deferred)
528             self.stage = 4
529             return
530
531     def receiveClose(self):
532         if self.stage != 4:
533             raise BananaError("'call' sequence ended too early")
534         # time to create the InboundDelivery object so we can queue it