Неожиданно в разделе Spring при использовании synchronized

#multithreading #hibernate #spring-boot #spring-data-jpa #spring-batch

#весна #переход в спящий режим #весенняя загрузка #spring-data-jpa #spring-batch #spring

Вопрос:

Я использую Spring Batch и Partition для выполнения параллельной обработки. Спящий режим и Jpa Spring Data для базы данных. Для шага разделения считыватель, процессор и записывающее устройство имеют stepscope, и поэтому я могу ввести в них ключ раздела и диапазон (от-до). Теперь в процессоре у меня есть один синхронизированный метод, и я ожидал, что этот метод будет запускаться один раз за раз, но это не так.

Я настроил его на 10 разделов, все 10 элементов чтения считывают правильный разделенный диапазон. Проблема связана с обработчиком элементов. Код Blow имеет ту же логику, что и я.

 public class accountProcessor implementes ItemProcessor{
    @override
    public Custom process(item) {
        createAccount(item);
        return item;
    }

    //account has unique constraints username, gender, and email
    /*
        When 1 thread execute that method, it will create 1 account 
        and save it. If next thread comes in and  try to save the  same  account, 
        it  should find the account created by first thread and do one update. 
        But now it doesn't happen, instead findIfExist return null 
        and it  try to do another insert of duplicate data
    */
    private synchronized void createAccount(item) {
        Account account = accountRepo.findIfExist(item.getUsername(),  item.getGender(),  item.getEmail());
        if(account  == null) {
            //account  doesn't  exist
            account = new Account();
            account.setUsername(item.getUsername());
            account.setGender(item.getGender());
            account.setEmail(item.getEmail());
            account.setMoney(10000);
        } else {
            account.setMoney(account.getMoney()-10);
        }
        accountRepo.save(account);
    }
}
  

Ожидаемый результат заключается в том, что только 1 поток будет запускать этот метод в любой момент времени, так что не будет повторяющейся вставки в db, а также избежит исключения DataIntegrityViolationException.

Фактически результатом является то, что второй поток не может найти первую учетную запись и пытается создать дубликат учетной записи и сохранить в db, что вызовет исключение DataIntegrityViolationException, ошибку уникальных ограничений.

Поскольку я синхронизировал метод, поток должен выполнять его по порядку, второй поток должен дождаться завершения первого потока, а затем запуститься, что означает, что он должен быть в состоянии найти первую учетную запись.

Я пробовал использовать множество подходов, например, изменяемый набор, содержащий все уникальные учетные записи, выполняйте saveAndFlush, чтобы совершать коммиты как можно скорее, используя threadlocal вообще, ни один из них не работает.

Нужна некоторая помощь.

Комментарии:

1. Если ваш процессор находится в области step, то будет 10 экземпляров процессора

2. @PrabhakarD Вы имеете в виду, что для этих 10 процессоров у каждого из них будет свой собственный метод синхронизации?

Ответ №1:

Поскольку вы определили область действия процессора элемента, вам на самом деле не нужна синхронизация, поскольку на каждом шаге будет свой собственный экземпляр процессора.

Но, похоже, у вас проблема с дизайном, а не с реализацией. Вы пытаетесь синхронизировать потоки, чтобы они действовали в определенном порядке в параллельной настройке. Когда вы решаете пойти параллельно и разделить данные на разделы и предоставить каждому рабочему (локальному или удаленному) раздел для работы, вы должны признать, что эти разделы будут обрабатываться в неопределенном порядке и что не должно быть никакой связи между записями каждого раздела или между работой, выполняемой каждым рабочим.

Когда 1 поток выполнит этот метод, он создаст 1 учетную запись и сохранит ее. Если появится следующий поток и попытается сохранить ту же учетную запись, он должен найти учетную запись, созданную первым потоком, и выполнить одно обновление. Но теперь этого не происходит, вместо этого findIfExist возвращает null и пытается выполнить другую вставку дублирующихся данных

Это потому, что транзакция thread1, возможно, еще не зафиксирована, следовательно, thread2 не найдет запись, которая, по вашему мнению, была вставлена thread1.

Похоже, вы пытаетесь создать или обновить некоторые учетные записи с разделенной настройкой. Я не уверен, подходит ли эта настройка для рассматриваемой проблемы.

В качестве дополнительного примечания, я бы не вызывал accountRepo.save(account); в обработчике элементов, а скорее делал это в программе записи элементов.

Надеюсь, это поможет.

Комментарии:

1. Даже на этапе записи любой синхронизированный метод будет вести себя одинаково! Транзакция thread1 может не быть зафиксирована, а thread2 выполняет чтение! Любая другая предлагаемая реализация дизайна для достижения этой реализации?