Skip to content

Commit e1b343b

Browse files
committed
Merge branch 'master' of github.com:vtsykun/mq-insight
2 parents 4ebe78d + 803bc8c commit e1b343b

File tree

6 files changed

+80
-14
lines changed

6 files changed

+80
-14
lines changed

src/Command/StatRetrieveCommand.php

+9-8
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Doctrine\DBAL\Connection;
66
use Doctrine\DBAL\Types\Type;
7+
use Okvpn\Bundle\MQInsightBundle\Exception\TerminateCommandException;
78
use Okvpn\Bundle\MQInsightBundle\Manager\ProcessManager;
89
use Okvpn\Bundle\MQInsightBundle\Model\AppConfig;
910
use Okvpn\Bundle\MQInsightBundle\Model\Provider\QueueProviderInterface;
@@ -47,6 +48,7 @@ class StatRetrieveCommand extends ContainerAwareCommand
4748
protected function configure()
4849
{
4950
$this->setName(self::NAME)
51+
->addArgument('application', InputArgument::REQUIRED)
5052
->addArgument('parentPid', InputArgument::REQUIRED)
5153
->addOption('pollingInterval', null, InputOption::VALUE_OPTIONAL, 'The polling interval in sec.')
5254
->setDescription('Retrieve message count statistics');
@@ -91,7 +93,6 @@ protected function initialize(InputInterface $input, OutputInterface $output)
9193
protected function execute(InputInterface $input, OutputInterface $output)
9294
{
9395
$shmid = function_exists('sem_get') ? sem_get(AppConfig::getApplicationID()) : null;
94-
9596
if ($shmid && !sem_acquire($shmid, true)) {
9697
$output->writeln('<info>Not allowed to run a more one command.</info>');
9798
return 0;
@@ -103,9 +104,13 @@ protected function execute(InputInterface $input, OutputInterface $output)
103104
$this->delayPool->sync();
104105
sleep(1);
105106
}
107+
} catch (TerminateCommandException $exception) {
108+
$output->writeln('<error>' . $exception->getMessage() .'</error>');
109+
return 1;
106110
} finally {
107111
if ($shmid) {
108-
sem_release($shmid);
112+
@sem_release($shmid);
113+
@sem_remove($shmid);
109114
}
110115
}
111116

@@ -136,16 +141,12 @@ protected function processCount()
136141
protected function terminateIfNeeded()
137142
{
138143
$message = '';
139-
if (ProcessManager::getNumberOfRunningProcess(self::NAME) > 1) {
140-
$message .= "Running more than one instance StatRetrieveCommand\n";
141-
}
142-
143144
if (ProcessManager::getProcessNameByPid($this->parentPid) === '') {
144-
$message = "The parent process died. Parent pid not found:{$this->parentPid}\n";
145+
$message = "The parent process died. Parent pid not found: {$this->parentPid}\n";
145146
}
146147

147148
if ($message) {
148-
throw new \RuntimeException($message);
149+
throw new TerminateCommandException($message);
149150
}
150151
}
151152
}

src/Controller/QueueController.php

+9-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ public function statusAction()
2828
$fetchFrom->modify('-1 day');
2929
$data = $this->get('okvpn_mq_insight.chart_provider')->getQueueSizeData($fetchFrom);
3030

31+
$provider = $this->get('okvpn_mq_insight.queued_messages_provider');
3132
$runningConsumers = ProcessManager::getPidsOfRunningProcess('oro:message-queue:consume');
33+
$runningConsumers = $provider->filterNotActivePids($runningConsumers);
34+
3235
/** @var MQStateStat $size */
3336
$size = $this->get('doctrine')
3437
->getRepository('OkvpnMQInsightBundle:MQStateStat')
@@ -66,9 +69,13 @@ public function plotAction()
6669
*/
6770
public function queuedAction(Request $request)
6871
{
69-
$result = $this->get('okvpn_mq_insight.queued_messages_provider')->getQueuedMessages();
72+
$provider = $this->get('okvpn_mq_insight.queued_messages_provider');
73+
$result = $provider->getQueuedMessages();
74+
75+
$runningConsumers = $provider->filterNotActivePids(
76+
ProcessManager::getPidsOfRunningProcess('oro:message-queue:consume')
77+
);
7078

71-
$runningConsumers = ProcessManager::getPidsOfRunningProcess('oro:message-queue:consume');
7279
if ($request->get('isLast')) {
7380
$result = end($result) ?: [];
7481
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
namespace Okvpn\Bundle\MQInsightBundle\Exception;
4+
5+
class TerminateCommandException extends \RuntimeException
6+
{
7+
}

src/Extension/MQStatExtension.php

+6-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Okvpn\Bundle\MQInsightBundle\Client\DebugProducerInterface;
88
use Okvpn\Bundle\MQInsightBundle\Command\StatRetrieveCommand;
99
use Okvpn\Bundle\MQInsightBundle\Manager\ProcessManager;
10+
use Okvpn\Bundle\MQInsightBundle\Model\AppConfig;
1011
use Okvpn\Bundle\MQInsightBundle\Model\Counter;
1112
use Okvpn\Bundle\MQInsightBundle\Model\Worker\CallbackTask;
1213
use Okvpn\Bundle\MQInsightBundle\Model\Worker\DelayPool;
@@ -123,6 +124,9 @@ public function onInterrupted(Context $context)
123124
$uid = $message ? $message->getMessageId() : null;
124125
$this->publishErrorStat($context->getException(), $name, $uid);
125126
}
127+
128+
$provider = $this->container->get('okvpn_mq_insight.queued_messages_provider');
129+
$provider->flush(getmypid());
126130
} catch (\Exception $e) {
127131
// do nothing
128132
} finally {
@@ -266,7 +270,7 @@ protected function runStatRetrieveCommandIfNeeded()
266270
{
267271
if (!getenv('SKIP_STAT_RETRIEVE')
268272
&& !$this->container->getParameter('okvpn_mq_insight.skip_stat_retrieve')
269-
&& !ProcessManager::isProcessRunning(StatRetrieveCommand::NAME)
273+
&& !ProcessManager::isProcessRunning(StatRetrieveCommand::NAME . ' ' . AppConfig::getApplicationID())
270274
) {
271275
$env = $this->container->get('kernel')->getEnvironment();
272276
$pb = new ProcessBuilder();
@@ -277,6 +281,7 @@ protected function runStatRetrieveCommandIfNeeded()
277281
->add($phpPath)
278282
->add($_SERVER['argv'][0])
279283
->add(StatRetrieveCommand::NAME)
284+
->add(AppConfig::getApplicationID())
280285
->add(getmypid())
281286
->add("--env=$env");
282287

src/Model/Provider/DbalQueueProvider.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public function getApproxQueueCount()
4545
$lastValue = $this->registry->getRepository('OkvpnMQInsightBundle:MQStateStat')
4646
->getLastValue();
4747

48-
if ($lastValue instanceof MQStateStat && $lastValue->getQueue() > 10000) {
48+
if ($lastValue instanceof MQStateStat && $lastValue->getQueue() > 500000) {
4949

5050
/*
5151
* Fast estimated row count.

src/Provider/QueuedMessagesProvider.php

+48-2
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,17 @@ class QueuedMessagesProvider
1212

1313
protected $storage;
1414

15-
15+
/**
16+
* @param KeyValueStorageInterface $storage
17+
*/
1618
public function __construct(KeyValueStorageInterface $storage)
1719
{
1820
$this->storage = $storage;
1921
}
2022

23+
/**
24+
* @param array $pids
25+
*/
2126
public function collect(array $pids)
2227
{
2328
$applicationId = $this->getApplicationId();
@@ -31,24 +36,65 @@ public function collect(array $pids)
3136
foreach ($pids as $pid) {
3237
$result = $this->storage->get((string)$pid);
3338
$currentItem += (is_array($result) && $currentTime - $result[1] < 3 * self::POLLING_TIME) ? $result[0] : 0;
39+
if (is_array($result) && $currentTime - $result[1] > self::KEEP_RESULT_TIME) {
40+
$this->flush($pid);
41+
}
3442
}
3543

3644
$previousResult[] = [$currentTime, $currentItem];
3745
$result = [];
3846
foreach ($previousResult as $item) {
39-
if ($currentTime - $item[0] < self::KEEP_RESULT_TIME) {
47+
if ($currentTime - $item[0] < 2 * self::KEEP_RESULT_TIME) {
4048
$result[] = $item;
4149
}
4250
}
4351

4452
$this->storage->set($applicationId, array_values($result));
4553
}
4654

55+
/**
56+
* @param array $pids
57+
* @return array
58+
*/
59+
public function filterNotActivePids(array $pids)
60+
{
61+
$currentTime = time();
62+
$runningPids = [];
63+
foreach ($pids as $pid) {
64+
$result = $this->storage->get((string) $pid);
65+
if (is_array($result) && $currentTime - $result[1] < 2 * self::KEEP_RESULT_TIME) {
66+
$runningPids[] = $pid;
67+
}
68+
}
69+
70+
return $runningPids;
71+
}
72+
73+
/**
74+
* @param string $pid
75+
* @param mixed $value
76+
*/
4777
public function saveResultForPid($pid, $value)
4878
{
4979
$this->storage->set((string) $pid, $value);
5080
}
5181

82+
/**
83+
* @param string $pid
84+
* @return bool
85+
*/
86+
public function flush($pid)
87+
{
88+
try {
89+
return $this->storage->delete((string) $pid);
90+
} catch (\Throwable $e) {
91+
return false;
92+
}
93+
}
94+
95+
/**
96+
* @return array|mixed
97+
*/
5298
public function getQueuedMessages()
5399
{
54100
$applicationId = $this->getApplicationId();

0 commit comments

Comments
 (0)