Replace daemon and scheduler package with service package

This commit is contained in:
Frédéric Guillot 2018-11-11 15:32:48 -08:00
parent ca45765c46
commit 487852f07e
16 changed files with 287 additions and 275 deletions

View file

@ -9,7 +9,6 @@ import (
"fmt" "fmt"
"miniflux.app/config" "miniflux.app/config"
"miniflux.app/daemon"
"miniflux.app/database" "miniflux.app/database"
"miniflux.app/logger" "miniflux.app/logger"
"miniflux.app/storage" "miniflux.app/storage"
@ -49,7 +48,7 @@ func Parse() {
flag.BoolVar(&flagCreateAdmin, "create-admin", false, flagCreateAdminHelp) flag.BoolVar(&flagCreateAdmin, "create-admin", false, flagCreateAdminHelp)
flag.BoolVar(&flagResetPassword, "reset-password", false, flagResetPasswordHelp) flag.BoolVar(&flagResetPassword, "reset-password", false, flagResetPasswordHelp)
flag.BoolVar(&flagResetFeedErrors, "reset-feed-errors", false, flagResetFeedErrorsHelp) flag.BoolVar(&flagResetFeedErrors, "reset-feed-errors", false, flagResetFeedErrorsHelp)
flag.BoolVar(&flagDebugMode,"debug", false, flagDebugModeHelp) flag.BoolVar(&flagDebugMode, "debug", false, flagDebugModeHelp)
flag.Parse() flag.Parse()
cfg := config.NewConfig() cfg := config.NewConfig()
@ -111,5 +110,5 @@ func Parse() {
createAdmin(store) createAdmin(store)
} }
daemon.Run(cfg, store) startDaemon(cfg, store)
} }

57
cli/daemon.go Normal file
View file

@ -0,0 +1,57 @@
// Copyright 2018 Frédéric Guillot. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package cli // import "miniflux.app/cli"
import (
"context"
"os"
"os/signal"
"runtime"
"syscall"
"time"
"miniflux.app/config"
"miniflux.app/logger"
"miniflux.app/reader/feed"
"miniflux.app/service/scheduler"
"miniflux.app/service/httpd"
"miniflux.app/storage"
"miniflux.app/worker"
)
func startDaemon(cfg *config.Config, store *storage.Storage) {
logger.Info("Starting Miniflux...")
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)
signal.Notify(stop, syscall.SIGTERM)
feedHandler := feed.NewFeedHandler(store)
pool := worker.NewPool(feedHandler, cfg.WorkerPoolSize())
go scheduler.Serve(cfg, store, pool)
go showProcessStatistics()
httpServer := httpd.Serve(cfg, store, pool, feedHandler)
<-stop
logger.Info("Shutting down the process...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
httpServer.Shutdown(ctx)
logger.Info("Process gracefully stopped")
}
func showProcessStatistics() {
for {
var m runtime.MemStats
runtime.ReadMemStats(&m)
logger.Debug("Sys=%vK, InUse=%vK, HeapInUse=%vK, StackSys=%vK, StackInUse=%vK, GoRoutines=%d, NumCPU=%d",
m.Sys/1024, (m.Sys-m.HeapReleased)/1024, m.HeapInuse/1024, m.StackSys/1024, m.StackInuse/1024,
runtime.NumGoroutine(), runtime.NumCPU())
time.Sleep(30 * time.Second)
}
}

View file

@ -1,61 +0,0 @@
// Copyright 2018 Frédéric Guillot. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package daemon // import "miniflux.app/daemon"
import (
"context"
"os"
"os/signal"
"runtime"
"syscall"
"time"
"miniflux.app/config"
"miniflux.app/logger"
"miniflux.app/reader/feed"
"miniflux.app/scheduler"
"miniflux.app/storage"
)
// Run starts the daemon.
func Run(cfg *config.Config, store *storage.Storage) {
logger.Info("Starting Miniflux...")
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)
signal.Notify(stop, syscall.SIGTERM)
go func() {
for {
var m runtime.MemStats
runtime.ReadMemStats(&m)
logger.Debug("Sys=%vK, InUse=%vK, HeapInUse=%vK, StackSys=%vK, StackInUse=%vK, GoRoutines=%d, NumCPU=%d",
m.Sys/1024, (m.Sys-m.HeapReleased)/1024, m.HeapInuse/1024, m.StackSys/1024, m.StackInuse/1024,
runtime.NumGoroutine(), runtime.NumCPU())
time.Sleep(30 * time.Second)
}
}()
feedHandler := feed.NewFeedHandler(store)
pool := scheduler.NewWorkerPool(feedHandler, cfg.WorkerPoolSize())
server := newServer(cfg, store, pool, feedHandler)
scheduler.NewFeedScheduler(
store,
pool,
cfg.PollingFrequency(),
cfg.BatchSize(),
)
scheduler.NewCleanupScheduler(store, cfg.CleanupFrequency())
<-stop
logger.Info("Shutting down the server...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
server.Shutdown(ctx)
logger.Info("Server gracefully stopped")
}

View file

@ -1,37 +0,0 @@
// Copyright 2018 Frédéric Guillot. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package daemon // import "miniflux.app/daemon"
import (
"miniflux.app/api"
"miniflux.app/config"
"miniflux.app/fever"
"miniflux.app/middleware"
"miniflux.app/reader/feed"
"miniflux.app/scheduler"
"miniflux.app/storage"
"miniflux.app/ui"
"github.com/gorilla/mux"
)
func routes(cfg *config.Config, store *storage.Storage, feedHandler *feed.Handler, pool *scheduler.WorkerPool) *mux.Router {
router := mux.NewRouter()
middleware := middleware.New(cfg, store, router)
if cfg.BasePath() != "" {
router = router.PathPrefix(cfg.BasePath()).Subrouter()
}
router.Use(middleware.ClientIP)
router.Use(middleware.HeaderConfig)
router.Use(middleware.Logging)
fever.Serve(router, cfg, store)
api.Serve(router, store, feedHandler)
ui.Serve(router, cfg, store, pool, feedHandler)
return router
}

View file

@ -1,94 +0,0 @@
// Copyright 2018 Frédéric Guillot. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package daemon // import "miniflux.app/daemon"
import (
"crypto/tls"
"net/http"
"time"
"miniflux.app/config"
"miniflux.app/logger"
"miniflux.app/reader/feed"
"miniflux.app/scheduler"
"miniflux.app/storage"
"golang.org/x/crypto/acme/autocert"
)
func newServer(cfg *config.Config, store *storage.Storage, pool *scheduler.WorkerPool, feedHandler *feed.Handler) *http.Server {
certFile := cfg.CertFile()
keyFile := cfg.KeyFile()
certDomain := cfg.CertDomain()
certCache := cfg.CertCache()
server := &http.Server{
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 60 * time.Second,
Addr: cfg.ListenAddr(),
Handler: routes(cfg, store, feedHandler, pool),
}
if certDomain != "" && certCache != "" {
cfg.IsHTTPS = true
server.Addr = ":https"
certManager := autocert.Manager{
Cache: autocert.DirCache(certCache),
Prompt: autocert.AcceptTOS,
HostPolicy: autocert.HostWhitelist(certDomain),
}
// Handle http-01 challenge.
s := &http.Server{
Handler: certManager.HTTPHandler(nil),
Addr: ":http",
}
go s.ListenAndServe()
go func() {
logger.Info(`Listening on "%s" by using auto-configured certificate for "%s"`, server.Addr, certDomain)
if err := server.Serve(certManager.Listener()); err != http.ErrServerClosed {
logger.Fatal(`Server failed to start: %v`, err)
}
}()
} else if certFile != "" && keyFile != "" {
cfg.IsHTTPS = true
// See https://blog.cloudflare.com/exposing-go-on-the-internet/
// And https://wiki.mozilla.org/Security/Server_Side_TLS
server.TLSConfig = &tls.Config{
MinVersion: tls.VersionTLS12,
PreferServerCipherSuites: true,
CurvePreferences: []tls.CurveID{
tls.CurveP256,
tls.X25519,
},
CipherSuites: []uint16{
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
},
}
go func() {
logger.Info(`Listening on "%s" by using certificate "%s" and key "%s"`, server.Addr, certFile, keyFile)
if err := server.ListenAndServeTLS(certFile, keyFile); err != http.ErrServerClosed {
logger.Fatal(`Server failed to start: %v`, err)
}
}()
} else {
go func() {
logger.Info(`Listening on "%s" without TLS`, server.Addr)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
logger.Fatal(`Server failed to start: %v`, err)
}
}()
}
return server
}

