summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorOmar Roth <omarroth@protonmail.com>2019-06-01 10:19:18 -0500
committerOmar Roth <omarroth@protonmail.com>2019-06-01 10:19:18 -0500
commit18d66dddedb4c6c14975a0ab47377258fea59e2e (patch)
tree85a9e7928ae8c08e1daa9bbf432132f3a4b6ed73 /src
parent701b5ea56134f2c8d8e8f54036b9f6467095b47b (diff)
downloadinvidious-18d66dddedb4c6c14975a0ab47377258fea59e2e.tar.gz
invidious-18d66dddedb4c6c14975a0ab47377258fea59e2e.tar.bz2
invidious-18d66dddedb4c6c14975a0ab47377258fea59e2e.zip
Add 'needs_update' column for scheduling feed refresh
Diffstat (limited to 'src')
-rw-r--r--src/invidious.cr60
-rw-r--r--src/invidious/channels.cr29
-rw-r--r--src/invidious/helpers/helpers.cr1
-rw-r--r--src/invidious/helpers/jobs.cr66
-rw-r--r--src/invidious/users.cr11
5 files changed, 42 insertions, 125 deletions
diff --git a/src/invidious.cr b/src/invidious.cr
index de134947..822f7b85 100644
--- a/src/invidious.cr
+++ b/src/invidious.cr
@@ -1710,18 +1710,12 @@ post "/subscription_ajax" do |env|
when .starts_with? "action_create"
if !user.subscriptions.includes? channel_id
get_channel(channel_id, PG_DB, false, false)
- PG_DB.exec("UPDATE users SET subscriptions = array_append(subscriptions, $1) WHERE email = $2", channel_id, email)
+ PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = array_append(subscriptions, $1) WHERE email = $2", channel_id, email)
end
when .starts_with? "action_remove"
- PG_DB.exec("UPDATE users SET subscriptions = array_remove(subscriptions, $1) WHERE email = $2", channel_id, email)
+ PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = array_remove(subscriptions, $1) WHERE email = $2", channel_id, email)
end
- payload = {
- "email" => user.email,
- "action" => "refresh",
- }.to_json
- PG_DB.exec("NOTIFY feeds, E'#{payload}'")
-
if redirect
env.redirect referer
else
@@ -1884,7 +1878,7 @@ post "/data_control" do |env|
user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false)
- PG_DB.exec("UPDATE users SET subscriptions = $1 WHERE email = $2", user.subscriptions, user.email)
+ PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email)
end
if body["watch_history"]?
@@ -1906,7 +1900,7 @@ post "/data_control" do |env|
user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false)
- PG_DB.exec("UPDATE users SET subscriptions = $1 WHERE email = $2", user.subscriptions, user.email)
+ PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email)
when "import_freetube"
user.subscriptions += body.scan(/"channelId":"(?<channel_id>[a-zA-Z0-9_-]{24})"/).map do |md|
md["channel_id"]
@@ -1915,7 +1909,7 @@ post "/data_control" do |env|
user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false)
- PG_DB.exec("UPDATE users SET subscriptions = $1 WHERE email = $2", user.subscriptions, user.email)
+ PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email)
when "import_newpipe_subscriptions"
body = JSON.parse(body)
user.subscriptions += body["subscriptions"].as_a.compact_map do |channel|
@@ -1939,7 +1933,7 @@ post "/data_control" do |env|
user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false)
- PG_DB.exec("UPDATE users SET subscriptions = $1 WHERE email = $2", user.subscriptions, user.email)
+ PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email)
when "import_newpipe"
Zip::Reader.open(IO::Memory.new(body)) do |file|
file.each_entry do |entry|
@@ -1958,7 +1952,7 @@ post "/data_control" do |env|
user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false)
- PG_DB.exec("UPDATE users SET subscriptions = $1 WHERE email = $2", user.subscriptions, user.email)
+ PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email)
db.close
tempfile.delete
@@ -1967,12 +1961,6 @@ post "/data_control" do |env|
end
end
end
-
- payload = {
- "email" => user.email,
- "action" => "refresh",
- }.to_json
- PG_DB.exec("NOTIFY feeds, E'#{payload}'")
end
env.redirect referer
@@ -2874,7 +2862,7 @@ post "/feed/webhook/:token" do |env|
views: video.views,
)
- users = PG_DB.query_all("UPDATE users SET notifications = notifications || $1 \
+ emails = PG_DB.query_all("UPDATE users SET notifications = notifications || $1 \
WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email",
video.id, video.published, video.ucid, as: String)
@@ -2886,13 +2874,14 @@ post "/feed/webhook/:token" do |env|
updated = $4, ucid = $5, author = $6, length_seconds = $7, \
live_now = $8, premiere_timestamp = $9, views = $10", video_array)
- users.each do |user|
- payload = {
- "email" => user,
- "action" => "refresh",
- }.to_json
- PG_DB.exec("NOTIFY feeds, E'#{payload}'")
+ # Update all users affected by insert
+ if emails.empty?
+ values = "'{}'"
+ else
+ values = "VALUES #{emails.map { |id| %(('#{id}')) }.join(",")}"
end
+
+ PG_DB.exec("UPDATE users SET feed_needs_update = true WHERE email = ANY($1)", emails)
end
end
@@ -4490,7 +4479,6 @@ post "/api/v1/auth/preferences" do |env|
PG_DB.exec("UPDATE users SET preferences = $1 WHERE email = $2", preferences.to_json, user.email)
env.response.status_code = 204
- ""
end
get "/api/v1/auth/subscriptions" do |env|
@@ -4525,13 +4513,7 @@ post "/api/v1/auth/subscriptions/:ucid" do |env|
if !user.subscriptions.includes? ucid
get_channel(ucid, PG_DB, false, false)
- PG_DB.exec("UPDATE users SET subscriptions = array_append(subscriptions,$1) WHERE email = $2", ucid, user.email)
-
- payload = {
- "email" => user.email,
- "action" => "refresh",
- }.to_json
- PG_DB.exec("NOTIFY feeds, E'#{payload}'")
+ PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = array_append(subscriptions,$1) WHERE email = $2", ucid, user.email)
end
# For Google accounts, access tokens don't have enough information to
@@ -4539,7 +4521,6 @@ post "/api/v1/auth/subscriptions/:ucid" do |env|
# YouTube.
env.response.status_code = 204
- ""
end
delete "/api/v1/auth/subscriptions/:ucid" do |env|
@@ -4548,15 +4529,9 @@ delete "/api/v1/auth/subscriptions/:ucid" do |env|
ucid = env.params.url["ucid"]
- PG_DB.exec("UPDATE users SET subscriptions = array_remove(subscriptions, $1) WHERE email = $2", ucid, user.email)
- payload = {
- "email" => user.email,
- "action" => "refresh",
- }.to_json
- PG_DB.exec("NOTIFY feeds, E'#{payload}'")
+ PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = array_remove(subscriptions, $1) WHERE email = $2", ucid, user.email)
env.response.status_code = 204
- ""
end
get "/api/v1/auth/tokens" do |env|
@@ -4663,7 +4638,6 @@ post "/api/v1/auth/tokens/unregister" do |env|
end
env.response.status_code = 204
- ""
end
get "/api/manifest/dash/id/videoplayback" do |env|
diff --git a/src/invidious/channels.cr b/src/invidious/channels.cr
index d0eb7dd3..d33cd9c3 100644
--- a/src/invidious/channels.cr
+++ b/src/invidious/channels.cr
@@ -184,7 +184,7 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil)
views: views,
)
- users = db.query_all("UPDATE users SET notifications = notifications || $1 \
+ emails = db.query_all("UPDATE users SET notifications = notifications || $1 \
WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email",
video.id, video.published, ucid, as: String)
@@ -198,13 +198,14 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil)
updated = $4, ucid = $5, author = $6, length_seconds = $7, \
live_now = $8, views = $10", video_array)
- users.each do |user|
- payload = {
- "email" => user,
- "action" => "refresh",
- }.to_json
- PG_DB.exec("NOTIFY feeds, E'#{payload}'")
+ # Update all users affected by insert
+ if emails.empty?
+ values = "'{}'"
+ else
+ values = "VALUES #{emails.map { |id| %(('#{id}')) }.join(",")}"
end
+
+ db.exec("UPDATE users SET feed_needs_update = true WHERE email = ANY($1)", emails)
end
if pull_all_videos
@@ -252,7 +253,7 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil)
# We are notified of Red videos elsewhere (PubSub), which includes a correct published date,
# so since they don't provide a published date here we can safely ignore them.
if Time.now - video.published > 1.minute
- users = db.query_all("UPDATE users SET notifications = notifications || $1 \
+ emails = db.query_all("UPDATE users SET notifications = notifications || $1 \
WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email",
video.id, video.published, video.ucid, as: String)
@@ -266,13 +267,13 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil)
live_now = $8, views = $10", video_array)
# Update all users affected by insert
- users.each do |user|
- payload = {
- "email" => user,
- "action" => "refresh",
- }.to_json
- PG_DB.exec("NOTIFY feeds, E'#{payload}'")
+ if emails.empty?
+ values = "'{}'"
+ else
+ values = "VALUES #{emails.map { |id| %(('#{id}')) }.join(",")}"
end
+
+ db.exec("UPDATE users SET feed_needs_update = true WHERE email = ANY($1)", emails)
end
end
diff --git a/src/invidious/helpers/helpers.cr b/src/invidious/helpers/helpers.cr
index 3155cb67..476038c7 100644
--- a/src/invidious/helpers/helpers.cr
+++ b/src/invidious/helpers/helpers.cr
@@ -105,7 +105,6 @@ struct Config
hmac_key: String?, # HMAC signing key for CSRF tokens and verifying pubsub subscriptions
domain: String?, # Domain to be used for links to resources on the site where an absolute URL is required
use_pubsub_feeds: {type: Bool | Int32, default: false}, # Subscribe to channels using PubSubHubbub (requires domain, hmac_key)
- use_feed_events: {type: Bool | Int32, default: false}, # Update feeds on receiving notifications
default_home: {type: String, default: "Top"},
feed_menu: {type: Array(String), default: ["Popular", "Top", "Trending", "Subscriptions"]},
top_enabled: {type: Bool, default: true},
diff --git a/src/invidious/helpers/jobs.cr b/src/invidious/helpers/jobs.cr
index b9f9a86f..1dd81cf5 100644
--- a/src/invidious/helpers/jobs.cr
+++ b/src/invidious/helpers/jobs.cr
@@ -43,66 +43,6 @@ def refresh_channels(db, logger, config)
end
def refresh_feeds(db, logger, config)
- # Spawn thread to handle feed events
- if config.use_feed_events
- case config.use_feed_events
- when Bool
- max_feed_event_threads = config.use_feed_events.as(Bool).to_unsafe
- when Int32
- max_feed_event_threads = config.use_feed_events.as(Int32)
- end
- max_feed_event_channel = Channel(Int32).new
-
- spawn do
- queue = Deque(String).new(30)
- PG.connect_listen(PG_URL, "feeds") do |event|
- if !queue.includes? event.payload
- queue << event.payload
- end
- end
-
- max_threads = max_feed_event_channel.receive
- active_threads = 0
- active_channel = Channel(Bool).new
-
- loop do
- until queue.empty?
- event = queue.shift
-
- if active_threads >= max_threads
- if active_channel.receive
- active_threads -= 1
- end
- end
-
- active_threads += 1
-
- spawn do
- begin
- feed = JSON.parse(event)
- email = feed["email"].as_s
- action = feed["action"].as_s
-
- view_name = "subscriptions_#{sha256(email)}"
-
- case action
- when "refresh"
- db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
- end
- rescue ex
- end
-
- active_channel.send(true)
- end
- end
-
- sleep 5.seconds
- end
- end
-
- max_feed_event_channel.send(max_feed_event_threads.as(Int32))
- end
-
max_channel = Channel(Int32).new
spawn do
max_threads = max_channel.receive
@@ -110,7 +50,7 @@ def refresh_feeds(db, logger, config)
active_channel = Channel(Bool).new
loop do
- db.query("SELECT email FROM users") do |rs|
+ db.query("SELECT email FROM users WHERE feed_needs_update = true OR feed_needs_update IS NULL") do |rs|
rs.each do
email = rs.read(String)
view_name = "subscriptions_#{sha256(email)}"
@@ -135,6 +75,7 @@ def refresh_feeds(db, logger, config)
end
db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
+ db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
rescue ex
# Rename old views
begin
@@ -152,6 +93,7 @@ def refresh_feeds(db, logger, config)
SELECT * FROM channel_videos WHERE \
ucid = ANY ((SELECT subscriptions FROM users WHERE email = E'#{email.gsub("'", "\\'")}')::text[]) \
ORDER BY published DESC;")
+ db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
end
rescue ex
logger.write("REFRESH #{email} : #{ex.message}\n")
@@ -164,7 +106,7 @@ def refresh_feeds(db, logger, config)
end
end
- sleep 1.minute
+ sleep 5.seconds
end
end
diff --git a/src/invidious/users.cr b/src/invidious/users.cr
index 298d6b0d..ceaac9f1 100644
--- a/src/invidious/users.cr
+++ b/src/invidious/users.cr
@@ -20,9 +20,10 @@ struct User
type: Preferences,
converter: PreferencesConverter,
},
- password: String?,
- token: String,
- watched: Array(String),
+ password: String?,
+ token: String,
+ watched: Array(String),
+ feed_needs_update: Bool?,
})
end
@@ -205,7 +206,7 @@ def fetch_user(sid, headers, db)
token = Base64.urlsafe_encode(Random::Secure.random_bytes(32))
- user = User.new(Time.now, [] of String, channels, email, CONFIG.default_user_preferences, nil, token, [] of String)
+ user = User.new(Time.now, [] of String, channels, email, CONFIG.default_user_preferences, nil, token, [] of String, true)
return user, sid
end
@@ -213,7 +214,7 @@ def create_user(sid, email, password)
password = Crypto::Bcrypt::Password.create(password, cost: 10)
token = Base64.urlsafe_encode(Random::Secure.random_bytes(32))
- user = User.new(Time.now, [] of String, [] of String, email, CONFIG.default_user_preferences, password.to_s, token, [] of String)
+ user = User.new(Time.now, [] of String, [] of String, email, CONFIG.default_user_preferences, password.to_s, token, [] of String, true)
return user, sid
end