Nutch1.3集成Solr3.4网页快照功能实现(三)
public void reduce(Text key, Iterator values,OutputCollector output, Reporter reporter)
throws IOException {
Inlinks inlinks = null;
CrawlDatum dbDatum = null;
CrawlDatum fetchDatum = null;
ParseData parseData = null;
ParseText parseText = null;
byte[] cache_content = null;
while (values.hasNext()) {
final Writable value = values.next().get(); // unwrap
if (value instanceof Inlinks) {
inlinks = (Inlinks) value;
} else if (value instanceof CrawlDatum) {
final CrawlDatum datum = (CrawlDatum) value;
if (CrawlDatum.hasDbStatus(datum))
dbDatum = datum;
else if (CrawlDatum.hasFetchStatus(datum)) {
// don't index unmodified (empty) pages
if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED)
fetchDatum = datum;
} else if (CrawlDatum.STATUS_LINKED == datum.getStatus()
|| CrawlDatum.STATUS_SIGNATURE == datum.getStatus()
|| CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
continue;
} else {
throw new RuntimeException("Unexpected status: "
+ datum.getStatus());
}
} else if (value instanceof ParseData) {
parseData = (ParseData) value;
} else if (value instanceof ParseText) {
parseText = (ParseText) value;
}
else if (value instanceof Content) {
cache_content = ((Content) value).getContent();
}
else if (LOG.isWarnEnabled()) {
LOG.warn("Unrecognized type: " + value.getClass());
}
}
if (fetchDatum == null || dbDatum == null || parseText == null
|| parseData == null) {
return; // only have inlinks
}
if (!parseData.getStatus().isSuccess()
|| fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
return;
}
NutchDocument doc = new NutchDocument();
final Metadata metadata = parseData.getContentMeta();
// add segment, used to map from merged index back to segment files
doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY));
// add digest, used by dedup
doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));
doc.add("cache_content", cache_content);
final Parse parse = new ParseImpl(parseText, parseData);
try {
// extract information from dbDatum and pass it to
// fetchDatum so that indexing filters can use it
final Text url = (Text) dbDatum.getMetaData().get(
Nutch.WRITABLE_REPR_URL_KEY);
if (url != null) {
fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url);
}
// run indexing filters
doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);
} catch (final IndexingException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Error indexing " + key + ": " + e);
}
return;
}
// skip documents discarded by indexing filters
if (doc == null)
return;
float boost = 1.0f;
// run scoring filters
try {
boost = this.scfilters.indexerScore(key, doc, dbDatum, fetchDatum,
parse, inlinks, boost);
} catch (final ScoringFilterException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Error calculating score " + key + ": " + e);
}
return;
}
// apply boost to all indexed fields.
doc.setWeight(boost);
// store boost for use by explain and dedup
doc.add("boost", Float.toString(boost));
output.collect(key, doc);
}
页:
[1]