#spring-boot #event-handling #microservices #spring-cloud #cqrs
Вопрос:
Весь этот код доступен по адресу: https://github.com/Naresh-Chaurasia/API-MicroServices-Kafka/tree/master/Microservices-CQRS-SAGA-Kafka
У меня есть следующая схема вариантов использования:
ProductsCommandКонтроллер
@RestController
@RequestMapping("/products")
public class ProductsCommandController {
private final Environment env;
private final CommandGateway commandGateway;
@Autowired
public ProductsCommandController(Environment env, CommandGateway commandGateway) {
this.env = env;
this.commandGateway = commandGateway;
}
@PostMapping
public String createProducts(@RequestBody CreateProductRestModel createProductRestModel) throws InterruptedException {
CreateProductCommand createProductCommand = CreateProductCommand.builder()
.price(createProductRestModel.getPrice())
.quantity(createProductRestModel.getQuantity())
.title(createProductRestModel.getTitle())
.productId(UUID.randomUUID().toString()).build();
String returnValue = commandGateway.sendAndWait(createProductCommand);
return returnValue;
}
}
Команда создания продукта
@Builder
@Data
public class CreateProductCommand {
@TargetAggregateIdentifier
private final String productId;
private final String title;
private final BigDecimal price;
private final Integer quantity;
}
Устройство для обработки деталей
@Component
public class ProductEventsHandler {
private final ProductsRepository productsRepository;
private static final Logger LOGGER = LoggerFactory.getLogger(ProductEventsHandler.class);
public ProductEventsHandler(ProductsRepository productsRepository) {
this.productsRepository = productsRepository;
}
@EventHandler
public void on(ProductCreatedEvent event) {
ProductEntity productEntity = new ProductEntity();
BeanUtils.copyProperties(event, productEntity);
try {
productsRepository.save(productEntity);
} catch (IllegalArgumentException ex) {
ex.printStackTrace();
}
}
}
Товары для ремонта
public interface ProductsRepository extends JpaRepository<ProductEntity, String> {
ProductEntity findByProductId(String productId);
ProductEntity findByProductIdOrTitle(String productId, String title);
}
Работая над CQR, я столкнулся со следующим заявлением:
Итак, теперь, когда у нас создан объект command, мы готовы отправить эту команду в command Gateway, и мы введем объект command gateway в наш класс контроллера. А командный шлюз-это объект, который будет использоваться для отправки этого созданного нами командного объекта на командную шину. Поэтому думайте о командном шлюзе как об API для отправки команд, а командная шина-это механизм, который получает эту команду и направляет ее обработчику команд.
Мой вопрос заключается в следующем:
Какой тип командной шины создается. Можем ли мы заменить эту шину другими устройствами, такими как очереди/темы/кафка. Если да, то как мы можем это сделать?
Кроме того, можно ли просмотреть содержимое автобуса.
Пожалуйста, ведите.
Заранее спасибо.
Ответ №1:
Кафка и тому подобное вполне пригодны для использования в качестве командной шины в этой архитектуре. В случае Кафки тема (или, возможно, правильнее сказать, раздел темы, поскольку порядок гарантируется только в разделе), как правило, представляет собой командную шину.
В общем случае командная шина (как она, по-видимому, используется в этом контексте) может не позволять видеть ее содержимое (например, команды могут отправляться через конечные точки HTTP; если обработчик для этой конечной точки также не сохранит команду где-либо (пример поиска команд), вы не сможете (при отсутствии таких вещей, как захват пакетов) увидеть содержимое шины); должна ли командная шина быть интроспективной и/или постоянной, безусловно, учитывается при разработке/выборе/реализации.