#java #hadoop
#java #hadoop
Вопрос:
Я написал этот код внутри метода run() класса Reducer в Hadoop
@Override
public void run(Context context) throws IOException, InterruptedException {
setup(context);
ConcurrentHashMap<String, HashSet<Text>> map = new ConcurrentHashMap<String, HashSet<Text>>();
while (context.nextKey()) {
String line = context.getCurrentKey().toString();
HashSet<Text> values = new HashSet<Text>();
for (Text t : context.getValues()) {
values.add(new Text(t));
}
map.put(line, new HashSet<Text>());
for (Text t : values) {
map.get(line).add(new Text(t));
}
}
ConcurrentHashMap<String, HashSet<Text>> newMap = new ConcurrentHashMap<String, HashSet<Text>>();
for (String keyToMerge : map.keySet()) {
String[] keyToMergeTokens = keyToMerge.split(",");
for (String key : map.keySet()) {
String[] keyTokens = key.split(",");
if (keyToMergeTokens[keyToMergeTokens.length - 1].equals(keyTokens[0])) {
String newKey = keyToMerge;
for (int i = 1; i < keyTokens.length; i ) {
newKey = "," keyTokens[i];
}
if (!newMap.contains(newKey)) {
newMap.put(newKey, new HashSet<Text>());
for (Text t : map.get(keyToMerge)) {
newMap.get(newKey).add(new Text(t));
}
}
for (Text t : map.get(key)) {
newMap.get(newKey).add(new Text(t));
}
}
}
//call the reducers
for (String key : newMap.keySet()) {
reduce(new Text(key), newMap.get(key), context);
}
cleanup(context);
}
моя проблема в том, что даже если мои входные данные слишком малы, для их последовательного запуска требуется 30 минут из-за вызова newMap.put() . Если я помещаю эту команду в комментарии, она выполняется быстро без каких-либо проблем.
Как вы можете видеть, я использую ConcurrentHashMap. Я не хотел его использовать, потому что я думаю, что run() вызывается только один раз на каждой машине (он не запускается одновременно), поэтому у меня не было бы никаких проблем с простой хэш-картой, но если я заменю ConcurrentHashMap простой хэш-картой, я получаю ошибку (concurrentModificationError).
У кого-нибудь есть идея о том, как заставить его работать без каких-либо задержек?
заранее спасибо!
* java6 * hadoop 1.2.1
Комментарии:
1. Может ли это быть кандидатом для CodeReview?
2. Я этого не понял. Можете ли вы объяснить?
3. @developer Если у вас есть рабочий код, который вы хотели бы улучшить, Code Review SE — хорошее место для его размещения.
4. где конкретно возникает исключение одновременной модификации? Это почти наверняка вызвано изменением карты во время итерации по набору ключей, набору записей или значениям.
5. Поскольку метод java6 String.split() является убийцей производительности, вместо этого используйте org.apache.commons.lang3.StringUtils.splitPreserveAllTokens(String str,разделитель символов)
Ответ №1:
Я не знаю, решит ли это ваши проблемы с производительностью, но я вижу одну неэффективную вещь, которую вы делаете :
newMap.put(newKey, new HashSet<Text>());
for (Text t : map.get(keyToMerge)) {
newMap.get(newKey).add(new Text(t));
}
Было бы эффективнее сохранить HashSet в переменной, а не искать его в newMap :
HashSet<Text> newSet = new HashSet<Text>();
newMap.put(newKey, newSet);
for (Text t : map.get(keyToMerge)) {
newSet.add(new Text(t));
}
Еще одна неэффективная вещь, которую вы делаете, — это создание хэш-набора значений, а затем создание другого идентичного хэш-набора для размещения на карте. Поскольку исходный HashSet ( values
) больше никогда не используется, вы создаете все эти текстовые объекты без всякой причины.
Вместо:
while (context.nextKey()) {
String line = context.getCurrentKey().toString();
HashSet<Text> values = new HashSet<Text>();
for (Text t : context.getValues()) {
values.add(new Text(t));
}
map.put(line, new HashSet<Text>());
for (Text t : values) {
map.get(line).add(new Text(t));
}
}
Вы можете просто написать :
while (context.nextKey()) {
String line = context.getCurrentKey().toString();
HashSet<Text> values = new HashSet<Text>();
for (Text t : context.getValues()) {
values.add(new Text(t));
}
map.put(line, values);
}
Редактировать :
Я только что увидел дополнительный код, который вы опубликовали в качестве ответа (из вашего cleanup()
метода) :
//clear map
for (String s : map.keySet()) {
map.remove(s);
}
map = null;
//clear newMap
for (String s : newMap.keySet()) {
newMap.remove(s);
}
newMap = null;
Причина, по которой этот код дает вам ConcurrentModificationError
, заключается в том, что циклы foreach не поддерживают модификацию коллекции, которую вы перебираете.
Чтобы преодолеть это, вы можете использовать итератор :
//clear map
Iterator<Map.Entry<String, HashSet<Text>>> iter1 = map.entrySet ().iterator ();
while (iter1.hasNext()) {
Map.Entry<String, HashSet<Text>> entry = iter1.next();
iter1.remove();
}
map = null;
//clear newMap
Iterator<Map.Entry<String, HashSet<Text>>> iter2 = newMap.entrySet ().iterator ();
while (iter2.hasNext()) {
Map.Entry<String, HashSet<Text>> entry = iter2.next();
iter2.remove();
}
newMap = null;
Тем не менее, вам действительно не нужно удалять каждый элемент отдельно.
Вы можете просто написать
map = null;
newMap = null;
Когда вы удаляете ссылку на карты, сборщик мусора может собирать их мусор. Удаление элементов с карт не имеет значения.
Комментарии:
1. Я думал, что у меня это было в O (1) раз. в любом случае спасибо, но это ничто перед основной проблемой: (