|
public synchronized List chooseUnderReplicatedBlocks(
int blocksToProcess) {
// initialize data structure for the return value
List blocksToReplicate = new ArrayList(LEVEL);
for (int i = 0; i < LEVEL; i++) {
blocksToReplicate.add(new ArrayList());
}
if (size() == 0) { // There are no blocks to collect.
return blocksToReplicate;
}
int blockCount = 0;
for (int priority = 0; priority < LEVEL; priority++) {
// Go through all blocks that need replications with current priority.
BlockIterator neededReplicationsIterator = iterator(priority);
Integer replIndex = priorityToReplIdx.get(priority);
// skip to the first unprocessed block, which is at replIndex
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
neededReplicationsIterator.next();
}
blocksToProcess = Math.min(blocksToProcess, size());
if (blockCount == blocksToProcess) {
break; // break if already expected blocks are obtained
}
// Loop through all remaining blocks in the list.
while (blockCount < blocksToProcess
&& neededReplicationsIterator.hasNext()) {
Block block = neededReplicationsIterator.next();
blocksToReplicate.get(priority).add(block);
replIndex++;
blockCount++;
}
if (!neededReplicationsIterator.hasNext()
&& neededReplicationsIterator.getPriority() == LEVEL - 1) {
// reset all priorities replication index to 0 because there is no
// recently added blocks in any list.
for (int i = 0; i < LEVEL; i++) {
priorityToReplIdx.put(i, 0);
}
break;
}
priorityToReplIdx.put(priority, replIndex);
}
return blocksToReplicate;
}
|
|
|