Skip to content

Commit

Permalink
PERF: Preload S3 inventory data for multisite clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtaylorhq committed Jul 29, 2020
1 parent 620c223 commit 16c65a9
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 21 deletions.
31 changes: 30 additions & 1 deletion app/jobs/scheduled/ensure_s3_uploads_existence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,38 @@ module Jobs
class EnsureS3UploadsExistence < ::Jobs::Scheduled
every 1.day

def perform(*args)
super
ensure
if @db_inventories
@db_inventories.values.each { |f| f.close; f.unlink }
end
end

def prepare_for_all_sites
inventory = S3Inventory.new(s3_helper, :upload)
@db_inventories = inventory.prepare_for_all_sites
@inventory_date = inventory.inventory_date
end

def execute(args)
return unless SiteSetting.enable_s3_inventory
Discourse.store.list_missing_uploads(skip_optimized: true)
require 's3_inventory'

if !@db_inventories && Rails.configuration.multisite && GlobalSetting.use_s3?
prepare_for_all_sites
end

if @db_inventories && preloaded_inventory_file = @db_inventories[RailsMultisite::ConnectionManagement.current_db]
S3Inventory.new(
s3_helper,
:upload,
preloaded_inventory_file: preloaded_inventory_file,
preloaded_inventory_date: @inventory_date
).backfill_etags_and_list_missing
else
S3Inventory.new(s3_helper, :upload).backfill_etags_and_list_missing
end
end
end
end
76 changes: 61 additions & 15 deletions lib/s3_inventory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ class S3Inventory
INVENTORY_PREFIX ||= "inventory"
INVENTORY_VERSION ||= "1"

def initialize(s3_helper, type)
def initialize(s3_helper, type, preloaded_inventory_file: nil, preloaded_inventory_date: nil)
@s3_helper = s3_helper

if preloaded_inventory_file && preloaded_inventory_date
# Data preloaded, so we don't need to fetch it again
@preloaded_inventory_file = preloaded_inventory_file
@inventory_date = preloaded_inventory_date
end

if type == :upload
@type = "original"
@model = Upload
Expand All @@ -25,32 +31,25 @@ def initialize(s3_helper, type)
end

def backfill_etags_and_list_missing
if files.blank?
if !@preloaded_inventory_file && files.blank?
error("Failed to list inventory from S3")
return
end

DistributedMutex.synchronize("s3_inventory_list_missing_#{type}", validity: 30.minutes) do
begin
files.each do |file|
next if File.exists?(file[:filename][0...-3])

download_inventory_file_to_tmp_directory(file)
decompress_inventory_file(file)
end
download_and_decompress_files if !@preloaded_inventory_file

multisite_prefix = Discourse.store.upload_path
ActiveRecord::Base.transaction do
begin
connection.exec("CREATE TEMP TABLE #{table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))")
connection.copy_data("COPY #{table_name} FROM STDIN CSV") do
files.each do |file|
CSV.foreach(file[:filename][0...-3], headers: false) do |row|
key = row[CSV_KEY_INDEX]
next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
url = File.join(Discourse.store.absolute_base_url, key)
connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
end
for_each_inventory_row do |row|
key = row[CSV_KEY_INDEX]
next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
url = File.join(Discourse.store.absolute_base_url, key)
connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
end
end

Expand Down Expand Up @@ -87,6 +86,16 @@ def backfill_etags_and_list_missing
end
end

def for_each_inventory_row
if @preloaded_inventory_file
CSV.foreach(@preloaded_inventory_file) { |row| yield(row) }
else
files.each do |file|
CSV.foreach(file[:filename][0...-3]) { |row| yield(row) }
end
end
end

def download_inventory_file_to_tmp_directory(file)
return if File.exists?(file[:filename])

Expand Down Expand Up @@ -136,9 +145,36 @@ def update_bucket_inventory_configuration
)
end

def prepare_for_all_sites
db_names = RailsMultisite::ConnectionManagement.all_dbs
db_files = {}

db_names.each do |db|
db_files[db] = Tempfile.new("#{db}-inventory.csv")
end

download_and_decompress_files
for_each_inventory_row do |row|
key = row[CSV_KEY_INDEX]
row_db = key.match(/uploads\/([^\/]+)\//)&.[](1)
if row_db && file = db_files[row_db]
file.write(row.to_csv)
end
end

db_names.each do |db|
db_files[db].rewind
end

db_files
ensure
cleanup!
end

private

def cleanup!
return if @preloaded_inventory_file
files.each do |file|
File.delete(file[:filename]) if File.exists?(file[:filename])
File.delete(file[:filename][0...-3]) if File.exists?(file[:filename][0...-3])
Expand All @@ -154,6 +190,7 @@ def table_name
end

def files
return if @preloaded_inventory_file
@files ||= begin
symlink_file = unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
return [] if symlink_file.blank?
Expand All @@ -171,6 +208,15 @@ def files
end
end

def download_and_decompress_files
files.each do |file|
next if File.exists?(file[:filename][0...-3])

download_inventory_file_to_tmp_directory(file)
decompress_inventory_file(file)
end
end

def tmp_directory
@tmp_directory ||= begin
current_db = RailsMultisite::ConnectionManagement.current_db
Expand Down
46 changes: 44 additions & 2 deletions spec/components/s3_inventory_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
freeze_time

CSV.foreach(csv_filename, headers: false) do |row|
next unless row[S3Inventory::CSV_KEY_INDEX].include?("default")
Fabricate(:upload, etag: row[S3Inventory::CSV_ETAG_INDEX], updated_at: 2.days.ago)
end

Expand All @@ -82,8 +83,8 @@

it "should backfill etags to uploads table correctly" do
files = [
["#{Discourse.store.absolute_base_url}/original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg", "defcaac0b4aca535c284e95f30d608d0"],
["#{Discourse.store.absolute_base_url}/original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png", "25c02eaceef4cb779fc17030d33f7f06"]
["#{Discourse.store.absolute_base_url}/uploads/default/original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg", "defcaac0b4aca535c284e95f30d608d0"],
["#{Discourse.store.absolute_base_url}/uploads/default/original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png", "25c02eaceef4cb779fc17030d33f7f06"]
]
files.each { |file| Fabricate(:upload, url: file[0]) }

Expand All @@ -95,4 +96,45 @@

expect(Upload.by_users.order(:url).pluck(:url, :etag)).to eq(files)
end

it "should work when passed preloaded data" do
freeze_time

CSV.foreach(csv_filename, headers: false) do |row|
next unless row[S3Inventory::CSV_KEY_INDEX].include?("default")
Fabricate(:upload, etag: row[S3Inventory::CSV_ETAG_INDEX], updated_at: 2.days.ago)
end

upload = Fabricate(:upload, etag: "ETag", updated_at: 1.days.ago)
Fabricate(:upload, etag: "ETag2", updated_at: Time.now)
no_etag = Fabricate(:upload, updated_at: 2.days.ago)

output = capture_stdout do
File.open(csv_filename) do |f|
preloaded_inventory = S3Inventory.new(
helper,
:upload,
preloaded_inventory_file: f,
preloaded_inventory_date: Time.now
)
preloaded_inventory.backfill_etags_and_list_missing
end
end

expect(output).to eq("#{upload.url}\n#{no_etag.url}\n2 of 5 uploads are missing\n")
expect(Discourse.stats.get("missing_s3_uploads")).to eq(2)
end

it "can create per-site files", type: :multisite do
freeze_time

inventory.stubs(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }])

files = inventory.prepare_for_all_sites
db1 = files["default"].read
db2 = files["second"].read
expect(db1.lines.count).to eq(3)
expect(db2.lines.count).to eq(1)
files.values.each { |f| f.close; f.unlink }
end
end
7 changes: 4 additions & 3 deletions spec/fixtures/csv/s3_inventory.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"abc","original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg","defcaac0b4aca535c284e95f30d608d0"
"abc","original/1X/050afc0ab01debe8cf48fd2ce50fbbf5eb072815.jpg","0cdc623af39cde0adb382670a6dc702a"
"abc","original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png","25c02eaceef4cb779fc17030d33f7f06"
"abc","uploads/default/original/1X/0184537a4f419224404d013414e913a4f56018f2.jpg","defcaac0b4aca535c284e95f30d608d0"
"abc","uploads/default/original/1X/050afc0ab01debe8cf48fd2ce50fbbf5eb072815.jpg","0cdc623af39cde0adb382670a6dc702a"
"abc","uploads/default/original/1X/0789fbf5490babc68326b9cec90eeb0d6590db05.png","25c02eaceef4cb779fc17030d33f7f06"
"abc","uploads/second/original/1X/f789fbf5490babc68326b9cec90eeb0d6590db03.png","15c02eaceef4cb779fc17030d33f7f04"

2 comments on commit 16c65a9

@tgxworld
Copy link
Contributor

@tgxworld tgxworld commented on 16c65a9 Jul 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidtaylorhq Nothing major but I thought it'll be nice if the commit message provided a summary of what changed to make this faster and by how much.

@davidtaylorhq
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgxworld you're right, I should have been more detailed there.

In case a comment is useful for future travellers, the commit message should have looked something like:

In a multisite cluster where the S3 config is shared across all sites, the inventory is also shared. Previously, we were downloading, decompressing and reading that inventory once for every site. On one of CDCK's large clusters, these inventory files easily exceed 100mb and more than 1 million rows.

Since the files are the same for every site, we can save a significant amount of work by fetching/decompressing/reading the large inventory file once, and then creating much smaller per-site inventory files for the job to use.

For a typical CDCK cluster (~400 sites, > 1 million total uploads) this commit reduced the job time from 3 hours to 5 minutes.

Please sign in to comment.