View file

@ -4,7 +4,7 @@
/* /*
Package daemon handles the main application process. Package browser handles website crawling.
*/ */
package daemon // import "miniflux.app/daemon" package browser // import "miniflux.app/reader/browser"

View file

@ -1,44 +0,0 @@
// Copyright 2017 Frédéric Guillot. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package scheduler // import "miniflux.app/scheduler"
import (
"time"
"miniflux.app/logger"
"miniflux.app/storage"
)
// NewFeedScheduler starts a new scheduler that push jobs to a pool of workers.
func NewFeedScheduler(store *storage.Storage, workerPool *WorkerPool, frequency, batchSize int) {
go func() {
c := time.Tick(time.Duration(frequency) * time.Minute)
for range c {
jobs, err := store.NewBatch(batchSize)
if err != nil {
logger.Error("[FeedScheduler] %v", err)
} else {
logger.Debug("[FeedScheduler] Pushing %d jobs", len(jobs))
workerPool.Push(jobs)
}
}
}()
}
// NewCleanupScheduler starts a new scheduler that clean old sessions and archive read items.
func NewCleanupScheduler(store *storage.Storage, frequency int) {
go func() {
c := time.Tick(time.Duration(frequency) * time.Hour)
for range c {
nbSessions := store.CleanOldSessions()
nbUserSessions := store.CleanOldUserSessions()
logger.Info("[CleanupScheduler] Cleaned %d sessions and %d user sessions", nbSessions, nbUserSessions)
if err := store.ArchiveEntries(); err != nil {
logger.Error("[CleanupScheduler] %v", err)
}
}
}()
}

View file

@ -4,7 +4,7 @@
/* /*
Package scheduler implements the application internal scheduler. Package httpd implements the HTTP service.
*/ */
package scheduler // import "miniflux.app/scheduler" package httpd // import "miniflux.app/service/httpd"

130
service/httpd/httpd.go Normal file
View file

