sách gpt4 ai đã đi

java - Camel JMS 事务处理不起作用

In lại 作者:行者123 更新时间:2023-12-02 02:22:23 28 4
mua khóa gpt4 Nike

我正在尝试使用事务工作获取 Camel 路由 JMS->HTTP4,但当发生异常时消息不会传输到 ActiveMQ.DLQ,我不明白为什么。

下面的示例说明了如果 REST 服务的服务器宕机并且无法下发路由会发生什么情况。

我得到了正确的异常:

2018-01-18 12:30:50:962-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN o.a.c.s.s.TransactionErrorHandler - Transaction rollback (0x30a1c779) redelivered(false) for (MessageId: ID:MGR-MacBook-Pro.local-51837-1516262355358-4:2:1:1:16 on ExchangeId: ID-MGR-MacBook-Pro-local-1516275047663-0-1) caught: java.net.ConnectException: Cannot connect to CORE REST  

2018-01-18 12:30:50:965-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN o.a.c.c.j.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - java.net.ConnectException: Cannot connect to CORE REST]
org.apache.camel.RuntimeCamelException: java.net.ConnectException: Cannot connect to CORE REST …

但是该消息已被消耗并从队列中删除。我的假设是使用事务/事务化 Camel 和 AMQ 可以解决此问题并将消息移动到 ActiveMQ.DLQ。

我已阅读《Camel in Action》第一版的第 9 章。并用谷歌搜索,但没有找到任何解决我的问题的方法。

我知道我可以创建/定义自己的 TransactionErrorHandler() 并将消息存储在我选择的队列中,但我的印象是,这是使用事务处理时的默认设置......

我正在使用已安装和配置的独立 ActiveMQ 5.15.2 vanilla。
Camel 2.20.1
MacOS 10.13.2 上的 Java 8_144

我的配置:

@Cấu hình
public class Config {
/**
* The Camel context.
*/
final CamelContext camelContext;


/**
* The Broker url.
*/
@Value("${jms.broker.url}")
private String brokerURL;


/**
* Instantiates a new Config.
*
* @param camelContext the sisyfos context
* @param metricRegistry the metric registry
*/
@Autowired
public Config(final CamelContext camelContext, final MetricRegistry metricRegistry) {
this.camelContext = camelContext;
this.metricRegistry = metricRegistry;
}

@Đậu
public ActiveMQConnectionFactory activeMQConnectionFactory() {
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerURL);
return activeMQConnectionFactory;
}

/**
* Pooled connection factory pooled connection factory.
*
* @return the pooled connection factory
*/
@Đậu
@Primary
public PooledConnectionFactory pooledConnectionFactory() {
final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setMaxConnections(8);
pooledConnectionFactory.setMaximumActiveSessionPerConnection(500);
pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory());
return pooledConnectionFactory;
}

/**
* Jms configuration jms configuration.
*
* @return the jms configuration
*/
@Đậu
public JmsConfiguration jmsConfiguration() {
final JmsConfiguration jmsConfiguration = new JmsConfiguration();
jmsConfiguration.setConnectionFactory(pooledConnectionFactory());
jmsConfiguration.setTransacted(true);
jmsConfiguration.setTransactionManager(transactionManager());
jmsConfiguration.setConcurrentConsumers(10);

return jmsConfiguration;
}

/**
* Transaction manager jms transaction manager.
*
* @return the jms transaction manager
*/
@Đậu
public JmsTransactionManager transactionManager() {
final JmsTransactionManager transactionManager = new JmsTransactionManager();
transactionManager.setConnectionFactory(pooledConnectionFactory());
return transactionManager;
}

/**
* Active mq component active mq component.
*
* @return the active mq component
*/
@Đậu
public ActiveMQComponent activeMQComponent(JmsConfiguration jmsConfiguration,
PooledConnectionFactory pooledConnectionFactory,
JmsTransactionManager transactionManager) {
final ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setConfiguration(jmsConfiguration);
activeMQComponent.setTransacted(true);
activeMQComponent.setUsePooledConnection(true);
activeMQComponent.setConnectionFactory(pooledConnectionFactory);
activeMQComponent.setTransactionManager(transactionManager);
return activeMQComponent;
}


}

