Может ли «гонка данных» происходить с триггерами в MariaDB с InnoDB?

#triggers #innodb #data-race #mariadb-10.5

#триггеры #innodb #гонка данных #mariadb-10.5

Вопрос:

Мое приложение обрабатывает очень большой объем данных в режиме реального времени (> 200 миллионов в день), и мне нужно агрегировать их в режиме реального времени, чтобы поддерживать производительность отчетов. Данные передаются и, следовательно, обрабатываются сервером случайным образом несколькими потоками.

Я использую MariaDB 10.5.6-MariaDB с InnoDB 10.5.6

  • Знаете ли вы, является ли триггер потокобезопасным, т.Е. Может ли вообще произойти гонка данных. Другими словами, когда 1000 обновлений — только приращение — происходит с одними и теми же столбцами в одной строке в течение секунды при 10 соединениях, тогда данные не будут перепутаны, и результат будет таким, как будто значения были суммированы одним соединением последовательно.
  • Знаете ли вы, как работает блокировка на уровне строк, и является ли она автоматической или может быть применена вручную.

Также было бы полезно поделиться некоторыми из ваших соответствующих закладок, потому что я не нашел ничего краткого и полезного в Google.

Обновить

Я добавил триггер после вставки, который создавал новую запись в таблице отчетов, если запись не существовала, а затем обновлял столбцы с помощью инструкции update update table set field=value delta where condition . Базе данных это не понравилось, и приложение — java, hibernate -, которое отправляло данные, тоже не выдержало и начало выдавать исключения:

  • Это совершенно не имеет отношения к строке, которую hibernate пытался вставить, потому что она не пыталась обновить. Очевидно, что это происходит от триггера MariaDB: Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)
  • Я не уверен, почему это произошло, но и получил кое-что из этого: Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction

Ответ №1:

Я обнаружил, что триггер не был потокобезопасным в том смысле, что база данных начала выдавать разные ошибки при одновременном обновлении одной и той же строки:

  • Строка была обновлена или удалена другой транзакцией (или отображение несохраненных значений было неправильным)
  • При попытке получить блокировку обнаружена взаимоблокировка; попробуйте перезапустить транзакцию

Я пытался ввести блокировку на уровне строк, но это вообще не сработало. Я полагаю, что блокировка была проигнорирована или строки вообще не были заблокированы

 $ grep "ExecStart=" /usr/lib/systemd/system/mariadb.service 
ExecStart=/usr/sbin/mariadbd --autocommit=0 --transaction-isolation=read-committed $MYSQLD_OPTS $_WSREP_NEW_CLUSTER $_WSREP_START_POSITION
 
  • Автоматическая фиксация была отключена
  • Изоляция транзакций была изменена на чтение с фиксацией
  • Пробовал блокировку на уровне строк с SELECT what FROM tables WHERE conditions FOR UPDATE использованием первичных ключей

Пробовал также эквивалентное решение для блокировки на уровне таблицы с сохранением данных в одном потоке, но оно не справилось с объемом данных, который у меня был.

Решение, которое я выбрал, — это разделение обработки потока на уровне потока от сохранения таким образом, что несколько потоков обрабатывают входящий поток данных и создают объекты сущностей для другого набора потоков, чтобы сохранить их в базе данных. Это позволяет мне экспериментировать и находить оптимальное количество потоков на область для моей платформы, например, в настоящее время я тестирую с 8 потоками, обрабатывающими входящий поток и создающими объекты сущностей для еще 4 потоков, которые отвечают за их сохранение в базе данных. Для постоянных потоков я ввел некоторую интеллектуальную сегрегацию и пользовательскую блокировку набора объектов на прикладном уровне, чтобы гарантировать, что никакие два потока не пытаются обновить одну и ту же строку одновременно. Кажется, это работает, теперь мне просто нужно найти нужное количество потоков для обеих областей.

Это потребительский класс, который создает отставание для авторов БД

     protected abstract Map<String, Set<ENTITY>> breakDownToBatchesForPersistance(Collection<ENTITY> localBacklog);

    private void saveEntitiesInBatches(IDefaultEntityDAO<ENTITY> dao, Collection<ENTITY> localBacklog) {
            for (Map.Entry<String, Set<ENTITY>> entry : breakDownToBatchesForPersistance(localBacklog).entrySet()) {
                persister.saveAll(dao, entry.getKey(), entry.getValue());
            }
    }
 

Это отставание для авторов БД

     private LinkedBlockingQueue<Key> keys;
    private Map<Key, Set> backlog;

    public <ENTITY> void saveAll(IDefaultEntityDAO<ENTITY> dao, String bucket, Set<ENTITY> entitySet) {
        Key<ENTITY> key = Key.get(dao, bucket);
        synchronized (key) {
            synchronized (backlog) {
                if (backlog.containsKey(key)) {
                    backlog.get(key).addAll(entitySet);
                } else {
                    backlog.put(key, entitySet);
                    try {
                        keys.put(key);
                    } catch (InterruptedException ex) {
                    }
                }
            }
        }
    }
 

Это ядро записи БД

     private void processDBBatchUpdate(Key key) {
        synchronized (key) {
            Set set;
            synchronized (backlog) {
                set = backlog.remove(key);
            }

            key.getDao().saveAll(set);
        }
    }
 

Это ключевой класс для блокировки

     private IDefaultEntityDAO<ENTITY> dao;
    private String bucket;
    private static Map<IDefaultEntityDAO, Map<Object, Key>> keys = new HashMap<>();

    private Key(IDefaultEntityDAO dao, String bucket) {
        this.dao = dao;
        this.bucket = bucket;
    }

    public static synchronized <ENTITY> Key<ENTITY> get(IDefaultEntityDAO<ENTITY> dao, String bucket) {
        if (!keys.containsKey(dao)) {
            keys.put(dao, new HashMap<>());
        }

        if (!keys.get(dao).containsKey(bucket)) {
            keys.get(dao).put(bucket, new Key(dao, bucket));
        }

        return keys.get(dao).get(bucket);
    }