как убедиться, что блокировки освобождаются с помощью ef core и postgres?

#postgresql #entity-framework-core

#postgresql #entity-framework-core

Вопрос:

У меня есть консольная программа, которая перемещает данные между двумя разными серверами (DatabaseA и DatabaseB). База данных B является Postgres-сервером. Он вызывает множество хранимых процедур и других необработанных запросов. Я использую ExecuteSqlRaw много. Я также использую NpsqlBulk.EFCore. Программа использует один и тот же экземпляр контекста для DatabaseB в течение всего выполнения, необходимого для завершения. Каким-то образом я получаю блокировки для некоторых моих таблиц в DatabaseB, которые никогда не освобождаются. Это всегда происходит на моем столе mytable_fromdatabase_import . Код, выполняемый на этом, следующий:

     protected override void AddIdsNew()
    {
        var toAdd = IdsNotInDatabaseB();
        var newObjectsToAdd = GetByIds(toAdd).Select(Converter.ConvertAToB);
        DatabaseBContext.Database.ExecuteSqlRaw("truncate mytable_fromdatabase_import; ");
      
        var uploader = new NpgsqlBulkUploader(DatabaseBContext);
        uploader.Insert(newObjectsToAdd); // inserts data into mytable_fromdatabase_import

        DatabaseBContext.Database.ExecuteSqlRaw("call insert_myTable_from_importTable();");
        
    }
  

После того, как я запустил его, вся таблица больше недоступна, и когда я запрашиваю блокировки на сервере, я вижу, что есть процесс, который ее удерживает.
Как я могу убедиться, что этот процесс всегда закрывается и снимает блокировки с таблиц?
Я думал, что ef-core сделает это автоматически.

————Редактировать————

Я просто хотел добавить, что это не временная проблема во время запуска консоли. Когда я запускаю этот код и он завершен, моя таблица по-прежнему заблокирована, и ничто не может получить к ней доступ. Я понимаю, что контекст ef-core освободит все после его удаления (если по ошибке или по завершении)

Комментарии:

1. Кажется, вам не хватает commit

2. @a_horse_with_no_name я не открываю транзакцию явно, поэтому я должен зафиксировать?

3. Я не знаю EF core, но если таблица недоступна после усечения, это явно означает, что коммит отсутствует.

4. @a_horse_with_no_name hm ок. Возможно, это возможно. Я буду ждать, пока кто-нибудь ответит с большим знанием ef core, потому что, как я понял, я должен получать ошибки из NpgsqlBulkUploader. Для меня ExecuteSqlRaw кажется немного странным, как он запускается на сервере

Ответ №1:

Проблема не имела ничего общего с ef core, а с неправильно настроенным backupscript. Программа запущена без каких-либо изменений и работает нормально

Ответ №2:

Для конкретной задачи вам нужны правильные инструменты. Вероятно, у вас возникают блокировки при получении идентификаторов, а также при попытке не загружать уже импортированные записи. Эти шаги медленные!

Я бы предложил использовать linq2db (отказ от ответственности, я соавтор этой библиотеки) Для создания двух проектов с моделями из разных баз данных: Source.Model.csproj — установите linq2db.SQLServer назначения.Model.csproj — установите linq2db.PostgreSQL Следуйте инструкциям в шаблонах T4, как сгенерировать модель из двух баз данных. Это просто, и вы можете задать вопросы на сайте github linq2db.

Я опубликую вспомогательный класс, который я использовал для переноса таблиц в моем предыдущем проекте. Он дополнительно использует библиотеку CodeJam для сопоставления, но в вашем проекте, конечно, вы можете использовать Automapper.

 public class DataImporter
{
    private readonly DataConnection _source;
    private readonly DataConnection _destination;

    public DataImporter(DataConnection source, DataConnection destination)
    {
        _source = source;
        _destination = destination;
    }

    private long ImportDataPrepared<TSource, TDest>(IOrderedQueryable<TSource> source, Expression<Func<TSource, TDest>> projection) where TDest : class
    {
        var destination = _destination.GetTable<TDest>();
        var tableName = destination.TableName;
        var sourceCount = source.Count();
        if (sourceCount == 0)
            return 0;
        
        var currentCount = destination.Count();

        if (currentCount > sourceCount)
            throw new Exception($"'{tableName}' what happened here?.");

        if (currentCount >= sourceCount)
            return 0;
        
        IQueryable<TSource> sourceQuery = source;
        if (currentCount > 0)
            sourceQuery = sourceQuery.Skip(currentCount);

        var projected = sourceQuery.Select(projection);

        var copied =
            _destination.BulkCopy(
                new BulkCopyOptions
                {
                    BulkCopyType = BulkCopyType.MultipleRows,
                    RowsCopiedCallback = (obj) => RowsCopiedCallback(obj, currentCount, sourceCount, tableName)
                }, projected);
        return copied.RowsCopied;
    }

    private void RowsCopiedCallback(BulkCopyRowsCopied obj, int currentRows, int totalRows, string tableName)
    {
        var percent = (currentRows   obj.RowsCopied) / (double)totalRows * 100;
        Console.WriteLine($"Copied {percent:N2}% tto {tableName}");
    }


    public class ImporterHelper<TSource>
    {
        private readonly DataImporter _improrter;
        private readonly IOrderedQueryable<TSource> _sourceQuery;

        public ImporterHelper(DataImporter improrter, IOrderedQueryable<TSource> sourceQuery)
        {
            _improrter = improrter;
            _sourceQuery = sourceQuery;
        }
        
        public long To<TDest>() where TDest : class
        {
            var mapperBuilder = new MapperBuilder<TSource, TDest>();
            return _improrter.ImportDataPrepared(_sourceQuery, mapperBuilder.GetMapper().GetMapperExpressionEx());
        }
        
        public long To<TDest>(Expression<Func<TSource, TDest>> projection) where TDest : class
        {
            return _improrter.ImportDataPrepared(_sourceQuery, projection);
        }
    }
    
    public ImporterHelper<TSource> ImprortData<TSource>(IOrderedQueryable<TSource> source)
    {
        return new ImporterHelper<TSource>(this, source);
    }
}
  

Итак, начинайте перенос. Обратите внимание, что я использовал OrderBy/ThenBy для указания порядка идентификаторов, чтобы не импортировать уже переданные записи — важные поля порядка должны содержать уникальную комбинацию клавиш. Итак, этот пример является реентерабельным и может быть повторно запущен при потере соединения.

 var sourceBuilder = new LinqToDbConnectionOptionsBuilder();
sourceBuilder.UseSqlServer(SourceConnectionString);

var destinationBuilder = new LinqToDbConnectionOptionsBuilder();
destinationBuilder.UsePostgreSQL(DestinationConnectionString);


using (var source = new DataConnection(sourceBuilder.Build()))
using (var destination = new DataConnection(destinationBuilder.Build()))
{
    var dataImporter = new DataImporter(source, destination);

    dataImporter.ImprortData(source.GetTable<Source.Model.FirstTable>()
       .OrderBy(e => e.Id1)
       .ThenBy(e => e.Id2))
            .To<Dest.Model.FirstTable>();

    dataImporter.ImprortData(source.GetTable<Source.Model.SecondTable>().OrderBy(e => e.Id))
            .To<Dest.Model.SecondTable>();
}
  

Конечно, скучная часть с OrderBy может быть сгенерирована автоматически, но это приведет к взрыву, это уже не короткий ответ.

Также играйте с BulkCopyOptions. Собственная КОПИЯ Npgsql может завершиться ошибкой, и следует использовать многострочный вариант.

Комментарии:

1. это вообще не отвечает на мой вопрос, и мне кажется, вы просто используете его для рекламы своей библиотеки. мой вопрос был не в том, как быстрее передавать данные или какие инструменты использовать, а в том, как убедиться, что блокировки в базе данных сняты.

2. Я хочу показать вам простой способ передачи миллиона записей без блокировок. Я уверен, что IdsNotInDatabaseB() — это проблема. Также попробуйте сделать это не в транзакции. Я не хочу рекламировать свою библиотеку, она уже популярна среди людей, которые знают SQL.

3. IdsNotInDatabaseB не должен быть проблемой, он возвращает только хэш-наборы идентификаторов. Это просто очень простой выбор для hashset

4. Выполняет ли эта операция транзакцию? В любом случае это также может быть проблемой Npgsql.Bulk . Ранее в нашей библиотеке мы обнаружили, что КОПИРОВАНИЕ очень чувствительно к двоичному формату и на некоторых типах происходит сбой.