From 5dc45c35e64c1e5882e0c01287b36ec88dff70a7 Mon Sep 17 00:00:00 2001 From: Omar Roth Date: Wed, 10 Apr 2019 16:23:37 -0500 Subject: Automatically migrate database --- src/invidious.cr | 10 +- src/invidious/helpers/helpers.cr | 104 +++++++++++++++++ src/invidious/helpers/jobs.cr | 225 ++++++++++++++++++++++++++++++++++++ src/invidious/helpers/macros.cr | 16 ++- src/invidious/helpers/signatures.cr | 70 +++++++++++ src/invidious/jobs.cr | 225 ------------------------------------ src/invidious/signatures.cr | 70 ----------- 7 files changed, 417 insertions(+), 303 deletions(-) create mode 100644 src/invidious/helpers/jobs.cr create mode 100644 src/invidious/helpers/signatures.cr delete mode 100644 src/invidious/jobs.cr delete mode 100644 src/invidious/signatures.cr (limited to 'src') diff --git a/src/invidious.cr b/src/invidious.cr index 06e96a53..adc69829 100644 --- a/src/invidious.cr +++ b/src/invidious.cr @@ -105,10 +105,16 @@ end Kemal::CLI.new ARGV +# Check table integrity +analyze_table(PG_DB, logger, "channel_videos", ChannelVideo) +analyze_table(PG_DB, logger, "nonces", Nonce) +analyze_table(PG_DB, logger, "session_ids", SessionId) +analyze_table(PG_DB, logger, "users", User) +analyze_table(PG_DB, logger, "videos", Video) + +# Start jobs refresh_channels(PG_DB, logger, config.channel_threads, config.full_refresh) - refresh_feeds(PG_DB, logger, config.feed_threads) - subscribe_to_feeds(PG_DB, logger, HMAC_KEY, config) statistics = { diff --git a/src/invidious/helpers/helpers.cr b/src/invidious/helpers/helpers.cr index 3b53a468..f515f28a 100644 --- a/src/invidious/helpers/helpers.cr +++ b/src/invidious/helpers/helpers.cr @@ -1,5 +1,20 @@ require "./macros" +struct Nonce + db_mapping({ + nonce: String, + expire: Time, + }) +end + +struct SessionId + db_mapping({ + id: String, + email: String, + issued: String, + }) +end + struct ConfigPreferences module StringToArray def self.to_yaml(value : Array(String), yaml : YAML::Nodes::Builder) @@ -483,3 +498,92 @@ def extract_shelf_items(nodeset, ucid = nil, author_name = nil) return items end + +def analyze_table(db, logger, table_name, struct_type = nil) + # Create table if it doesn't exist + if !db.query_one?("SELECT true FROM information_schema.tables WHERE table_name = $1", table_name, as: Bool) + db.using_connection do |conn| + conn.as(PG::Connection).exec_all(File.read("config/sql/#{table_name}.sql")) + end + + logger.write("CREATE TABLE #{table_name}\n") + end + + if !struct_type + return + end + + struct_array = struct_type.to_type_tuple + column_array = get_column_array(db, table_name) + column_types = File.read("config/sql/#{table_name}.sql").match(/CREATE TABLE public\.#{table_name}\n\((?[\d\D]*?)\);/) + .try &.["types"].split(",").map { |line| line.strip } + + if !column_types + return + end + + struct_array.each_with_index do |name, i| + if name != column_array[i]? + if !column_array[i]? + new_column = column_types.select { |line| line.starts_with? name }[0] + db.exec("ALTER TABLE #{table_name} ADD COLUMN #{new_column}") + logger.write("ALTER TABLE #{table_name} ADD COLUMN #{new_column}\n") + next + end + + # Column doesn't exist + if !column_array.includes? name + new_column = column_types.select { |line| line.starts_with? name }[0] + db.exec("ALTER TABLE #{table_name} ADD COLUMN #{new_column}") + end + + # Column exists but in the wrong position, rotate + if struct_array.includes? column_array[i] + until name == column_array[i] + new_column = column_types.select { |line| line.starts_with? column_array[i] }[0]?.try &.gsub("#{column_array[i]}", "#{column_array[i]}_new") + + # There's a column we didn't expect + if !new_column + db.exec("ALTER TABLE #{table_name} DROP COLUMN #{column_array[i]} CASCADE") + logger.write("ALTER TABLE #{table_name} DROP COLUMN #{column_array[i]}\n") + + column_array = get_column_array(db, table_name) + next + end + + db.exec("ALTER TABLE #{table_name} ADD COLUMN #{new_column}") + logger.write("ALTER TABLE #{table_name} ADD COLUMN #{new_column}\n") + db.exec("UPDATE #{table_name} SET #{column_array[i]}_new=#{column_array[i]}") + logger.write("UPDATE #{table_name} SET #{column_array[i]}_new=#{column_array[i]}\n") + db.exec("ALTER TABLE #{table_name} DROP COLUMN #{column_array[i]} CASCADE") + logger.write("ALTER TABLE #{table_name} DROP COLUMN #{column_array[i]} CASCADE\n") + db.exec("ALTER TABLE #{table_name} RENAME COLUMN #{column_array[i]}_new TO #{column_array[i]}") + logger.write("ALTER TABLE #{table_name} RENAME COLUMN #{column_array[i]}_new TO #{column_array[i]}\n") + + column_array = get_column_array(db, table_name) + end + else + db.exec("ALTER TABLE #{table_name} DROP COLUMN #{column_array[i]} CASCADE") + logger.write("ALTER TABLE #{table_name} DROP COLUMN #{column_array[i]} CASCADE\n") + end + end + end +end + +class PG::ResultSet + def field(index = @column_index) + @fields.not_nil![index] + end +end + +def get_column_array(db, table_name) + column_array = [] of String + db.query("SELECT * FROM #{table_name} LIMIT 0") do |rs| + rs.column_count.times do |i| + column = rs.as(PG::ResultSet).field(i) + column_array << column.name + end + end + + return column_array +end diff --git a/src/invidious/helpers/jobs.cr b/src/invidious/helpers/jobs.cr new file mode 100644 index 00000000..ad9e8ada --- /dev/null +++ b/src/invidious/helpers/jobs.cr @@ -0,0 +1,225 @@ +def refresh_channels(db, logger, max_threads = 1, full_refresh = false) + max_channel = Channel(Int32).new + + spawn do + max_threads = max_channel.receive + active_threads = 0 + active_channel = Channel(Bool).new + + loop do + db.query("SELECT id FROM channels ORDER BY updated") do |rs| + rs.each do + id = rs.read(String) + + if active_threads >= max_threads + if active_channel.receive + active_threads -= 1 + end + end + + active_threads += 1 + spawn do + begin + channel = fetch_channel(id, db, full_refresh) + + db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.now, channel.author, id) + rescue ex + if ex.message == "Deleted or invalid channel" + db.exec("UPDATE channels SET updated = $1, deleted = true WHERE id = $2", Time.now, id) + end + logger.write("#{id} : #{ex.message}\n") + end + + active_channel.send(true) + end + end + end + + sleep 1.minute + end + end + + max_channel.send(max_threads) +end + +def refresh_feeds(db, logger, max_threads = 1) + max_channel = Channel(Int32).new + + spawn do + max_threads = max_channel.receive + active_threads = 0 + active_channel = Channel(Bool).new + + loop do + db.query("SELECT email FROM users") do |rs| + rs.each do + email = rs.read(String) + view_name = "subscriptions_#{sha256(email)[0..7]}" + + if active_threads >= max_threads + if active_channel.receive + active_threads -= 1 + end + end + + active_threads += 1 + spawn do + begin + db.query("SELECT * FROM #{view_name} LIMIT 1") do |rs| + # Drop view that doesn't contain same number of rows as ChannelVideo + if ChannelVideo.from_rs(rs)[0]?.try &.to_a.size.try &.!= rs.column_count + db.exec("DROP MATERIALIZED VIEW #{view_name}") + raise "valid schema does not exist" + end + end + + db.exec("REFRESH MATERIALIZED VIEW #{view_name}") + rescue ex + # Create view if it doesn't exist + if ex.message.try &.ends_with?("does not exist") + # While iterating through, we may have an email stored from a deleted account + if db.query_one?("SELECT true FROM users WHERE email = $1", email, as: Bool) + db.exec("CREATE MATERIALIZED VIEW #{view_name} AS \ + SELECT * FROM channel_videos WHERE \ + ucid = ANY ((SELECT subscriptions FROM users WHERE email = E'#{email.gsub("'", "\\'")}')::text[]) \ + ORDER BY published DESC;") + logger.write("CREATE #{view_name}\n") + end + else + logger.write("REFRESH #{email} : #{ex.message}\n") + end + end + + active_channel.send(true) + end + end + end + + sleep 1.minute + end + end + + max_channel.send(max_threads) +end + +def subscribe_to_feeds(db, logger, key, config) + if config.use_pubsub_feeds + case config.use_pubsub_feeds + when Bool + max_threads = config.use_pubsub_feeds.as(Bool).to_unsafe + when Int32 + max_threads = config.use_pubsub_feeds.as(Int32) + end + max_channel = Channel(Int32).new + + spawn do + max_threads = max_channel.receive + active_threads = 0 + active_channel = Channel(Bool).new + + loop do + db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > interval '4 days' OR subscribed IS NULL") do |rs| + rs.each do + ucid = rs.read(String) + + if active_threads >= max_threads.as(Int32) + if active_channel.receive + active_threads -= 1 + end + end + + active_threads += 1 + + spawn do + begin + response = subscribe_pubsub(ucid, key, config) + + if response.status_code >= 400 + logger.write("#{ucid} : #{response.body}\n") + end + rescue ex + end + + active_channel.send(true) + end + end + end + + sleep 1.minute + end + end + + max_channel.send(max_threads.as(Int32)) + end +end + +def pull_top_videos(config, db) + loop do + begin + top = rank_videos(db, 40) + rescue ex + next + end + + if top.size > 0 + args = arg_array(top) + else + next + end + + videos = [] of Video + + top.each do |id| + begin + videos << get_video(id, db) + rescue ex + next + end + end + + yield videos + sleep 1.minute + end +end + +def pull_popular_videos(db) + loop do + subscriptions = db.query_all("SELECT channel FROM \ + (SELECT UNNEST(subscriptions) AS channel FROM users) AS d \ + GROUP BY channel ORDER BY COUNT(channel) DESC LIMIT 40", as: String) + + videos = db.query_all("SELECT DISTINCT ON (ucid) * FROM \ + channel_videos WHERE ucid IN (#{arg_array(subscriptions)}) \ + ORDER BY ucid, published DESC", subscriptions, as: ChannelVideo).sort_by { |video| video.published }.reverse + + yield videos + sleep 1.minute + end +end + +def update_decrypt_function + loop do + begin + decrypt_function = fetch_decrypt_function + rescue ex + next + end + + yield decrypt_function + sleep 1.minute + end +end + +def find_working_proxies(regions) + loop do + regions.each do |region| + proxies = get_proxies(region).first(20) + proxies = proxies.map { |proxy| {ip: proxy[:ip], port: proxy[:port]} } + # proxies = filter_proxies(proxies) + + yield region, proxies + end + + sleep 1.minute + end +end diff --git a/src/invidious/helpers/macros.cr b/src/invidious/helpers/macros.cr index fda34ed7..fe1fc94e 100644 --- a/src/invidious/helpers/macros.cr +++ b/src/invidious/helpers/macros.cr @@ -3,10 +3,14 @@ macro db_mapping(mapping) end def to_a - return [{{*mapping.keys.map { |id| "@#{id}".id }}}] + return [ {{*mapping.keys.map { |id| "@#{id}".id }}} ] end - DB.mapping({{mapping}}) + def self.to_type_tuple + return { {{*mapping.keys.map { |id| "#{id}" }}} } + end + + DB.mapping( {{mapping}} ) end macro json_mapping(mapping) @@ -14,11 +18,11 @@ macro json_mapping(mapping) end def to_a - return [{{*mapping.keys.map { |id| "@#{id}".id }}}] + return [ {{*mapping.keys.map { |id| "@#{id}".id }}} ] end - JSON.mapping({{mapping}}) - YAML.mapping({{mapping}}) + JSON.mapping( {{mapping}} ) + YAML.mapping( {{mapping}} ) end macro yaml_mapping(mapping) @@ -26,7 +30,7 @@ macro yaml_mapping(mapping) end def to_a - return [{{*mapping.keys.map { |id| "@#{id}".id }}}] + return [ {{*mapping.keys.map { |id| "@#{id}".id }}} ] end def to_tuple diff --git a/src/invidious/helpers/signatures.cr b/src/invidious/helpers/signatures.cr new file mode 100644 index 00000000..8b760398 --- /dev/null +++ b/src/invidious/helpers/signatures.cr @@ -0,0 +1,70 @@ +def fetch_decrypt_function(id = "CvFH_6DNRCY") + client = make_client(YT_URL) + document = client.get("/watch?v=#{id}&gl=US&hl=en&disable_polymer=1").body + url = document.match(/src="(?\/yts\/jsbin\/player_ias-.{9}\/en_US\/base.js)"/).not_nil!["url"] + player = client.get(url).body + + function_name = player.match(/^(?[^=]+)=function\(a\){a=a\.split\(""\)/m).not_nil!["name"] + function_body = player.match(/^#{Regex.escape(function_name)}=function\(a\){(?[^}]+)}/m).not_nil!["body"] + function_body = function_body.split(";")[1..-2] + + var_name = function_body[0][0, 2] + var_body = player.delete("\n").match(/var #{Regex.escape(var_name)}={(?(.*?))};/).not_nil!["body"] + + operations = {} of String => String + var_body.split("},").each do |operation| + op_name = operation.match(/^[^:]+/).not_nil![0] + op_body = operation.match(/\{[^}]+/).not_nil![0] + + case op_body + when "{a.reverse()" + operations[op_name] = "a" + when "{a.splice(0,b)" + operations[op_name] = "b" + else + operations[op_name] = "c" + end + end + + decrypt_function = [] of {name: String, value: Int32} + function_body.each do |function| + function = function.lchop(var_name).delete("[].") + + op_name = function.match(/[^\(]+/).not_nil![0] + value = function.match(/\(a,(?[\d]+)\)/).not_nil!["value"].to_i + + decrypt_function << {name: operations[op_name], value: value} + end + + return decrypt_function +end + +def decrypt_signature(fmt, code) + if !fmt["s"]? + return "" + end + + a = fmt["s"] + a = a.split("") + + code.each do |item| + case item[:name] + when "a" + a.reverse! + when "b" + a.delete_at(0..(item[:value] - 1)) + when "c" + a = splice(a, item[:value]) + end + end + + signature = a.join("") + return "&#{fmt["sp"]?}=#{signature}" +end + +def splice(a, b) + c = a[0] + a[0] = a[b % a.size] + a[b % a.size] = c + return a +end diff --git a/src/invidious/jobs.cr b/src/invidious/jobs.cr deleted file mode 100644 index 82e58c93..00000000 --- a/src/invidious/jobs.cr +++ /dev/null @@ -1,225 +0,0 @@ -def refresh_channels(db, logger, max_threads = 1, full_refresh = false) - max_channel = Channel(Int32).new - - spawn do - max_threads = max_channel.receive - active_threads = 0 - active_channel = Channel(Bool).new - - loop do - db.query("SELECT id FROM channels ORDER BY updated") do |rs| - rs.each do - id = rs.read(String) - - if active_threads >= max_threads - if active_channel.receive - active_threads -= 1 - end - end - - active_threads += 1 - spawn do - begin - channel = fetch_channel(id, db, full_refresh) - - db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.now, channel.author, id) - rescue ex - if ex.message == "Deleted or invalid channel" - db.exec("UPDATE channels SET updated = $1, deleted = true WHERE id = $2", Time.now, id) - end - logger.write("#{id} : #{ex.message}\n") - end - - active_channel.send(true) - end - end - end - - sleep 1.minute - end - end - - max_channel.send(max_threads) -end - -def refresh_feeds(db, logger, max_threads = 1) - max_channel = Channel(Int32).new - - spawn do - max_threads = max_channel.receive - active_threads = 0 - active_channel = Channel(Bool).new - - loop do - db.query("SELECT email FROM users") do |rs| - rs.each do - email = rs.read(String) - view_name = "subscriptions_#{sha256(email)[0..7]}" - - if active_threads >= max_threads - if active_channel.receive - active_threads -= 1 - end - end - - active_threads += 1 - spawn do - begin - db.query("SELECT * FROM #{view_name} LIMIT 1") do |rs| - # View doesn't contain same number of rows as ChannelVideo - if ChannelVideo.from_rs(rs)[0]?.try &.to_a.size.try &.!= rs.column_count - db.exec("DROP MATERIALIZED VIEW #{view_name}") - raise "valid schema does not exist" - end - end - - db.exec("REFRESH MATERIALIZED VIEW #{view_name}") - rescue ex - # Create view if it doesn't exist - if ex.message.try &.ends_with?("does not exist") - # While iterating through, we may have an email stored from a deleted account - if db.query_one?("SELECT true FROM users WHERE email = $1", email, as: Bool) - db.exec("CREATE MATERIALIZED VIEW #{view_name} AS \ - SELECT * FROM channel_videos WHERE \ - ucid = ANY ((SELECT subscriptions FROM users WHERE email = E'#{email.gsub("'", "\\'")}')::text[]) \ - ORDER BY published DESC;") - logger.write("CREATE #{view_name}\n") - end - else - logger.write("REFRESH #{email} : #{ex.message}\n") - end - end - - active_channel.send(true) - end - end - end - - sleep 1.minute - end - end - - max_channel.send(max_threads) -end - -def subscribe_to_feeds(db, logger, key, config) - if config.use_pubsub_feeds - case config.use_pubsub_feeds - when Bool - max_threads = config.use_pubsub_feeds.as(Bool).to_unsafe - when Int32 - max_threads = config.use_pubsub_feeds.as(Int32) - end - max_channel = Channel(Int32).new - - spawn do - max_threads = max_channel.receive - active_threads = 0 - active_channel = Channel(Bool).new - - loop do - db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > interval '4 days' OR subscribed IS NULL") do |rs| - rs.each do - ucid = rs.read(String) - - if active_threads >= max_threads.as(Int32) - if active_channel.receive - active_threads -= 1 - end - end - - active_threads += 1 - - spawn do - begin - response = subscribe_pubsub(ucid, key, config) - - if response.status_code >= 400 - logger.write("#{ucid} : #{response.body}\n") - end - rescue ex - end - - active_channel.send(true) - end - end - end - - sleep 1.minute - end - end - - max_channel.send(max_threads.as(Int32)) - end -end - -def pull_top_videos(config, db) - loop do - begin - top = rank_videos(db, 40) - rescue ex - next - end - - if top.size > 0 - args = arg_array(top) - else - next - end - - videos = [] of Video - - top.each do |id| - begin - videos << get_video(id, db) - rescue ex - next - end - end - - yield videos - sleep 1.minute - end -end - -def pull_popular_videos(db) - loop do - subscriptions = db.query_all("SELECT channel FROM \ - (SELECT UNNEST(subscriptions) AS channel FROM users) AS d \ - GROUP BY channel ORDER BY COUNT(channel) DESC LIMIT 40", as: String) - - videos = db.query_all("SELECT DISTINCT ON (ucid) * FROM \ - channel_videos WHERE ucid IN (#{arg_array(subscriptions)}) \ - ORDER BY ucid, published DESC", subscriptions, as: ChannelVideo).sort_by { |video| video.published }.reverse - - yield videos - sleep 1.minute - end -end - -def update_decrypt_function - loop do - begin - decrypt_function = fetch_decrypt_function - rescue ex - next - end - - yield decrypt_function - sleep 1.minute - end -end - -def find_working_proxies(regions) - loop do - regions.each do |region| - proxies = get_proxies(region).first(20) - proxies = proxies.map { |proxy| {ip: proxy[:ip], port: proxy[:port]} } - # proxies = filter_proxies(proxies) - - yield region, proxies - end - - sleep 1.minute - end -end diff --git a/src/invidious/signatures.cr b/src/invidious/signatures.cr deleted file mode 100644 index 8b760398..00000000 --- a/src/invidious/signatures.cr +++ /dev/null @@ -1,70 +0,0 @@ -def fetch_decrypt_function(id = "CvFH_6DNRCY") - client = make_client(YT_URL) - document = client.get("/watch?v=#{id}&gl=US&hl=en&disable_polymer=1").body - url = document.match(/src="(?\/yts\/jsbin\/player_ias-.{9}\/en_US\/base.js)"/).not_nil!["url"] - player = client.get(url).body - - function_name = player.match(/^(?[^=]+)=function\(a\){a=a\.split\(""\)/m).not_nil!["name"] - function_body = player.match(/^#{Regex.escape(function_name)}=function\(a\){(?[^}]+)}/m).not_nil!["body"] - function_body = function_body.split(";")[1..-2] - - var_name = function_body[0][0, 2] - var_body = player.delete("\n").match(/var #{Regex.escape(var_name)}={(?(.*?))};/).not_nil!["body"] - - operations = {} of String => String - var_body.split("},").each do |operation| - op_name = operation.match(/^[^:]+/).not_nil![0] - op_body = operation.match(/\{[^}]+/).not_nil![0] - - case op_body - when "{a.reverse()" - operations[op_name] = "a" - when "{a.splice(0,b)" - operations[op_name] = "b" - else - operations[op_name] = "c" - end - end - - decrypt_function = [] of {name: String, value: Int32} - function_body.each do |function| - function = function.lchop(var_name).delete("[].") - - op_name = function.match(/[^\(]+/).not_nil![0] - value = function.match(/\(a,(?[\d]+)\)/).not_nil!["value"].to_i - - decrypt_function << {name: operations[op_name], value: value} - end - - return decrypt_function -end - -def decrypt_signature(fmt, code) - if !fmt["s"]? - return "" - end - - a = fmt["s"] - a = a.split("") - - code.each do |item| - case item[:name] - when "a" - a.reverse! - when "b" - a.delete_at(0..(item[:value] - 1)) - when "c" - a = splice(a, item[:value]) - end - end - - signature = a.join("") - return "&#{fmt["sp"]?}=#{signature}" -end - -def splice(a, b) - c = a[0] - a[0] = a[b % a.size] - a[b % a.size] = c - return a -end -- cgit v1.2.3