summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorOmar Roth <omarroth@protonmail.com>2019-06-02 07:41:53 -0500
committerOmar Roth <omarroth@protonmail.com>2019-06-02 07:41:53 -0500
commit71bf8b6b4d66a615a84b86446ca2a1c350868965 (patch)
tree195b92539c751ca679173b0c9090cb90a94c3ba6 /src
parent576067c1e5d3586b0773035f935addc0e0724396 (diff)
downloadinvidious-71bf8b6b4d66a615a84b86446ca2a1c350868965.tar.gz
invidious-71bf8b6b4d66a615a84b86446ca2a1c350868965.tar.bz2
invidious-71bf8b6b4d66a615a84b86446ca2a1c350868965.zip
Refactor connect_listen for notifications
Diffstat (limited to 'src')
-rw-r--r--src/invidious.cr31
-rw-r--r--src/invidious/helpers/helpers.cr174
2 files changed, 115 insertions, 90 deletions
diff --git a/src/invidious.cr b/src/invidious.cr
index 9f79a82f..296db08a 100644
--- a/src/invidious.cr
+++ b/src/invidious.cr
@@ -186,6 +186,13 @@ spawn do
end
end
+notification_channels = [] of Channel(PQ::Notification)
+PG.connect_listen(PG_URL, "notifications") do |event|
+ notification_channels.each do |channel|
+ channel.send(event)
+ end
+end
+
proxies = PROXY_LIST
before_all do |env|
@@ -4457,17 +4464,37 @@ get "/api/v1/mixes/:rdid" do |env|
end
get "/api/v1/auth/notifications" do |env|
+ env.response.content_type = "text/event-stream"
+
topics = env.params.query["topics"]?.try &.split(",").uniq.first(1000)
topics ||= [] of String
- create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics)
+ notification_channel = Channel(PQ::Notification).new
+ notification_channels << notification_channel
+
+ begin
+ create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics, notification_channel)
+ rescue ex
+ ensure
+ notification_channels.delete(notification_channel)
+ end
end
post "/api/v1/auth/notifications" do |env|
+ env.response.content_type = "text/event-stream"
+
topics = env.params.body["topics"]?.try &.split(",").uniq.first(1000)
topics ||= [] of String
- create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics)
+ notification_channel = Channel(PQ::Notification).new
+ notification_channels << notification_channel
+
+ begin
+ create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics, notification_channel)
+ rescue ex
+ ensure
+ notification_channels.delete(notification_channel)
+ end
end
get "/api/v1/auth/preferences" do |env|
diff --git a/src/invidious/helpers/helpers.cr b/src/invidious/helpers/helpers.cr
index 476038c7..539db250 100644
--- a/src/invidious/helpers/helpers.cr
+++ b/src/invidious/helpers/helpers.cr
@@ -661,89 +661,23 @@ def copy_in_chunks(input, output, chunk_size = 4096)
end
end
-def create_notification_stream(env, proxies, config, kemal_config, decrypt_function, topics)
+def create_notification_stream(env, proxies, config, kemal_config, decrypt_function, topics, notification_channel)
locale = LOCALES[env.get("preferences").as(Preferences).locale]?
- env.response.content_type = "text/event-stream"
-
since = env.params.query["since"]?.try &.to_i?
+ id = 0
- begin
- id = 0
-
- if topics.includes? "debug"
- spawn do
- loop do
- time_span = [0, 0, 0, 0]
- time_span[rand(4)] = rand(30) + 5
- published = Time.now - Time::Span.new(time_span[0], time_span[1], time_span[2], time_span[3])
- video_id = TEST_IDS[rand(TEST_IDS.size)]
-
- video = get_video(video_id, PG_DB, proxies)
- video.published = published
- response = JSON.parse(video.to_json(locale, config, kemal_config, decrypt_function))
-
- if fields_text = env.params.query["fields"]?
- begin
- JSONFilter.filter(response, fields_text)
- rescue ex
- env.response.status_code = 400
- response = {"error" => ex.message}
- end
- end
-
- env.response.puts "id: #{id}"
- env.response.puts "data: #{response.to_json}"
- env.response.puts
- env.response.flush
-
- id += 1
-
- sleep 1.minute
- end
- end
- end
-
+ if topics.includes? "debug"
spawn do
- if since
- topics.try &.each do |topic|
- case topic
- when .match(/UC[A-Za-z0-9_-]{22}/)
- PG_DB.query_all("SELECT * FROM channel_videos WHERE ucid = $1 AND published > $2 ORDER BY published DESC LIMIT 15",
- topic, Time.unix(since.not_nil!), as: ChannelVideo).each do |video|
- response = JSON.parse(video.to_json(locale, config, Kemal.config))
-
- if fields_text = env.params.query["fields"]?
- begin
- JSONFilter.filter(response, fields_text)
- rescue ex
- env.response.status_code = 400
- response = {"error" => ex.message}
- end
- end
-
- env.response.puts "id: #{id}"
- env.response.puts "data: #{response.to_json}"
- env.response.puts
- env.response.flush
-
- id += 1
- end
- else
- # TODO
- end
- end
- end
-
- PG.connect_listen(PG_URL, "notifications") do |event|
- notification = JSON.parse(event.payload)
- topic = notification["topic"].as_s
- video_id = notification["videoId"].as_s
- published = notification["published"].as_i64
+ loop do
+ time_span = [0, 0, 0, 0]
+ time_span[rand(4)] = rand(30) + 5
+ published = Time.now - Time::Span.new(time_span[0], time_span[1], time_span[2], time_span[3])
+ video_id = TEST_IDS[rand(TEST_IDS.size)]
video = get_video(video_id, PG_DB, proxies)
- video.published = Time.unix(published)
- response = JSON.parse(video.to_json(locale, config, Kemal.config, decrypt_function))
+ video.published = published
+ response = JSON.parse(video.to_json(locale, config, kemal_config, decrypt_function))
if fields_text = env.params.query["fields"]?
begin
@@ -754,24 +688,88 @@ def create_notification_stream(env, proxies, config, kemal_config, decrypt_funct
end
end
- if topics.try &.includes? topic
- env.response.puts "id: #{id}"
- env.response.puts "data: #{response.to_json}"
- env.response.puts
- env.response.flush
+ env.response.puts "id: #{id}"
+ env.response.puts "data: #{response.to_json}"
+ env.response.puts
+ env.response.flush
- id += 1
+ id += 1
+
+ sleep 1.minute
+ end
+ end
+ end
+
+ spawn do
+ if since
+ topics.try &.each do |topic|
+ case topic
+ when .match(/UC[A-Za-z0-9_-]{22}/)
+ PG_DB.query_all("SELECT * FROM channel_videos WHERE ucid = $1 AND published > $2 ORDER BY published DESC LIMIT 15",
+ topic, Time.unix(since.not_nil!), as: ChannelVideo).each do |video|
+ response = JSON.parse(video.to_json(locale, config, Kemal.config))
+
+ if fields_text = env.params.query["fields"]?
+ begin
+ JSONFilter.filter(response, fields_text)
+ rescue ex
+ env.response.status_code = 400
+ response = {"error" => ex.message}
+ end
+ end
+
+ env.response.puts "id: #{id}"
+ env.response.puts "data: #{response.to_json}"
+ env.response.puts
+ env.response.flush
+
+ id += 1
+ end
+ else
+ # TODO
end
end
end
+ end
- # Send heartbeat
+ spawn do
loop do
- env.response.puts ":keepalive #{Time.now.to_unix}"
- env.response.puts
- env.response.flush
- sleep (20 + rand(11)).seconds
+ event = notification_channel.receive
+
+ notification = JSON.parse(event.payload)
+ topic = notification["topic"].as_s
+ video_id = notification["videoId"].as_s
+ published = notification["published"].as_i64
+
+ video = get_video(video_id, PG_DB, proxies)
+ video.published = Time.unix(published)
+ response = JSON.parse(video.to_json(locale, config, Kemal.config, decrypt_function))
+
+ if fields_text = env.params.query["fields"]?
+ begin
+ JSONFilter.filter(response, fields_text)
+ rescue ex
+ env.response.status_code = 400
+ response = {"error" => ex.message}
+ end
+ end
+
+ if topics.try &.includes? topic
+ env.response.puts "id: #{id}"
+ env.response.puts "data: #{response.to_json}"
+ env.response.puts
+ env.response.flush
+
+ id += 1
+ end
end
- rescue
+ end
+
+ # Send heartbeat
+ loop do
+ env.response.puts ":keepalive #{Time.now.to_unix}"
+ env.response.puts
+ env.response.flush
+ sleep (20 + rand(11)).seconds
end
end