summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthew McGarvey <matthewmcgarvey14@gmail.com>2020-10-17 07:25:57 -0500
committerGitHub <noreply@github.com>2020-10-17 12:25:57 +0000
commit0520e673035d0acc2255de77636f7d30b2ad0a17 (patch)
tree7bd29213bde070828b1505dc6c0951259b579e8f /src
parentd571573e5219f453d371de7061dfcdeae31a567a (diff)
downloadinvidious-0520e673035d0acc2255de77636f7d30b2ad0a17.tar.gz
invidious-0520e673035d0acc2255de77636f7d30b2ad0a17.tar.bz2
invidious-0520e673035d0acc2255de77636f7d30b2ad0a17.zip
[refactor] Finish converting jobs to new job setup (#1420)
[refactor] Finish converting jobs to new job setup
Diffstat (limited to 'src')
-rw-r--r--src/invidious.cr92
-rw-r--r--src/invidious/helpers/jobs.cr286
-rw-r--r--src/invidious/jobs/bypass_captcha_job.cr131
-rw-r--r--src/invidious/jobs/notification_job.cr24
-rw-r--r--src/invidious/jobs/refresh_feeds_job.cr77
-rw-r--r--src/invidious/jobs/statistics_refresh_job.cr59
-rw-r--r--src/invidious/jobs/subscribe_to_feeds_job.cr52
-rw-r--r--src/invidious/jobs/update_decrypt_function_job.cr19
8 files changed, 375 insertions, 365 deletions
diff --git a/src/invidious.cr b/src/invidious.cr
index 29a412a1..f7a7876f 100644
--- a/src/invidious.cr
+++ b/src/invidious.cr
@@ -160,90 +160,29 @@ end
# Start jobs
Invidious::Jobs.register Invidious::Jobs::RefreshChannelsJob.new(PG_DB, logger, config)
-refresh_feeds(PG_DB, logger, config)
-subscribe_to_feeds(PG_DB, logger, HMAC_KEY, config)
+Invidious::Jobs.register Invidious::Jobs::RefreshFeedsJob.new(PG_DB, logger, config)
+Invidious::Jobs.register Invidious::Jobs::SubscribeToFeedsJob.new(PG_DB, logger, config, HMAC_KEY)
+Invidious::Jobs.register Invidious::Jobs::PullPopularVideosJob.new(PG_DB)
+Invidious::Jobs.register Invidious::Jobs::UpdateDecryptFunctionJob.new
-statistics = {
- "error" => "Statistics are not availabile.",
-}
if config.statistics_enabled
- spawn do
- statistics = {
- "version" => "2.0",
- "software" => SOFTWARE,
- "openRegistrations" => config.registration_enabled,
- "usage" => {
- "users" => {
- "total" => PG_DB.query_one("SELECT count(*) FROM users", as: Int64),
- "activeHalfyear" => PG_DB.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '6 months'", as: Int64),
- "activeMonth" => PG_DB.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '1 month'", as: Int64),
- },
- },
- "metadata" => {
- "updatedAt" => Time.utc.to_unix,
- "lastChannelRefreshedAt" => PG_DB.query_one?("SELECT updated FROM channels ORDER BY updated DESC LIMIT 1", as: Time).try &.to_unix || 0_i64,
- },
- }
-
- loop do
- sleep 1.minute
- Fiber.yield
+ Invidious::Jobs.register Invidious::Jobs::StatisticsRefreshJob.new(PG_DB, config, SOFTWARE)
+end
- statistics["usage"].as(Hash)["users"].as(Hash)["total"] = PG_DB.query_one("SELECT count(*) FROM users", as: Int64)
- statistics["usage"].as(Hash)["users"].as(Hash)["activeHalfyear"] = PG_DB.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '6 months'", as: Int64)
- statistics["usage"].as(Hash)["users"].as(Hash)["activeMonth"] = PG_DB.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '1 month'", as: Int64)
- statistics["metadata"].as(Hash(String, Int64))["updatedAt"] = Time.utc.to_unix
- statistics["metadata"].as(Hash(String, Int64))["lastChannelRefreshedAt"] = PG_DB.query_one?("SELECT updated FROM channels ORDER BY updated DESC LIMIT 1", as: Time).try &.to_unix || 0_i64
- end
- end
+if config.captcha_key
+ Invidious::Jobs.register Invidious::Jobs::BypassCaptchaJob.new(logger, config)
end
-Invidious::Jobs.register Invidious::Jobs::PullPopularVideosJob.new(PG_DB)
+connection_channel = Channel({Bool, Channel(PQ::Notification)}).new(32)
+Invidious::Jobs.register Invidious::Jobs::NotificationJob.new(connection_channel, PG_URL)
+
Invidious::Jobs.start_all
def popular_videos
Invidious::Jobs::PullPopularVideosJob::POPULAR_VIDEOS.get
end
-DECRYPT_FUNCTION = [] of {SigProc, Int32}
-spawn do
- update_decrypt_function do |function|
- DECRYPT_FUNCTION.clear
- function.each { |i| DECRYPT_FUNCTION << i }
- end
-end
-
-if CONFIG.captcha_key
- spawn do
- bypass_captcha(CONFIG.captcha_key, logger) do |cookies|
- cookies.each do |cookie|
- config.cookies << cookie
- end
-
- # Persist cookies between runs
- CONFIG.cookies = config.cookies
- File.write("config/config.yml", config.to_yaml)
- end
- end
-end
-
-connection_channel = Channel({Bool, Channel(PQ::Notification)}).new(32)
-spawn do
- connections = [] of Channel(PQ::Notification)
-
- PG.connect_listen(PG_URL, "notifications") { |event| connections.each { |connection| connection.send(event) } }
-
- loop do
- action, connection = connection_channel.receive
-
- case action
- when true
- connections << connection
- when false
- connections.delete(connection)
- end
- end
-end
+DECRYPT_FUNCTION = Invidious::Jobs::UpdateDecryptFunctionJob::DECRYPT_FUNCTION
before_all do |env|
preferences = begin
@@ -3658,12 +3597,7 @@ get "/api/v1/stats" do |env|
next error_message
end
- if statistics["error"]?
- env.response.status_code = 500
- next statistics.to_json
- end
-
- statistics.to_json
+ Invidious::Jobs::StatisticsRefreshJob::STATISTICS.to_json
end
# YouTube provides "storyboards", which are sprites containing x * y
diff --git a/src/invidious/helpers/jobs.cr b/src/invidious/helpers/jobs.cr
deleted file mode 100644
index 11eb7def..00000000
--- a/src/invidious/helpers/jobs.cr
+++ /dev/null
@@ -1,286 +0,0 @@
-def refresh_feeds(db, logger, config)
- max_channel = Channel(Int32).new
- spawn do
- max_threads = max_channel.receive
- active_threads = 0
- active_channel = Channel(Bool).new
-
- loop do
- db.query("SELECT email FROM users WHERE feed_needs_update = true OR feed_needs_update IS NULL") do |rs|
- rs.each do
- email = rs.read(String)
- view_name = "subscriptions_#{sha256(email)}"
-
- if active_threads >= max_threads
- if active_channel.receive
- active_threads -= 1
- end
- end
-
- active_threads += 1
- spawn do
- begin
- # Drop outdated views
- column_array = get_column_array(db, view_name)
- ChannelVideo.type_array.each_with_index do |name, i|
- if name != column_array[i]?
- logger.puts("DROP MATERIALIZED VIEW #{view_name}")
- db.exec("DROP MATERIALIZED VIEW #{view_name}")
- raise "view does not exist"
- end
- end
-
- if !db.query_one("SELECT pg_get_viewdef('#{view_name}')", as: String).includes? "WHERE ((cv.ucid = ANY (u.subscriptions))"
- logger.puts("Materialized view #{view_name} is out-of-date, recreating...")
- db.exec("DROP MATERIALIZED VIEW #{view_name}")
- end
-
- db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
- db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
- rescue ex
- # Rename old views
- begin
- legacy_view_name = "subscriptions_#{sha256(email)[0..7]}"
-
- db.exec("SELECT * FROM #{legacy_view_name} LIMIT 0")
- logger.puts("RENAME MATERIALIZED VIEW #{legacy_view_name}")
- db.exec("ALTER MATERIALIZED VIEW #{legacy_view_name} RENAME TO #{view_name}")
- rescue ex
- begin
- # While iterating through, we may have an email stored from a deleted account
- if db.query_one?("SELECT true FROM users WHERE email = $1", email, as: Bool)
- logger.puts("CREATE #{view_name}")
- db.exec("CREATE MATERIALIZED VIEW #{view_name} AS #{MATERIALIZED_VIEW_SQL.call(email)}")
- db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
- end
- rescue ex
- logger.puts("REFRESH #{email} : #{ex.message}")
- end
- end
- end
-
- active_channel.send(true)
- end
- end
- end
-
- sleep 5.seconds
- Fiber.yield
- end
- end
-
- max_channel.send(config.feed_threads)
-end
-
-def subscribe_to_feeds(db, logger, key, config)
- if config.use_pubsub_feeds
- case config.use_pubsub_feeds
- when Bool
- max_threads = config.use_pubsub_feeds.as(Bool).to_unsafe
- when Int32
- max_threads = config.use_pubsub_feeds.as(Int32)
- end
- max_channel = Channel(Int32).new
-
- spawn do
- max_threads = max_channel.receive
- active_threads = 0
- active_channel = Channel(Bool).new
-
- loop do
- db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > interval '4 days' OR subscribed IS NULL") do |rs|
- rs.each do
- ucid = rs.read(String)
-
- if active_threads >= max_threads.as(Int32)
- if active_channel.receive
- active_threads -= 1
- end
- end
-
- active_threads += 1
-
- spawn do
- begin
- response = subscribe_pubsub(ucid, key, config)
-
- if response.status_code >= 400
- logger.puts("#{ucid} : #{response.body}")
- end
- rescue ex
- logger.puts("#{ucid} : #{ex.message}")
- end
-
- active_channel.send(true)
- end
- end
- end
-
- sleep 1.minute
- Fiber.yield
- end
- end
-
- max_channel.send(max_threads.as(Int32))
- end
-end
-
-def pull_popular_videos(db)
- loop do
- videos = db.query_all("SELECT DISTINCT ON (ucid) * FROM channel_videos WHERE ucid IN \
- (SELECT channel FROM (SELECT UNNEST(subscriptions) AS channel FROM users) AS d \
- GROUP BY channel ORDER BY COUNT(channel) DESC LIMIT 40) \
- ORDER BY ucid, published DESC", as: ChannelVideo).sort_by { |video| video.published }.reverse
-
- yield videos
-
- sleep 1.minute
- Fiber.yield
- end
-end
-
-def update_decrypt_function
- loop do
- begin
- decrypt_function = fetch_decrypt_function
- yield decrypt_function
- rescue ex
- # TODO: Log error
- next
- ensure
- sleep 1.minute
- Fiber.yield
- end
- end
-end
-
-def bypass_captcha(captcha_key, logger)
- loop do
- begin
- {"/watch?v=CvFH_6DNRCY&gl=US&hl=en&has_verified=1&bpctr=9999999999", produce_channel_videos_url(ucid: "UCXuqSBlHAE6Xw-yeJA0Tunw")}.each do |path|
- response = YT_POOL.client &.get(path)
- if response.body.includes?("To continue with your YouTube experience, please fill out the form below.")
- html = XML.parse_html(response.body)
- form = html.xpath_node(%(//form[@action="/das_captcha"])).not_nil!
- site_key = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-sitekey"]
- s_value = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-s"]
-
- inputs = {} of String => String
- form.xpath_nodes(%(.//input[@name])).map do |node|
- inputs[node["name"]] = node["value"]
- end
-
- headers = response.cookies.add_request_headers(HTTP::Headers.new)
-
- response = JSON.parse(HTTP::Client.post("https://api.anti-captcha.com/createTask", body: {
- "clientKey" => CONFIG.captcha_key,
- "task" => {
- "type" => "NoCaptchaTaskProxyless",
- "websiteURL" => "https://www.youtube.com#{path}",
- "websiteKey" => site_key,
- "recaptchaDataSValue" => s_value,
- },
- }.to_json).body)
-
- raise response["error"].as_s if response["error"]?
- task_id = response["taskId"].as_i
-
- loop do
- sleep 10.seconds
-
- response = JSON.parse(HTTP::Client.post("https://api.anti-captcha.com/getTaskResult", body: {
- "clientKey" => CONFIG.captcha_key,
- "taskId" => task_id,
- }.to_json).body)
-
- if response["status"]?.try &.== "ready"
- break
- elsif response["errorId"]?.try &.as_i != 0
- raise response["errorDescription"].as_s
- end
- end
-
- inputs["g-recaptcha-response"] = response["solution"]["gRecaptchaResponse"].as_s
- headers["Cookies"] = response["solution"]["cookies"].as_h?.try &.map { |k, v| "#{k}=#{v}" }.join("; ") || ""
- response = YT_POOL.client &.post("/das_captcha", headers, form: inputs)
-
- yield response.cookies.select { |cookie| cookie.name != "PREF" }
- elsif response.headers["Location"]?.try &.includes?("/sorry/index")
- location = response.headers["Location"].try { |u| URI.parse(u) }
- headers = HTTP::Headers{":authority" => location.host.not_nil!}
- response = YT_POOL.client &.get(location.full_path, headers)
-
- html = XML.parse_html(response.body)
- form = html.xpath_node(%(//form[@action="index"])).not_nil!
- site_key = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-sitekey"]
- s_value = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-s"]
-
- inputs = {} of String => String
- form.xpath_nodes(%(.//input[@name])).map do |node|
- inputs[node["name"]] = node["value"]
- end
-
- captcha_client = HTTPClient.new(URI.parse("https://api.anti-captcha.com"))
- captcha_client.family = CONFIG.force_resolve || Socket::Family::INET
- response = JSON.parse(captcha_client.post("/createTask", body: {
- "clientKey" => CONFIG.captcha_key,
- "task" => {
- "type" => "NoCaptchaTaskProxyless",
- "websiteURL" => location.to_s,
- "websiteKey" => site_key,
- "recaptchaDataSValue" => s_value,
- },
- }.to_json).body)
-
- raise response["error"].as_s if response["error"]?
- task_id = response["taskId"].as_i
-
- loop do
- sleep 10.seconds
-
- response = JSON.parse(captcha_client.post("/getTaskResult", body: {
- "clientKey" => CONFIG.captcha_key,
- "taskId" => task_id,
- }.to_json).body)
-
- if response["status"]?.try &.== "ready"
- break
- elsif response["errorId"]?.try &.as_i != 0
- raise response["errorDescription"].as_s
- end
- end
-
- inputs["g-recaptcha-response"] = response["solution"]["gRecaptchaResponse"].as_s
- headers["Cookies"] = response["solution"]["cookies"].as_h?.try &.map { |k, v| "#{k}=#{v}" }.join("; ") || ""
- response = YT_POOL.client &.post("/sorry/index", headers: headers, form: inputs)
- headers = HTTP::Headers{
- "Cookie" => URI.parse(response.headers["location"]).query_params["google_abuse"].split(";")[0],
- }
- cookies = HTTP::Cookies.from_headers(headers)
-
- yield cookies
- end
- end
- rescue ex
- logger.puts("Exception: #{ex.message}")
- ensure
- sleep 1.minute
- Fiber.yield
- end
- end
-end
-
-def find_working_proxies(regions)
- loop do
- regions.each do |region|
- proxies = get_proxies(region).first(20)
- proxies = proxies.map { |proxy| {ip: proxy[:ip], port: proxy[:port]} }
- # proxies = filter_proxies(proxies)
-
- yield region, proxies
- end
-
- sleep 1.minute
- Fiber.yield
- end
-end
diff --git a/src/invidious/jobs/bypass_captcha_job.cr b/src/invidious/jobs/bypass_captcha_job.cr
new file mode 100644
index 00000000..169e6afb
--- /dev/null
+++ b/src/invidious/jobs/bypass_captcha_job.cr
@@ -0,0 +1,131 @@
+class Invidious::Jobs::BypassCaptchaJob < Invidious::Jobs::BaseJob
+ private getter logger : Invidious::LogHandler
+ private getter config : Config
+
+ def initialize(@logger, @config)
+ end
+
+ def begin
+ loop do
+ begin
+ {"/watch?v=CvFH_6DNRCY&gl=US&hl=en&has_verified=1&bpctr=9999999999", produce_channel_videos_url(ucid: "UCXuqSBlHAE6Xw-yeJA0Tunw")}.each do |path|
+ response = YT_POOL.client &.get(path)
+ if response.body.includes?("To continue with your YouTube experience, please fill out the form below.")
+ html = XML.parse_html(response.body)
+ form = html.xpath_node(%(//form[@action="/das_captcha"])).not_nil!
+ site_key = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-sitekey"]
+ s_value = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-s"]
+
+ inputs = {} of String => String
+ form.xpath_nodes(%(.//input[@name])).map do |node|
+ inputs[node["name"]] = node["value"]
+ end
+
+ headers = response.cookies.add_request_headers(HTTP::Headers.new)
+
+ response = JSON.parse(HTTP::Client.post("https://api.anti-captcha.com/createTask", body: {
+ "clientKey" => config.captcha_key,
+ "task" => {
+ "type" => "NoCaptchaTaskProxyless",
+ "websiteURL" => "https://www.youtube.com#{path}",
+ "websiteKey" => site_key,
+ "recaptchaDataSValue" => s_value,
+ },
+ }.to_json).body)
+
+ raise response["error"].as_s if response["error"]?
+ task_id = response["taskId"].as_i
+
+ loop do
+ sleep 10.seconds
+
+ response = JSON.parse(HTTP::Client.post("https://api.anti-captcha.com/getTaskResult", body: {
+ "clientKey" => config.captcha_key,
+ "taskId" => task_id,
+ }.to_json).body)
+
+ if response["status"]?.try &.== "ready"
+ break
+ elsif response["errorId"]?.try &.as_i != 0
+ raise response["errorDescription"].as_s
+ end
+ end
+
+ inputs["g-recaptcha-response"] = response["solution"]["gRecaptchaResponse"].as_s
+ headers["Cookies"] = response["solution"]["cookies"].as_h?.try &.map { |k, v| "#{k}=#{v}" }.join("; ") || ""
+ response = YT_POOL.client &.post("/das_captcha", headers, form: inputs)
+
+ response.cookies
+ .select { |cookie| cookie.name != "PREF" }
+ .each { |cookie| config.cookies << cookie }
+
+ # Persist cookies between runs
+ File.write("config/config.yml", config.to_yaml)
+ elsif response.headers["Location"]?.try &.includes?("/sorry/index")
+ location = response.headers["Location"].try { |u| URI.parse(u) }
+ headers = HTTP::Headers{":authority" => location.host.not_nil!}
+ response = YT_POOL.client &.get(location.full_path, headers)
+
+ html = XML.parse_html(response.body)
+ form = html.xpath_node(%(//form[@action="index"])).not_nil!
+ site_key = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-sitekey"]
+ s_value = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-s"]
+
+ inputs = {} of String => String
+ form.xpath_nodes(%(.//input[@name])).map do |node|
+ inputs[node["name"]] = node["value"]
+ end
+
+ captcha_client = HTTPClient.new(URI.parse("https://api.anti-captcha.com"))
+ captcha_client.family = config.force_resolve || Socket::Family::INET
+ response = JSON.parse(captcha_client.post("/createTask", body: {
+ "clientKey" => config.captcha_key,
+ "task" => {
+ "type" => "NoCaptchaTaskProxyless",
+ "websiteURL" => location.to_s,
+ "websiteKey" => site_key,
+ "recaptchaDataSValue" => s_value,
+ },
+ }.to_json).body)
+
+ raise response["error"].as_s if response["error"]?
+ task_id = response["taskId"].as_i
+
+ loop do
+ sleep 10.seconds
+
+ response = JSON.parse(captcha_client.post("/getTaskResult", body: {
+ "clientKey" => config.captcha_key,
+ "taskId" => task_id,
+ }.to_json).body)
+
+ if response["status"]?.try &.== "ready"
+ break
+ elsif response["errorId"]?.try &.as_i != 0
+ raise response["errorDescription"].as_s
+ end
+ end
+
+ inputs["g-recaptcha-response"] = response["solution"]["gRecaptchaResponse"].as_s
+ headers["Cookies"] = response["solution"]["cookies"].as_h?.try &.map { |k, v| "#{k}=#{v}" }.join("; ") || ""
+ response = YT_POOL.client &.post("/sorry/index", headers: headers, form: inputs)
+ headers = HTTP::Headers{
+ "Cookie" => URI.parse(response.headers["location"]).query_params["google_abuse"].split(";")[0],
+ }
+ cookies = HTTP::Cookies.from_headers(headers)
+
+ cookies.each { |cookie| config.cookies << cookie }
+
+ # Persist cookies between runs
+ File.write("config/config.yml", config.to_yaml)
+ end
+ end
+ rescue ex
+ logger.puts("Exception: #{ex.message}")
+ ensure
+ sleep 1.minute
+ Fiber.yield
+ end
+ end
+ end
+end
diff --git a/src/invidious/jobs/notification_job.cr b/src/invidious/jobs/notification_job.cr
new file mode 100644
index 00000000..2f525e08
--- /dev/null
+++ b/src/invidious/jobs/notification_job.cr
@@ -0,0 +1,24 @@
+class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
+ private getter connection_channel : Channel({Bool, Channel(PQ::Notification)})
+ private getter pg_url : URI
+
+ def initialize(@connection_channel, @pg_url)
+ end
+
+ def begin
+ connections = [] of Channel(PQ::Notification)
+
+ PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) }
+
+ loop do
+ action, connection = connection_channel.receive
+
+ case action
+ when true
+ connections << connection
+ when false
+ connections.delete(connection)
+ end
+ end
+ end
+end
diff --git a/src/invidious/jobs/refresh_feeds_job.cr b/src/invidious/jobs/refresh_feeds_job.cr
new file mode 100644
index 00000000..eebdf0f3
--- /dev/null
+++ b/src/invidious/jobs/refresh_feeds_job.cr
@@ -0,0 +1,77 @@
+class Invidious::Jobs::RefreshFeedsJob < Invidious::Jobs::BaseJob
+ private getter db : DB::Database
+ private getter logger : Invidious::LogHandler
+ private getter config : Config
+
+ def initialize(@db, @logger, @config)
+ end
+
+ def begin
+ max_threads = config.feed_threads
+ active_threads = 0
+ active_channel = Channel(Bool).new
+
+ loop do
+ db.query("SELECT email FROM users WHERE feed_needs_update = true OR feed_needs_update IS NULL") do |rs|
+ rs.each do
+ email = rs.read(String)
+ view_name = "subscriptions_#{sha256(email)}"
+
+ if active_threads >= max_threads
+ if active_channel.receive
+ active_threads -= 1
+ end
+ end
+
+ active_threads += 1
+ spawn do
+ begin
+ # Drop outdated views
+ column_array = get_column_array(db, view_name)
+ ChannelVideo.type_array.each_with_index do |name, i|
+ if name != column_array[i]?
+ logger.puts("DROP MATERIALIZED VIEW #{view_name}")
+ db.exec("DROP MATERIALIZED VIEW #{view_name}")
+ raise "view does not exist"
+ end
+ end
+
+ if !db.query_one("SELECT pg_get_viewdef('#{view_name}')", as: String).includes? "WHERE ((cv.ucid = ANY (u.subscriptions))"
+ logger.puts("Materialized view #{view_name} is out-of-date, recreating...")
+ db.exec("DROP MATERIALIZED VIEW #{view_name}")
+ end
+
+ db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
+ db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
+ rescue ex
+ # Rename old views
+ begin
+ legacy_view_name = "subscriptions_#{sha256(email)[0..7]}"
+
+ db.exec("SELECT * FROM #{legacy_view_name} LIMIT 0")
+ logger.puts("RENAME MATERIALIZED VIEW #{legacy_view_name}")
+ db.exec("ALTER MATERIALIZED VIEW #{legacy_view_name} RENAME TO #{view_name}")
+ rescue ex
+ begin
+ # While iterating through, we may have an email stored from a deleted account
+ if db.query_one?("SELECT true FROM users WHERE email = $1", email, as: Bool)
+ logger.puts("CREATE #{view_name}")
+ db.exec("CREATE MATERIALIZED VIEW #{view_name} AS #{MATERIALIZED_VIEW_SQL.call(email)}")
+ db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
+ end
+ rescue ex
+ logger.puts("REFRESH #{email} : #{ex.message}")
+ end
+ end
+ end
+
+ active_channel.send(true)
+ end
+ end
+ end
+
+ sleep 5.seconds
+ Fiber.yield
+ end
+ end
+end
diff --git a/src/invidious/jobs/statistics_refresh_job.cr b/src/invidious/jobs/statistics_refresh_job.cr
new file mode 100644
index 00000000..021671be
--- /dev/null
+++ b/src/invidious/jobs/statistics_refresh_job.cr
@@ -0,0 +1,59 @@
+class Invidious::Jobs::StatisticsRefreshJob < Invidious::Jobs::BaseJob
+ STATISTICS = {
+ "version" => "2.0",
+ "software" => {
+ "name" => "invidious",
+ "version" => "",
+ "branch" => "",
+ },
+ "openRegistrations" => true,
+ "usage" => {
+ "users" => {
+ "total" => 0_i64,
+ "activeHalfyear" => 0_i64,
+ "activeMonth" => 0_i64,
+ },
+ },
+ "metadata" => {
+ "updatedAt" => Time.utc.to_unix,
+ "lastChannelRefreshedAt" => 0_i64,
+ },
+ }
+
+ private getter db : DB::Database
+ private getter config : Config
+
+ def initialize(@db, @config, @software_config : Hash(String, String))
+ end
+
+ def begin
+ load_initial_stats
+
+ loop do
+ refresh_stats
+ sleep 1.minute
+ Fiber.yield
+ end
+ end
+
+ # should only be called once at the very beginning
+ private def load_initial_stats
+ STATISTICS["software"] = {
+ "name" => @software_config["name"],
+ "version" => @software_config["version"],
+ "branch" => @software_config["branch"],
+ }
+ STATISTICS["openRegistration"] = config.registration_enabled
+ end
+
+ private def refresh_stats
+ users = STATISTICS.dig("usage", "users").as(Hash(String, Int64))
+ users["total"] = db.query_one("SELECT count(*) FROM users", as: Int64)
+ users["activeHalfyear"] = db.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '6 months'", as: Int64)
+ users["activeMonth"] = db.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '1 month'", as: Int64)
+ STATISTICS["metadata"] = {
+ "updatedAt" => Time.utc.to_unix,
+ "lastChannelRefreshedAt" => db.query_one?("SELECT updated FROM channels ORDER BY updated DESC LIMIT 1", as: Time).try &.to_unix || 0_i64,
+ }
+ end
+end
diff --git a/src/invidious/jobs/subscribe_to_feeds_job.cr b/src/invidious/jobs/subscribe_to_feeds_job.cr
new file mode 100644
index 00000000..3d3b2218
--- /dev/null
+++ b/src/invidious/jobs/subscribe_to_feeds_job.cr
@@ -0,0 +1,52 @@
+class Invidious::Jobs::SubscribeToFeedsJob < Invidious::Jobs::BaseJob
+ private getter db : DB::Database
+ private getter logger : Invidious::LogHandler
+ private getter hmac_key : String
+ private getter config : Config
+
+ def initialize(@db, @logger, @config, @hmac_key)
+ end
+
+ def begin
+ max_threads = 1
+ if config.use_pubsub_feeds.is_a?(Int32)
+ max_threads = config.use_pubsub_feeds.as(Int32)
+ end
+
+ active_threads = 0
+ active_channel = Channel(Bool).new
+
+ loop do
+ db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > interval '4 days' OR subscribed IS NULL") do |rs|
+ rs.each do
+ ucid = rs.read(String)
+
+ if active_threads >= max_threads.as(Int32)
+ if active_channel.receive
+ active_threads -= 1
+ end
+ end
+
+ active_threads += 1
+
+ spawn do
+ begin
+ response = subscribe_pubsub(ucid, hmac_key, config)
+
+ if response.status_code >= 400
+ logger.puts("#{ucid} : #{response.body}")
+ end
+ rescue ex
+ logger.puts("#{ucid} : #{ex.message}")
+ end
+
+ active_channel.send(true)
+ end
+ end
+ end
+
+ sleep 1.minute
+ Fiber.yield
+ end
+ end
+end
diff --git a/src/invidious/jobs/update_decrypt_function_job.cr b/src/invidious/jobs/update_decrypt_function_job.cr
new file mode 100644
index 00000000..5332c672
--- /dev/null
+++ b/src/invidious/jobs/update_decrypt_function_job.cr
@@ -0,0 +1,19 @@
+class Invidious::Jobs::UpdateDecryptFunctionJob < Invidious::Jobs::BaseJob
+ DECRYPT_FUNCTION = [] of {SigProc, Int32}
+
+ def begin
+ loop do
+ begin
+ decrypt_function = fetch_decrypt_function
+ DECRYPT_FUNCTION.clear
+ decrypt_function.each { |df| DECRYPT_FUNCTION << df }
+ rescue ex
+ # TODO: Log error
+ next
+ ensure
+ sleep 1.minute
+ Fiber.yield
+ end
+ end
+ end
+end