Changeset 480:c2a9b31e1fd5
- Timestamp:
- 09/02/08 20:48:55
(4 months ago)
- Author:
- Brian Warner <warner@allmydata.com>
- branch:
- default
- Message:
gatterer: reclassify everything that isn't already classified. Closes #94.
-
Files:
-
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
| r479 |
r480 |
|
| 1 | 1 | 2008-09-02 Brian Warner <warner@lothar.com> |
|---|
| | 2 | |
|---|
| | 3 | * foolscap/pb.py (Tub._log_gatherer_connected): use callRemoteOnly |
|---|
| | 4 | to pass the logport to the gatherer: we don't need to hear about |
|---|
| | 5 | any problems it has. |
|---|
| | 6 | |
|---|
| | 7 | * foolscap/logging/gatherer.py |
|---|
| | 8 | (IncidentGathererService.classify_stored_incidents): reclassify |
|---|
| | 9 | everything that isn't already present in one of the classified/* |
|---|
| | 10 | files. This makes it a lot easier to iterate over the [start |
|---|
| | 11 | gatherer; see what is unknown; update classifiers; remove |
|---|
| | 12 | classified/unknown; repeat] loop. Also log classification events |
|---|
| | 13 | better. Closes #94. |
|---|
| | 14 | * foolscap/test/test_logging.py (IncidentGatherer.test_emit): test it |
|---|
| 2 | 15 | |
|---|
| 3 | 16 | * foolscap/test/common.py (PollMixin): replace the use of chained |
|---|
| r472 |
r480 |
|
| 436 | 436 | if not os.path.isdir(outputdir): |
|---|
| 437 | 437 | os.makedirs(outputdir) |
|---|
| 438 | | self.classify_stored_incidents(indir) |
|---|
| | 438 | self.classify_stored_incidents(indir) |
|---|
| 439 | 439 | GatheringBase.startService(self) |
|---|
| 440 | 440 | |
|---|
| 441 | 441 | def classify_stored_incidents(self, indir): |
|---|
| 442 | 442 | stdout = self.stdout or sys.stdout |
|---|
| 443 | | print >>stdout, "No classified/ directory: reclassifying stored incidents" |
|---|
| 444 | | # now classify all stored incidents |
|---|
| | 443 | print >>stdout, "classifying stored incidents" |
|---|
| | 444 | # now classify all stored incidents that aren't already classified |
|---|
| | 445 | already = set() |
|---|
| | 446 | outputdir = os.path.join(self.basedir, "classified") |
|---|
| | 447 | for category in os.listdir(outputdir): |
|---|
| | 448 | for line in open(os.path.join(outputdir, category), "r"): |
|---|
| | 449 | fn = line.strip() |
|---|
| | 450 | abs_fn = os.path.join(self.basedir, fn) |
|---|
| | 451 | already.add(abs_fn) |
|---|
| | 452 | print >>stdout, "%d incidents already classified" % len(already) |
|---|
| | 453 | count = 0 |
|---|
| 445 | 454 | for tubid_s in os.listdir(indir): |
|---|
| 446 | 455 | nodedir = os.path.join(indir, tubid_s) |
|---|
| … | … | |
| 448 | 457 | if fn.startswith("incident-"): |
|---|
| 449 | 458 | abs_fn = os.path.join(nodedir, fn) |
|---|
| | 459 | if abs_fn in already: |
|---|
| | 460 | continue |
|---|
| 450 | 461 | incident = self.load_incident(abs_fn) |
|---|
| 451 | 462 | rel_fn = os.path.join("incidents", tubid_s, fn) |
|---|
| 452 | 463 | self.classify_incident(rel_fn, tubid_s, incident) |
|---|
| | 464 | count += 1 |
|---|
| | 465 | print >>stdout, "done classifying %d stored incidents" % count |
|---|
| 453 | 466 | |
|---|
| 454 | 467 | def load_incident(self, abs_fn): |
|---|
| … | … | |
| 480 | 493 | def new_incident(self, abs_fn, rel_fn, tubid_s, incident): |
|---|
| 481 | 494 | stdout = self.stdout or sys.stdout |
|---|
| 482 | | categories = self.classify_incident(rel_fn, tubid_s, incident) |
|---|
| 483 | | print >>stdout, "GOT INCIDENT %s [%s]" % (rel_fn, ",".join(categories)) |
|---|
| | 495 | self.classify_incident(rel_fn, tubid_s, incident) |
|---|
| 484 | 496 | self.incidents_received += 1 |
|---|
| 485 | 497 | |
|---|
| 486 | 498 | def classify_incident(self, rel_fn, tubid_s, incident): |
|---|
| | 499 | stdout = self.stdout or sys.stdout |
|---|
| 487 | 500 | categories = set() |
|---|
| 488 | 501 | for f in self.classifiers: |
|---|
| … | … | |
| 499 | 512 | f.write(rel_fn + "\n") |
|---|
| 500 | 513 | f.close() |
|---|
| | 514 | print >>stdout, "classified %s as [%s]" % (rel_fn, ",".join(categories)) |
|---|
| 501 | 515 | return categories |
|---|
| 502 | 516 | |
|---|
| r470 |
r480 |
|
| 391 | 391 | # RILogGatherer.logport requires a string for nodeid= |
|---|
| 392 | 392 | tubID = '<unauth>' |
|---|
| 393 | | rref.callRemote('logport', tubID, self.getLogPort()) |
|---|
| | 393 | rref.callRemoteOnly('logport', tubID, self.getLogPort()) |
|---|
| 394 | 394 | |
|---|
| 395 | 395 | |
|---|
| r477 |
r480 |
|
| 1010 | 1010 | # at startup. |
|---|
| 1011 | 1011 | |
|---|
| 1012 | | # give the call to remote_logport a chance to retire |
|---|
| 1013 | | d.addCallback(self.stall, 0.5) |
|---|
| 1014 | 1012 | d.addCallback(lambda res: ig.disownServiceParent()) |
|---|
| | 1013 | |
|---|
| | 1014 | def classify_boom(nodeid_s, (header,events)): |
|---|
| | 1015 | if "boom" in header["trigger"].get("message",""): |
|---|
| | 1016 | return "boom" |
|---|
| | 1017 | def classify_foom(nodeid_s, (header,events)): |
|---|
| | 1018 | if "foom" in header["trigger"].get("message",""): |
|---|
| | 1019 | return "foom" |
|---|
| 1015 | 1020 | |
|---|
| 1016 | 1021 | incident_d2 = defer.Deferred() |
|---|
| 1017 | 1022 | def _update_classifiers(res): |
|---|
| 1018 | 1023 | self.remove_classified_incidents(ig) |
|---|
| 1019 | | def classify_boom(nodeid_s, (header,events)): |
|---|
| 1020 | | if "boom" in header["trigger"].get("message",""): |
|---|
| 1021 | | return "boom" |
|---|
| 1022 | | def classify_foom(nodeid_s, (header,events)): |
|---|
| 1023 | | if "foom" in header["trigger"].get("message",""): |
|---|
| 1024 | | return "foom" |
|---|
| 1025 | 1024 | ig2 = self.create_incident_gatherer(basedir, [classify_boom]) |
|---|
| 1026 | 1025 | ig2.add_classifier(classify_foom) |
|---|
| … | … | |
| 1054 | 1053 | self.failIf(os.path.exists(unknowns_fn)) |
|---|
| 1055 | 1054 | d.addCallback(_new_incident2) |
|---|
| | 1055 | d.addCallback(lambda res: self.ig2.disownServiceParent()) |
|---|
| | 1056 | |
|---|
| | 1057 | # if we remove just classified/boom, then those incidents should be |
|---|
| | 1058 | # reclassified |
|---|
| | 1059 | |
|---|
| | 1060 | def _remove_boom_incidents(res): |
|---|
| | 1061 | booms_fn = os.path.join(ig.basedir, "classified", "boom") |
|---|
| | 1062 | os.remove(booms_fn) |
|---|
| | 1063 | |
|---|
| | 1064 | ig2a = self.create_incident_gatherer(basedir, [classify_boom, |
|---|
| | 1065 | classify_foom]) |
|---|
| | 1066 | ig2a.setServiceParent(self.parent) |
|---|
| | 1067 | self.ig2a = ig2a |
|---|
| | 1068 | |
|---|
| | 1069 | # now classified/boom should be back, and the other files should |
|---|
| | 1070 | # have been left untouched |
|---|
| | 1071 | booms = [fn.strip() for fn in open(booms_fn,"r").readlines()] |
|---|
| | 1072 | self.failUnlessEqual(len(booms), 1) |
|---|
| | 1073 | d.addCallback(_remove_boom_incidents) |
|---|
| | 1074 | d.addCallback(lambda res: self.ig2a.disownServiceParent()) |
|---|
| 1056 | 1075 | |
|---|
| 1057 | 1076 | # and if we remove the classification functions (but do *not* remove |
|---|
| … | … | |
| 1059 | 1078 | # anything |
|---|
| 1060 | 1079 | |
|---|
| 1061 | | d.addCallback(self.stall, 0.5) |
|---|
| 1062 | | d.addCallback(lambda res: self.ig2.disownServiceParent()) |
|---|
| 1063 | 1080 | def _update_classifiers_again(res): |
|---|
| 1064 | 1081 | ig3 = self.create_incident_gatherer(basedir) |
|---|
| … | … | |
| 1077 | 1094 | d.addCallback(_update_classifiers_again) |
|---|
| 1078 | 1095 | |
|---|
| 1079 | | # give the call to remote_logport a chance to retire |
|---|
| 1080 | | d.addCallback(self.stall, 0.5) |
|---|
| 1081 | 1096 | d.addCallback(lambda res: self.ig3.disownServiceParent()) |
|---|
| 1082 | 1097 | |
|---|
| … | … | |
| 1099 | 1114 | d.addCallback(lambda res: |
|---|
| 1100 | 1115 | self.poll(lambda : self.ig4.incidents_received == 2)) |
|---|
| 1101 | | |
|---|
| 1102 | | d.addBoth(self.stall, 0.5) # allow shutdown even on error |
|---|
| 1103 | 1116 | |
|---|
| 1104 | 1117 | return d |
|---|