Changeset 480:c2a9b31e1fd5

Show
Ignore:
Timestamp:
09/02/08 20:48:55 (4 months ago)
Author:
Brian Warner <warner@allmydata.com>
branch:
default
Message:

gatterer: reclassify everything that isn't already classified. Closes #94.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • ChangeLog

    r479 r480  
    112008-09-02  Brian Warner  <warner@lothar.com> 
     2 
     3        * foolscap/pb.py (Tub._log_gatherer_connected): use callRemoteOnly 
     4        to pass the logport to the gatherer: we don't need to hear about 
     5        any problems it has. 
     6 
     7        * foolscap/logging/gatherer.py 
     8        (IncidentGathererService.classify_stored_incidents): reclassify 
     9        everything that isn't already present in one of the classified/* 
     10        files. This makes it a lot easier to iterate over the [start 
     11        gatherer; see what is unknown; update classifiers; remove 
     12        classified/unknown; repeat] loop. Also log classification events 
     13        better. Closes #94. 
     14        * foolscap/test/test_logging.py (IncidentGatherer.test_emit): test it 
    215 
    316        * foolscap/test/common.py (PollMixin): replace the use of chained 
  • foolscap/logging/gatherer.py

    r472 r480  
    436436        if not os.path.isdir(outputdir): 
    437437            os.makedirs(outputdir) 
    438             self.classify_stored_incidents(indir) 
     438        self.classify_stored_incidents(indir) 
    439439        GatheringBase.startService(self) 
    440440 
    441441    def classify_stored_incidents(self, indir): 
    442442        stdout = self.stdout or sys.stdout 
    443         print >>stdout, "No classified/ directory: reclassifying stored incidents" 
    444         # now classify all stored incidents 
     443        print >>stdout, "classifying stored incidents" 
     444        # now classify all stored incidents that aren't already classified 
     445        already = set() 
     446        outputdir = os.path.join(self.basedir, "classified") 
     447        for category in os.listdir(outputdir): 
     448            for line in open(os.path.join(outputdir, category), "r"): 
     449                fn = line.strip() 
     450                abs_fn = os.path.join(self.basedir, fn) 
     451                already.add(abs_fn) 
     452        print >>stdout, "%d incidents already classified" % len(already) 
     453        count = 0 
    445454        for tubid_s in os.listdir(indir): 
    446455            nodedir = os.path.join(indir, tubid_s) 
     
    448457                if fn.startswith("incident-"): 
    449458                    abs_fn = os.path.join(nodedir, fn) 
     459                    if abs_fn in already: 
     460                        continue 
    450461                    incident = self.load_incident(abs_fn) 
    451462                    rel_fn = os.path.join("incidents", tubid_s, fn) 
    452463                    self.classify_incident(rel_fn, tubid_s, incident) 
     464                    count += 1 
     465        print >>stdout, "done classifying %d stored incidents" % count 
    453466 
    454467    def load_incident(self, abs_fn): 
     
    480493    def new_incident(self, abs_fn, rel_fn, tubid_s, incident): 
    481494        stdout = self.stdout or sys.stdout 
    482         categories = self.classify_incident(rel_fn, tubid_s, incident) 
    483         print >>stdout, "GOT INCIDENT %s [%s]" % (rel_fn, ",".join(categories)) 
     495        self.classify_incident(rel_fn, tubid_s, incident) 
    484496        self.incidents_received += 1 
    485497 
    486498    def classify_incident(self, rel_fn, tubid_s, incident): 
     499        stdout = self.stdout or sys.stdout 
    487500        categories = set() 
    488501        for f in self.classifiers: 
     
    499512            f.write(rel_fn + "\n") 
    500513            f.close() 
     514        print >>stdout, "classified %s as [%s]" % (rel_fn, ",".join(categories)) 
    501515        return categories 
    502516 
  • foolscap/pb.py

    r470 r480  
    391391            # RILogGatherer.logport requires a string for nodeid= 
    392392            tubID = '<unauth>' 
    393         rref.callRemote('logport', tubID, self.getLogPort()) 
     393        rref.callRemoteOnly('logport', tubID, self.getLogPort()) 
    394394 
    395395 
  • foolscap/test/test_logging.py

    r477 r480  
    10101010        # at startup. 
    10111011 
    1012         # give the call to remote_logport a chance to retire 
    1013         d.addCallback(self.stall, 0.5) 
    10141012        d.addCallback(lambda res: ig.disownServiceParent()) 
     1013 
     1014        def classify_boom(nodeid_s, (header,events)): 
     1015            if "boom" in header["trigger"].get("message",""): 
     1016                return "boom" 
     1017        def classify_foom(nodeid_s, (header,events)): 
     1018            if "foom" in header["trigger"].get("message",""): 
     1019                return "foom" 
    10151020 
    10161021        incident_d2 = defer.Deferred() 
    10171022        def _update_classifiers(res): 
    10181023            self.remove_classified_incidents(ig) 
    1019             def classify_boom(nodeid_s, (header,events)): 
    1020                 if "boom" in header["trigger"].get("message",""): 
    1021                     return "boom" 
    1022             def classify_foom(nodeid_s, (header,events)): 
    1023                 if "foom" in header["trigger"].get("message",""): 
    1024                     return "foom" 
    10251024            ig2 = self.create_incident_gatherer(basedir, [classify_boom]) 
    10261025            ig2.add_classifier(classify_foom) 
     
    10541053            self.failIf(os.path.exists(unknowns_fn)) 
    10551054        d.addCallback(_new_incident2) 
     1055        d.addCallback(lambda res: self.ig2.disownServiceParent()) 
     1056 
     1057        # if we remove just classified/boom, then those incidents should be 
     1058        # reclassified 
     1059 
     1060        def _remove_boom_incidents(res): 
     1061            booms_fn = os.path.join(ig.basedir, "classified", "boom") 
     1062            os.remove(booms_fn) 
     1063 
     1064            ig2a = self.create_incident_gatherer(basedir, [classify_boom, 
     1065                                                           classify_foom]) 
     1066            ig2a.setServiceParent(self.parent) 
     1067            self.ig2a = ig2a 
     1068 
     1069            # now classified/boom should be back, and the other files should 
     1070            # have been left untouched 
     1071            booms = [fn.strip() for fn in open(booms_fn,"r").readlines()] 
     1072            self.failUnlessEqual(len(booms), 1) 
     1073        d.addCallback(_remove_boom_incidents) 
     1074        d.addCallback(lambda res: self.ig2a.disownServiceParent()) 
    10561075 
    10571076        # and if we remove the classification functions (but do *not* remove 
     
    10591078        # anything 
    10601079 
    1061         d.addCallback(self.stall, 0.5) 
    1062         d.addCallback(lambda res: self.ig2.disownServiceParent()) 
    10631080        def _update_classifiers_again(res): 
    10641081            ig3 = self.create_incident_gatherer(basedir) 
     
    10771094        d.addCallback(_update_classifiers_again) 
    10781095 
    1079         # give the call to remote_logport a chance to retire 
    1080         d.addCallback(self.stall, 0.5) 
    10811096        d.addCallback(lambda res: self.ig3.disownServiceParent()) 
    10821097 
     
    10991114        d.addCallback(lambda res: 
    11001115                      self.poll(lambda : self.ig4.incidents_received == 2)) 
    1101  
    1102         d.addBoth(self.stall, 0.5) # allow shutdown even on error 
    11031116 
    11041117        return d