我的路线:

    @Thành phần
public class SendToCore extends SpringRouteBuilder {

@Ghi đè
public void configure() throws Exception {
Logger.getLogger(SendToCore.class).info("Sending to CORE");


//No retries if first fails due to connection error
interceptSendToEndpoint("http4:*")
.choice()
.when(header("JMSRedelivered").isEqualTo("false"))
.throwException(new ConnectException("Cannot connect to CORE REST"))
.end();

from("activemq:queue:myIncomingQueue")
.transacted()
.setHeader(Exchange.CONTENT_TYPE, constant("application/xml"))
.to("http4:localhost/myRESTservice")
.log("${header.CamelHttpResponseCode}")
.end();
}
}

您可能会在某些 bean 中发现冗余声明,这就是我试图解决的问题...

添加一个指向我的 Github 存储库的链接,并用一个小型测试项目来说明这一点:
https://github.com/hakuseki/transacted

1 Câu trả lời

这可能是SpringBoot自动配置的问题。

nếu như消息丢失,Camel 的 ActiveMQ 组件不会发送到 DLQ,而是会自动提交消息,而不是等到工作完成。

更新:使示例与 Java Config 配合使用的步骤

Notice: my config does not have a transaction manager because it is not needed for your case. Instead just set in the ActiveMQComponent transacted ĐẾN ĐÚNG VẬYlazyCreateTransactionManager ĐẾN SAI. Then you got a "local" transaction with your broker and that is all you need.

  • 我从您的路由中删除了 .transacted()(需要事务管理器,但不需要“JMS 本地事务”路由)
  • 我在路由类中注释掉了您的错误处理程序(需要事务管理器,您可以使用默认的错误处理程序)
  • hiện hữuMainApplication中禁用JMS和ActiveMQ的自动配置:@SpringBootApplication(exclude = { JmsAutoConfiguration.class, ActiveMQAutoConfiguration.class})
  • 将您的 Java 配置替换为以下配置(改编自此问题:ConnectionFactory get destroyed before camel)

Java 配置:

@Value("${jms.broker.url}") 
String brokerURL;

@Đậu
public ActiveMQConnectionFactory connectionFactory() {
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerURL);
return activeMQConnectionFactory;
}

@Đậu
@Primary
public PooledConnectionFactory pooledConnectionFactory(ConnectionFactory cf) {
final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setMaxConnections(1);
pooledConnectionFactory.setConnectionFactory(cf);
return pooledConnectionFactory;
}

@Bean(name = "activemq")
@ConditionalOnClass(ActiveMQComponent.class)
public ActiveMQComponent activeMQComponent(ConnectionFactory connectionFactory) {
ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setConnectionFactory(connectionFactory);
activeMQComponent.setTransacted(true);
activeMQComponent.setLazyCreateTransactionManager(false);
return activeMQComponent;
}

最后,为了“运行”该路线,我添加了一个小型 Camel Route 测试

@RunWith(CamelSpringBootRunner.class)
@SpringBootTest(classes = MainApplication.class)
public class SampleCamelApplicationTest {

@Produce(uri = "activemq:queue:myIncomingQueue")
protected ProducerTemplate template;

@Bài kiểm tra
public void shouldProduceMessages() throws Exception {
template.sendBody("test");
Thread.sleep(20000); //wait for ActiveMQ redeliveries
}

}

如果我运行此测试,消息将发送到 ActiveMQ.DLQ.

希望这有帮助

关于java - Camel JMS 事务处理不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48321633/

28 4 0
行者123
Hồ sơ cá nhân

Tôi là một lập trình viên xuất sắc, rất giỏi!

Nhận phiếu giảm giá Didi Taxi miễn phí
Mã giảm giá Didi Taxi
Giấy chứng nhận ICP Bắc Kinh số 000000
Hợp tác quảng cáo: 1813099741@qq.com 6ren.com