Add new config option CLEANUP_ARCHIVE_BATCH_SIZE
This commit is contained in:
parent
c119a2c011
commit
36868e648c
5 changed files with 24 additions and 7 deletions
|
@ -43,6 +43,7 @@ const (
|
||||||
defaultCleanupFrequencyHours = 24
|
defaultCleanupFrequencyHours = 24
|
||||||
defaultCleanupArchiveReadDays = 60
|
defaultCleanupArchiveReadDays = 60
|
||||||
defaultCleanupArchiveUnreadDays = 180
|
defaultCleanupArchiveUnreadDays = 180
|
||||||
|
defaultCleanupArchiveBatchSize = 10000
|
||||||
defaultCleanupRemoveSessionsDays = 30
|
defaultCleanupRemoveSessionsDays = 30
|
||||||
defaultProxyImages = "http-only"
|
defaultProxyImages = "http-only"
|
||||||
defaultFetchYouTubeWatchTime = false
|
defaultFetchYouTubeWatchTime = false
|
||||||
|
@ -101,6 +102,7 @@ type Options struct {
|
||||||
cleanupFrequencyHours int
|
cleanupFrequencyHours int
|
||||||
cleanupArchiveReadDays int
|
cleanupArchiveReadDays int
|
||||||
cleanupArchiveUnreadDays int
|
cleanupArchiveUnreadDays int
|
||||||
|
cleanupArchiveBatchSize int
|
||||||
cleanupRemoveSessionsDays int
|
cleanupRemoveSessionsDays int
|
||||||
pollingFrequency int
|
pollingFrequency int
|
||||||
batchSize int
|
batchSize int
|
||||||
|
@ -160,6 +162,7 @@ func NewOptions() *Options {
|
||||||
cleanupFrequencyHours: defaultCleanupFrequencyHours,
|
cleanupFrequencyHours: defaultCleanupFrequencyHours,
|
||||||
cleanupArchiveReadDays: defaultCleanupArchiveReadDays,
|
cleanupArchiveReadDays: defaultCleanupArchiveReadDays,
|
||||||
cleanupArchiveUnreadDays: defaultCleanupArchiveUnreadDays,
|
cleanupArchiveUnreadDays: defaultCleanupArchiveUnreadDays,
|
||||||
|
cleanupArchiveBatchSize: defaultCleanupArchiveBatchSize,
|
||||||
cleanupRemoveSessionsDays: defaultCleanupRemoveSessionsDays,
|
cleanupRemoveSessionsDays: defaultCleanupRemoveSessionsDays,
|
||||||
pollingFrequency: defaultPollingFrequency,
|
pollingFrequency: defaultPollingFrequency,
|
||||||
batchSize: defaultBatchSize,
|
batchSize: defaultBatchSize,
|
||||||
|
@ -293,6 +296,11 @@ func (o *Options) CleanupArchiveUnreadDays() int {
|
||||||
return o.cleanupArchiveUnreadDays
|
return o.cleanupArchiveUnreadDays
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CleanupArchiveBatchSize returns the number of entries to archive for each interval.
|
||||||
|
func (o *Options) CleanupArchiveBatchSize() int {
|
||||||
|
return o.cleanupArchiveBatchSize
|
||||||
|
}
|
||||||
|
|
||||||
// CleanupRemoveSessionsDays returns the number of days after which to remove sessions.
|
// CleanupRemoveSessionsDays returns the number of days after which to remove sessions.
|
||||||
func (o *Options) CleanupRemoveSessionsDays() int {
|
func (o *Options) CleanupRemoveSessionsDays() int {
|
||||||
return o.cleanupRemoveSessionsDays
|
return o.cleanupRemoveSessionsDays
|
||||||
|
@ -488,6 +496,7 @@ func (o *Options) SortedOptions() []*Option {
|
||||||
"CERT_FILE": o.certFile,
|
"CERT_FILE": o.certFile,
|
||||||
"CLEANUP_ARCHIVE_READ_DAYS": o.cleanupArchiveReadDays,
|
"CLEANUP_ARCHIVE_READ_DAYS": o.cleanupArchiveReadDays,
|
||||||
"CLEANUP_ARCHIVE_UNREAD_DAYS": o.cleanupArchiveUnreadDays,
|
"CLEANUP_ARCHIVE_UNREAD_DAYS": o.cleanupArchiveUnreadDays,
|
||||||
|
"CLEANUP_ARCHIVE_BATCH_SIZE": o.cleanupArchiveBatchSize,
|
||||||
"CLEANUP_FREQUENCY_HOURS": o.cleanupFrequencyHours,
|
"CLEANUP_FREQUENCY_HOURS": o.cleanupFrequencyHours,
|
||||||
"CLEANUP_REMOVE_SESSIONS_DAYS": o.cleanupRemoveSessionsDays,
|
"CLEANUP_REMOVE_SESSIONS_DAYS": o.cleanupRemoveSessionsDays,
|
||||||
"CREATE_ADMIN": o.createAdmin,
|
"CREATE_ADMIN": o.createAdmin,
|
||||||
|
|
|
@ -119,6 +119,8 @@ func (p *Parser) parseLines(lines []string) (err error) {
|
||||||
p.opts.cleanupArchiveReadDays = parseInt(value, defaultCleanupArchiveReadDays)
|
p.opts.cleanupArchiveReadDays = parseInt(value, defaultCleanupArchiveReadDays)
|
||||||
case "CLEANUP_ARCHIVE_UNREAD_DAYS":
|
case "CLEANUP_ARCHIVE_UNREAD_DAYS":
|
||||||
p.opts.cleanupArchiveUnreadDays = parseInt(value, defaultCleanupArchiveUnreadDays)
|
p.opts.cleanupArchiveUnreadDays = parseInt(value, defaultCleanupArchiveUnreadDays)
|
||||||
|
case "CLEANUP_ARCHIVE_BATCH_SIZE":
|
||||||
|
p.opts.cleanupArchiveBatchSize = parseInt(value, defaultCleanupArchiveBatchSize)
|
||||||
case "CLEANUP_REMOVE_SESSIONS_DAYS":
|
case "CLEANUP_REMOVE_SESSIONS_DAYS":
|
||||||
p.opts.cleanupRemoveSessionsDays = parseInt(value, defaultCleanupRemoveSessionsDays)
|
p.opts.cleanupRemoveSessionsDays = parseInt(value, defaultCleanupRemoveSessionsDays)
|
||||||
case "WORKER_POOL_SIZE":
|
case "WORKER_POOL_SIZE":
|
||||||
|
|
|
@ -223,6 +223,11 @@ Set to -1 to keep all unread entries.
|
||||||
.br
|
.br
|
||||||
Default is 180 days\&.
|
Default is 180 days\&.
|
||||||
.TP
|
.TP
|
||||||
|
.B CLEANUP_ARCHIVE_BATCH_SIZE
|
||||||
|
Number of entries to archive for each job interval\&.
|
||||||
|
.br
|
||||||
|
Default is 10000 entries\&.
|
||||||
|
.TP
|
||||||
.B CLEANUP_REMOVE_SESSIONS_DAYS
|
.B CLEANUP_REMOVE_SESSIONS_DAYS
|
||||||
Number of days after removing old sessions from the database\&.
|
Number of days after removing old sessions from the database\&.
|
||||||
.br
|
.br
|
||||||
|
|
|
@ -31,6 +31,7 @@ func Serve(store *storage.Storage, pool *worker.Pool) {
|
||||||
config.Opts.CleanupFrequencyHours(),
|
config.Opts.CleanupFrequencyHours(),
|
||||||
config.Opts.CleanupArchiveReadDays(),
|
config.Opts.CleanupArchiveReadDays(),
|
||||||
config.Opts.CleanupArchiveUnreadDays(),
|
config.Opts.CleanupArchiveUnreadDays(),
|
||||||
|
config.Opts.CleanupArchiveBatchSize(),
|
||||||
config.Opts.CleanupRemoveSessionsDays(),
|
config.Opts.CleanupRemoveSessionsDays(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -47,14 +48,14 @@ func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func cleanupScheduler(store *storage.Storage, frequency, archiveReadDays, archiveUnreadDays, sessionsDays int) {
|
func cleanupScheduler(store *storage.Storage, frequency, archiveReadDays, archiveUnreadDays, archiveBatchSize, sessionsDays int) {
|
||||||
for range time.Tick(time.Duration(frequency) * time.Hour) {
|
for range time.Tick(time.Duration(frequency) * time.Hour) {
|
||||||
nbSessions := store.CleanOldSessions(sessionsDays)
|
nbSessions := store.CleanOldSessions(sessionsDays)
|
||||||
nbUserSessions := store.CleanOldUserSessions(sessionsDays)
|
nbUserSessions := store.CleanOldUserSessions(sessionsDays)
|
||||||
logger.Info("[Scheduler:Cleanup] Cleaned %d sessions and %d user sessions", nbSessions, nbUserSessions)
|
logger.Info("[Scheduler:Cleanup] Cleaned %d sessions and %d user sessions", nbSessions, nbUserSessions)
|
||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
if rowsAffected, err := store.ArchiveEntries(model.EntryStatusRead, archiveReadDays); err != nil {
|
if rowsAffected, err := store.ArchiveEntries(model.EntryStatusRead, archiveReadDays, archiveBatchSize); err != nil {
|
||||||
logger.Error("[Scheduler:ArchiveReadEntries] %v", err)
|
logger.Error("[Scheduler:ArchiveReadEntries] %v", err)
|
||||||
} else {
|
} else {
|
||||||
logger.Info("[Scheduler:ArchiveReadEntries] %d entries changed", rowsAffected)
|
logger.Info("[Scheduler:ArchiveReadEntries] %d entries changed", rowsAffected)
|
||||||
|
@ -65,7 +66,7 @@ func cleanupScheduler(store *storage.Storage, frequency, archiveReadDays, archiv
|
||||||
}
|
}
|
||||||
|
|
||||||
startTime = time.Now()
|
startTime = time.Now()
|
||||||
if rowsAffected, err := store.ArchiveEntries(model.EntryStatusUnread, archiveUnreadDays); err != nil {
|
if rowsAffected, err := store.ArchiveEntries(model.EntryStatusUnread, archiveUnreadDays, archiveBatchSize); err != nil {
|
||||||
logger.Error("[Scheduler:ArchiveUnreadEntries] %v", err)
|
logger.Error("[Scheduler:ArchiveUnreadEntries] %v", err)
|
||||||
} else {
|
} else {
|
||||||
logger.Info("[Scheduler:ArchiveUnreadEntries] %d entries changed", rowsAffected)
|
logger.Info("[Scheduler:ArchiveUnreadEntries] %d entries changed", rowsAffected)
|
||||||
|
|
|
@ -299,8 +299,8 @@ func (s *Storage) RefreshFeedEntries(userID, feedID int64, entries model.Entries
|
||||||
}
|
}
|
||||||
|
|
||||||
// ArchiveEntries changes the status of entries to "removed" after the given number of days.
|
// ArchiveEntries changes the status of entries to "removed" after the given number of days.
|
||||||
func (s *Storage) ArchiveEntries(status string, days int) (int64, error) {
|
func (s *Storage) ArchiveEntries(status string, days, limit int) (int64, error) {
|
||||||
if days < 0 {
|
if days < 0 || limit <= 0 {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -310,10 +310,10 @@ func (s *Storage) ArchiveEntries(status string, days int) (int64, error) {
|
||||||
SET
|
SET
|
||||||
status='removed'
|
status='removed'
|
||||||
WHERE
|
WHERE
|
||||||
id=ANY(SELECT id FROM entries WHERE status=$1 AND starred is false AND share_code='' AND created_at < now () - '%d days'::interval ORDER BY created_at ASC LIMIT 5000)
|
id=ANY(SELECT id FROM entries WHERE status=$1 AND starred is false AND share_code='' AND created_at < now () - '%d days'::interval ORDER BY created_at ASC LIMIT %d)
|
||||||
`
|
`
|
||||||
|
|
||||||
result, err := s.db.Exec(fmt.Sprintf(query, days), status)
|
result, err := s.db.Exec(fmt.Sprintf(query, days, limit), status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf(`store: unable to archive %s entries: %v`, status, err)
|
return 0, fmt.Errorf(`store: unable to archive %s entries: %v`, status, err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue