String seqFileDir = "/home/hduser/tmp/kmeans/clusters-2/"
String pointsDir = "/home/hduser/tmp/kmeans/clusteredPoints/"
def conf = new Configuration()
FileSystem fs = new Path(seqFileDir).getFileSystem(conf)
Map<Integer, List<WeightedVectorWritable>> clusterIdToPoints = readPoints(new Path(pointsDir), new Configuration());
for (FileStatus seqFile: fs.globStatus(new Path(seqFileDir, "part-*"))) {
Path path = seqFile.getPath()
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
org.apache.hadoop.io.Writable key = reader.getKeyClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance();
org.apache.hadoop.io.Writable value = reader.getValueClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance();
while (reader.next(key, value)) {
Cluster cluster = (Cluster) value;
int id = cluster.getId()
int np = cluster.getNumPoints()
List<WeightedVectorWritable> points = clusterIdToPoints.get(cluster.getId());
if (points != null && points.size() > 4) {
for (Iterator<WeightedVectorWritable> iterator = points.iterator(); iterator.hasNext();) {
println(((NamedVector) iterator.next().getVector()).getName())
}
println "======================================"
}
}
}
private static Map<Integer, List<WeightedVectorWritable>> readPoints(Path pointsPathDir, Configuration conf)
throws IOException {
Map<Integer, List<WeightedVectorWritable>> result = new TreeMap<Integer, List<WeightedVectorWritable>>();
FileSystem fs = pointsPathDir.getFileSystem(conf);
FileStatus[] children = fs.listStatus(pointsPathDir, new PathFilter() {
@Override
public boolean accept(Path path) {
String name = path.getName();
return !(name.endsWith(".crc") || name.startsWith("_"));
}
});
for (FileStatus file: children) {
Path path = file.getPath();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
IntWritable key = reader.getKeyClass().asSubclass(IntWritable.class).newInstance();
WeightedVectorWritable value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance();
while (reader.next(key, value)) {
// value is the cluster id as an int, key is the name/id of the
// vector, but that doesn't matter because we only care about printing
// it
// String clusterId = value.toString();
List<WeightedVectorWritable> pointList = result.get(key.get());
if (pointList == null) {
pointList = new ArrayList<WeightedVectorWritable>();
result.put(key.get(), pointList);
}
pointList.add(value);
value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance();
}
}
return result;
}