Changeset 472:f7bf19c86baa
- Timestamp:
- 08/28/08 18:37:04
(4 months ago)
- Author:
- Brian Warner <warner@allmydata.com>
- branch:
- default
- Message:
incident-gatherer: only fetch one incident at a time. Closes #85.
-
Files:
-
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
| r471 |
r472 |
|
| 1 | 1 | 2008-08-28 Brian Warner <warner@lothar.com> |
|---|
| | 2 | |
|---|
| | 3 | * foolscap/logging/gatherer.py (IncidentObserver): only fetch one |
|---|
| | 4 | incident at a time, to limit the size of the sender's outbound |
|---|
| | 5 | queue. This should help close #85. |
|---|
| | 6 | (IncidentGathererService.new_incident): include the classification |
|---|
| | 7 | results in the per-incident log messages |
|---|
| | 8 | * foolscap/test/test_logging.py (IncidentGatherer.test_emit): tests |
|---|
| 2 | 9 | |
|---|
| 3 | 10 | * foolscap/logging/log.py (FoolscapLogger.declare_incident): |
|---|
| r461 |
r472 |
|
| 8 | 8 | from zope.interface import implements |
|---|
| 9 | 9 | from twisted.internet import reactor, utils, defer |
|---|
| 10 | | from twisted.python import usage, procutils, filepath |
|---|
| | 10 | from twisted.python import usage, procutils, filepath, log as tw_log |
|---|
| 11 | 11 | from twisted.application import service, internet |
|---|
| 12 | 12 | import foolscap |
|---|
| … | … | |
| 310 | 310 | self.stdout = stdout |
|---|
| 311 | 311 | self.caught_up_d = defer.Deferred() |
|---|
| | 312 | self.incidents_wanted = [] |
|---|
| | 313 | self.incident_fetch_outstanding = False |
|---|
| 312 | 314 | |
|---|
| 313 | 315 | def connect(self): |
|---|
| … | … | |
| 332 | 334 | |
|---|
| 333 | 335 | def remote_new_incident(self, name, trigger): |
|---|
| 334 | | print >>self.stdout, "got incident", name |
|---|
| | 336 | print >>self.stdout, "new incident", name |
|---|
| 335 | 337 | # name= should look like "incident-2008-07-29-204211-aspkxoi". We |
|---|
| 336 | 338 | # prevent name= from containing path metacharacters like / or : by |
|---|
| 337 | 339 | # using FilePath later on. |
|---|
| | 340 | self.incidents_wanted.append( (name, trigger) ) |
|---|
| | 341 | self.maybe_fetch_incident() |
|---|
| | 342 | |
|---|
| | 343 | def maybe_fetch_incident(self): |
|---|
| | 344 | # only fetch one incident at a time, to keep the sender's outbound |
|---|
| | 345 | # memory usage to a reasonable level |
|---|
| | 346 | if self.incident_fetch_outstanding: |
|---|
| | 347 | return |
|---|
| | 348 | if not self.incidents_wanted: |
|---|
| | 349 | return |
|---|
| | 350 | self.incident_fetch_outstanding = True |
|---|
| | 351 | (name, trigger) = self.incidents_wanted.pop(0) |
|---|
| | 352 | print >>self.stdout, "fetching incident", name |
|---|
| 338 | 353 | d = self.publisher.callRemote("get_incident", name) |
|---|
| 339 | 354 | d.addCallback(self._got_incident, name, trigger) |
|---|
| 340 | | d.addCallback(lambda res: None) |
|---|
| 341 | | return d |
|---|
| | 355 | d.addErrback(tw_log.err, |
|---|
| | 356 | "IncidentObserver.get_incident or _got_incident") |
|---|
| | 357 | |
|---|
| 342 | 358 | def _got_incident(self, incident, name, trigger): |
|---|
| | 359 | self.incident_fetch_outstanding = False |
|---|
| 343 | 360 | # We always save the incident to a .bz2 file. |
|---|
| 344 | 361 | abs_fn = self.basedir.child(name).path # this prevents evil |
|---|
| … | … | |
| 350 | 367 | self.update_latest(name) |
|---|
| 351 | 368 | self.gatherer.new_incident(abs_fn, rel_fn, self.tubid_s, incident) |
|---|
| | 369 | self.maybe_fetch_incident() |
|---|
| 352 | 370 | |
|---|
| 353 | 371 | def save_incident(self, filename, incident): |
|---|
| … | … | |
| 405 | 423 | self.classifiers.extend(classifiers) |
|---|
| 406 | 424 | self.stdout = stdout |
|---|
| | 425 | self.incidents_received = 0 # for tests |
|---|
| 407 | 426 | |
|---|
| 408 | 427 | def add_classifier(self, f): |
|---|
| … | … | |
| 461 | 480 | def new_incident(self, abs_fn, rel_fn, tubid_s, incident): |
|---|
| 462 | 481 | stdout = self.stdout or sys.stdout |
|---|
| 463 | | print >>stdout, "NEW INCIDENT", rel_fn |
|---|
| 464 | | self.classify_incident(rel_fn, tubid_s, incident) |
|---|
| | 482 | categories = self.classify_incident(rel_fn, tubid_s, incident) |
|---|
| | 483 | print >>stdout, "GOT INCIDENT %s [%s]" % (rel_fn, ",".join(categories)) |
|---|
| | 484 | self.incidents_received += 1 |
|---|
| 465 | 485 | |
|---|
| 466 | 486 | def classify_incident(self, rel_fn, tubid_s, incident): |
|---|
| … | … | |
| 479 | 499 | f.write(rel_fn + "\n") |
|---|
| 480 | 500 | f.close() |
|---|
| | 501 | return categories |
|---|
| 481 | 502 | |
|---|
| 482 | 503 | |
|---|
| r471 |
r472 |
|
| 1060 | 1060 | ig3 = self.create_incident_gatherer(basedir) |
|---|
| 1061 | 1061 | ig3.setServiceParent(self.parent) |
|---|
| | 1062 | self.ig3 = ig3 |
|---|
| 1062 | 1063 | |
|---|
| 1063 | 1064 | unknowns_fn = os.path.join(ig.basedir, "classified", "unknown") |
|---|
| … | … | |
| 1074 | 1075 | # give the call to remote_logport a chance to retire |
|---|
| 1075 | 1076 | d.addCallback(self.stall, 0.5) |
|---|
| | 1077 | d.addCallback(lambda res: self.ig3.disownServiceParent()) |
|---|
| | 1078 | |
|---|
| | 1079 | # and if we remove all the stored incidents (and the 'latest' |
|---|
| | 1080 | # record), the gatherer will grab everything. This exercises the |
|---|
| | 1081 | # only-grab-one-at-a-time code. I verified this manually, by adding a |
|---|
| | 1082 | # print to the avoid-duplicate clause of |
|---|
| | 1083 | # IncidentObserver.maybe_fetch_incident . |
|---|
| | 1084 | |
|---|
| | 1085 | def _create_ig4(res): |
|---|
| | 1086 | ig4 = self.create_incident_gatherer(basedir) |
|---|
| | 1087 | for nodeid in os.listdir(os.path.join(ig4.basedir, "incidents")): |
|---|
| | 1088 | nodedir = os.path.join(ig4.basedir, "incidents", nodeid) |
|---|
| | 1089 | for fn in os.listdir(nodedir): |
|---|
| | 1090 | os.unlink(os.path.join(nodedir, fn)) |
|---|
| | 1091 | os.rmdir(nodedir) |
|---|
| | 1092 | ig4.setServiceParent(self.parent) |
|---|
| | 1093 | self.ig4 = ig4 |
|---|
| | 1094 | d.addCallback(_create_ig4) |
|---|
| | 1095 | d.addCallback(lambda res: |
|---|
| | 1096 | self.poll(lambda : self.ig4.incidents_received == 2)) |
|---|
| | 1097 | |
|---|
| | 1098 | d.addCallback(self.stall, 0.5) |
|---|
| | 1099 | |
|---|
| 1076 | 1100 | return d |
|---|
| 1077 | 1101 | |
|---|