From 3dc8e5ebaf6d0ed36acbe7079fde56fc58fa1a53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Guillot?= Date: Sun, 25 Jun 2023 10:24:05 -0700 Subject: [PATCH] Refresh feeds in the cronjob in parallel --- cli/cli.go | 14 +------------- cli/cronjob.go | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 13 deletions(-) create mode 100644 cli/cronjob.go diff --git a/cli/cli.go b/cli/cli.go index 6ec55035..f28619a4 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -11,7 +11,6 @@ import ( "miniflux.app/database" "miniflux.app/locale" "miniflux.app/logger" - feedHandler "miniflux.app/reader/handler" "miniflux.app/storage" "miniflux.app/ui/static" "miniflux.app/version" @@ -192,18 +191,7 @@ func Parse() { } if flagCronjob { - jobs, err := store.NewBatch(config.Opts.BatchSize()) - if err != nil { - logger.Error("[Cronjob] %v", err) - } - - logger.Info("[Cronjob]] Processing %d jobs", len(jobs)) - - for _, job := range jobs { - if err := feedHandler.RefreshFeed(store, job.UserID, job.FeedID); err != nil { - logger.Error("[Cronjob] Refreshing the feed #%d returned this error: %v", job.FeedID, err) - } - } + runCronjob(store) return } diff --git a/cli/cronjob.go b/cli/cronjob.go new file mode 100644 index 00000000..e894a4c9 --- /dev/null +++ b/cli/cronjob.go @@ -0,0 +1,51 @@ +// SPDX-FileCopyrightText: Copyright The Miniflux Authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package cli // import "miniflux.app/cli" + +import ( + "sync" + "time" + + "miniflux.app/config" + "miniflux.app/logger" + "miniflux.app/model" + feedHandler "miniflux.app/reader/handler" + "miniflux.app/storage" +) + +func runCronjob(store *storage.Storage) { + var wg sync.WaitGroup + + startTime := time.Now() + jobs, err := store.NewBatch(config.Opts.BatchSize()) + if err != nil { + logger.Error("[Cronjob] %v", err) + } + + nbJobs := len(jobs) + logger.Info("[Cronjob]] Created %d jobs from a batch size of %d", nbJobs, config.Opts.BatchSize()) + var jobQueue = make(chan model.Job, nbJobs) + + logger.Info("[Cronjob] Starting a pool of %d workers", config.Opts.WorkerPoolSize()) + for i := 0; i < config.Opts.WorkerPoolSize(); i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for job := range jobQueue { + logger.Info("[Cronjob] Refreshing feed #%d for user #%d in worker #%d", job.FeedID, job.UserID, workerID) + if err := feedHandler.RefreshFeed(store, job.UserID, job.FeedID); err != nil { + logger.Error("[Cronjob] Refreshing the feed #%d returned this error: %v", job.FeedID, err) + } + } + }(i) + } + + for _, job := range jobs { + jobQueue <- job + } + close(jobQueue) + + wg.Wait() + logger.Info("[Cronjob] Refreshed %d feed(s) in %s", nbJobs, time.Since(startTime)) +}