miniflux/reader/processor/processor.go
Davide Masserut 034e46700c Process older entries first
Feed entries are usually ordered from most to least recent.

Processing older entries first ensures that their creation timestamp
is lower than that of newer entries.

This is useful when we order by creation, because then we get a
consistent timeline.
2023-03-25 16:19:07 -07:00

301 lines
8.4 KiB
Go

// Copyright 2018 Frédéric Guillot. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package processor
import (
"errors"
"fmt"
"math"
"regexp"
"strconv"
"strings"
"time"
"unicode/utf8"
"miniflux.app/integration"
"miniflux.app/config"
"miniflux.app/http/client"
"miniflux.app/logger"
"miniflux.app/metric"
"miniflux.app/model"
"miniflux.app/reader/browser"
"miniflux.app/reader/rewrite"
"miniflux.app/reader/sanitizer"
"miniflux.app/reader/scraper"
"miniflux.app/storage"
"github.com/PuerkitoBio/goquery"
"github.com/rylans/getlang"
)
var (
youtubeRegex = regexp.MustCompile(`youtube\.com/watch\?v=(.*)`)
iso8601Regex = regexp.MustCompile(`^P((?P<year>\d+)Y)?((?P<month>\d+)M)?((?P<week>\d+)W)?((?P<day>\d+)D)?(T((?P<hour>\d+)H)?((?P<minute>\d+)M)?((?P<second>\d+)S)?)?$`)
customReplaceRuleRegex = regexp.MustCompile(`rewrite\("(.*)"\|"(.*)"\)`)
)
// ProcessFeedEntries downloads original web page for entries and apply filters.
func ProcessFeedEntries(store *storage.Storage, feed *model.Feed, user *model.User) {
var filteredEntries model.Entries
// array used for bulk push
entriesToPush := model.Entries{}
// Process older entries first
for i := len(feed.Entries) - 1; i >= 0; i-- {
entry := feed.Entries[i]
logger.Debug("[Processor] Processing entry %q from feed %q", entry.URL, feed.FeedURL)
if isBlockedEntry(feed, entry) || !isAllowedEntry(feed, entry) {
continue
}
url := getUrlFromEntry(feed, entry)
entryIsNew := !store.EntryURLExists(feed.ID, entry.URL)
if feed.Crawler && entryIsNew {
logger.Debug("[Processor] Crawling entry %q from feed %q", url, feed.FeedURL)
startTime := time.Now()
content, scraperErr := scraper.Fetch(
url,
feed.ScraperRules,
feed.UserAgent,
feed.Cookie,
feed.AllowSelfSignedCertificates,
feed.FetchViaProxy,
)
if config.Opts.HasMetricsCollector() {
status := "success"
if scraperErr != nil {
status = "error"
}
metric.ScraperRequestDuration.WithLabelValues(status).Observe(time.Since(startTime).Seconds())
}
if scraperErr != nil {
logger.Error(`[Processor] Unable to crawl this entry: %q => %v`, entry.URL, scraperErr)
} else if content != "" {
// We replace the entry content only if the scraper doesn't return any error.
entry.Content = content
}
}
entry.Content = rewrite.Rewriter(url, entry.Content, feed.RewriteRules)
// The sanitizer should always run at the end of the process to make sure unsafe HTML is filtered.
entry.Content = sanitizer.Sanitize(url, entry.Content)
if entryIsNew {
intg, err := store.Integration(feed.UserID)
if err != nil {
logger.Error("[Processor] Get integrations for user %d failed: %v; the refresh process will go on, but no integrations will run this time.", feed.UserID, err)
} else if intg != nil {
localEntry := entry
go func() {
integration.PushEntry(localEntry, intg)
}()
entriesToPush = append(entriesToPush, localEntry)
}
}
updateEntryReadingTime(store, feed, entry, entryIsNew, user)
filteredEntries = append(filteredEntries, entry)
}
intg, err := store.Integration(feed.UserID)
if err != nil {
logger.Error("[Processor] Get integrations for user %d failed: %v; the refresh process will go on, but no integrations will run this time.", feed.UserID, err)
} else if intg != nil && len(entriesToPush) > 0 {
go func() {
integration.PushEntries(entriesToPush, intg)
}()
}
feed.Entries = filteredEntries
}
func isBlockedEntry(feed *model.Feed, entry *model.Entry) bool {
if feed.BlocklistRules != "" {
match, _ := regexp.MatchString(feed.BlocklistRules, entry.Title)
if match {
logger.Debug("[Processor] Blocking entry %q from feed %q based on rule %q", entry.Title, feed.FeedURL, feed.BlocklistRules)
return true
}
}
return false
}
func isAllowedEntry(feed *model.Feed, entry *model.Entry) bool {
if feed.KeeplistRules != "" {
match, _ := regexp.MatchString(feed.KeeplistRules, entry.Title)
if match {
logger.Debug("[Processor] Allow entry %q from feed %q based on rule %q", entry.Title, feed.FeedURL, feed.KeeplistRules)
return true
}
return false
}
return true
}
// ProcessEntryWebPage downloads the entry web page and apply rewrite rules.
func ProcessEntryWebPage(feed *model.Feed, entry *model.Entry, user *model.User) error {
startTime := time.Now()
url := getUrlFromEntry(feed, entry)
content, scraperErr := scraper.Fetch(
url,
entry.Feed.ScraperRules,
entry.Feed.UserAgent,
entry.Feed.Cookie,
feed.AllowSelfSignedCertificates,
feed.FetchViaProxy,
)
if config.Opts.HasMetricsCollector() {
status := "success"
if scraperErr != nil {
status = "error"
}
metric.ScraperRequestDuration.WithLabelValues(status).Observe(time.Since(startTime).Seconds())
}
if scraperErr != nil {
return scraperErr
}
content = rewrite.Rewriter(url, content, entry.Feed.RewriteRules)
content = sanitizer.Sanitize(url, content)
if content != "" {
entry.Content = content
entry.ReadingTime = calculateReadingTime(content, user)
}
return nil
}
func getUrlFromEntry(feed *model.Feed, entry *model.Entry) string {
var url = entry.URL
if feed.UrlRewriteRules != "" {
parts := customReplaceRuleRegex.FindStringSubmatch(feed.UrlRewriteRules)
if len(parts) >= 3 {
re := regexp.MustCompile(parts[1])
url = re.ReplaceAllString(entry.URL, parts[2])
logger.Debug(`[Processor] Rewriting entry URL %s to %s`, entry.URL, url)
} else {
logger.Debug("[Processor] Cannot find search and replace terms for replace rule %s", feed.UrlRewriteRules)
}
}
return url
}
func updateEntryReadingTime(store *storage.Storage, feed *model.Feed, entry *model.Entry, entryIsNew bool, user *model.User) {
if shouldFetchYouTubeWatchTime(entry) {
if entryIsNew {
watchTime, err := fetchYouTubeWatchTime(entry.URL)
if err != nil {
logger.Error("[Processor] Unable to fetch YouTube watch time: %q => %v", entry.URL, err)
}
entry.ReadingTime = watchTime
} else {
entry.ReadingTime = store.GetReadTime(entry, feed)
}
}
// Handle YT error case and non-YT entries.
if entry.ReadingTime == 0 {
entry.ReadingTime = calculateReadingTime(entry.Content, user)
}
}
func shouldFetchYouTubeWatchTime(entry *model.Entry) bool {
if !config.Opts.FetchYouTubeWatchTime() {
return false
}
matches := youtubeRegex.FindStringSubmatch(entry.URL)
urlMatchesYouTubePattern := len(matches) == 2
return urlMatchesYouTubePattern
}
func fetchYouTubeWatchTime(url string) (int, error) {
clt := client.NewClientWithConfig(url, config.Opts)
response, browserErr := browser.Exec(clt)
if browserErr != nil {
return 0, browserErr
}
doc, docErr := goquery.NewDocumentFromReader(response.Body)
if docErr != nil {
return 0, docErr
}
durs, exists := doc.Find(`meta[itemprop="duration"]`).First().Attr("content")
if !exists {
return 0, errors.New("duration has not found")
}
dur, err := parseISO8601(durs)
if err != nil {
return 0, fmt.Errorf("unable to parse duration %s: %v", durs, err)
}
return int(dur.Minutes()), nil
}
// parseISO8601 parses an ISO 8601 duration string.
func parseISO8601(from string) (time.Duration, error) {
var match []string
var d time.Duration
if iso8601Regex.MatchString(from) {
match = iso8601Regex.FindStringSubmatch(from)
} else {
return 0, errors.New("could not parse duration string")
}
for i, name := range iso8601Regex.SubexpNames() {
part := match[i]
if i == 0 || name == "" || part == "" {
continue
}
val, err := strconv.ParseInt(part, 10, 64)
if err != nil {
return 0, err
}
switch name {
case "hour":
d = d + (time.Duration(val) * time.Hour)
case "minute":
d = d + (time.Duration(val) * time.Minute)
case "second":
d = d + (time.Duration(val) * time.Second)
default:
return 0, fmt.Errorf("unknown field %s", name)
}
}
return d, nil
}
func calculateReadingTime(content string, user *model.User) int {
sanitizedContent := sanitizer.StripTags(content)
languageInfo := getlang.FromString(sanitizedContent)
var timeToReadInt int
if languageInfo.LanguageCode() == "ko" || languageInfo.LanguageCode() == "zh" || languageInfo.LanguageCode() == "jp" {
timeToReadInt = int(math.Ceil(float64(utf8.RuneCountInString(sanitizedContent)) / float64(user.CJKReadingSpeed)))
} else {
nbOfWords := len(strings.Fields(sanitizedContent))
timeToReadInt = int(math.Ceil(float64(nbOfWords) / float64(user.DefaultReadingSpeed)))
}
return timeToReadInt
}