feat/activity-realtime-mercure #28
@@ -1,4 +1,5 @@
|
||||
import { defineStore } from 'pinia';
|
||||
import { Job } from '../../domain/entities/job';
|
||||
import { ApiJobRepository } from '../../infrastructure/api/ApiJobRepository';
|
||||
|
||||
const jobRepository = new ApiJobRepository();
|
||||
@@ -106,6 +107,40 @@ export const useActivityStore = defineStore('activity', {
|
||||
if (job) job.progress = progress;
|
||||
},
|
||||
|
||||
handleJobCreated(data) {
|
||||
const alreadyExists = this.jobs.some(j => j.id === data.id);
|
||||
if (alreadyExists) return;
|
||||
|
||||
const job = Job.create({
|
||||
id: data.id,
|
||||
type: data.type_job,
|
||||
status: data.status,
|
||||
createdAt: data.createdAt,
|
||||
context: data.context,
|
||||
attempts: data.attempts,
|
||||
maxAttempts: data.maxAttempts,
|
||||
});
|
||||
|
||||
this.jobs.unshift(job);
|
||||
this.total += 1;
|
||||
this.totalPages = Math.ceil(this.total / this.limit);
|
||||
},
|
||||
|
||||
handleJobStatusChange(jobId, newStatus) {
|
||||
const job = this.jobs.find(j => j.id === jobId);
|
||||
if (!job) return;
|
||||
|
||||
if (newStatus === 'in_progress') {
|
||||
job.status = 'in_progress';
|
||||
} else {
|
||||
setTimeout(() => {
|
||||
this.jobs = this.jobs.filter(j => j.id !== jobId);
|
||||
this.total = Math.max(0, this.total - 1);
|
||||
this.totalPages = Math.ceil(this.total / this.limit);
|
||||
}, 1500);
|
||||
}
|
||||
},
|
||||
|
||||
subscribeMercure() {
|
||||
if (this.mercureEventSource) return;
|
||||
const url = new URL('/.well-known/mercure', window.location.origin);
|
||||
@@ -113,8 +148,12 @@ export const useActivityStore = defineStore('activity', {
|
||||
this.mercureEventSource = new EventSource(url.toString());
|
||||
this.mercureEventSource.onmessage = (event) => {
|
||||
const data = JSON.parse(event.data);
|
||||
if (data.type === 'job.progress_updated') {
|
||||
if (data.type === 'job.created') {
|
||||
this.handleJobCreated(data);
|
||||
} else if (data.type === 'job.progress_updated') {
|
||||
this.updateJobProgress(data.jobId, data.progress);
|
||||
} else if (data.type === 'job.status_changed') {
|
||||
this.handleJobStatusChange(data.jobId, data.status);
|
||||
}
|
||||
};
|
||||
},
|
||||
|
||||
@@ -45,7 +45,7 @@ readonly class ScrapeChapterHandler
|
||||
$job->start();
|
||||
$this->jobRepository->save($job);
|
||||
|
||||
$this->eventBus->dispatch(new ChapterScrapingStarted($manga->getTitle(), $chapter->chapterNumber));
|
||||
$this->eventBus->dispatch(new ChapterScrapingStarted($job->id, $manga->getTitle(), $chapter->chapterNumber));
|
||||
|
||||
$sources = $this->getSourcesToTry($manga);
|
||||
$slugsToTry = array_merge([$manga->getSlug()], $manga->getAlternativeSlugs());
|
||||
@@ -106,7 +106,7 @@ readonly class ScrapeChapterHandler
|
||||
$errorMessage = $lastException?->getMessage() ?? 'Failed to scrape chapter from all available sources';
|
||||
$job->fail($errorMessage);
|
||||
$this->jobRepository->save($job);
|
||||
$this->eventBus->dispatch(new ChapterScrapingFailed($chapter->mangaId, $chapter->chapterNumber, $errorMessage));
|
||||
$this->eventBus->dispatch(new ChapterScrapingFailed($job->id, $chapter->mangaId, $chapter->chapterNumber, $errorMessage));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,12 +5,18 @@ namespace App\Domain\Scraping\Domain\Event;
|
||||
readonly class ChapterScrapingFailed
|
||||
{
|
||||
public function __construct(
|
||||
private string $jobId,
|
||||
private string $mangaId,
|
||||
private string $chapterNumber,
|
||||
private string $reason
|
||||
) {
|
||||
}
|
||||
|
||||
public function getJobId(): string
|
||||
{
|
||||
return $this->jobId;
|
||||
}
|
||||
|
||||
public function getMangaId(): string
|
||||
{
|
||||
return $this->mangaId;
|
||||
|
||||
@@ -5,11 +5,17 @@ namespace App\Domain\Scraping\Domain\Event;
|
||||
class ChapterScrapingStarted
|
||||
{
|
||||
public function __construct(
|
||||
private readonly string $jobId,
|
||||
private readonly string $mangaTitle,
|
||||
private readonly float $chapterNumber,
|
||||
) {
|
||||
}
|
||||
|
||||
public function getJobId(): string
|
||||
{
|
||||
return $this->jobId;
|
||||
}
|
||||
|
||||
public function getMangaTitle(): string
|
||||
{
|
||||
return $this->mangaTitle;
|
||||
|
||||
@@ -9,6 +9,8 @@ use App\Domain\Scraping\Domain\Model\ScrapingJob;
|
||||
use App\Domain\Scraping\Infrastructure\ApiPlatform\Dto\ScrapeChapterRequest;
|
||||
use App\Domain\Shared\Domain\Contract\JobRepositoryInterface;
|
||||
use Ramsey\Uuid\Uuid;
|
||||
use Symfony\Component\Mercure\HubInterface;
|
||||
use Symfony\Component\Mercure\Update;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
|
||||
final class ScrapeChapterStateProcessor implements ProcessorInterface
|
||||
@@ -16,6 +18,7 @@ final class ScrapeChapterStateProcessor implements ProcessorInterface
|
||||
public function __construct(
|
||||
private readonly MessageBusInterface $commandBus,
|
||||
private readonly JobRepositoryInterface $jobRepository,
|
||||
private readonly HubInterface $hub,
|
||||
) {
|
||||
}
|
||||
|
||||
@@ -29,6 +32,20 @@ final class ScrapeChapterStateProcessor implements ProcessorInterface
|
||||
$job->context['chapterId'] = $data->chapterId;
|
||||
$this->jobRepository->save($job);
|
||||
|
||||
$this->hub->publish(new Update(
|
||||
'jobs/activity',
|
||||
json_encode([
|
||||
'type' => 'job.created',
|
||||
'id' => $job->id,
|
||||
'type_job' => $job->type,
|
||||
'status' => $job->status->value,
|
||||
'createdAt' => $job->createdAt->format('c'),
|
||||
'context' => $job->context,
|
||||
'attempts' => $job->attempts,
|
||||
'maxAttempts' => $job->maxAttempts,
|
||||
])
|
||||
));
|
||||
|
||||
$this->commandBus->dispatch(new ScrapeChapter($data->chapterId, $jobId));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,11 +50,13 @@ class ScrapingEventSubscriber implements EventSubscriberInterface
|
||||
#[AsMessageHandler]
|
||||
public function onChapterScrapingStarted(ChapterScrapingStarted $event): void
|
||||
{
|
||||
$chapterNumber = $event->getChapterNumber();
|
||||
$mangaTitle = $event->getMangaTitle();
|
||||
$this->hub->publish(new Update(
|
||||
'jobs/activity',
|
||||
json_encode(['type' => 'job.status_changed', 'jobId' => $event->getJobId(), 'status' => 'in_progress'])
|
||||
));
|
||||
|
||||
$this->notification->sendInfo(
|
||||
sprintf('Scraping du chapitre %s de "%s" démarré', $chapterNumber, $mangaTitle)
|
||||
sprintf('Scraping du chapitre %s de "%s" démarré', $event->getChapterNumber(), $event->getMangaTitle())
|
||||
);
|
||||
}
|
||||
|
||||
@@ -101,6 +103,11 @@ class ScrapingEventSubscriber implements EventSubscriberInterface
|
||||
$update = new Update($topics, json_encode($data));
|
||||
$this->hub->publish($update);
|
||||
|
||||
$this->hub->publish(new Update(
|
||||
'jobs/activity',
|
||||
json_encode(['type' => 'job.status_changed', 'jobId' => $jobId, 'status' => 'completed'])
|
||||
));
|
||||
|
||||
$mangaTitle = $job->context['mangaTitle'] ?? 'manga inconnu';
|
||||
$this->notification->sendSuccess(
|
||||
sprintf('Chapitre %s de "%s" scrappé avec succès', $chapter->chapterNumber, $mangaTitle)
|
||||
@@ -110,6 +117,11 @@ class ScrapingEventSubscriber implements EventSubscriberInterface
|
||||
#[AsMessageHandler]
|
||||
public function onChapterScrapingFailed(ChapterScrapingFailed $event): void
|
||||
{
|
||||
$this->hub->publish(new Update(
|
||||
'jobs/activity',
|
||||
json_encode(['type' => 'job.status_changed', 'jobId' => $event->getJobId(), 'status' => 'failed'])
|
||||
));
|
||||
|
||||
$this->logger->info('ChapterScrapingFailed reçu pour mangaId: ' . $event->getMangaId() . ', chapter: ' . $event->getChapterNumber());
|
||||
|
||||
$data = [
|
||||
|
||||
@@ -78,6 +78,7 @@ class ScrapeChapterHandlerTest extends TestCase
|
||||
$this->assertCount(2, $dispatchedMessages);
|
||||
|
||||
$this->assertInstanceOf(ChapterScrapingStarted::class, $dispatchedMessages[0]);
|
||||
$this->assertSame($jobId, $dispatchedMessages[0]->getJobId());
|
||||
$this->assertSame(2.0, $dispatchedMessages[0]->getChapterNumber());
|
||||
|
||||
$this->assertInstanceOf(ChapterScraped::class, $dispatchedMessages[1]);
|
||||
|
||||
Reference in New Issue
Block a user