Ticket #149: 149.WIP.diff

File 149.WIP.diff, 40.8 KB (added by Brian Warner, 14 years ago)

here's my current WIP patch. Tests are still failing, but I like the overall design.

  • foolscap/banana.py

    diff -r 9adfe64ba138 foolscap/banana.py
    a b  
    8888    return acc
    8989
    9090HIGH_BIT_SET = chr(0x80)
     91WANT_HEADER = "want_header"
     92WANT_BODY = "want_body"
    9193
    9294
    9395
     
    567569    # calls self.receivedObject()
    568570
    569571    unslicerClass = RootUnslicer
    570     debugReceive = False
     572    debugReceive = True #False
    571573    logViolations = False
    572574    logReceiveErrors = True
    573575    useKeepalives = False
     
    578580
    579581    def initReceive(self):
    580582        self.inOpen = False # set during the Index Phase of an OPEN sequence
     583        self.inClose = False
    581584        self.opentype = [] # accumulates Index Tokens
    582585
    583586        # to pre-negotiate, set the negotiation parameters and set
     
    585588        # self.buffer with the inbound negotiation block.
    586589        self.negotiated = False
    587590        self.connectionAbandoned = False
    588         self.buffer = ''
     591        self.bufferChunks = []
     592        self.bufferSize = 0
     593        self.bufferOffset = 0
     594        self.receiveMode = WANT_HEADER
    589595
    590596        self.incomingVocabulary = {}
    591597        self.skipBytes = 0 # used to discard a single long token
     
    698704
    699705
    700706    def handleData(self, chunk):
    701         # buffer, assemble into tokens
    702         # call self.receiveToken(token) with each
    703         if self.skipBytes:
    704             if len(chunk) < self.skipBytes:
    705                 # skip the whole chunk
    706                 self.skipBytes -= len(chunk)
    707                 return
    708             # skip part of the chunk, and stop skipping
    709             chunk = chunk[self.skipBytes:]
    710             self.skipBytes = 0
    711         buffer = self.buffer + chunk
     707        if not chunk:
     708            return
     709        self.bufferChunks.append(chunk)
     710        self.bufferSize += len(chunk)
     711        #if self.receiveMode is WANT_LENGTH:
     712        #    self.bufferChunks = ["".join(self.bufferChunks)]
     713        done = False
     714        while not done:
     715            if self.receiveMode is WANT_HEADER:
     716                done = self.receiveHeader()
     717            if self.receiveMode is WANT_BODY:
     718                # TODO: if self.discardCount: self.discardBody()
     719                done = self.receiveBody()
     720            # when we get here, any completely consumed chunk is removed
    712721
    713         # Loop through the available input data, extracting one token per
    714         # pass.
    715722
    716         while buffer:
    717             assert self.buffer != buffer, \
    718                    ("Banana.handleData: no progress made: %s %s" %
    719                     (repr(buffer),))
    720             self.buffer = buffer
    721             pos = 0
     723    def receiveHeader(self):
     724        def non_consuming_lookahead():
     725            for chunknum,chunk in enumerate(self.bufferChunks):
     726                start = 0
     727                if chunknum == 0:
     728                    start = self.bufferOffset
     729                for i in range(start, len(chunk)):
     730                    yield (chunknum, i, chunk[i])
     731        headerbytes = []
     732        typebyte = None
     733        iterator = non_consuming_lookahead()
     734        for (chunknum, offset, ch) in iterator:
     735            if ch >= HIGH_BIT_SET:
     736                header = b1282int(headerbytes)
     737                typebyte = ch
     738                break
     739            headerbytes.append(ch)
     740            if len(headerbytes) > 64:
     741                # drop the connection
     742                got = "".join(headerbytes)
     743                raise BananaError("token prefix is limited to 64 bytes:"
     744                                  " but got %r" % got)
     745            # still looking
     746        if typebyte is None:
     747            return True # still hungry, must wait for next message
    722748
    723             for ch in buffer:
    724                 if ch >= HIGH_BIT_SET:
    725                     break
    726                 pos = pos + 1
    727                 if pos > 64:
    728                     # drop the connection. We log more of the buffer, but not
    729                     # all of it, to make it harder for someone to spam our
    730                     # logs.
    731                     raise BananaError("token prefix is limited to 64 bytes: "
    732                                       "but got %r" % (buffer[:200],))
    733             else:
    734                 # we've run out of buffer without seeing the high bit, which
    735                 # means we're still waiting for header to finish
    736                 return
    737             assert pos <= 64
     749        # found it. We need to trim the chunklist and update the offset. The
     750        # next character that our iterator returns (if any) is where the next
     751        # consumer should look.
     752        self.bufferSize -= len(headerbytes)+1
     753        try:
     754            (chunknum, offset, ign) = iterator.next()
     755            self.bufferOffset = offset
     756            if chunknum > 0:
     757                self.bufferChunks = self.bufferChunks[chunknum:]
     758        except StopIteration:
     759            # we've consumed everything
     760            self.bufferChunks = []
     761            self.bufferOffset = 0
    738762
    739             # At this point, the header and type byte have been received.
    740             # The body may or may not be complete.
     763        return self.processTypeByte(header, typebyte)
    741764
    742             typebyte = buffer[pos]
    743             if pos:
    744                 header = b1282int(buffer[:pos])
    745             else:
    746                 header = 0
    747 
    748             # rejected is set as soon as a violation is detected. It
    749             # indicates that this single token will be rejected.
    750 
    751             rejected = False
    752             if self.discardCount:
    753                 rejected = True
    754 
    755             wasInOpen = self.inOpen
    756             if typebyte == OPEN:
    757                 self.inboundObjectCount = self.objectCounter
    758                 self.objectCounter += 1
    759                 if self.inOpen:
    760                     raise BananaError("OPEN token followed by OPEN")
    761                 self.inOpen = True
    762                 # the inOpen flag is set as soon as the OPEN token is
    763                 # witnessed (even it it gets rejected later), because it
    764                 # means that there is a new sequence starting that must be
    765                 # handled somehow (either discarded or given to a new
    766                 # Unslicer).
    767 
    768                 # The inOpen flag is cleared when the Index Phase ends. There
    769                 # are two possibilities: 1) a new Unslicer is pushed, and
    770                 # tokens are delivered to it normally. 2) a Violation was
    771                 # raised, and the tokens must be discarded
    772                 # (self.discardCount++). *any* rejection-caused True->False
    773                 # transition of self.inOpen must be accompanied by exactly
    774                 # one increment of self.discardCount
    775 
    776             # determine if this token will be accepted, and if so, how large
    777             # it is allowed to be (for STRING and LONGINT/LONGNEG)
    778 
    779             if ((not rejected) and
    780                 (typebyte not in (PING, PONG, ABORT, CLOSE, ERROR))):
    781                 # PING, PONG, ABORT, CLOSE, and ERROR are always legal. All
    782                 # others (including OPEN) can be rejected by the schema: for
    783                 # example, a list of integers would reject STRING, VOCAB, and
    784                 # OPEN because none of those will produce integers. If the
    785                 # unslicer's .checkToken rejects the tokentype, its
    786                 # .receiveChild will immediately get an Failure
    787                 try:
    788                     # the purpose here is to limit the memory consumed by
    789                     # the body of a STRING, OPEN, LONGINT, or LONGNEG token
    790                     # (i.e., the size of a primitive type). If the sender
    791                     # wants to feed us more data than we want to accept, the
    792                     # checkToken() method should raise a Violation. This
    793                     # will never be called with ABORT or CLOSE types.
    794                     top = self.receiveStack[-1]
    795                     if wasInOpen:
    796                         top.openerCheckToken(typebyte, header, self.opentype)
    797                     else:
    798                         top.checkToken(typebyte, header)
    799                 except Violation, v:
    800                     rejected = True
    801                     f = BananaFailure()
    802                     if wasInOpen:
    803                         methname = "openerCheckToken"
    804                     else:
    805                         methname = "checkToken"
    806                     self.handleViolation(f, methname, inOpen=self.inOpen)
    807                     self.inOpen = False
    808 
     765    def processTypeByte(self, header, typebyte):
     766        # I decide how much data we want for the body. when we exit this,
     767        # we'll be in WANT_BODY and have a length set.
     768        self.receivedHeader = header
     769        self.receivedTypeByte = typebyte
     770        self.receiveMode = WANT_BODY
     771        if typebyte in (OPEN, CLOSE, ABORT, INT, NEG, VOCAB, PING, PONG, LIST):
     772            self.wantLength = 0
     773        elif typebyte in (ERROR, STRING, LONGINT, LONGNEG):
     774            self.wantLength = header
    809775            if typebyte == ERROR and header > SIZE_LIMIT:
    810776                # someone is trying to spam us with an ERROR token. Drop
    811777                # them with extreme prejudice.
    812778                raise BananaError("oversized ERROR token")
     779        elif typebyte in (FLOAT,):
     780            self.wantLength = 8
    813781
    814             rest = buffer[pos+1:]
     782    def _consume_and_return_first_n_bytes(self, wanted):
     783        offset = self.bufferOffset
     784        #print " _consume"
     785        for (chunknum, chunk) in enumerate(self.bufferChunks):
     786            available = len(chunk)-offset
     787            #print " a:", chunknum, chunk, available, wanted
     788            if available > wanted:
     789                #print " want middle"
     790                # we want the middle of this segment, but no more
     791                s = chunk[offset:offset+wanted]
     792                offset = offset+wanted
     793                nextChunknum = chunknum
     794            else:
     795                #print " want tail"
     796                # we want the whole tail (chunk[offset:])
     797                if offset == 0:
     798                    s = chunk # avoid a copy, not sure what python does
     799                else:
     800                    # drat, "".join() doesn't take buffers, only strings
     801                    s = chunk[offset:]
     802                offset = 0 # and we begin at the start of the next chunk
     803                nextChunknum = chunknum+1
     804            wanted -= len(s)
     805            #print " yield", s
     806            yield s
     807            self.bufferSize -= len(s)
     808            if not wanted:
     809                # now trim
     810                self.bufferOffset = offset
     811                if nextChunknum > 0:
     812                    self.bufferChunks = self.bufferChunks[nextChunknum:]
     813                break
     814        #print " _consume done"
    815815
    816             # determine what kind of token it is. Each clause finishes in
    817             # one of four ways:
    818             #
    819             #  raise BananaError: the protocol was violated so badly there is
    820             #                     nothing to do for it but hang up abruptly
    821             #
    822             #  return: if the token is not yet complete (need more data)
    823             #
    824             #  continue: if the token is complete but no object (for
    825             #            handleToken) was produced, e.g. OPEN, CLOSE, ABORT
    826             #
    827             #  obj=foo: the token is complete and an object was produced
    828             #
    829             # note that if rejected==True, the object is dropped instead of
    830             # being passed up to the current Unslicer
     816    def receiveBody(self):
     817        # compare the amount of data available in self.bufferChunks against
     818        # the amount of data that we want to receive (which might be zero).
     819        # If we have enough, stringify the body (if any) and deliver the
     820        # token.
     821        #print "receiveBody", self.bufferSize, self.wantLength
     822        if self.bufferSize < self.wantLength:
     823            #print " still hungry"
     824            return True # still hungry, need more data
     825        #print " ready"
    831826
     827        # figure out which pieces we want. We've already checked the length,
     828        # we know there's enough, so it's safe to consume these as we go.
     829        pieces = self._consume_and_return_first_n_bytes(self.wantLength)
     830        body = "".join(list(pieces))
     831        assert len(body) == self.wantLength, (len(body), self.wantLength)
     832        self.wantLength = 0
     833        self.receiveMode = WANT_HEADER
     834        self.processBody(self.receivedTypeByte, self.receivedHeader, body)
     835
     836    def processBody(self, typebyte, header, body):
     837        #print "processBody (%s)(%s)(%s)" % (tokens.tokenNames[typebyte], header, body)
     838
     839        if typebyte == PING:
     840            # repond with a PONG, but otherwise ignore it
     841            self.sendPONG(header)
     842            return
     843
     844        if typebyte == PONG:
     845            return # ignored completely
     846
     847        if typebyte == OPEN:
     848            self.objectCounter += 1 # keep this in sync
     849            print "self.objectCounter now", self.objectCounter
     850
     851        if self.discardCount:
     852            # we track OPEN and CLOSE even while we're in discard mode, so we
     853            # know when to stop discarding
    832854            if typebyte == OPEN:
    833                 buffer = rest
    834                 self.inboundOpenCount = header
    835                 if rejected:
    836                     if self.debugReceive:
    837                         print "DROP (OPEN)"
    838                     if self.inOpen:
    839                         # we are discarding everything at the old level, so
    840                         # discard everything in the new level too
    841                         self.discardCount += 1
    842                         if self.debugReceive:
    843                             print "++discardCount (OPEN), now %d" \
    844                                   % self.discardCount
    845                         self.inOpen = False
    846                     else:
    847                         # the checkToken handleViolation has already started
    848                         # discarding this new sequence, we don't have to
    849                         pass
    850                 else:
    851                     self.inOpen = True
    852                     self.opentype = []
    853                 continue
     855                self.discardCount += 1
     856            elif typebyte == CLOSE:
     857                self.discardCount -= 1
     858            # when discarding, we completely ignore the rest
     859            return
    854860
    855             elif typebyte == CLOSE:
    856                 buffer = rest
    857                 count = header
    858                 if self.discardCount:
    859                     self.discardCount -= 1
    860                     if self.debugReceive:
    861                         print "--discardCount (CLOSE), now %d" \
    862                               % self.discardCount
    863                 else:
    864                     self.handleClose(count)
    865                 continue
     861        try:
     862            self.processBody2(typebyte, header, body)
     863        except Violation:
     864            f = BananaFailure()
     865            self.handleViolation(f)
    866866
    867             elif typebyte == ABORT:
    868                 buffer = rest
    869                 count = header
    870                 # TODO: this isn't really a Violation, but we need something
    871                 # to describe it. It does behave identically to what happens
    872                 # when receiveChild raises a Violation. The .handleViolation
    873                 # will pop the now-useless Unslicer and start discarding
    874                 # tokens just as if the Unslicer had made the decision.
    875                 if rejected:
    876                     if self.debugReceive:
    877                         print "DROP (ABORT)"
    878                     # I'm ignoring you, LALALALALA.
    879                     #
    880                     # In particular, do not deliver a second Violation
    881                     # because of the ABORT that we're supposed to be
    882                     # ignoring because of a first Violation that happened
    883                     # earlier.
    884                     continue
    885                 try:
    886                     # slightly silly way to do it, but nice and uniform
    887                     raise Violation("ABORT received")
    888                 except Violation:
    889                     f = BananaFailure()
    890                     self.handleViolation(f, "receive-abort")
    891                 continue
     867    def processBody2(self, typebyte, header, body):
     868        top = self.receiveStack[-1]
     869        self._rx_methname = "unknown"
    892870
    893             elif typebyte == ERROR:
    894                 strlen = header
    895                 if len(rest) >= strlen:
    896                     # the whole string is available
    897                     buffer = rest[strlen:]
    898                     obj = rest[:strlen]
    899                     # handleError must drop the connection
    900                     self.handleError(obj)
    901                     return
    902                 else:
    903                     return # there is more to come
     871        if typebyte == OPEN:
     872            # stash for later. We already incremented it, so -1
     873            self.inboundObjectCount = self.objectCounter-1
     874            if self.inOpen:
     875                raise BananaError("OPEN token followed by OPEN")
     876            self.inOpen = True
     877            self._rx_methname = "checkToken"
     878            top.checkToken(OPEN, header) # might raise Violation
    904879
    905             elif typebyte == LIST:
    906                 raise BananaError("oldbanana peer detected, " +
    907                                   "compatibility code not yet written")
    908                 #listStack.append((header, []))
    909                 #buffer = rest
     880            # the inOpen flag is set as soon as the OPEN token is
     881            # witnessed (even it it gets rejected later), because it
     882            # means that there is a new sequence starting that must be
     883            # handled somehow (either discarded or given to a new
     884            # Unslicer).
    910885
    911             elif typebyte == STRING:
    912                 strlen = header
    913                 if len(rest) >= strlen:
    914                     # the whole string is available
    915                     buffer = rest[strlen:]
    916                     obj = rest[:strlen]
    917                     # although it might be rejected
    918                 else:
    919                     # there is more to come
    920                     if rejected:
    921                         # drop all we have and note how much more should be
    922                         # dropped
    923                         if self.debugReceive:
    924                             print "DROPPED some string bits"
    925                         self.skipBytes = strlen - len(rest)
    926                         self.buffer = ""
    927                     return
     886            # The inOpen flag is cleared when the Index Phase ends. There
     887            # are two possibilities: 1) a new Unslicer is pushed, and
     888            # tokens are delivered to it normally. 2) a Violation was
     889            # raised, and the tokens must be discarded
     890            # (self.discardCount++). *any* rejection-caused True->False
     891            # transition of self.inOpen must be accompanied by exactly
     892            # one increment of self.discardCount
    928893
    929             elif typebyte == INT:
    930                 buffer = rest
    931                 obj = int(header)
    932             elif typebyte == NEG:
    933                 buffer = rest
    934                 # -2**31 is too large for a positive int, so go through
    935                 # LongType first
    936                 obj = int(-long(header))
    937             elif typebyte == LONGINT or typebyte == LONGNEG:
    938                 strlen = header
    939                 if len(rest) >= strlen:
    940                     # the whole number is available
    941                     buffer = rest[strlen:]
    942                     obj = bytes_to_long(rest[:strlen])
    943                     if typebyte == LONGNEG:
    944                         obj = -obj
    945                     # although it might be rejected
    946                 else:
    947                     # there is more to come
    948                     if rejected:
    949                         # drop all we have and note how much more should be
    950                         # dropped
    951                         self.skipBytes = strlen - len(rest)
    952                         self.buffer = ""
    953                     return
     894            self.inboundOpenCount = header # for debugging/error-checking
     895            self.opentype = []
     896            return
    954897
    955             elif typebyte == VOCAB:
    956                 buffer = rest
    957                 obj = self.incomingVocabulary[header]
    958                 # TODO: bail if expanded string is too big
    959                 # this actually means doing self.checkToken(VOCAB, len(obj))
    960                 # but we have to make sure we handle the rejection properly
     898        # PING, PONG, ABORT, CLOSE, and ERROR are always legal (i.e. we do
     899        # not submit them to checkToken/openerCheckToken).
    961900
    962             elif typebyte == FLOAT:
    963                 if len(rest) >= 8:
    964                     buffer = rest[8:]
    965                     obj = struct.unpack("!d", rest[:8])[0]
    966                 else:
    967                     # this case is easier than STRING, because it is only 8
    968                     # bytes. We don't bother skipping anything.
    969                     return
     901        if typebyte == ABORT:
     902            # This isn't really a Violation, but we need something to
     903            # describe it. It does behave identically to what happens when
     904            # receiveChild raises a Violation. The .handleViolation will pop
     905            # the now-useless Unslicer and start discarding tokens just as if
     906            # the Unslicer itself had made the decision.
    970907
    971             elif typebyte == PING:
    972                 buffer = rest
    973                 self.sendPONG(header)
    974                 continue # otherwise ignored
     908            # slightly silly way to do it, but nice and uniform
     909            self._rx_methname = "receive-abort"
     910            raise Violation("ABORT received")
    975911
    976             elif typebyte == PONG:
    977                 buffer = rest
    978                 continue # otherwise ignored
     912        if typebyte == ERROR:
     913            # handleError must drop the connection
     914            self.handleError(body)
     915            return
    979916
    980             else:
    981                 raise BananaError("Invalid Type Byte 0x%x" % ord(typebyte))
     917        if typebyte == CLOSE:
     918            self.handleClose(header)
     919            return
    982920
    983             if not rejected:
    984                 if self.inOpen:
    985                     self.handleOpen(self.inboundOpenCount,
    986                                     self.inboundObjectCount,
    987                                     obj)
    988                     # handleOpen might push a new unslicer and clear
    989                     # .inOpen, or leave .inOpen true and append the object
    990                     # to .indexOpen
    991                 else:
    992                     self.handleToken(obj)
    993             else:
    994                 if self.debugReceive:
    995                     print "DROP", type(obj), obj
    996                 pass # drop the object
     921        # the other tokens (LIST, STRING, INT/NEG, LONGINT/LONGNEG, FLOAT,
     922        # VOCAB) *are* subject to openerCheckToken/checkToken validation.
    997923
    998             # while loop ends here
     924        # For example, a list of integers would reject STRING, VOCAB, and
     925        # OPEN because none of those will produce integers. If the unslicer's
     926        # .checkToken rejects the tokentype, its .receiveChild will
     927        # immediately get an Failure
    999928
    1000         self.buffer = ''
     929        # The validation includes a body length. We've already buffered the
     930        # whole token by now, but still do the validation, because many of
     931        # the unslicers perform their only checks in checkToken() instead of
     932        # receiveChild(). TODO: once unslicers have moved all code out of
     933        # checkToken(), remove this check.
     934
     935        # the openerCheckToken/checkToken calls might raise Violation
     936        if self.inOpen:
     937            self._rx_methname = "openerCheckToken"
     938            top.openerCheckToken(typebyte, header, self.opentype)
     939        else:
     940            self._rx_methname = "checkToken"
     941            print "calling checkToken", repr(typebyte), header
     942            top.checkToken(typebyte, header)
     943            print " checkToken happy"
     944
     945        # ok, we're accepting the token. These will all cause objects to be
     946        # submitted to handleOpen or handleToken
     947
     948        if typebyte == STRING:
     949            obj = body
     950        elif typebyte == INT:
     951            obj = int(header)
     952        elif typebyte == NEG:
     953            # -2**31 is too large for a positive int, so go through
     954            # LongType first
     955            obj = int(-long(header))
     956        elif typebyte in (LONGINT, LONGNEG):
     957            obj = bytes_to_long(body)
     958            if typebyte == LONGNEG:
     959                obj = -obj
     960        elif typebyte == VOCAB:
     961            obj = self.incomingVocabulary[header]
     962            # TODO: bail if expanded string is too big
     963            # this actually means doing self.checkToken(VOCAB, len(obj))
     964            # but we have to make sure we handle the rejection properly
     965        elif typebyte == FLOAT:
     966            obj = struct.unpack("!d", body)[0]
     967        else:
     968            raise BananaError("Invalid Type Byte 0x%x" % ord(typebyte))
     969
     970        #print "about to handleToken(%s) (%s)" % (obj, body)
     971        if self.inOpen:
     972            self.handleOpen(self.inboundOpenCount,
     973                            self.inboundObjectCount,
     974                            obj)
     975            # handleOpen might push a new unslicer and clear
     976            # .inOpen, or leave .inOpen true and append the object
     977            # to .indexOpen
     978        else:
     979            self.handleToken(obj)
    1001980
    1002981
    1003982    def handleOpen(self, openCount, objectCount, indexToken):
     983        # I am called for each index token of an OPEN sequence. For any given
     984        # OPEN sequence (introducing a new node of the object graph), if the
     985        # opentype contains N tokens, I will be called N times, with the same
     986        # openCount/objectCount values but different (sequential) indexToken
     987        # values.
     988        #
     989        # openCount is the (optional) value provided by the sender, telling
     990        # us how many OPEN tokens they've sent us before this one.
     991        # objectCount is a local counter, which tells us how many tokens (of
     992        # any kind) we've received before the OPEN that started this
     993        # sequence.
    1004994        self.opentype.append(indexToken)
    1005995        opentype = tuple(self.opentype)
    1006996        if self.debugReceive:
    1007             print "handleOpen(%d,%d,%s)" % (openCount, objectCount, indexToken)
     997            print "handleOpen(opencount=%d,objcount=%d,%s)" % \
     998                  (openCount, objectCount, indexToken)
    1008999        top = self.receiveStack[-1]
    1009         try:
    1010             # obtain a new Unslicer to handle the object
    1011             child = top.doOpen(opentype)
    1012             if not child:
    1013                 if self.debugReceive:
    1014                     print " doOpen wants more index tokens"
    1015                 return # they want more index tokens, leave .inOpen=True
     1000
     1001        # obtain a new Unslicer to handle the object
     1002        self._rx_methname = "doOpen"
     1003        child = top.doOpen(opentype) # might raise Violation
     1004        if not child:
    10161005            if self.debugReceive:
    1017                 print " opened[%d] with %s" % (openCount, child)
    1018         except Violation, v:
    1019             # must discard the rest of the child object. There is no new
    1020             # unslicer pushed yet, so we don't use abandonUnslicer
    1021             self.inOpen = False
    1022             f = BananaFailure()
    1023             self.handleViolation(f, "doOpen", inOpen=True)
    1024             return
     1006                print " doOpen wants more index tokens"
     1007            return # they want more index tokens, leave .inOpen=True
    10251008
     1009        if self.debugReceive:
     1010            print " opened[%d] with %s" % (openCount, child)
    10261011        assert tokens.IUnslicer.providedBy(child), "child is %s" % child
    1027         self.inOpen = False
    10281012        child.protocol = self
    10291013        child.openCount = openCount
    10301014        child.parent = top
     1015        self.inOpen = False
    10311016        self.receiveStack.append(child)
    1032         try:
    1033             child.start(objectCount)
    1034         except Violation, v:
    1035             # the child is now on top, so use abandonUnslicer to discard the
    1036             # rest of the child
    1037             f = BananaFailure()
    1038             # notifies the new child
    1039             self.handleViolation(f, "start")
     1017        self._rx_methname = "start"
     1018        child.start(objectCount) # might raise Violation
    10401019
    10411020    def handleToken(self, token, ready_deferred=None):
    10421021        top = self.receiveStack[-1]
    10431022        if self.debugReceive: print "handleToken(%s)" % (token,)
    10441023        if ready_deferred:
    10451024            assert isinstance(ready_deferred, defer.Deferred)
    1046         try:
    1047             top.receiveChild(token, ready_deferred)
    1048         except Violation, v:
    1049             # this is how the child says "I've been contaminated". We don't
    1050             # pop them automatically: if they want that, they should return
    1051             # back the failure in their reportViolation method.
    1052             f = BananaFailure()
    1053             self.handleViolation(f, "receiveChild")
     1025        self._rx_methname = "receiveChild"
     1026        top.receiveChild(token, ready_deferred) # might raise Violation
    10541027
    10551028    def handleClose(self, closeCount):
    10561029        if self.debugReceive:
     
    10591032            raise BananaError("lost sync, got CLOSE(%d) but expecting %s" \
    10601033                              % (closeCount, self.receiveStack[-1].openCount))
    10611034
     1035        # If a Violation is raised by receiveClose() or finish(), we'll have
     1036        # an extra Unslicer (we've received the CLOSE, the sequence is over,
     1037        # but we haven't yet popped the unslicer off the stack). self.inClose
     1038        # is a signal to handleViolation that we can discard one fewer
     1039        # levels.
     1040        self.inClose = True
     1041        if self.inOpen:
     1042            # huh, we were still waiting to see all the opentype tokens when
     1043            # the caller closed the sequence. We must disagree about the
     1044            # opentype sequences. This is a Violation. Treat it as if it
     1045            # occurred in handleOpen().
     1046            self._rx_methname = "close-during-open"
     1047            raise Violation("received CLOSE during open sequence")
     1048
    10621049        child = self.receiveStack[-1] # don't pop yet: describe() needs it
    10631050
    1064         try:
    1065             obj, ready_deferred = child.receiveClose()
    1066         except Violation, v:
    1067             # the child is contaminated. However, they're finished, so we
    1068             # don't have to discard anything. Just give an Failure to the
    1069             # parent instead of the object they would have returned.
    1070             f = BananaFailure()
    1071             self.handleViolation(f, "receiveClose", inClose=True)
    1072             return
     1051        self._rx_methname = "receiveClose"
     1052        obj, ready_deferred = child.receiveClose() # might raise Violation
    10731053        if self.debugReceive: print "receiveClose returned", obj
    10741054
    1075         try:
    1076             child.finish()
    1077         except Violation, v:
    1078             # .finish could raise a Violation if an object that references
    1079             # the child is just now deciding that they don't like it
    1080             # (perhaps their TupleConstraint couldn't be asserted until the
    1081             # tuple was complete and referenceable). In this case, the child
    1082             # has produced a valid object, but an earlier (incomplete)
    1083             # object is not valid. So we treat this as if this child itself
    1084             # raised the Violation. The .where attribute will point to this
    1085             # child, which is the node that caused somebody problems, but
    1086             # will be marked <FINISH>, which indicates that it wasn't the
    1087             # child itself which raised the Violation. TODO: not true
    1088             #
    1089             # TODO: it would be more useful if the UF could also point to
    1090             # the completing object (the one which raised Violation).
     1055        self._rx_methname = "finish"
     1056        child.finish() # might raise Violation
    10911057
    1092             f = BananaFailure()
    1093             self.handleViolation(f, "finish", inClose=True)
    1094             return
     1058        # .finish could raise a Violation if an object that references
     1059        # the child is just now deciding that they don't like it
     1060        # (perhaps their TupleConstraint couldn't be asserted until the
     1061        # tuple was complete and referenceable). In this case, the child
     1062        # has produced a valid object, but an earlier (incomplete)
     1063        # object is not valid. So we treat this as if this child itself
     1064        # raised the Violation. The .where attribute will point to this
     1065        # child, which is the node that caused somebody problems, but
     1066        # will be marked <FINISH>, which indicates that it wasn't the
     1067        # child itself which raised the Violation. TODO: not true
     1068        #
     1069        # TODO: it would be more useful if the UF could also point to
     1070        # the completing object (the one which raised Violation).
    10951071
    10961072        self.receiveStack.pop()
     1073        self.inClose = False
    10971074
    10981075        # now deliver the object to the parent
    10991076        self.handleToken(obj, ready_deferred)
    11001077
    1101     def handleViolation(self, f, methname, inOpen=False, inClose=False):
     1078    def handleViolation(self, f):
    11021079        """An Unslicer has decided to give up, or we have given up on it
    11031080        (because we received an ABORT token).
     1081
     1082        inOpen= and inClose= are used to manage the gap between how many
     1083        OPEN/CLOSE tokens we've received and how many Unslicers are on the
     1084        stack. The inbound token sequence is OPEN ot1 ot2 ch1 ch2 CLOSE, but
     1085        we don't push a new Unslicer until the last opentype token (between
     1086        'ot2' and 'ch1'). inOpen=True when we receive the OPEN, and is set to
     1087        False when we push the Unslicer during processing of ot2.
     1088        inClose=True when we receive the CLOSE and is set to False a few
     1089        statements later after we pop the Unslicer.
    11041090        """
    11051091
     1092        methname = self._rx_methname
    11061093        where = self.describeReceive()
    11071094        f.value.setLocation(where)
    11081095
    11091096        if self.debugReceive:
    11101097            print " handleViolation-%s (inOpen=%s, inClose=%s): %s" \
    1111                   % (methname, inOpen, inClose, f)
     1098                  % (methname, self.inOpen, self.inClose, f)
    11121099
    11131100        assert isinstance(f, BananaFailure)
    11141101
     
    11161103            log.msg("Violation in %s at %s" % (methname, where))
    11171104            log.err(f)
    11181105
    1119         if inOpen:
     1106        if self.inOpen:
     1107            self.inOpen = False
    11201108            self.discardCount += 1
    11211109            if self.debugReceive:
    11221110                print "  ++discardCount (inOpen), now %d" % self.discardCount
    11231111
     1112        inClose = self.inClose
     1113        self.inClose = False
     1114
    11241115        while True:
    11251116            # tell the parent that their child is dead. This is useful for
    11261117            # things like PB, which may want to errback the current request.
  • foolscap/test/test_banana.py

    diff -r 9adfe64ba138 foolscap/test/test_banana.py
    a b  
    77
    88from foolscap.tokens import ISlicer, Violation, BananaError
    99from foolscap.tokens import BananaFailure, tokenNames, \
    10      OPEN, CLOSE, ABORT, INT, LONGINT, NEG, LONGNEG, FLOAT, STRING
     10     OPEN, CLOSE, ERROR, ABORT, INT, LONGINT, NEG, LONGNEG, FLOAT, STRING
    1111from foolscap import slicer, schema, storage, banana, vocab
    1212from foolscap.eventual import fireEventually, flushEventualQueue
    1313from foolscap.slicers.allslicers import RootSlicer, DictUnslicer, TupleUnslicer
     
    1515from foolscap.banana import int2b128, long_to_bytes
    1616
    1717import StringIO
    18 import sets, struct
     18import sets, struct, itertools
    1919from decimal import Decimal
     20from hashlib import md5
    2021
    2122#log.startLogging(sys.stderr)
    2223
     
    14301431                    schema.StringConstraint(10))
    14311432        self.conform2("\x0a\x82" + "a"*10 + "extra", "a"*10,
    14321433                    schema.StringConstraint(10))
     1434        print "doing violate2"
    14331435        self.violate2("\x0b\x82" + "a"*11 + "extra",
    14341436                      "<RootUnslicer>",
    14351437                      schema.StringConstraint(10))
     1438        print "did violate2"
    14361439
    14371440    def NOTtestFoo(self):
    14381441        if 0:
     
    19961999                       tCLOSE(0)])
    19972000        return d
    19982001
     2002class TokenParsingBanana(banana.Banana):
     2003    def processBody(self, typebyte, header, body):
     2004        self.tokens.append( (typebyte, header, body) )
     2005
     2006class TokenParser(unittest.TestCase):
     2007    def setUp(self):
     2008        self.b = b = TokenParsingBanana()
     2009        b.initReceive()
     2010        self.tokens = b.tokens = []
     2011    def get_tokens(self):
     2012        t = list(self.tokens)
     2013        self.tokens[:] = []
     2014        return t
     2015    def put(self, *chunks):
     2016        for c in chunks:
     2017            self.b.handleData(c)
     2018    def expect(self, tokens):
     2019        self.failUnlessEqual(self.get_tokens(), tokens)
     2020        self.failUnlessEqual(self.b.bufferSize, 0)
     2021        self.failUnlessEqual(self.b.bufferChunks, [])
     2022
     2023    def test_tokenizer(self):
     2024        put = self.put
     2025        expect = self.expect
     2026
     2027        put("")
     2028        expect([])
     2029
     2030        put(bINT(4))
     2031        expect([(INT, 4, '')])
     2032
     2033        put(bSTR("abcd"))
     2034        expect([(STRING, 4, "abcd")])
     2035
     2036        put(bSTR("abcd")+bSTR("efg"))
     2037        expect([(STRING, 4, "abcd"), (STRING, 3, "efg")])
     2038
     2039        s = bSTR("a"*100) + bSTR("b"*100)
     2040        put(s[:50])
     2041        put(s[50:])
     2042        expect([(STRING, 100, "a"*100), (STRING, 100, "b"*100)])
     2043
     2044        s = bSTR("a"*5) + bSTR("b"*5) + bSTR("c"*100) + bSTR("d") + bSTR("e")
     2045        put(s[:50])
     2046        put(s[50:])
     2047        expect([(STRING, 5, "a"*5), (STRING, 5, "b"*5),
     2048                (STRING, 100, "c"*100), (STRING, 1, "d"),
     2049                (STRING, 1, "e"),
     2050                ])
     2051
     2052
     2053    def write_int(self, num):
     2054        token = []
     2055        typebyte = "\x81"
     2056        if num < 0:
     2057            num = -num
     2058            typebyte = "\x83"
     2059        banana.int2b128(num, token.append)
     2060        token.append(typebyte)
     2061        return typebyte, "".join(token)
     2062
     2063    def write_long(self, num):
     2064        token = []
     2065        typebyte = "\x85"
     2066        if num < 0:
     2067            num = -num
     2068            typebyte = "\x86"
     2069        s = banana.long_to_bytes(num)
     2070        banana.int2b128(len(s), token.append)
     2071        token.append(typebyte)
     2072        token.append(s)
     2073        return typebyte, len(s), s, "".join(token)
     2074
     2075    def int2b128(self, n):
     2076        header = []
     2077        banana.int2b128(n, header.append)
     2078        return "".join(header)
     2079
     2080    def test_random(self):
     2081        put = self.put
     2082        expect = self.expect
     2083        for bodyseed in range(20):
     2084            pieces = []
     2085            expected = []
     2086            for i in range(40):
     2087                seed = md5("%d-%d" % (bodyseed, i)).hexdigest()
     2088                if seed[0] in "01":
     2089                    num = int(seed[1], 16) # small
     2090                    if seed[0] == "0" and num > 0:
     2091                        pieces.append(bINT(-num))
     2092                        expected.append( (NEG, num, "") )
     2093                    else:
     2094                        pieces.append(bINT(num))
     2095                        expected.append( (INT, num, "") )
     2096                if seed[0] in "23":
     2097                    num = int(seed[1:], 16) # large, still fits in INT
     2098                    if seed[0] == "2":
     2099                        num = -num
     2100                    typebyte, token = self.write_int(num)
     2101                    pieces.append(token)
     2102                    expected.append( (typebyte, abs(num), "") )
     2103                if seed[0] in "45":
     2104                    num = int(seed[1:], 16) # large, put in LONGINT
     2105                    if seed[0] == "4":
     2106                        num = -num
     2107                    typebyte, header, body, token = self.write_long(num)
     2108                    pieces.append(token)
     2109                    expected.append( (typebyte, header, body) )
     2110                if seed[0] in "67":
     2111                    l = int(seed[1:3], 16)
     2112                    s = seed[4] * l
     2113                    pieces.append(self.int2b128(l) + STRING + s)
     2114                    expected.append( (STRING, l, s) )
     2115                if seed[0] in "78":
     2116                    float_s = seed[2:2+8]
     2117                    pieces.append(FLOAT+float_s)
     2118                    expected.append( (FLOAT, 0, float_s) )
     2119                if seed[0] == "9":
     2120                    count = int(seed[1:3], 16)
     2121                    pieces.append(self.int2b128(count) + OPEN)
     2122                    expected.append( (OPEN, count, "") )
     2123                if seed[0] == "a":
     2124                    count = int(seed[1:3], 16)
     2125                    pieces.append(self.int2b128(count) + CLOSE)
     2126                    expected.append( (CLOSE, count, "") )
     2127                if seed[0] == "b":
     2128                    l = int(seed[1:3], 16)
     2129                    s = seed[4] * l
     2130                    pieces.append(self.int2b128(l) + ERROR + s)
     2131                    expected.append( (ERROR, l, s) )
     2132
     2133            s = "".join(pieces)
     2134            for chunkseed in range(20):
     2135                #print "bodylen:", len(s)  # generally 1000-2000 bytes
     2136                remaining = s
     2137                chunkstep = itertools.count(0)
     2138                while remaining:
     2139                    i = chunkstep.next()
     2140                    seed = md5("%d-%d" % (chunkseed, i)).hexdigest()
     2141                    size = int(seed[:2], 16) # TODO: prefer exponential
     2142                    #print " ", size, len(remaining)
     2143                    chunk, remaining = remaining[:size], remaining[size:]
     2144                    self.put(chunk)
     2145                self.expect(expected)
    19992146
    20002147
    20012148# TODO: vocab test: