Взаимоблокировка транзакционного канала Spring AMQP

#java #spring-transactions #spring-amqp #spring-rabbit

#java #spring-транзакции #spring-amqp #spring-rabbit

Вопрос:

У меня есть требование отправить сообщение экземпляру RabbitMQ с объектом JPA после его сохранения / очистки, что привело меня к настройке rabbitTemplate как channelTransacted .

Потребитель является внешним, но для создания интеграционного теста я добавил встроенный брокер (Apache QPid) и прослушиватель, чтобы иметь возможность проверять отправку сообщений.

Как указано в документации, я, похоже, столкнулся с тупиковой ситуацией:

Если у нас есть производители и потребители в одном приложении, мы можем оказаться в взаимоблокировке, когда производители блокируют соединение (потому что у брокера больше нет ресурсов), а потребители не могут их освободить (потому что соединение заблокировано). […]

Отдельный CachingConnectionFactory невозможен для производителей транзакций, которые выполняются в потоке-потребителе, поскольку они должны повторно использовать канал, связанный с транзакциями-потребителями.

Если я установил rabbitTemplate.channelTransacted = false , прослушиватель вызывается нормально, в противном случае harness.getNextInvocationDataFor просто ждет, пока не истечет время ожидания.

Я надеюсь, что все еще существует способ выполнить такого рода интеграционный тест и что, возможно, я настроил что-то неправильно.

Я пробовал как с simple , так и с direct типами прослушивателя, не имело никакого значения:

 queues:
  transactions: 'transactions'

spring:
  rabbitmq:
    host: rabbitmq
    username: guest
    password: guest

    dynamic: true # automatically create queues and exchanges on the RabbitMQ server
    template:
      routing-key: ${queues.transactions}
      retry.enabled: true
      # mandatory: true # interesting only for cases where a reply is expected

    # publisher-confirms: true # does not work in transacted mode
    publisher-returns: true # required to get notifications in case of send problems

    # used for integration tests
    listener:
      type: direct
      direct:
        retry:
          enabled: true
          stateless: false # needed when transacted mode is enabled
          max-attempts: 1 # since this is used just for integration tests, we don't want more
  

Я использую Spring Boot 2.1.3 с spring-boot-starter-amqp , который подключается spring-rabbit-2.1.4 , и Apache Qpid 7.1.1 в качестве встроенного брокера для теста:

 @RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.main.allow-bean-definition-overriding=true")
@AutoConfigureTestDatabase
@Transactional
@ActiveProfiles("test")
public class SalesTransactionGatewayTest {

    private static final String TRANSACTIONS_LISTENER = "transactions";

    @TestConfiguration
    @RabbitListenerTest(spy = false, capture = true)
    public static class Config {
        @Bean
        public SystemLauncher broker() throws Exception {
            SystemLauncher broker = new SystemLauncher();
            Map<String, Object> attributes = new HashMap<>();
            attributes.put(SystemConfig.TYPE, "Memory");
            attributes.put(SystemConfig.INITIAL_CONFIGURATION_LOCATION, "classpath:qpid-config.json");
            attributes.put(SystemConfig.STARTUP_LOGGED_TO_SYSTEM_OUT, false);
            broker.startup(attributes);
            return broker;
        }

        @Bean
        public Listener listener() {
            return new Listener();
        }
    }

    public static class Listener {
        @RabbitListener(id = TRANSACTIONS_LISTENER, queues = "${queues.transactions}")
        public void receive(SalesTransaction transaction) {
            Logger.getLogger(Listener.class.getName()).log(Level.INFO, "Received tx: {0}", transaction);
        }
    }

    @Before
    public void setUp() {
        // this makes the test work, setting it to `true` makes it deadlock
        rabbitTemplate.setChannelTransacted(false);
    }

    @Test
    public void shouldBeSentToGateway() throws Exception {
        SalesTransaction savedTransaction = service.saveTransaction(salesTransaction);

        InvocationData invocationData = this.harness.getNextInvocationDataFor(TRANSACTIONS_LISTENER, 10, TimeUnit.SECONDS);
        assertNotNull(invocationData);
        assertEquals(salesTransaction, invocationData.getArguments()[0]);
    }
}
  
 11:02:56.844 [SimpleAsyncTaskExecutor-1] INFO  org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer - SimpleConsumer [queue=transactions, consumerTag=sgen_1 identity=16ef3497] started
Mar 25, 2019 11:02:57 AM AmqpSalesTransactionGateway send
INFO: Sending transaction: 01a92a56-c93b-4d02-af66-66ef007c2817 w/ status COMPLETED
11:02:57.961 [main] INFO  org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Attempting to connect to: [localhost:5672]
11:02:57.972 [main] INFO  org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Created new connection: rabbitConnectionFactory.publisher#6d543ec2:0/SimpleConnection@79dd79eb [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56501]

java.lang.AssertionError
    at org.junit.Assert.fail(Assert.java:86)
    at org.junit.Assert.assertTrue(Assert.java:41)
    at org.junit.Assert.assertNotNull(Assert.java:712)
    at org.junit.Assert.assertNotNull(Assert.java:722)
  

Комментарии:

1. Неясно, куда вы отправляете сообщение с помощью RabbitTemplate, но отправленное сообщение не будет получено потребителем до тех пор, пока транзакция не будет зафиксирована.

2. saveTransaction отправляет сообщение, и действительно, когда транзакция фиксируется, сообщения отправляются, взаимоблокировки нет 😄 Ключевым моментом здесь является то, что тест не может быть помечен как @Transactional , даже с @Commit , поскольку транзакция фиксируется после завершения метода тестирования, в то время как утверждения прослушивателя проверяются внутри него. Спасибо!

3. Рассмотрите возможность использования TransactionTemplate в рамках вашего тестового примера.

4. Я не уверен, что это необходимо, поскольку транзакция все равно запускается при входе service.saveTransaction(salesTransaction) и фиксируется при выходе метода. Насколько я понимаю, для тестирования этого случая правильным является простое удаление @Transactional аннотации из теста.