summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/invidious.cr4
-rw-r--r--src/invidious/helpers/helpers.cr2
-rw-r--r--src/invidious/helpers/jobs.cr78
3 files changed, 54 insertions, 30 deletions
diff --git a/src/invidious.cr b/src/invidious.cr
index ad4401a7..f487b0e9 100644
--- a/src/invidious.cr
+++ b/src/invidious.cr
@@ -129,8 +129,8 @@ end
# Start jobs
-refresh_channels(PG_DB, logger, config.channel_threads, config.full_refresh)
-refresh_feeds(PG_DB, logger, config.feed_threads, config.use_feed_events)
+refresh_channels(PG_DB, logger, config)
+refresh_feeds(PG_DB, logger, config)
subscribe_to_feeds(PG_DB, logger, HMAC_KEY, config)
statistics = {
diff --git a/src/invidious/helpers/helpers.cr b/src/invidious/helpers/helpers.cr
index 983d3d1e..3155cb67 100644
--- a/src/invidious/helpers/helpers.cr
+++ b/src/invidious/helpers/helpers.cr
@@ -105,7 +105,7 @@ struct Config
hmac_key: String?, # HMAC signing key for CSRF tokens and verifying pubsub subscriptions
domain: String?, # Domain to be used for links to resources on the site where an absolute URL is required
use_pubsub_feeds: {type: Bool | Int32, default: false}, # Subscribe to channels using PubSubHubbub (requires domain, hmac_key)
- use_feed_events: {type: Bool, default: false}, # Update feeds on receiving notifications
+ use_feed_events: {type: Bool | Int32, default: false}, # Update feeds on receiving notifications
default_home: {type: String, default: "Top"},
feed_menu: {type: Array(String), default: ["Popular", "Top", "Trending", "Subscriptions"]},
top_enabled: {type: Bool, default: true},
diff --git a/src/invidious/helpers/jobs.cr b/src/invidious/helpers/jobs.cr
index d725a023..b9f9a86f 100644
--- a/src/invidious/helpers/jobs.cr
+++ b/src/invidious/helpers/jobs.cr
@@ -1,4 +1,4 @@
-def refresh_channels(db, logger, max_threads = 1, full_refresh = false)
+def refresh_channels(db, logger, config)
max_channel = Channel(Int32).new
spawn do
@@ -20,7 +20,7 @@ def refresh_channels(db, logger, max_threads = 1, full_refresh = false)
active_threads += 1
spawn do
begin
- channel = fetch_channel(id, db, full_refresh)
+ channel = fetch_channel(id, db, config.full_refresh)
db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.now, channel.author, id)
rescue ex
@@ -39,47 +39,71 @@ def refresh_channels(db, logger, max_threads = 1, full_refresh = false)
end
end
- max_channel.send(max_threads)
+ max_channel.send(config.channel_threads)
end
-def refresh_feeds(db, logger, max_threads = 1, use_feed_events = false)
- max_channel = Channel(Int32).new
-
+def refresh_feeds(db, logger, config)
# Spawn thread to handle feed events
- if use_feed_events
+ if config.use_feed_events
+ case config.use_feed_events
+ when Bool
+ max_feed_event_threads = config.use_feed_events.as(Bool).to_unsafe
+ when Int32
+ max_feed_event_threads = config.use_feed_events.as(Int32)
+ end
+ max_feed_event_channel = Channel(Int32).new
+
spawn do
queue = Deque(String).new(30)
+ PG.connect_listen(PG_URL, "feeds") do |event|
+ if !queue.includes? event.payload
+ queue << event.payload
+ end
+ end
- spawn do
- loop do
- if event = queue.shift?
- feed = JSON.parse(event)
- email = feed["email"].as_s
- action = feed["action"].as_s
+ max_threads = max_feed_event_channel.receive
+ active_threads = 0
+ active_channel = Channel(Bool).new
- view_name = "subscriptions_#{sha256(email)}"
+ loop do
+ until queue.empty?
+ event = queue.shift
- case action
- when "refresh"
- db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
+ if active_threads >= max_threads
+ if active_channel.receive
+ active_threads -= 1
end
-
- # Delete any future events that we just processed
- queue.delete(event)
- else
- sleep 1.second
end
- Fiber.yield
+ active_threads += 1
+
+ spawn do
+ begin
+ feed = JSON.parse(event)
+ email = feed["email"].as_s
+ action = feed["action"].as_s
+
+ view_name = "subscriptions_#{sha256(email)}"
+
+ case action
+ when "refresh"
+ db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
+ end
+ rescue ex
+ end
+
+ active_channel.send(true)
+ end
end
- end
- PG.connect_listen(PG_URL, "feeds") do |event|
- queue << event.payload
+ sleep 5.seconds
end
end
+
+ max_feed_event_channel.send(max_feed_event_threads.as(Int32))
end
+ max_channel = Channel(Int32).new
spawn do
max_threads = max_channel.receive
active_threads = 0
@@ -144,7 +168,7 @@ def refresh_feeds(db, logger, max_threads = 1, use_feed_events = false)
end
end
- max_channel.send(max_threads)
+ max_channel.send(config.feed_threads)
end
def subscribe_to_feeds(db, logger, key, config)