Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Was not possible to write frame! Write operation timed out while connecting ActiveMQ #169

Open
chandra10207 opened this issue Jun 29, 2022 · 1 comment

Comments

@chandra10207
Copy link

chandra10207 commented Jun 29, 2022

Fatal error: Uncaught Stomp\Exception\ConnectionException: Was not possible to write frame! Write operation timed out.

Fatal error: Uncaught Stomp\Exception\ConnectionException: Was not possible to write frame! Write operation timed out. (Host: 127.0.0.1) in /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/HeartbeatEmitter.php on line 178

Stomp\Network\Observer\Exception\HeartbeatException: Could not send heartbeat to server. in /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/HeartbeatEmitter.php on line 178

Call Stack:
    0.0029     410088   1. {main}() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/bin/process-location-stock-message.php:0
  242.5327   98653392   2. App\Broker->read() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/bin/process-location-stock-message.php:32
  242.5327   98653392   3. Stomp\StatefulStomp->read() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/src/Broker.php:122
  242.5327   98653392   4. Stomp\States\ConsumerState->read() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/StatefulStomp.php:163
  242.5327   98653392   5. Stomp\Client->readFrame() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/States/ConsumerState.php:170
  242.5327   98653392   6. Stomp\Network\Connection->readFrame() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Client.php:361
  242.5328   98653424   7. Stomp\Network\Observer\ConnectionObserverCollection->emptyRead() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Connection.php:537
  242.5328   98653424   8. Stomp\Network\Observer\HeartbeatEmitter->emptyRead() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/ConnectionObserverCollection.php:123
  242.5328   98653424   9. Stomp\Network\Observer\HeartbeatEmitter->onPotentialConnectionStateActivity() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/AbstractBeats.php:245
  242.5328   98653424  10. Stomp\Network\Observer\HeartbeatEmitter->checkDelayed() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/HeartbeatEmitter.php:164
  242.5328   98653424  11. Stomp\Network\Observer\HeartbeatEmitter->onDelay() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/AbstractBeats.php:155

But I already have Heartbeat setup on my connection:

       <?php

namespace App;

use Stomp\Client;
use Stomp\Exception\ConnectionException;
use Stomp\Network\Connection;
use Stomp\Network\Observer\HeartbeatEmitter;
use Stomp\StatefulStomp;
use Stomp\Transport\Frame;
use Stomp\Transport\Message;

class Broker
{
    private $client;
    private $subscriptions = [];
    private $host;
    private $port;
    private $brokerUri;
    private $username;
    private $password;
    private $selector;
    private $topic;
    private $path = '';
    private $log_path = '';
    private $log_filepath = '';

    public function __construct()
    {
        $this->load_amq_config();
        $connection = new Connection($this->host . ':' . $this->port);
        $client = new Client($connection);
        $client->setLogin($this->username, $this->password);

        $client->setHeartbeat(500);
        $connection->setReadTimeout(0, 250000);

        $emitter = new HeartbeatEmitter($client->getConnection());
        $client->getConnection()->getObservers()->addObserver($emitter);

        $this->client = new StatefulStomp($client);
        $client->connect();
    }


    private function load_amq_config(): void {
        $this->host = AMQ_HOST;
        $this->port = AMQ_PORT;
        $this->brokerUri = AMQ_HOST . ':' . AMQ_PORT;
        $this->username = AMQ_USERNAME;
        $this->password = AMQ_PASSWORD;
        $this->selector = AMQ_LOCATION_SELECTOR;
        $this->topic = AMQ_TOPIC;
    }

    public function getSelector(){
        return $this->selector;
    }

    public function getTopic(){
        return $this->topic;
    }


    public function sendQueue(string $queueName, string $message, array $headers = []): bool
    {
        $destination = '/queue/' . $queueName;
        return $this->client->send($destination, new Message($message, $headers + ['persistent' => 'true']));
    }

    public function sendTopic(string $topicName, string $message, array $headers = []): bool
    {
        $destination = '/topic/' . $topicName;
        return $this->client->send($destination, new Message($message, $headers + ['persistent' => 'true']));
    }

    public function subscribeQueue(string $queueName, ?string $selector = null): void
    {
        $destination = '/queue/' . $queueName;
        $this->subscriptions[$destination] = $this->client->subscribe($destination, $selector, 'client-individual');
    }

    public function subscribeTopic(string $topicName, ?string $selector = null): void
    {
        $destination = '/topic/' . $topicName;
        $this->subscriptions[$destination] = $this->client->subscribe($destination, $selector, 'client-individual');
    }

    public function unsubscribeQueue(?string $queueName = null): void
    {
        if ($queueName) {
            $destination = '/queue/' . $queueName;
            if (isset($this->subscriptions[$destination])) {
                $this->client->unsubscribe($this->subscriptions[$destination]);
            }
        } else {
            $this->client->unsubscribe();
        }
    }

    public function unsubscribeTopic(?string $topicName = null): void
    {
        if ($topicName) {
            $destination = '/topic/' . $topicName;
            if (isset($this->subscriptions[$destination])) {
                $this->client->unsubscribe($this->subscriptions[$destination]);
            }
        } else {
            $this->client->unsubscribe();
        }
    }

    public function read(): ?Frame
    {
        return ($frame = $this->client->read()) ? $frame : null;
    }

    public function ack(Frame $message): void
    {
        $this->client->ack($message);
    }

    public function nack(Frame $message): void
    {
        $this->client->nack($message);
    }
}

Main Message Processor Cron File:

<?php

require_once __DIR__ . '/../vendor/autoload.php';
use App\Broker;
use App\class_amqMessageProcessor;
use Exception;
use Stomp\Transport\Frame;

$amqProcessor = new class_amqMessageProcessor();
try {
    $broker = new Broker();
} catch (Exception $e) {
    $amqProcessor->save_log("Error Message: ".$e->getMessage());
    exit(1);
}

$selector = $broker->getSelector();
$topic = $broker->getTopic();
try{
    $broker->subscribeTopic( $topic , $selector);
} catch (Exception $e) {
    $amqProcessor->save_log("Error Message: ".$e->getMessage());
    exit(1);
}

while (true) {
    $message = $broker->read();
    if($message AND $message != ''){
        if ($message instanceof Frame) {
            if ($message['type'] === 'terminate') {
                $amqProcessor->save_log("Received shutdown command on message.");
            }
            $messageHeader = $message->getHeaders();
            $messageBody = $message->getBody();
            echo json_encode($messageBody);
            $broker->ack($message);
        }
        usleep(100000);
    }
    else{
        echo ("No Messages Received.\n");
        $amqProcessor->save_log("No Messages Received.");
    }
}

// $broker->unsubscribeTopic();

$amqProcessor->save_log("Cron End");

Composer dependency details:


ActiveMQ 5.17.1
 "php": ">=7.2.0"
"name": "stomp-php/stomp-php",
            "version": "5.0.0",

Can you please tell me what am I missing here?

@gmorisseau-sigma
Copy link

gmorisseau-sigma commented Oct 21, 2022

Hello @chandra10207 i had a similar error with my artemis active MQ inside a docker ("from scratch"). In fact with default configuration, I had to comment the lines


        //$client->setHeartbeat(500);
        //$connection->setReadTimeout(0, 250000);

        //$emitter = new HeartbeatEmitter($client->getConnection());
        //$client->getConnection()->getObservers()->addObserver($emitter);

And then i can read frames...

Note : it is ok if internal json process is fast (less than artemis TTL > 60 sec)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants