Added:
- activity on menu - starting activity page
This commit is contained in:
146
src/EventSubscriber/QueueStatusSubscriber.php
Normal file
146
src/EventSubscriber/QueueStatusSubscriber.php
Normal file
@@ -0,0 +1,146 @@
|
||||
<?php
|
||||
|
||||
namespace App\EventSubscriber;
|
||||
|
||||
use App\Event\PageScrappingProgressEvent;
|
||||
use App\Message\DownloadChapter;
|
||||
use App\Repository\ChapterRepository;
|
||||
use App\Service\ActivityService;
|
||||
use Doctrine\DBAL\Connection;
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
||||
|
||||
class QueueStatusSubscriber implements EventSubscriberInterface
|
||||
{
|
||||
public function __construct(
|
||||
private ActivityService $activityService,
|
||||
private Connection $connection,
|
||||
private ChapterRepository $chapterRepository
|
||||
)
|
||||
{
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents(): array
|
||||
{
|
||||
return [
|
||||
WorkerMessageReceivedEvent::class => 'onMessageReceived',
|
||||
WorkerMessageHandledEvent::class => 'onMessageHandled',
|
||||
WorkerMessageFailedEvent::class => 'onMessageFailed',
|
||||
PageScrappingProgressEvent::NAME => 'onPageScrapingProgress',
|
||||
];
|
||||
}
|
||||
|
||||
public function onMessageReceived(WorkerMessageReceivedEvent $event): void
|
||||
{
|
||||
$envelope = $event->getEnvelope();
|
||||
$message = $envelope->getMessage();
|
||||
|
||||
if ($message instanceof DownloadChapter) {
|
||||
$this->activityService->sendUpdate($this->getActivity());
|
||||
}
|
||||
}
|
||||
|
||||
public function onMessageHandled(WorkerMessageHandledEvent $event): void
|
||||
{
|
||||
$envelope = $event->getEnvelope();
|
||||
$message = $envelope->getMessage();
|
||||
|
||||
if ($message instanceof DownloadChapter) {
|
||||
$this->activityService->sendUpdate($this->getActivity());
|
||||
}
|
||||
}
|
||||
|
||||
public function onMessageFailed(WorkerMessageFailedEvent $event): void
|
||||
{
|
||||
$envelope = $event->getEnvelope();
|
||||
$message = $envelope->getMessage();
|
||||
|
||||
if ($message instanceof DownloadChapter) {
|
||||
$this->activityService->sendUpdate($this->getActivity());
|
||||
}
|
||||
}
|
||||
|
||||
public function onPageScrapingProgress(PageScrappingProgressEvent $event): void
|
||||
{
|
||||
$data = [
|
||||
'status' => 'Page scraping progress',
|
||||
'chapterId' => $event->getChapterId(),
|
||||
'pageIndex' => $event->getPageIndex(),
|
||||
'totalPages' => $event->getTotalPages(),
|
||||
];
|
||||
$this->activityService->sendUpdate($data);
|
||||
}
|
||||
|
||||
private function getActivity(): array
|
||||
{
|
||||
$queueStatus = $this->getQueueStatus();
|
||||
return [
|
||||
'processing' => $this->buildStatusActivity($this->decodeMessages($queueStatus['processing'])),
|
||||
'pending' => $this->buildStatusActivity($this->decodeMessages($queueStatus['pending']))
|
||||
];
|
||||
}
|
||||
|
||||
//TODO refactorer ce code avec celui du ActivityController
|
||||
private function buildStatusActivity(array $activity): array
|
||||
{
|
||||
$status = [];
|
||||
foreach ($activity as $envelope) {
|
||||
$envelope = $envelope['body'];
|
||||
if ($envelope instanceof Envelope) {
|
||||
if (!$envelope->getMessage() instanceof DownloadChapter) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$chapter = $this->chapterRepository->find($envelope->getMessage()->getChapterId());
|
||||
$manga = $chapter->getManga();
|
||||
$status[] = [
|
||||
'manga' => $manga->getTitle(),
|
||||
'volume' => $chapter->getVolume(),
|
||||
'chapter' => $chapter->getNumber(),
|
||||
'title' => $chapter->getTitle(),
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
return $status;
|
||||
}
|
||||
|
||||
private function getQueueStatus(): array
|
||||
{
|
||||
// Requête pour récupérer les messages en attente
|
||||
$sqlPending = 'SELECT * FROM messenger_messages WHERE queue_name = :queue AND available_at IS NULL';
|
||||
$pending = $this->connection->fetchAllAssociative($sqlPending, ['queue' => 'default']);
|
||||
|
||||
// Requête pour récupérer les messages en cours de traitement
|
||||
$sqlProcessing = 'SELECT * FROM messenger_messages WHERE queue_name = :queue AND available_at IS NOT NULL';
|
||||
$processing = $this->connection->fetchAllAssociative($sqlProcessing, ['queue' => 'default']);
|
||||
|
||||
return [
|
||||
'pending' => $pending,
|
||||
'processing' => $processing
|
||||
];
|
||||
}
|
||||
|
||||
private function decodeMessages(array $messages): array
|
||||
{
|
||||
$decodedMessages = [];
|
||||
|
||||
foreach ($messages as $message) {
|
||||
$decodedMessages[] = [
|
||||
'id' => $message['id'],
|
||||
'body' => $this->decodeMessageBody($message['body']),
|
||||
'headers' => json_decode($message['headers'], true),
|
||||
];
|
||||
}
|
||||
|
||||
return $decodedMessages;
|
||||
}
|
||||
|
||||
private function decodeMessageBody(string $body)
|
||||
{
|
||||
return unserialize(stripcslashes($body));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user