Changeset 472:f7bf19c86baa

Show
Ignore:
Timestamp:
08/28/08 18:37:04 (4 months ago)
Author:
Brian Warner <warner@allmydata.com>
branch:
default
Message:

incident-gatherer: only fetch one incident at a time. Closes #85.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • ChangeLog

    r471 r472  
    112008-08-28  Brian Warner  <warner@lothar.com> 
     2 
     3        * foolscap/logging/gatherer.py (IncidentObserver): only fetch one 
     4        incident at a time, to limit the size of the sender's outbound 
     5        queue. This should help close #85. 
     6        (IncidentGathererService.new_incident): include the classification 
     7        results in the per-incident log messages 
     8        * foolscap/test/test_logging.py (IncidentGatherer.test_emit): tests 
    29 
    310        * foolscap/logging/log.py (FoolscapLogger.declare_incident): 
  • foolscap/logging/gatherer.py

    r461 r472  
    88from zope.interface import implements 
    99from twisted.internet import reactor, utils, defer 
    10 from twisted.python import usage, procutils, filepath 
     10from twisted.python import usage, procutils, filepath, log as tw_log 
    1111from twisted.application import service, internet 
    1212import foolscap 
     
    310310        self.stdout = stdout 
    311311        self.caught_up_d = defer.Deferred() 
     312        self.incidents_wanted = [] 
     313        self.incident_fetch_outstanding = False 
    312314 
    313315    def connect(self): 
     
    332334 
    333335    def remote_new_incident(self, name, trigger): 
    334         print >>self.stdout, "got incident", name 
     336        print >>self.stdout, "new incident", name 
    335337        # name= should look like "incident-2008-07-29-204211-aspkxoi". We 
    336338        # prevent name= from containing path metacharacters like / or : by 
    337339        # using FilePath later on. 
     340        self.incidents_wanted.append( (name, trigger) ) 
     341        self.maybe_fetch_incident() 
     342 
     343    def maybe_fetch_incident(self): 
     344        # only fetch one incident at a time, to keep the sender's outbound 
     345        # memory usage to a reasonable level 
     346        if self.incident_fetch_outstanding: 
     347            return 
     348        if not self.incidents_wanted: 
     349            return 
     350        self.incident_fetch_outstanding = True 
     351        (name, trigger) = self.incidents_wanted.pop(0) 
     352        print >>self.stdout, "fetching incident", name 
    338353        d = self.publisher.callRemote("get_incident", name) 
    339354        d.addCallback(self._got_incident, name, trigger) 
    340         d.addCallback(lambda res: None) 
    341         return d 
     355        d.addErrback(tw_log.err, 
     356                     "IncidentObserver.get_incident or _got_incident") 
     357 
    342358    def _got_incident(self, incident, name, trigger): 
     359        self.incident_fetch_outstanding = False 
    343360        # We always save the incident to a .bz2 file. 
    344361        abs_fn = self.basedir.child(name).path # this prevents evil 
     
    350367        self.update_latest(name) 
    351368        self.gatherer.new_incident(abs_fn, rel_fn, self.tubid_s, incident) 
     369        self.maybe_fetch_incident() 
    352370 
    353371    def save_incident(self, filename, incident): 
     
    405423        self.classifiers.extend(classifiers) 
    406424        self.stdout = stdout 
     425        self.incidents_received = 0 # for tests 
    407426 
    408427    def add_classifier(self, f): 
     
    461480    def new_incident(self, abs_fn, rel_fn, tubid_s, incident): 
    462481        stdout = self.stdout or sys.stdout 
    463         print >>stdout, "NEW INCIDENT", rel_fn 
    464         self.classify_incident(rel_fn, tubid_s, incident) 
     482        categories = self.classify_incident(rel_fn, tubid_s, incident) 
     483        print >>stdout, "GOT INCIDENT %s [%s]" % (rel_fn, ",".join(categories)) 
     484        self.incidents_received += 1 
    465485 
    466486    def classify_incident(self, rel_fn, tubid_s, incident): 
     
    479499            f.write(rel_fn + "\n") 
    480500            f.close() 
     501        return categories 
    481502 
    482503 
  • foolscap/test/test_logging.py

    r471 r472  
    10601060            ig3 = self.create_incident_gatherer(basedir) 
    10611061            ig3.setServiceParent(self.parent) 
     1062            self.ig3 = ig3 
    10621063 
    10631064            unknowns_fn = os.path.join(ig.basedir, "classified", "unknown") 
     
    10741075        # give the call to remote_logport a chance to retire 
    10751076        d.addCallback(self.stall, 0.5) 
     1077        d.addCallback(lambda res: self.ig3.disownServiceParent()) 
     1078 
     1079        # and if we remove all the stored incidents (and the 'latest' 
     1080        # record), the gatherer will grab everything. This exercises the 
     1081        # only-grab-one-at-a-time code. I verified this manually, by adding a 
     1082        # print to the avoid-duplicate clause of 
     1083        # IncidentObserver.maybe_fetch_incident . 
     1084 
     1085        def _create_ig4(res): 
     1086            ig4 = self.create_incident_gatherer(basedir) 
     1087            for nodeid in os.listdir(os.path.join(ig4.basedir, "incidents")): 
     1088                nodedir = os.path.join(ig4.basedir, "incidents", nodeid) 
     1089                for fn in os.listdir(nodedir): 
     1090                    os.unlink(os.path.join(nodedir, fn)) 
     1091                os.rmdir(nodedir) 
     1092            ig4.setServiceParent(self.parent) 
     1093            self.ig4 = ig4 
     1094        d.addCallback(_create_ig4) 
     1095        d.addCallback(lambda res: 
     1096                      self.poll(lambda : self.ig4.incidents_received == 2)) 
     1097 
     1098        d.addCallback(self.stall, 0.5) 
     1099 
    10761100        return d 
    10771101