summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorsyeopite <syeopite@syeopite.dev>2025-02-26 13:55:25 -0800
committersyeopite <syeopite@syeopite.dev>2025-02-26 13:55:25 -0800
commit4ea4878d1a17bc90223b9fb8449858ba3a9a1125 (patch)
tree0252dfa19b265f0e21b73d195cfdf9d9048fe559 /src
parent1f0a89fb5ff89b3aeab686bdff656809beb8ed4c (diff)
parent3850739d7f4cd8be4f053fb1cb6775066f225939 (diff)
downloadinvidious-4ea4878d1a17bc90223b9fb8449858ba3a9a1125.tar.gz
invidious-4ea4878d1a17bc90223b9fb8449858ba3a9a1125.tar.bz2
invidious-4ea4878d1a17bc90223b9fb8449858ba3a9a1125.zip
User: Batch notifications together
Diffstat (limited to 'src')
-rw-r--r--src/invidious.cr5
-rw-r--r--src/invidious/channels/channels.cr12
-rw-r--r--src/invidious/database/users.cr10
-rw-r--r--src/invidious/jobs/notification_job.cr90
-rw-r--r--src/invidious/routes/feeds.cr16
5 files changed, 100 insertions, 33 deletions
diff --git a/src/invidious.cr b/src/invidious.cr
index b422dcbb..566d4dc9 100644
--- a/src/invidious.cr
+++ b/src/invidious.cr
@@ -192,8 +192,9 @@ if CONFIG.popular_enabled
Invidious::Jobs.register Invidious::Jobs::PullPopularVideosJob.new(PG_DB)
end
-CONNECTION_CHANNEL = ::Channel({Bool, ::Channel(PQ::Notification)}).new(32)
-Invidious::Jobs.register Invidious::Jobs::NotificationJob.new(CONNECTION_CHANNEL, CONFIG.database_url)
+NOTIFICATION_CHANNEL = ::Channel(VideoNotification).new(32)
+CONNECTION_CHANNEL = ::Channel({Bool, ::Channel(PQ::Notification)}).new(32)
+Invidious::Jobs.register Invidious::Jobs::NotificationJob.new(NOTIFICATION_CHANNEL, CONNECTION_CHANNEL, CONFIG.database_url)
Invidious::Jobs.register Invidious::Jobs::ClearExpiredItemsJob.new
diff --git a/src/invidious/channels/channels.cr b/src/invidious/channels/channels.cr
index 1478c8fc..65982325 100644
--- a/src/invidious/channels/channels.cr
+++ b/src/invidious/channels/channels.cr
@@ -249,11 +249,7 @@ def fetch_channel(ucid, pull_all_videos : Bool)
if was_insert
LOGGER.trace("fetch_channel: #{ucid} : video #{video_id} : Inserted, updating subscriptions")
- if CONFIG.enable_user_notifications
- Invidious::Database::Users.add_notification(video)
- else
- Invidious::Database::Users.feed_needs_update(video)
- end
+ NOTIFICATION_CHANNEL.send(VideoNotification.from_video(video))
else
LOGGER.trace("fetch_channel: #{ucid} : video #{video_id} : Updated")
end
@@ -285,11 +281,7 @@ def fetch_channel(ucid, pull_all_videos : Bool)
if Time.utc - video.published > 1.minute
was_insert = Invidious::Database::ChannelVideos.insert(video)
if was_insert
- if CONFIG.enable_user_notifications
- Invidious::Database::Users.add_notification(video)
- else
- Invidious::Database::Users.feed_needs_update(video)
- end
+ NOTIFICATION_CHANNEL.send(VideoNotification.from_video(video))
end
end
end
diff --git a/src/invidious/database/users.cr b/src/invidious/database/users.cr
index d54e6a76..4a3056ea 100644
--- a/src/invidious/database/users.cr
+++ b/src/invidious/database/users.cr
@@ -119,15 +119,15 @@ module Invidious::Database::Users
# Update (notifs)
# -------------------
- def add_notification(video : ChannelVideo)
+ def add_multiple_notifications(channel_id : String, video_ids : Array(String))
request = <<-SQL
UPDATE users
- SET notifications = array_append(notifications, $1),
+ SET notifications = array_cat(notifications, $1),
feed_needs_update = true
WHERE $2 = ANY(subscriptions)
SQL
- PG_DB.exec(request, video.id, video.ucid)
+ PG_DB.exec(request, video_ids, channel_id)
end
def remove_notification(user : User, vid : String)
@@ -154,14 +154,14 @@ module Invidious::Database::Users
# Update (misc)
# -------------------
- def feed_needs_update(video : ChannelVideo)
+ def feed_needs_update(channel_id : String)
request = <<-SQL
UPDATE users
SET feed_needs_update = true
WHERE $1 = ANY(subscriptions)
SQL
- PG_DB.exec(request, video.ucid)
+ PG_DB.exec(request, channel_id)
end
def update_preferences(user : User)
diff --git a/src/invidious/jobs/notification_job.cr b/src/invidious/jobs/notification_job.cr
index b445107b..f2c9d4be 100644
--- a/src/invidious/jobs/notification_job.cr
+++ b/src/invidious/jobs/notification_job.cr
@@ -1,8 +1,32 @@
+struct VideoNotification
+ getter video_id : String
+ getter channel_id : String
+ getter published : Time
+
+ def_hash @channel_id, @video_id
+
+ def ==(other)
+ video_id == other.video_id
+ end
+
+ def self.from_video(video : ChannelVideo) : self
+ VideoNotification.new(video.id, video.ucid, video.published)
+ end
+
+ def initialize(@video_id, @channel_id, @published)
+ end
+
+ def clone : VideoNotification
+ VideoNotification.new(video_id.clone, channel_id.clone, published.clone)
+ end
+end
+
class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
+ private getter notification_channel : ::Channel(VideoNotification)
private getter connection_channel : ::Channel({Bool, ::Channel(PQ::Notification)})
private getter pg_url : URI
- def initialize(@connection_channel, @pg_url)
+ def initialize(@notification_channel, @connection_channel, @pg_url)
end
def begin
@@ -10,6 +34,70 @@ class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) }
+ # hash of channels to their videos (id+published) that need notifying
+ to_notify = Hash(String, Set(VideoNotification)).new(
+ ->(hash : Hash(String, Set(VideoNotification)), key : String) {
+ hash[key] = Set(VideoNotification).new
+ }
+ )
+ notify_mutex = Mutex.new()
+
+ # fiber to locally cache all incoming notifications (from pubsub webhooks and refresh channels job)
+ spawn do
+ begin
+ loop do
+ notification = notification_channel.receive
+ notify_mutex.synchronize do
+ to_notify[notification.channel_id] << notification
+ end
+ end
+ end
+ end
+ # fiber to regularly persist all cached notifications
+ spawn do
+ loop do
+ begin
+ LOGGER.debug("NotificationJob: waking up")
+ cloned = {} of String => Set(VideoNotification)
+ notify_mutex.synchronize do
+ cloned = to_notify.clone
+ to_notify.clear
+ end
+
+ cloned.each do |channel_id, notifications|
+ if notifications.empty?
+ next
+ end
+
+ LOGGER.info("NotificationJob: updating channel #{channel_id} with #{notifications.size} notifications")
+ if CONFIG.enable_user_notifications
+ video_ids = notifications.map { |n| n.video_id }
+ Invidious::Database::Users.add_multiple_notifications(channel_id, video_ids)
+ PG_DB.using_connection do |conn|
+ notifications.each do |n|
+ # Deliver notifications to `/api/v1/auth/notifications`
+ payload = {
+ "topic" => n.channel_id,
+ "videoId" => n.video_id,
+ "published" => n.published.to_unix,
+ }.to_json
+ conn.exec("NOTIFY notifications, E'#{payload}'")
+ end
+ end
+ else
+ Invidious::Database::Users.feed_needs_update(channel_id)
+ end
+ end
+
+ LOGGER.trace("NotificationJob: Done, sleeping")
+ rescue ex
+ LOGGER.error("NotificationJob: #{ex.message}")
+ end
+ sleep 1.minute
+ Fiber.yield
+ end
+ end
+
loop do
action, connection = connection_channel.receive
diff --git a/src/invidious/routes/feeds.cr b/src/invidious/routes/feeds.cr
index fa5065fb..7f9a0edb 100644
--- a/src/invidious/routes/feeds.cr
+++ b/src/invidious/routes/feeds.cr
@@ -420,16 +420,6 @@ module Invidious::Routes::Feeds
next # skip this video since it raised an exception (e.g. it is a scheduled live event)
end
- if CONFIG.enable_user_notifications
- # Deliver notifications to `/api/v1/auth/notifications`
- payload = {
- "topic" => video.ucid,
- "videoId" => video.id,
- "published" => published.to_unix,
- }.to_json
- PG_DB.exec("NOTIFY notifications, E'#{payload}'")
- end
-
video = ChannelVideo.new({
id: id,
title: video.title,
@@ -445,11 +435,7 @@ module Invidious::Routes::Feeds
was_insert = Invidious::Database::ChannelVideos.insert(video, with_premiere_timestamp: true)
if was_insert
- if CONFIG.enable_user_notifications
- Invidious::Database::Users.add_notification(video)
- else
- Invidious::Database::Users.feed_needs_update(video)
- end
+ NOTIFICATION_CHANNEL.send(VideoNotification.from_video(video))
end
end
end