Ticket #149: 149.WIP.diff
File 149.WIP.diff, 40.8 KB (added by , 15 years ago) |
---|
-
foolscap/banana.py
diff -r 9adfe64ba138 foolscap/banana.py
a b 88 88 return acc 89 89 90 90 HIGH_BIT_SET = chr(0x80) 91 WANT_HEADER = "want_header" 92 WANT_BODY = "want_body" 91 93 92 94 93 95 … … 567 569 # calls self.receivedObject() 568 570 569 571 unslicerClass = RootUnslicer 570 debugReceive = False572 debugReceive = True #False 571 573 logViolations = False 572 574 logReceiveErrors = True 573 575 useKeepalives = False … … 578 580 579 581 def initReceive(self): 580 582 self.inOpen = False # set during the Index Phase of an OPEN sequence 583 self.inClose = False 581 584 self.opentype = [] # accumulates Index Tokens 582 585 583 586 # to pre-negotiate, set the negotiation parameters and set … … 585 588 # self.buffer with the inbound negotiation block. 586 589 self.negotiated = False 587 590 self.connectionAbandoned = False 588 self.buffer = '' 591 self.bufferChunks = [] 592 self.bufferSize = 0 593 self.bufferOffset = 0 594 self.receiveMode = WANT_HEADER 589 595 590 596 self.incomingVocabulary = {} 591 597 self.skipBytes = 0 # used to discard a single long token … … 698 704 699 705 700 706 def handleData(self, chunk): 701 # buffer, assemble into tokens 702 # call self.receiveToken(token) with each 703 if self.skipBytes: 704 if len(chunk) < self.skipBytes: 705 # skip the whole chunk 706 self.skipBytes -= len(chunk) 707 return 708 # skip part of the chunk, and stop skipping 709 chunk = chunk[self.skipBytes:] 710 self.skipBytes = 0 711 buffer = self.buffer + chunk 707 if not chunk: 708 return 709 self.bufferChunks.append(chunk) 710 self.bufferSize += len(chunk) 711 #if self.receiveMode is WANT_LENGTH: 712 # self.bufferChunks = ["".join(self.bufferChunks)] 713 done = False 714 while not done: 715 if self.receiveMode is WANT_HEADER: 716 done = self.receiveHeader() 717 if self.receiveMode is WANT_BODY: 718 # TODO: if self.discardCount: self.discardBody() 719 done = self.receiveBody() 720 # when we get here, any completely consumed chunk is removed 712 721 713 # Loop through the available input data, extracting one token per714 # pass.715 722 716 while buffer: 717 assert self.buffer != buffer, \ 718 ("Banana.handleData: no progress made: %s %s" % 719 (repr(buffer),)) 720 self.buffer = buffer 721 pos = 0 723 def receiveHeader(self): 724 def non_consuming_lookahead(): 725 for chunknum,chunk in enumerate(self.bufferChunks): 726 start = 0 727 if chunknum == 0: 728 start = self.bufferOffset 729 for i in range(start, len(chunk)): 730 yield (chunknum, i, chunk[i]) 731 headerbytes = [] 732 typebyte = None 733 iterator = non_consuming_lookahead() 734 for (chunknum, offset, ch) in iterator: 735 if ch >= HIGH_BIT_SET: 736 header = b1282int(headerbytes) 737 typebyte = ch 738 break 739 headerbytes.append(ch) 740 if len(headerbytes) > 64: 741 # drop the connection 742 got = "".join(headerbytes) 743 raise BananaError("token prefix is limited to 64 bytes:" 744 " but got %r" % got) 745 # still looking 746 if typebyte is None: 747 return True # still hungry, must wait for next message 722 748 723 for ch in buffer: 724 if ch >= HIGH_BIT_SET: 725 break 726 pos = pos + 1 727 if pos > 64: 728 # drop the connection. We log more of the buffer, but not 729 # all of it, to make it harder for someone to spam our 730 # logs. 731 raise BananaError("token prefix is limited to 64 bytes: " 732 "but got %r" % (buffer[:200],)) 733 else: 734 # we've run out of buffer without seeing the high bit, which 735 # means we're still waiting for header to finish 736 return 737 assert pos <= 64 749 # found it. We need to trim the chunklist and update the offset. The 750 # next character that our iterator returns (if any) is where the next 751 # consumer should look. 752 self.bufferSize -= len(headerbytes)+1 753 try: 754 (chunknum, offset, ign) = iterator.next() 755 self.bufferOffset = offset 756 if chunknum > 0: 757 self.bufferChunks = self.bufferChunks[chunknum:] 758 except StopIteration: 759 # we've consumed everything 760 self.bufferChunks = [] 761 self.bufferOffset = 0 738 762 739 # At this point, the header and type byte have been received. 740 # The body may or may not be complete. 763 return self.processTypeByte(header, typebyte) 741 764 742 typebyte = buffer[pos] 743 if pos: 744 header = b1282int(buffer[:pos]) 745 else: 746 header = 0 747 748 # rejected is set as soon as a violation is detected. It 749 # indicates that this single token will be rejected. 750 751 rejected = False 752 if self.discardCount: 753 rejected = True 754 755 wasInOpen = self.inOpen 756 if typebyte == OPEN: 757 self.inboundObjectCount = self.objectCounter 758 self.objectCounter += 1 759 if self.inOpen: 760 raise BananaError("OPEN token followed by OPEN") 761 self.inOpen = True 762 # the inOpen flag is set as soon as the OPEN token is 763 # witnessed (even it it gets rejected later), because it 764 # means that there is a new sequence starting that must be 765 # handled somehow (either discarded or given to a new 766 # Unslicer). 767 768 # The inOpen flag is cleared when the Index Phase ends. There 769 # are two possibilities: 1) a new Unslicer is pushed, and 770 # tokens are delivered to it normally. 2) a Violation was 771 # raised, and the tokens must be discarded 772 # (self.discardCount++). *any* rejection-caused True->False 773 # transition of self.inOpen must be accompanied by exactly 774 # one increment of self.discardCount 775 776 # determine if this token will be accepted, and if so, how large 777 # it is allowed to be (for STRING and LONGINT/LONGNEG) 778 779 if ((not rejected) and 780 (typebyte not in (PING, PONG, ABORT, CLOSE, ERROR))): 781 # PING, PONG, ABORT, CLOSE, and ERROR are always legal. All 782 # others (including OPEN) can be rejected by the schema: for 783 # example, a list of integers would reject STRING, VOCAB, and 784 # OPEN because none of those will produce integers. If the 785 # unslicer's .checkToken rejects the tokentype, its 786 # .receiveChild will immediately get an Failure 787 try: 788 # the purpose here is to limit the memory consumed by 789 # the body of a STRING, OPEN, LONGINT, or LONGNEG token 790 # (i.e., the size of a primitive type). If the sender 791 # wants to feed us more data than we want to accept, the 792 # checkToken() method should raise a Violation. This 793 # will never be called with ABORT or CLOSE types. 794 top = self.receiveStack[-1] 795 if wasInOpen: 796 top.openerCheckToken(typebyte, header, self.opentype) 797 else: 798 top.checkToken(typebyte, header) 799 except Violation, v: 800 rejected = True 801 f = BananaFailure() 802 if wasInOpen: 803 methname = "openerCheckToken" 804 else: 805 methname = "checkToken" 806 self.handleViolation(f, methname, inOpen=self.inOpen) 807 self.inOpen = False 808 765 def processTypeByte(self, header, typebyte): 766 # I decide how much data we want for the body. when we exit this, 767 # we'll be in WANT_BODY and have a length set. 768 self.receivedHeader = header 769 self.receivedTypeByte = typebyte 770 self.receiveMode = WANT_BODY 771 if typebyte in (OPEN, CLOSE, ABORT, INT, NEG, VOCAB, PING, PONG, LIST): 772 self.wantLength = 0 773 elif typebyte in (ERROR, STRING, LONGINT, LONGNEG): 774 self.wantLength = header 809 775 if typebyte == ERROR and header > SIZE_LIMIT: 810 776 # someone is trying to spam us with an ERROR token. Drop 811 777 # them with extreme prejudice. 812 778 raise BananaError("oversized ERROR token") 779 elif typebyte in (FLOAT,): 780 self.wantLength = 8 813 781 814 rest = buffer[pos+1:] 782 def _consume_and_return_first_n_bytes(self, wanted): 783 offset = self.bufferOffset 784 #print " _consume" 785 for (chunknum, chunk) in enumerate(self.bufferChunks): 786 available = len(chunk)-offset 787 #print " a:", chunknum, chunk, available, wanted 788 if available > wanted: 789 #print " want middle" 790 # we want the middle of this segment, but no more 791 s = chunk[offset:offset+wanted] 792 offset = offset+wanted 793 nextChunknum = chunknum 794 else: 795 #print " want tail" 796 # we want the whole tail (chunk[offset:]) 797 if offset == 0: 798 s = chunk # avoid a copy, not sure what python does 799 else: 800 # drat, "".join() doesn't take buffers, only strings 801 s = chunk[offset:] 802 offset = 0 # and we begin at the start of the next chunk 803 nextChunknum = chunknum+1 804 wanted -= len(s) 805 #print " yield", s 806 yield s 807 self.bufferSize -= len(s) 808 if not wanted: 809 # now trim 810 self.bufferOffset = offset 811 if nextChunknum > 0: 812 self.bufferChunks = self.bufferChunks[nextChunknum:] 813 break 814 #print " _consume done" 815 815 816 # determine what kind of token it is. Each clause finishes in 817 # one of four ways: 818 # 819 # raise BananaError: the protocol was violated so badly there is 820 # nothing to do for it but hang up abruptly 821 # 822 # return: if the token is not yet complete (need more data) 823 # 824 # continue: if the token is complete but no object (for 825 # handleToken) was produced, e.g. OPEN, CLOSE, ABORT 826 # 827 # obj=foo: the token is complete and an object was produced 828 # 829 # note that if rejected==True, the object is dropped instead of 830 # being passed up to the current Unslicer 816 def receiveBody(self): 817 # compare the amount of data available in self.bufferChunks against 818 # the amount of data that we want to receive (which might be zero). 819 # If we have enough, stringify the body (if any) and deliver the 820 # token. 821 #print "receiveBody", self.bufferSize, self.wantLength 822 if self.bufferSize < self.wantLength: 823 #print " still hungry" 824 return True # still hungry, need more data 825 #print " ready" 831 826 827 # figure out which pieces we want. We've already checked the length, 828 # we know there's enough, so it's safe to consume these as we go. 829 pieces = self._consume_and_return_first_n_bytes(self.wantLength) 830 body = "".join(list(pieces)) 831 assert len(body) == self.wantLength, (len(body), self.wantLength) 832 self.wantLength = 0 833 self.receiveMode = WANT_HEADER 834 self.processBody(self.receivedTypeByte, self.receivedHeader, body) 835 836 def processBody(self, typebyte, header, body): 837 #print "processBody (%s)(%s)(%s)" % (tokens.tokenNames[typebyte], header, body) 838 839 if typebyte == PING: 840 # repond with a PONG, but otherwise ignore it 841 self.sendPONG(header) 842 return 843 844 if typebyte == PONG: 845 return # ignored completely 846 847 if typebyte == OPEN: 848 self.objectCounter += 1 # keep this in sync 849 print "self.objectCounter now", self.objectCounter 850 851 if self.discardCount: 852 # we track OPEN and CLOSE even while we're in discard mode, so we 853 # know when to stop discarding 832 854 if typebyte == OPEN: 833 buffer = rest 834 self.inboundOpenCount = header 835 if rejected: 836 if self.debugReceive: 837 print "DROP (OPEN)" 838 if self.inOpen: 839 # we are discarding everything at the old level, so 840 # discard everything in the new level too 841 self.discardCount += 1 842 if self.debugReceive: 843 print "++discardCount (OPEN), now %d" \ 844 % self.discardCount 845 self.inOpen = False 846 else: 847 # the checkToken handleViolation has already started 848 # discarding this new sequence, we don't have to 849 pass 850 else: 851 self.inOpen = True 852 self.opentype = [] 853 continue 855 self.discardCount += 1 856 elif typebyte == CLOSE: 857 self.discardCount -= 1 858 # when discarding, we completely ignore the rest 859 return 854 860 855 elif typebyte == CLOSE: 856 buffer = rest 857 count = header 858 if self.discardCount: 859 self.discardCount -= 1 860 if self.debugReceive: 861 print "--discardCount (CLOSE), now %d" \ 862 % self.discardCount 863 else: 864 self.handleClose(count) 865 continue 861 try: 862 self.processBody2(typebyte, header, body) 863 except Violation: 864 f = BananaFailure() 865 self.handleViolation(f) 866 866 867 elif typebyte == ABORT: 868 buffer = rest 869 count = header 870 # TODO: this isn't really a Violation, but we need something 871 # to describe it. It does behave identically to what happens 872 # when receiveChild raises a Violation. The .handleViolation 873 # will pop the now-useless Unslicer and start discarding 874 # tokens just as if the Unslicer had made the decision. 875 if rejected: 876 if self.debugReceive: 877 print "DROP (ABORT)" 878 # I'm ignoring you, LALALALALA. 879 # 880 # In particular, do not deliver a second Violation 881 # because of the ABORT that we're supposed to be 882 # ignoring because of a first Violation that happened 883 # earlier. 884 continue 885 try: 886 # slightly silly way to do it, but nice and uniform 887 raise Violation("ABORT received") 888 except Violation: 889 f = BananaFailure() 890 self.handleViolation(f, "receive-abort") 891 continue 867 def processBody2(self, typebyte, header, body): 868 top = self.receiveStack[-1] 869 self._rx_methname = "unknown" 892 870 893 elif typebyte == ERROR: 894 strlen = header 895 if len(rest) >= strlen: 896 # the whole string is available 897 buffer = rest[strlen:] 898 obj = rest[:strlen] 899 # handleError must drop the connection 900 self.handleError(obj) 901 return 902 else: 903 return # there is more to come 871 if typebyte == OPEN: 872 # stash for later. We already incremented it, so -1 873 self.inboundObjectCount = self.objectCounter-1 874 if self.inOpen: 875 raise BananaError("OPEN token followed by OPEN") 876 self.inOpen = True 877 self._rx_methname = "checkToken" 878 top.checkToken(OPEN, header) # might raise Violation 904 879 905 elif typebyte == LIST:906 raise BananaError("oldbanana peer detected, " +907 "compatibility code not yet written")908 #listStack.append((header, []))909 #buffer = rest880 # the inOpen flag is set as soon as the OPEN token is 881 # witnessed (even it it gets rejected later), because it 882 # means that there is a new sequence starting that must be 883 # handled somehow (either discarded or given to a new 884 # Unslicer). 910 885 911 elif typebyte == STRING: 912 strlen = header 913 if len(rest) >= strlen: 914 # the whole string is available 915 buffer = rest[strlen:] 916 obj = rest[:strlen] 917 # although it might be rejected 918 else: 919 # there is more to come 920 if rejected: 921 # drop all we have and note how much more should be 922 # dropped 923 if self.debugReceive: 924 print "DROPPED some string bits" 925 self.skipBytes = strlen - len(rest) 926 self.buffer = "" 927 return 886 # The inOpen flag is cleared when the Index Phase ends. There 887 # are two possibilities: 1) a new Unslicer is pushed, and 888 # tokens are delivered to it normally. 2) a Violation was 889 # raised, and the tokens must be discarded 890 # (self.discardCount++). *any* rejection-caused True->False 891 # transition of self.inOpen must be accompanied by exactly 892 # one increment of self.discardCount 928 893 929 elif typebyte == INT: 930 buffer = rest 931 obj = int(header) 932 elif typebyte == NEG: 933 buffer = rest 934 # -2**31 is too large for a positive int, so go through 935 # LongType first 936 obj = int(-long(header)) 937 elif typebyte == LONGINT or typebyte == LONGNEG: 938 strlen = header 939 if len(rest) >= strlen: 940 # the whole number is available 941 buffer = rest[strlen:] 942 obj = bytes_to_long(rest[:strlen]) 943 if typebyte == LONGNEG: 944 obj = -obj 945 # although it might be rejected 946 else: 947 # there is more to come 948 if rejected: 949 # drop all we have and note how much more should be 950 # dropped 951 self.skipBytes = strlen - len(rest) 952 self.buffer = "" 953 return 894 self.inboundOpenCount = header # for debugging/error-checking 895 self.opentype = [] 896 return 954 897 955 elif typebyte == VOCAB: 956 buffer = rest 957 obj = self.incomingVocabulary[header] 958 # TODO: bail if expanded string is too big 959 # this actually means doing self.checkToken(VOCAB, len(obj)) 960 # but we have to make sure we handle the rejection properly 898 # PING, PONG, ABORT, CLOSE, and ERROR are always legal (i.e. we do 899 # not submit them to checkToken/openerCheckToken). 961 900 962 elif typebyte == FLOAT: 963 if len(rest) >= 8: 964 buffer = rest[8:] 965 obj = struct.unpack("!d", rest[:8])[0] 966 else: 967 # this case is easier than STRING, because it is only 8 968 # bytes. We don't bother skipping anything. 969 return 901 if typebyte == ABORT: 902 # This isn't really a Violation, but we need something to 903 # describe it. It does behave identically to what happens when 904 # receiveChild raises a Violation. The .handleViolation will pop 905 # the now-useless Unslicer and start discarding tokens just as if 906 # the Unslicer itself had made the decision. 970 907 971 elif typebyte == PING: 972 buffer = rest 973 self.sendPONG(header) 974 continue # otherwise ignored 908 # slightly silly way to do it, but nice and uniform 909 self._rx_methname = "receive-abort" 910 raise Violation("ABORT received") 975 911 976 elif typebyte == PONG: 977 buffer = rest 978 continue # otherwise ignored 912 if typebyte == ERROR: 913 # handleError must drop the connection 914 self.handleError(body) 915 return 979 916 980 else: 981 raise BananaError("Invalid Type Byte 0x%x" % ord(typebyte)) 917 if typebyte == CLOSE: 918 self.handleClose(header) 919 return 982 920 983 if not rejected: 984 if self.inOpen: 985 self.handleOpen(self.inboundOpenCount, 986 self.inboundObjectCount, 987 obj) 988 # handleOpen might push a new unslicer and clear 989 # .inOpen, or leave .inOpen true and append the object 990 # to .indexOpen 991 else: 992 self.handleToken(obj) 993 else: 994 if self.debugReceive: 995 print "DROP", type(obj), obj 996 pass # drop the object 921 # the other tokens (LIST, STRING, INT/NEG, LONGINT/LONGNEG, FLOAT, 922 # VOCAB) *are* subject to openerCheckToken/checkToken validation. 997 923 998 # while loop ends here 924 # For example, a list of integers would reject STRING, VOCAB, and 925 # OPEN because none of those will produce integers. If the unslicer's 926 # .checkToken rejects the tokentype, its .receiveChild will 927 # immediately get an Failure 999 928 1000 self.buffer = '' 929 # The validation includes a body length. We've already buffered the 930 # whole token by now, but still do the validation, because many of 931 # the unslicers perform their only checks in checkToken() instead of 932 # receiveChild(). TODO: once unslicers have moved all code out of 933 # checkToken(), remove this check. 934 935 # the openerCheckToken/checkToken calls might raise Violation 936 if self.inOpen: 937 self._rx_methname = "openerCheckToken" 938 top.openerCheckToken(typebyte, header, self.opentype) 939 else: 940 self._rx_methname = "checkToken" 941 print "calling checkToken", repr(typebyte), header 942 top.checkToken(typebyte, header) 943 print " checkToken happy" 944 945 # ok, we're accepting the token. These will all cause objects to be 946 # submitted to handleOpen or handleToken 947 948 if typebyte == STRING: 949 obj = body 950 elif typebyte == INT: 951 obj = int(header) 952 elif typebyte == NEG: 953 # -2**31 is too large for a positive int, so go through 954 # LongType first 955 obj = int(-long(header)) 956 elif typebyte in (LONGINT, LONGNEG): 957 obj = bytes_to_long(body) 958 if typebyte == LONGNEG: 959 obj = -obj 960 elif typebyte == VOCAB: 961 obj = self.incomingVocabulary[header] 962 # TODO: bail if expanded string is too big 963 # this actually means doing self.checkToken(VOCAB, len(obj)) 964 # but we have to make sure we handle the rejection properly 965 elif typebyte == FLOAT: 966 obj = struct.unpack("!d", body)[0] 967 else: 968 raise BananaError("Invalid Type Byte 0x%x" % ord(typebyte)) 969 970 #print "about to handleToken(%s) (%s)" % (obj, body) 971 if self.inOpen: 972 self.handleOpen(self.inboundOpenCount, 973 self.inboundObjectCount, 974 obj) 975 # handleOpen might push a new unslicer and clear 976 # .inOpen, or leave .inOpen true and append the object 977 # to .indexOpen 978 else: 979 self.handleToken(obj) 1001 980 1002 981 1003 982 def handleOpen(self, openCount, objectCount, indexToken): 983 # I am called for each index token of an OPEN sequence. For any given 984 # OPEN sequence (introducing a new node of the object graph), if the 985 # opentype contains N tokens, I will be called N times, with the same 986 # openCount/objectCount values but different (sequential) indexToken 987 # values. 988 # 989 # openCount is the (optional) value provided by the sender, telling 990 # us how many OPEN tokens they've sent us before this one. 991 # objectCount is a local counter, which tells us how many tokens (of 992 # any kind) we've received before the OPEN that started this 993 # sequence. 1004 994 self.opentype.append(indexToken) 1005 995 opentype = tuple(self.opentype) 1006 996 if self.debugReceive: 1007 print "handleOpen(%d,%d,%s)" % (openCount, objectCount, indexToken) 997 print "handleOpen(opencount=%d,objcount=%d,%s)" % \ 998 (openCount, objectCount, indexToken) 1008 999 top = self.receiveStack[-1] 1009 try: 1010 # obtain a new Unslicer to handle the object 1011 child = top.doOpen(opentype) 1012 if not child: 1013 if self.debugReceive: 1014 print " doOpen wants more index tokens" 1015 return # they want more index tokens, leave .inOpen=True 1000 1001 # obtain a new Unslicer to handle the object 1002 self._rx_methname = "doOpen" 1003 child = top.doOpen(opentype) # might raise Violation 1004 if not child: 1016 1005 if self.debugReceive: 1017 print " opened[%d] with %s" % (openCount, child) 1018 except Violation, v: 1019 # must discard the rest of the child object. There is no new 1020 # unslicer pushed yet, so we don't use abandonUnslicer 1021 self.inOpen = False 1022 f = BananaFailure() 1023 self.handleViolation(f, "doOpen", inOpen=True) 1024 return 1006 print " doOpen wants more index tokens" 1007 return # they want more index tokens, leave .inOpen=True 1025 1008 1009 if self.debugReceive: 1010 print " opened[%d] with %s" % (openCount, child) 1026 1011 assert tokens.IUnslicer.providedBy(child), "child is %s" % child 1027 self.inOpen = False1028 1012 child.protocol = self 1029 1013 child.openCount = openCount 1030 1014 child.parent = top 1015 self.inOpen = False 1031 1016 self.receiveStack.append(child) 1032 try: 1033 child.start(objectCount) 1034 except Violation, v: 1035 # the child is now on top, so use abandonUnslicer to discard the 1036 # rest of the child 1037 f = BananaFailure() 1038 # notifies the new child 1039 self.handleViolation(f, "start") 1017 self._rx_methname = "start" 1018 child.start(objectCount) # might raise Violation 1040 1019 1041 1020 def handleToken(self, token, ready_deferred=None): 1042 1021 top = self.receiveStack[-1] 1043 1022 if self.debugReceive: print "handleToken(%s)" % (token,) 1044 1023 if ready_deferred: 1045 1024 assert isinstance(ready_deferred, defer.Deferred) 1046 try: 1047 top.receiveChild(token, ready_deferred) 1048 except Violation, v: 1049 # this is how the child says "I've been contaminated". We don't 1050 # pop them automatically: if they want that, they should return 1051 # back the failure in their reportViolation method. 1052 f = BananaFailure() 1053 self.handleViolation(f, "receiveChild") 1025 self._rx_methname = "receiveChild" 1026 top.receiveChild(token, ready_deferred) # might raise Violation 1054 1027 1055 1028 def handleClose(self, closeCount): 1056 1029 if self.debugReceive: … … 1059 1032 raise BananaError("lost sync, got CLOSE(%d) but expecting %s" \ 1060 1033 % (closeCount, self.receiveStack[-1].openCount)) 1061 1034 1035 # If a Violation is raised by receiveClose() or finish(), we'll have 1036 # an extra Unslicer (we've received the CLOSE, the sequence is over, 1037 # but we haven't yet popped the unslicer off the stack). self.inClose 1038 # is a signal to handleViolation that we can discard one fewer 1039 # levels. 1040 self.inClose = True 1041 if self.inOpen: 1042 # huh, we were still waiting to see all the opentype tokens when 1043 # the caller closed the sequence. We must disagree about the 1044 # opentype sequences. This is a Violation. Treat it as if it 1045 # occurred in handleOpen(). 1046 self._rx_methname = "close-during-open" 1047 raise Violation("received CLOSE during open sequence") 1048 1062 1049 child = self.receiveStack[-1] # don't pop yet: describe() needs it 1063 1050 1064 try: 1065 obj, ready_deferred = child.receiveClose() 1066 except Violation, v: 1067 # the child is contaminated. However, they're finished, so we 1068 # don't have to discard anything. Just give an Failure to the 1069 # parent instead of the object they would have returned. 1070 f = BananaFailure() 1071 self.handleViolation(f, "receiveClose", inClose=True) 1072 return 1051 self._rx_methname = "receiveClose" 1052 obj, ready_deferred = child.receiveClose() # might raise Violation 1073 1053 if self.debugReceive: print "receiveClose returned", obj 1074 1054 1075 try: 1076 child.finish() 1077 except Violation, v: 1078 # .finish could raise a Violation if an object that references 1079 # the child is just now deciding that they don't like it 1080 # (perhaps their TupleConstraint couldn't be asserted until the 1081 # tuple was complete and referenceable). In this case, the child 1082 # has produced a valid object, but an earlier (incomplete) 1083 # object is not valid. So we treat this as if this child itself 1084 # raised the Violation. The .where attribute will point to this 1085 # child, which is the node that caused somebody problems, but 1086 # will be marked <FINISH>, which indicates that it wasn't the 1087 # child itself which raised the Violation. TODO: not true 1088 # 1089 # TODO: it would be more useful if the UF could also point to 1090 # the completing object (the one which raised Violation). 1055 self._rx_methname = "finish" 1056 child.finish() # might raise Violation 1091 1057 1092 f = BananaFailure() 1093 self.handleViolation(f, "finish", inClose=True) 1094 return 1058 # .finish could raise a Violation if an object that references 1059 # the child is just now deciding that they don't like it 1060 # (perhaps their TupleConstraint couldn't be asserted until the 1061 # tuple was complete and referenceable). In this case, the child 1062 # has produced a valid object, but an earlier (incomplete) 1063 # object is not valid. So we treat this as if this child itself 1064 # raised the Violation. The .where attribute will point to this 1065 # child, which is the node that caused somebody problems, but 1066 # will be marked <FINISH>, which indicates that it wasn't the 1067 # child itself which raised the Violation. TODO: not true 1068 # 1069 # TODO: it would be more useful if the UF could also point to 1070 # the completing object (the one which raised Violation). 1095 1071 1096 1072 self.receiveStack.pop() 1073 self.inClose = False 1097 1074 1098 1075 # now deliver the object to the parent 1099 1076 self.handleToken(obj, ready_deferred) 1100 1077 1101 def handleViolation(self, f , methname, inOpen=False, inClose=False):1078 def handleViolation(self, f): 1102 1079 """An Unslicer has decided to give up, or we have given up on it 1103 1080 (because we received an ABORT token). 1081 1082 inOpen= and inClose= are used to manage the gap between how many 1083 OPEN/CLOSE tokens we've received and how many Unslicers are on the 1084 stack. The inbound token sequence is OPEN ot1 ot2 ch1 ch2 CLOSE, but 1085 we don't push a new Unslicer until the last opentype token (between 1086 'ot2' and 'ch1'). inOpen=True when we receive the OPEN, and is set to 1087 False when we push the Unslicer during processing of ot2. 1088 inClose=True when we receive the CLOSE and is set to False a few 1089 statements later after we pop the Unslicer. 1104 1090 """ 1105 1091 1092 methname = self._rx_methname 1106 1093 where = self.describeReceive() 1107 1094 f.value.setLocation(where) 1108 1095 1109 1096 if self.debugReceive: 1110 1097 print " handleViolation-%s (inOpen=%s, inClose=%s): %s" \ 1111 % (methname, inOpen,inClose, f)1098 % (methname, self.inOpen, self.inClose, f) 1112 1099 1113 1100 assert isinstance(f, BananaFailure) 1114 1101 … … 1116 1103 log.msg("Violation in %s at %s" % (methname, where)) 1117 1104 log.err(f) 1118 1105 1119 if inOpen: 1106 if self.inOpen: 1107 self.inOpen = False 1120 1108 self.discardCount += 1 1121 1109 if self.debugReceive: 1122 1110 print " ++discardCount (inOpen), now %d" % self.discardCount 1123 1111 1112 inClose = self.inClose 1113 self.inClose = False 1114 1124 1115 while True: 1125 1116 # tell the parent that their child is dead. This is useful for 1126 1117 # things like PB, which may want to errback the current request. -
foolscap/test/test_banana.py
diff -r 9adfe64ba138 foolscap/test/test_banana.py
a b 7 7 8 8 from foolscap.tokens import ISlicer, Violation, BananaError 9 9 from foolscap.tokens import BananaFailure, tokenNames, \ 10 OPEN, CLOSE, ABORT, INT, LONGINT, NEG, LONGNEG, FLOAT, STRING10 OPEN, CLOSE, ERROR, ABORT, INT, LONGINT, NEG, LONGNEG, FLOAT, STRING 11 11 from foolscap import slicer, schema, storage, banana, vocab 12 12 from foolscap.eventual import fireEventually, flushEventualQueue 13 13 from foolscap.slicers.allslicers import RootSlicer, DictUnslicer, TupleUnslicer … … 15 15 from foolscap.banana import int2b128, long_to_bytes 16 16 17 17 import StringIO 18 import sets, struct 18 import sets, struct, itertools 19 19 from decimal import Decimal 20 from hashlib import md5 20 21 21 22 #log.startLogging(sys.stderr) 22 23 … … 1430 1431 schema.StringConstraint(10)) 1431 1432 self.conform2("\x0a\x82" + "a"*10 + "extra", "a"*10, 1432 1433 schema.StringConstraint(10)) 1434 print "doing violate2" 1433 1435 self.violate2("\x0b\x82" + "a"*11 + "extra", 1434 1436 "<RootUnslicer>", 1435 1437 schema.StringConstraint(10)) 1438 print "did violate2" 1436 1439 1437 1440 def NOTtestFoo(self): 1438 1441 if 0: … … 1996 1999 tCLOSE(0)]) 1997 2000 return d 1998 2001 2002 class TokenParsingBanana(banana.Banana): 2003 def processBody(self, typebyte, header, body): 2004 self.tokens.append( (typebyte, header, body) ) 2005 2006 class TokenParser(unittest.TestCase): 2007 def setUp(self): 2008 self.b = b = TokenParsingBanana() 2009 b.initReceive() 2010 self.tokens = b.tokens = [] 2011 def get_tokens(self): 2012 t = list(self.tokens) 2013 self.tokens[:] = [] 2014 return t 2015 def put(self, *chunks): 2016 for c in chunks: 2017 self.b.handleData(c) 2018 def expect(self, tokens): 2019 self.failUnlessEqual(self.get_tokens(), tokens) 2020 self.failUnlessEqual(self.b.bufferSize, 0) 2021 self.failUnlessEqual(self.b.bufferChunks, []) 2022 2023 def test_tokenizer(self): 2024 put = self.put 2025 expect = self.expect 2026 2027 put("") 2028 expect([]) 2029 2030 put(bINT(4)) 2031 expect([(INT, 4, '')]) 2032 2033 put(bSTR("abcd")) 2034 expect([(STRING, 4, "abcd")]) 2035 2036 put(bSTR("abcd")+bSTR("efg")) 2037 expect([(STRING, 4, "abcd"), (STRING, 3, "efg")]) 2038 2039 s = bSTR("a"*100) + bSTR("b"*100) 2040 put(s[:50]) 2041 put(s[50:]) 2042 expect([(STRING, 100, "a"*100), (STRING, 100, "b"*100)]) 2043 2044 s = bSTR("a"*5) + bSTR("b"*5) + bSTR("c"*100) + bSTR("d") + bSTR("e") 2045 put(s[:50]) 2046 put(s[50:]) 2047 expect([(STRING, 5, "a"*5), (STRING, 5, "b"*5), 2048 (STRING, 100, "c"*100), (STRING, 1, "d"), 2049 (STRING, 1, "e"), 2050 ]) 2051 2052 2053 def write_int(self, num): 2054 token = [] 2055 typebyte = "\x81" 2056 if num < 0: 2057 num = -num 2058 typebyte = "\x83" 2059 banana.int2b128(num, token.append) 2060 token.append(typebyte) 2061 return typebyte, "".join(token) 2062 2063 def write_long(self, num): 2064 token = [] 2065 typebyte = "\x85" 2066 if num < 0: 2067 num = -num 2068 typebyte = "\x86" 2069 s = banana.long_to_bytes(num) 2070 banana.int2b128(len(s), token.append) 2071 token.append(typebyte) 2072 token.append(s) 2073 return typebyte, len(s), s, "".join(token) 2074 2075 def int2b128(self, n): 2076 header = [] 2077 banana.int2b128(n, header.append) 2078 return "".join(header) 2079 2080 def test_random(self): 2081 put = self.put 2082 expect = self.expect 2083 for bodyseed in range(20): 2084 pieces = [] 2085 expected = [] 2086 for i in range(40): 2087 seed = md5("%d-%d" % (bodyseed, i)).hexdigest() 2088 if seed[0] in "01": 2089 num = int(seed[1], 16) # small 2090 if seed[0] == "0" and num > 0: 2091 pieces.append(bINT(-num)) 2092 expected.append( (NEG, num, "") ) 2093 else: 2094 pieces.append(bINT(num)) 2095 expected.append( (INT, num, "") ) 2096 if seed[0] in "23": 2097 num = int(seed[1:], 16) # large, still fits in INT 2098 if seed[0] == "2": 2099 num = -num 2100 typebyte, token = self.write_int(num) 2101 pieces.append(token) 2102 expected.append( (typebyte, abs(num), "") ) 2103 if seed[0] in "45": 2104 num = int(seed[1:], 16) # large, put in LONGINT 2105 if seed[0] == "4": 2106 num = -num 2107 typebyte, header, body, token = self.write_long(num) 2108 pieces.append(token) 2109 expected.append( (typebyte, header, body) ) 2110 if seed[0] in "67": 2111 l = int(seed[1:3], 16) 2112 s = seed[4] * l 2113 pieces.append(self.int2b128(l) + STRING + s) 2114 expected.append( (STRING, l, s) ) 2115 if seed[0] in "78": 2116 float_s = seed[2:2+8] 2117 pieces.append(FLOAT+float_s) 2118 expected.append( (FLOAT, 0, float_s) ) 2119 if seed[0] == "9": 2120 count = int(seed[1:3], 16) 2121 pieces.append(self.int2b128(count) + OPEN) 2122 expected.append( (OPEN, count, "") ) 2123 if seed[0] == "a": 2124 count = int(seed[1:3], 16) 2125 pieces.append(self.int2b128(count) + CLOSE) 2126 expected.append( (CLOSE, count, "") ) 2127 if seed[0] == "b": 2128 l = int(seed[1:3], 16) 2129 s = seed[4] * l 2130 pieces.append(self.int2b128(l) + ERROR + s) 2131 expected.append( (ERROR, l, s) ) 2132 2133 s = "".join(pieces) 2134 for chunkseed in range(20): 2135 #print "bodylen:", len(s) # generally 1000-2000 bytes 2136 remaining = s 2137 chunkstep = itertools.count(0) 2138 while remaining: 2139 i = chunkstep.next() 2140 seed = md5("%d-%d" % (chunkseed, i)).hexdigest() 2141 size = int(seed[:2], 16) # TODO: prefer exponential 2142 #print " ", size, len(remaining) 2143 chunk, remaining = remaining[:size], remaining[size:] 2144 self.put(chunk) 2145 self.expect(expected) 1999 2146 2000 2147 2001 2148 # TODO: vocab test: