summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorOmar Roth <omarroth@protonmail.com>2019-06-03 13:12:06 -0500
committerOmar Roth <omarroth@protonmail.com>2019-06-03 13:12:06 -0500
commitd892ba6aa5f83fab35d353e2f2c6f1ab16d0b832 (patch)
tree07d063f1d8f3dc4c29b3ce4d598918eb357f5e7a /src
parent84b2583973ad615b729faae01d934b82a1ad00d6 (diff)
downloadinvidious-d892ba6aa5f83fab35d353e2f2c6f1ab16d0b832.tar.gz
invidious-d892ba6aa5f83fab35d353e2f2c6f1ab16d0b832.tar.bz2
invidious-d892ba6aa5f83fab35d353e2f2c6f1ab16d0b832.zip
Refactor connection channel for delivering notifications
Diffstat (limited to 'src')
-rw-r--r--src/invidious.cr39
-rw-r--r--src/invidious/helpers/helpers.cr172
2 files changed, 112 insertions, 99 deletions
diff --git a/src/invidious.cr b/src/invidious.cr
index 0049bfdc..e1e9af2a 100644
--- a/src/invidious.cr
+++ b/src/invidious.cr
@@ -186,10 +186,21 @@ spawn do
end
end
-notification_channels = [] of Channel(PQ::Notification)
-PG.connect_listen(PG_URL, "notifications") do |event|
- notification_channels.each do |channel|
- channel.send(event)
+connection_channel = Channel({Bool, Channel(PQ::Notification)}).new
+spawn do
+ connections = [] of Channel(PQ::Notification)
+
+ PG.connect_listen(PG_URL, "notifications") { |event| connections.each { |connection| connection.send(event) } }
+
+ loop do
+ action, connection = connection_channel.receive
+
+ case action
+ when true
+ connections << connection
+ when false
+ connections.delete(connection)
+ end
end
end
@@ -4469,15 +4480,7 @@ get "/api/v1/auth/notifications" do |env|
topics = env.params.query["topics"]?.try &.split(",").uniq.first(1000)
topics ||= [] of String
- notification_channel = Channel(PQ::Notification).new
- notification_channels << notification_channel
-
- begin
- create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics, notification_channel)
- rescue ex
- ensure
- notification_channels.delete(notification_channel)
- end
+ create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics, connection_channel)
end
post "/api/v1/auth/notifications" do |env|
@@ -4486,15 +4489,7 @@ post "/api/v1/auth/notifications" do |env|
topics = env.params.body["topics"]?.try &.split(",").uniq.first(1000)
topics ||= [] of String
- notification_channel = Channel(PQ::Notification).new
- notification_channels << notification_channel
-
- begin
- create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics, notification_channel)
- rescue ex
- ensure
- notification_channels.delete(notification_channel)
- end
+ create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics, connection_channel)
end
get "/api/v1/auth/preferences" do |env|
diff --git a/src/invidious/helpers/helpers.cr b/src/invidious/helpers/helpers.cr
index 539db250..699fc374 100644
--- a/src/invidious/helpers/helpers.cr
+++ b/src/invidious/helpers/helpers.cr
@@ -661,7 +661,10 @@ def copy_in_chunks(input, output, chunk_size = 4096)
end
end
-def create_notification_stream(env, proxies, config, kemal_config, decrypt_function, topics, notification_channel)
+def create_notification_stream(env, proxies, config, kemal_config, decrypt_function, topics, connection_channel)
+ connection = Channel(PQ::Notification).new
+ connection_channel.send({true, connection})
+
locale = LOCALES[env.get("preferences").as(Preferences).locale]?
since = env.params.query["since"]?.try &.to_i?
@@ -669,107 +672,122 @@ def create_notification_stream(env, proxies, config, kemal_config, decrypt_funct
if topics.includes? "debug"
spawn do
- loop do
- time_span = [0, 0, 0, 0]
- time_span[rand(4)] = rand(30) + 5
- published = Time.now - Time::Span.new(time_span[0], time_span[1], time_span[2], time_span[3])
- video_id = TEST_IDS[rand(TEST_IDS.size)]
-
- video = get_video(video_id, PG_DB, proxies)
- video.published = published
- response = JSON.parse(video.to_json(locale, config, kemal_config, decrypt_function))
-
- if fields_text = env.params.query["fields"]?
- begin
- JSONFilter.filter(response, fields_text)
- rescue ex
- env.response.status_code = 400
- response = {"error" => ex.message}
+ begin
+ loop do
+ time_span = [0, 0, 0, 0]
+ time_span[rand(4)] = rand(30) + 5
+ published = Time.now - Time::Span.new(time_span[0], time_span[1], time_span[2], time_span[3])
+ video_id = TEST_IDS[rand(TEST_IDS.size)]
+
+ video = get_video(video_id, PG_DB, proxies)
+ video.published = published
+ response = JSON.parse(video.to_json(locale, config, kemal_config, decrypt_function))
+
+ if fields_text = env.params.query["fields"]?
+ begin
+ JSONFilter.filter(response, fields_text)
+ rescue ex
+ env.response.status_code = 400
+ response = {"error" => ex.message}
+ end
end
- end
- env.response.puts "id: #{id}"
- env.response.puts "data: #{response.to_json}"
- env.response.puts
- env.response.flush
+ env.response.puts "id: #{id}"
+ env.response.puts "data: #{response.to_json}"
+ env.response.puts
+ env.response.flush
- id += 1
+ id += 1
- sleep 1.minute
+ sleep 1.minute
+ end
+ rescue ex
end
end
end
spawn do
- if since
- topics.try &.each do |topic|
- case topic
- when .match(/UC[A-Za-z0-9_-]{22}/)
- PG_DB.query_all("SELECT * FROM channel_videos WHERE ucid = $1 AND published > $2 ORDER BY published DESC LIMIT 15",
- topic, Time.unix(since.not_nil!), as: ChannelVideo).each do |video|
- response = JSON.parse(video.to_json(locale, config, Kemal.config))
-
- if fields_text = env.params.query["fields"]?
- begin
- JSONFilter.filter(response, fields_text)
- rescue ex
- env.response.status_code = 400
- response = {"error" => ex.message}
+ begin
+ if since
+ topics.try &.each do |topic|
+ case topic
+ when .match(/UC[A-Za-z0-9_-]{22}/)
+ PG_DB.query_all("SELECT * FROM channel_videos WHERE ucid = $1 AND published > $2 ORDER BY published DESC LIMIT 15",
+ topic, Time.unix(since.not_nil!), as: ChannelVideo).each do |video|
+ response = JSON.parse(video.to_json(locale, config, Kemal.config))
+
+ if fields_text = env.params.query["fields"]?
+ begin
+ JSONFilter.filter(response, fields_text)
+ rescue ex
+ env.response.status_code = 400
+ response = {"error" => ex.message}
+ end
end
- end
- env.response.puts "id: #{id}"
- env.response.puts "data: #{response.to_json}"
- env.response.puts
- env.response.flush
+ env.response.puts "id: #{id}"
+ env.response.puts "data: #{response.to_json}"
+ env.response.puts
+ env.response.flush
- id += 1
+ id += 1
+ end
+ else
+ # TODO
end
- else
- # TODO
end
end
end
end
spawn do
- loop do
- event = notification_channel.receive
-
- notification = JSON.parse(event.payload)
- topic = notification["topic"].as_s
- video_id = notification["videoId"].as_s
- published = notification["published"].as_i64
-
- video = get_video(video_id, PG_DB, proxies)
- video.published = Time.unix(published)
- response = JSON.parse(video.to_json(locale, config, Kemal.config, decrypt_function))
-
- if fields_text = env.params.query["fields"]?
- begin
- JSONFilter.filter(response, fields_text)
- rescue ex
- env.response.status_code = 400
- response = {"error" => ex.message}
+ begin
+ loop do
+ event = connection.receive
+
+ notification = JSON.parse(event.payload)
+ topic = notification["topic"].as_s
+ video_id = notification["videoId"].as_s
+ published = notification["published"].as_i64
+
+ video = get_video(video_id, PG_DB, proxies)
+ video.published = Time.unix(published)
+ response = JSON.parse(video.to_json(locale, config, Kemal.config, decrypt_function))
+
+ if fields_text = env.params.query["fields"]?
+ begin
+ JSONFilter.filter(response, fields_text)
+ rescue ex
+ env.response.status_code = 400
+ response = {"error" => ex.message}
+ end
end
- end
- if topics.try &.includes? topic
- env.response.puts "id: #{id}"
- env.response.puts "data: #{response.to_json}"
- env.response.puts
- env.response.flush
+ if topics.try &.includes? topic
+ env.response.puts "id: #{id}"
+ env.response.puts "data: #{response.to_json}"
+ env.response.puts
+ env.response.flush
- id += 1
+ id += 1
+ end
end
+ rescue ex
+ ensure
+ connection_channel.send({false, connection})
end
end
- # Send heartbeat
- loop do
- env.response.puts ":keepalive #{Time.now.to_unix}"
- env.response.puts
- env.response.flush
- sleep (20 + rand(11)).seconds
+ begin
+ # Send heartbeat
+ loop do
+ env.response.puts ":keepalive #{Time.now.to_unix}"
+ env.response.puts
+ env.response.flush
+ sleep (20 + rand(11)).seconds
+ end
+ rescue ex
+ ensure
+ connection_channel.send({false, connection})
end
end