summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/invidious/jobs/notification_job.cr36
1 files changed, 24 insertions, 12 deletions
diff --git a/src/invidious/jobs/notification_job.cr b/src/invidious/jobs/notification_job.cr
index b70e9ef4..f2c9d4be 100644
--- a/src/invidious/jobs/notification_job.cr
+++ b/src/invidious/jobs/notification_job.cr
@@ -35,14 +35,21 @@ class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) }
# hash of channels to their videos (id+published) that need notifying
- to_notify = Hash(String, Set(VideoNotification)).new(->(hash : Hash(String, Set(VideoNotification)), key : String) { hash[key] = Set(VideoNotification).new })
+ to_notify = Hash(String, Set(VideoNotification)).new(
+ ->(hash : Hash(String, Set(VideoNotification)), key : String) {
+ hash[key] = Set(VideoNotification).new
+ }
+ )
+ notify_mutex = Mutex.new()
# fiber to locally cache all incoming notifications (from pubsub webhooks and refresh channels job)
spawn do
begin
loop do
notification = notification_channel.receive
- to_notify[notification.channel_id] << notification
+ notify_mutex.synchronize do
+ to_notify[notification.channel_id] << notification
+ end
end
end
end
@@ -51,8 +58,11 @@ class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
loop do
begin
LOGGER.debug("NotificationJob: waking up")
- cloned = to_notify.clone
- to_notify.clear
+ cloned = {} of String => Set(VideoNotification)
+ notify_mutex.synchronize do
+ cloned = to_notify.clone
+ to_notify.clear
+ end
cloned.each do |channel_id, notifications|
if notifications.empty?
@@ -63,14 +73,16 @@ class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
if CONFIG.enable_user_notifications
video_ids = notifications.map { |n| n.video_id }
Invidious::Database::Users.add_multiple_notifications(channel_id, video_ids)
- notifications.each do |n|
- # Deliver notifications to `/api/v1/auth/notifications`
- payload = {
- "topic" => n.channel_id,
- "videoId" => n.video_id,
- "published" => n.published.to_unix,
- }.to_json
- PG_DB.exec("NOTIFY notifications, E'#{payload}'")
+ PG_DB.using_connection do |conn|
+ notifications.each do |n|
+ # Deliver notifications to `/api/v1/auth/notifications`
+ payload = {
+ "topic" => n.channel_id,
+ "videoId" => n.video_id,
+ "published" => n.published.to_unix,
+ }.to_json
+ conn.exec("NOTIFY notifications, E'#{payload}'")
+ end
end
else
Invidious::Database::Users.feed_needs_update(channel_id)