@ -0,0 +1,130 @@
// Copyright 2018 Frédéric Guillot. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package httpd // import "miniflux.app/service/httpd"
import (
"crypto/tls"
"net/http"
"time"
"miniflux.app/api"
"miniflux.app/config"
"miniflux.app/fever"
"miniflux.app/logger"
"miniflux.app/middleware"
"miniflux.app/reader/feed"
"miniflux.app/storage"
"miniflux.app/ui"
"miniflux.app/worker"
"github.com/gorilla/mux"
"golang.org/x/crypto/acme/autocert"
)
// Serve starts a new HTTP server.
func Serve(cfg *config.Config, store *storage.Storage, pool *worker.Pool, feedHandler *feed.Handler) *http.Server {
certFile := cfg.CertFile()
keyFile := cfg.KeyFile()
certDomain := cfg.CertDomain()
certCache := cfg.CertCache()
server := &http.Server{
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 60 * time.Second,
Addr: cfg.ListenAddr(),
Handler: setupHandler(cfg, store, feedHandler, pool),
}
if certDomain != "" && certCache != "" {
cfg.IsHTTPS = true
startAutoCertTLSServer(server, certDomain, certCache)
} else if certFile != "" && keyFile != "" {
cfg.IsHTTPS = true
startTLSServer(server, certFile, keyFile)
} else {
startHTTPServer(server)
}
return server
}
func startAutoCertTLSServer(server *http.Server, certDomain, certCache string) {
server.Addr = ":https"
certManager := autocert.Manager{
Cache: autocert.DirCache(certCache),
Prompt: autocert.AcceptTOS,
HostPolicy: autocert.HostWhitelist(certDomain),
}
// Handle http-01 challenge.
s := &http.Server{
Handler: certManager.HTTPHandler(nil),
Addr: ":http",
}
go s.ListenAndServe()
go func() {
logger.Info(`Listening on %q by using auto-configured certificate for %q`, server.Addr, certDomain)
if err := server.Serve(certManager.Listener()); err != http.ErrServerClosed {
logger.Fatal(`Server failed to start: %v`, err)
}
}()
}
func startTLSServer(server *http.Server, certFile, keyFile string) {
// See https://blog.cloudflare.com/exposing-go-on-the-internet/
// And https://wiki.mozilla.org/Security/Server_Side_TLS
server.TLSConfig = &tls.Config{
MinVersion: tls.VersionTLS12,
PreferServerCipherSuites: true,
CurvePreferences: []tls.CurveID{
tls.CurveP256,
tls.X25519,
},
CipherSuites: []uint16{
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
},
}
go func() {
logger.Info(`Listening on %q by using certificate %q and key %q`, server.Addr, certFile, keyFile)
if err := server.ListenAndServeTLS(certFile, keyFile); err != http.ErrServerClosed {
logger.Fatal(`Server failed to start: %v`, err)
}
}()
}
func startHTTPServer(server *http.Server) {
go func() {
logger.Info(`Listening on %q without TLS`, server.Addr)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
logger.Fatal(`Server failed to start: %v`, err)
}
}()
}
func setupHandler(cfg *config.Config, store *storage.Storage, feedHandler *feed.Handler, pool *worker.Pool) *mux.Router {
router := mux.NewRouter()
middleware := middleware.New(cfg, store, router)
if cfg.BasePath() != "" {
router = router.PathPrefix(cfg.BasePath()).Subrouter()
}
router.Use(middleware.ClientIP)
router.Use(middleware.HeaderConfig)
router.Use(middleware.Logging)
fever.Serve(router, cfg, store)
api.Serve(router, store, feedHandler)
ui.Serve(router, cfg, store, pool, feedHandler)
return router
}

10
service/scheduler/doc.go Normal file
View file

@ -0,0 +1,10 @@
// Copyright 2018 Frédéric Guillot. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
/*
Package scheduler implements the scheduler service.
*/
package scheduler // import "miniflux.app/service/scheduler"

View file

@ -0,0 +1,46 @@
// Copyright 2018 Frédéric Guillot. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package scheduler // import "miniflux.app/service/scheduler"
import (
"time"
"miniflux.app/config"
"miniflux.app/logger"
"miniflux.app/storage"
"miniflux.app/worker"
)
// Serve starts the internal scheduler.
func Serve(cfg *config.Config, store *storage.Storage, pool *worker.Pool) {
go feedScheduler(store, pool, cfg.PollingFrequency(), cfg.BatchSize())
go cleanupScheduler(store, cfg.CleanupFrequency())
}
func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSize int) {
c := time.Tick(time.Duration(frequency) * time.Minute)
for range c {
jobs, err := store.NewBatch(batchSize)
if err != nil {
logger.Error("[Scheduler:Feed] %v", err)
} else {
logger.Debug("[Scheduler:Feed] Pushing %d jobs", len(jobs))
pool.Push(jobs)
}
}
}
func cleanupScheduler(store *storage.Storage, frequency int) {
c := time.Tick(time.Duration(frequency) * time.Hour)
for range c {
nbSessions := store.CleanOldSessions()
nbUserSessions := store.CleanOldUserSessions()
logger.Info("[Scheduler:Cleanup] Cleaned %d sessions and %d user sessions", nbSessions, nbUserSessions)
if err := store.ArchiveEntries(); err != nil {
logger.Error("[Scheduler:Cleanup] %v", err)
}
}
}

