- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用类似于此示例的 RabbitMQ 在 PHP 上构建 RPC 服务:http://www.rabbitmq.com/tutorials/tutorial-six-java.html我正在使用这个 PECL 扩展:http://pecl.php.net/package/amqp (版本 1.0.3)
问题是当我向服务器添加标志 AMQP_EXCLUSIVE 时,我的回调队列(在客户端脚本中声明)被锁定.
这是我的服务器
// connect to server
$cnn = new AMQPConnection('...');
$cnn->connect();
$channel = new AMQPChannel($cnn);
// create exchange
$exchangeName = 'k-exchange';
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
// declare queue to consume messages from
$queue = new \AMQPQueue($channel);
$queue->setName('tempQueue');
$queue->declare();
// start consuming messages
$queue->consume(function($envelope, $queue)
use ($channel, $exchange) {
// create callback queue
$callbackQueue = new \AMQPQueue($channel);
$callbackQueue->setName($envelope->getReplyTo());
$callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag
/* WARNING: Following code line causes error. See rabbit logs below:
* connection <0.1224.10>, channel 1 - error:
* {amqp_error,resource_locked,
* "cannot obtain exclusive access to locked queue 'amq.gen-Q6J...' in vhost '/'",
* 'queue.bind'}
*/
$callbackQueue->bind($exchange->getName(), 'rpc_reply');
// trying to publish response back to client's callback queue
$exchange->publish(
json_encode(array('processed by remote service!')),
'rpc_reply',
AMQP_MANDATORY & AMQP_IMMEDIATE
);
$queue->ack($envelope->getDeliveryTag());
});
这是我的Client.php
// connect to server
$cnn = new AMQPConnection('...');
$cnn->connect();
$channel = new AMQPChannel($cnn);
// create exchange
$exchangeName = 'k-exchange';
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
// create a queue which we send messages to server via
$queue = new \AMQPQueue($channel);
$queue->setName('tempQueue');
$queue->declare();
// binding exchange to queue
$queue->bind($exchangeName, 'temp_action');
// create correlation_id
$correlationId = sha1(time() . rand(0, 1000000));
// create anonymous callback queue to get server response response via
$callbackQueue = new \AMQPQueue($channel);
$callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag
$callbackQueue->declare();
// publishing message to exchange (passing it to server)
$exchange->publish(
json_encode(array('process me!')),
'temp_action',
AMQP_MANDATORY,
array(
'reply_to' => $callbackQueue->getName(), // pass callback queue name
'correlation_id' => $correlationId
)
);
// going to wait for remote service complete tasks. tick once a second
$attempts = 0;
while ($attempts < 5)
{
echo 'Attempt ' . $attempts . PHP_EOL;
$envelope = $callbackQueue->get();
if ($envelope) {
echo 'Got response! ';
print_r($envelope->getBody());
echo PHP_EOL;
ra;
}
sleep(1);
$attempts++;
}
所以最后我只在 RabbitMQ 的日志中看到错误:
connection <0.1224.10>, channel 1 - error:
{amqp_error,resource_locked,
"cannot obtain exclusive access to locked queue 'amq.gen-Q6J...' in vhost '/'",
'queue.bind'}
问题:在 Server.php 中创建回调队列对象的正确方法是什么?看来我的 Server.php 与 RabbitMQ 服务器的连接不同于 Client.php。我应该在这里做什么?我应该如何在 Server.php 端“共享”相同的(到 Client.php 的)连接。
làm mới这里有更多的 RabbitMQ 日志
我的 Server.php 连接(Id 是:<0.22322.27>)
=INFO REPORT==== 20-Jun-2012::13:30:22 ===
accepting AMQP connection <0.22322.27> (127.0.0.1:58457 -> 127.0.0.1:5672)
我的 Client.php 连接(Id 是:<0.22465.27>)
=INFO REPORT==== 20-Jun-2012::13:30:38 ===
accepting AMQP connection <0.22465.27> (127.0.0.1:58458 -> 127.0.0.1:5672)
现在我看到 Server.php 导致错误:
=ERROR REPORT==== 20-Jun-2012::13:30:38 ===
connection <0.22322.27>, channel 1 - error:
{amqp_error,resource_locked,
"cannot obtain exclusive access to locked queue 'amq.gen-g6Q...' in vhost '/'",
'queue.bind'}
我的假设我怀疑由于 Client.php 和 Server.php 不共享具有相同 ID 的连接,因此它们不可能都使用 Client.php 中声明的独占队列
1 Câu trả lời
您的实现存在一些问题:
您无需声明交换 (AMQPExchange) 即可发布消息。在此 RPC 示例中,您需要将其用作广播消息的方式(例如临时队列或临时交换)。所有通信都将直接在 QUEUE 上进行,理论上会绕过交换。
$exchange = new AMQPExchange($channel);
$exchange->publish(...);
当您将 AMQPQueue::setName() 与 AMQPQueue::declare() 一起使用时,您将绑定(bind)到一个具有用户定义名称的队列。如果您声明的队列没有名称,则称为临时队列。当您需要从特定路由键接收广播消息时,这很有用。为此,RabbitMQ/AMQP 生成一个随机的临时名称。由于队列名称是为给定实例专门使用信息而创建的,因此为了自身的利益,它会在连接关闭时被处理掉。
当 RPC 客户端想要发布消息 (AMQPExchange::publish()) 时,它必须将回复指定为发布参数之一。这样,RPC 服务器在收到请求时就可以获取随机生成的名称。它使用回复名称作为 QUEUE 的名称,服务器将在该 QUEUE 上回复给定的客户端。除了临时队列名称,实例还必须发送一个 correlationId 以确保它收到的回复消息对于请求实例是唯一的。
客户端
$exchange = new AMQPExchange($channel);
$rpcServerQueueName = 'rpc_queue';
$client_queue = new AMQPQueue($this->channel);
$client_queue->setFlags(AMQP_EXCLUSIVE);
$client_queue->declareQueue();
$callbackQueueName = $client_queue->getName(); //e.g. amq.gen-JzTY20BRgKO-HjmUJj0wLg
//Set Publish Attributes
$corrId = uniqid();
$attributes = array(
'correlation_id' => $corrId,
'reply_to' => $this->callbackQueueName
);
$exchange->publish(
json_encode(['request message']),
$rpcServerQueueName,
AMQP_NOPARAM,
$attributes
);
//listen for response
$callback = function(AMQPEnvelope $message, AMQPQueue $q) {
if($message->getCorrelationId() == $this->corrId) {
$this->response = $message->getBody();
$q->nack($message->getDeliveryTag());
return false; //return false to signal to consume that you're done. other wise it continues to block
}
};
$client_queue->consume($callback);
máy chủ
$exchange = new AMQPExchange($channel);
$rpcServerQueueName = 'rpc_queue';
$srvr_queue = new AMQPQueue($channel);
$srvr_queue->setName($rpcServerQueueName); //intentionally declares the rpc_server queue name
$srvr_queue->declareQueue();
...
$srvr_queue->consume(function(AMQPEnvelope $message, AMQPQueue $q) use (&$exchange) {
//publish with the exchange instance to the reply to queue
$exchange->publish(
json_encode(['response message']), //reponse message
$message->getReplyTo(), //get the reply to queue from the message
AMQP_NOPARAM, //disable all other params
$message->getCorrelationId() //obtain and respond with correlation id
);
//acknowledge receipt of the message
$q->ack($message->getDeliveryTag());
});
在这种情况下,EXCLUSIVE 仅用于每个实例的 Rpc 客户端临时队列,以便它可以发布消息。换句话说,客户端为自己创建一个一次性的临时队列,专门从 RPC 服务器接收一个应答。这确保没有其他 channel 线程可以在该队列上发布。它仅对客户端及其响应者锁定。请务必注意,AQMP_EXCLUSIVE 不会阻止 RPC 服务器响应客户端的回复队列。 AMQP_EXCLUSIVE 与尝试发布到同一队列资源的两个独立线程( channel 实例)有关。发生这种情况时,队列实质上已为后续连接锁定。交换声明也会发生相同的行为。
@Denis:在这种情况下,您的实现在一定程度上是正确的
不好 - 不要在服务器中重新声明队列。那是客户的工作
$callbackQueue = new \AMQPQueue($channel);
$callbackQueue->setName($envelope->getReplyTo());
$callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag
...
$callbackQueue->bind($exchange->getName(), 'rpc_reply');
您正在尝试绑定(bind)到名为 tempQueue 的 QUEUE。但是您已经在 client.php 中创建了一个名为 tempQueue 的队列。根据先启动哪个服务,另一个将抛出错误。所以你可以去掉所有这些,只保留最后一部分:
// trying to publish response back to client's callback queue
$exchange->publish(
json_encode(array('processed by remote service!')),
'rpc_reply', //<--BAD Should be: $envelope->getReplyTo()
AMQP_MANDATORY & AMQP_IMMEDIATE
);
然后通过替换修改上面的内容:
'rpc_reply'
với
$envelope->getReplyTo()
不要在客户端声明队列名称
// create a queue which we send messages to server via
$queue = new \AMQPQueue($channel);
//$queue->setName('tempQueue'); //remove this line
//add exclusivity
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declare();
//no need for binding... we're communicating on the queue directly
//there is no one listening to 'temp_action' so this implementation will send your message into limbo
//$queue->bind($exchangeName, 'temp_action'); //remove this line
关于locking - RabbitMQ 远程过程调用 : Exclusive queues locking @ PHP,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11104004/
如果我声明了类似的类型 type test(NSIZE) integer, len :: NSIZE real :: dummy(NSIZE) contains procedure,
我知道这是一个不太可能的事情,但是由于“选项私有(private)模块”的限制,甚至更糟糕的“私有(private)子/函数”的限制,有谁知道是否有一种方法可以从 Excel 应用程序隐藏 VBA 过
我有两个表,property 和 component。 component.id_property = property.id。 我正在尝试创建一个过程,该过程对所选属性的组件进行计数,如果所选属性没
我有一份报告,它是在 SSRS 2005 中开发的,我正在使用存储过程从数据库中获取结果。报告输出的结果非常简单,如下图所示。 如果假设我正在寻找不同的成员 例如:- MemberID c108 c
我需要一个通用函数/过程,该函数/过程将根据提供的数据计算出我的淡入淡出时间和值,如下所示: 我将字节值保存在字节数组中:这些是起始值。然后,我在其他数组中存储了一些值:这些将是新值。然后我有时间要提
我想在界面的多个按钮上创建相同的操作。是否只能通过创建单独的操作监听器方法并调用执行操作的方法才可行,还是还有其他方法?是否可以将按钮放在一个组中并执行以下操作:- groupButton.setOn
我有以下情况: procedure Test; begin repeat TryAgain := FALSE; try // Code // Code if this an
我正在尝试执行以下操作;假设我在 Oracle 中创建了一个对象类型 create type test as object( name varchar2(12), member procedure p
问题: 如果可能的话,如何声明一个用于任何类型参数的函数 T其中 T 的唯一约束是它被定义为 1D array如 type T is array ( integer range <> ) of a_r
我正在尝试创建这个 mysql 过程来制作一个包含今年所有日期和所有时间的表(以一小时为间隔。) CREATE TABLE FECHAS ( created_at datetime ); CREA
所以, 我在这里面临一个问题,这让我发疯,我认为这是一个愚蠢的错误,所以我不是 MySQL 的新手,但它并不像我想象的那样工作。 尝试将此语句部署到 MySQL 后,我收到此错误: ERROR 106
我有一个架构,其中包含星球大战中的人物列表、他们出现的电影、他们访问的行星等。这是架构: CREATE DATABASE IF NOT EXISTS `starwarsFINAL` /*!40100
我一直在为一家慈善机构创建一款应用程序,允许家庭在节日期间注册接收礼物。数据库组织有多个表。下面列出了这些表(及其架构/创建语句): CREATE TABLE IF NOT EXISTS ValidD
正如上面标题所解释的,我正在尝试编写一个sql函数来按日期删除表而不删除系统表。我在此消息下方放置了一张图片,以便直观地解释我的问题。任何帮助将不胜感激!感谢您的时间! 最佳答案 您可以通过查询INF
DELIMITER $$ CREATE PROCEDURE INSERT_NONE_HISTORY_CHECKBOX() BEGIN DECLARE note_id bigint(20); F
是否可以编写一个存储过程或触发器,在特定时间在数据库内部自动执行,而无需来自应用程序的任何调用?如果是,那么任何人都可以给我一个例子或链接到一些我可以阅读如何做到这一点的资源。 最佳答案 查看 pgA
我需要创建一个过程:1)从表中的字段中选择一些文本并将其存储在变量中2) 更新相同的记录字段,仅添加 yyyymmdd 格式的日期以及过程中的附加文本输入...类似这样的... delimiter /
好的,这就是我想做的: 如果条目已存在(例如基于字段name),则只需返回其id 如果没有,请添加 这是我迄今为止所管理的(对于“如果不存在,则创建它”部分): INSERT INTO `object
以下是我编写的程序,用于找出每位客户每天购买的前 10 件商品。 这是我尝试过的第一个 PL/SQL 操作。它没有达到我预期的效果。 我使用的逻辑是接受开始日期、结束日期以及我对每个客户感兴趣的前“x
我正在尝试在MySQL中创建一个过程那insert week s(当年)发送至我的 week table 。但存在一个问题,因为在为下一行添加第一行后,我收到错误: number column can
Tôi là một lập trình viên xuất sắc, rất giỏi!