View file

@ -7,9 +7,9 @@ package ui // import "miniflux.app/ui"
import ( import (
"miniflux.app/config" "miniflux.app/config"
"miniflux.app/reader/feed" "miniflux.app/reader/feed"
"miniflux.app/scheduler"
"miniflux.app/storage" "miniflux.app/storage"
"miniflux.app/template" "miniflux.app/template"
"miniflux.app/worker"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
@ -19,6 +19,6 @@ type handler struct {
cfg *config.Config cfg *config.Config
store *storage.Storage store *storage.Storage
tpl *template.Engine tpl *template.Engine
pool *scheduler.WorkerPool pool *worker.Pool
feedHandler *feed.Handler feedHandler *feed.Handler
} }

View file

@ -9,15 +9,15 @@ import (
"miniflux.app/config" "miniflux.app/config"
"miniflux.app/reader/feed" "miniflux.app/reader/feed"
"miniflux.app/scheduler"
"miniflux.app/storage" "miniflux.app/storage"
"miniflux.app/template" "miniflux.app/template"
"miniflux.app/worker"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
// Serve declares all routes for the user interface. // Serve declares all routes for the user interface.
func Serve(router *mux.Router, cfg *config.Config, store *storage.Storage, pool *scheduler.WorkerPool, feedHandler *feed.Handler) { func Serve(router *mux.Router, cfg *config.Config, store *storage.Storage, pool *worker.Pool, feedHandler *feed.Handler) {
middleware := newMiddleware(router, cfg, store) middleware := newMiddleware(router, cfg, store)
handler := &handler{router, cfg, store, template.NewEngine(cfg, router), pool, feedHandler} handler := &handler{router, cfg, store, template.NewEngine(cfg, router), pool, feedHandler}

10
worker/doc.go Normal file
View file

@ -0,0 +1,10 @@
// Copyright 2018 Frédéric Guillot. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
/*
Package worker implements the background workers.
*/
package worker // import "miniflux.app/worker"

View file

@ -1,29 +1,29 @@
// Copyright 2017 Frédéric Guillot. All rights reserved. // Copyright 2018 Frédéric Guillot. All rights reserved.
// Use of this source code is governed by the Apache 2.0 // Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package scheduler // import "miniflux.app/scheduler" package worker // import "miniflux.app/worker"
import ( import (
"miniflux.app/model" "miniflux.app/model"
"miniflux.app/reader/feed" "miniflux.app/reader/feed"
) )
// WorkerPool handle a pool of workers. // Pool handles a pool of workers.
type WorkerPool struct { type Pool struct {
queue chan model.Job queue chan model.Job
} }
// Push send a list of jobs to the queue. // Push send a list of jobs to the queue.
func (w *WorkerPool) Push(jobs model.JobList) { func (p *Pool) Push(jobs model.JobList) {
for _, job := range jobs { for _, job := range jobs {
w.queue <- job p.queue <- job
} }
} }
// NewWorkerPool creates a pool of background workers. // NewPool creates a pool of background workers.
func NewWorkerPool(feedHandler *feed.Handler, nbWorkers int) *WorkerPool { func NewPool(feedHandler *feed.Handler, nbWorkers int) *Pool {
workerPool := &WorkerPool{ workerPool := &Pool{
queue: make(chan model.Job), queue: make(chan model.Job),
} }

View file

@ -2,11 +2,9 @@
// Use of this source code is governed by the Apache 2.0 // Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package scheduler // import "miniflux.app/scheduler" package worker // import "miniflux.app/worker"
import ( import (
"time"
"miniflux.app/logger" "miniflux.app/logger"
"miniflux.app/model" "miniflux.app/model"
"miniflux.app/reader/feed" "miniflux.app/reader/feed"
@ -30,7 +28,5 @@ func (w *Worker) Run(c chan model.Job) {
if err != nil { if err != nil {
logger.Error("[Worker] %v", err) logger.Error("[Worker] %v", err)
} }
time.Sleep(time.Millisecond * 1000)
} }
} }