aboutsummaryrefslogtreecommitdiffstats
path: root/activestorage/lib/active_storage/service
diff options
context:
space:
mode:
Diffstat (limited to 'activestorage/lib/active_storage/service')
-rw-r--r--activestorage/lib/active_storage/service/azure_service.rb115
-rw-r--r--activestorage/lib/active_storage/service/configurator.rb28
-rw-r--r--activestorage/lib/active_storage/service/disk_service.rb124
-rw-r--r--activestorage/lib/active_storage/service/gcs_service.rb79
-rw-r--r--activestorage/lib/active_storage/service/mirror_service.rb46
-rw-r--r--activestorage/lib/active_storage/service/s3_service.rb96
6 files changed, 488 insertions, 0 deletions
diff --git a/activestorage/lib/active_storage/service/azure_service.rb b/activestorage/lib/active_storage/service/azure_service.rb
new file mode 100644
index 0000000000..a505b9a0ee
--- /dev/null
+++ b/activestorage/lib/active_storage/service/azure_service.rb
@@ -0,0 +1,115 @@
+require "active_support/core_ext/numeric/bytes"
+require "azure/storage"
+require "azure/storage/core/auth/shared_access_signature"
+
+# Wraps the Microsoft Azure Storage Blob Service as a Active Storage service.
+# See `ActiveStorage::Service` for the generic API documentation that applies to all services.
+class ActiveStorage::Service::AzureService < ActiveStorage::Service
+ attr_reader :client, :path, :blobs, :container, :signer
+
+ def initialize(path:, storage_account_name:, storage_access_key:, container:)
+ @client = Azure::Storage::Client.create(storage_account_name: storage_account_name, storage_access_key: storage_access_key)
+ @signer = Azure::Storage::Core::Auth::SharedAccessSignature.new(storage_account_name, storage_access_key)
+ @blobs = client.blob_client
+ @container = container
+ @path = path
+ end
+
+ def upload(key, io, checksum: nil)
+ instrument :upload, key, checksum: checksum do
+ begin
+ blobs.create_block_blob(container, key, io, content_md5: checksum)
+ rescue Azure::Core::Http::HTTPError => e
+ raise ActiveStorage::IntegrityError
+ end
+ end
+ end
+
+ def download(key)
+ if block_given?
+ instrument :streaming_download, key do
+ stream(key, &block)
+ end
+ else
+ instrument :download, key do
+ _, io = blobs.get_blob(container, key)
+ io.force_encoding(Encoding::BINARY)
+ end
+ end
+ end
+
+ def delete(key)
+ instrument :delete, key do
+ begin
+ blobs.delete_blob(container, key)
+ rescue Azure::Core::Http::HTTPError
+ false
+ end
+ end
+ end
+
+ def exist?(key)
+ instrument :exist, key do |payload|
+ answer = blob_for(key).present?
+ payload[:exist] = answer
+ answer
+ end
+ end
+
+ def url(key, expires_in:, disposition:, filename:)
+ instrument :url, key do |payload|
+ base_url = url_for(key)
+ generated_url = signer.signed_uri(URI(base_url), false, permissions: "r",
+ expiry: format_expiry(expires_in), content_disposition: "#{disposition}; filename=\"#{filename}\"").to_s
+
+ payload[:url] = generated_url
+
+ generated_url
+ end
+ end
+
+ def url_for_direct_upload(key, expires_in:, content_type:, content_length:, checksum:)
+ instrument :url, key do |payload|
+ base_url = url_for(key)
+ generated_url = signer.signed_uri(URI(base_url), false, permissions: "rw",
+ expiry: format_expiry(expires_in)).to_s
+
+ payload[:url] = generated_url
+
+ generated_url
+ end
+ end
+
+ def headers_for_direct_upload(key, content_type:, checksum:, **)
+ { "Content-Type" => content_type, "Content-MD5" => checksum, "x-ms-blob-type" => "BlockBlob" }
+ end
+
+ private
+ def url_for(key)
+ "#{path}/#{container}/#{key}"
+ end
+
+ def blob_for(key)
+ blobs.get_blob_properties(container, key)
+ rescue Azure::Core::Http::HTTPError
+ false
+ end
+
+ def format_expiry(expires_in)
+ expires_in ? Time.now.utc.advance(seconds: expires_in).iso8601 : nil
+ end
+
+ # Reads the object for the given key in chunks, yielding each to the block.
+ def stream(key, options = {}, &block)
+ blob = blob_for(key)
+
+ chunk_size = 5.megabytes
+ offset = 0
+
+ while offset < blob.properties[:content_length]
+ _, io = blobs.get_blob(container, key, start_range: offset, end_range: offset + chunk_size - 1)
+ yield io
+ offset += chunk_size
+ end
+ end
+end
diff --git a/activestorage/lib/active_storage/service/configurator.rb b/activestorage/lib/active_storage/service/configurator.rb
new file mode 100644
index 0000000000..00ae24d251
--- /dev/null
+++ b/activestorage/lib/active_storage/service/configurator.rb
@@ -0,0 +1,28 @@
+class ActiveStorage::Service::Configurator #:nodoc:
+ attr_reader :configurations
+
+ def self.build(service_name, configurations)
+ new(configurations).build(service_name)
+ end
+
+ def initialize(configurations)
+ @configurations = configurations.deep_symbolize_keys
+ end
+
+ def build(service_name)
+ config = config_for(service_name.to_sym)
+ resolve(config.fetch(:service)).build(**config, configurator: self)
+ end
+
+ private
+ def config_for(name)
+ configurations.fetch name do
+ raise "Missing configuration for the #{name.inspect} Active Storage service. Configurations available for #{configurations.keys.inspect}"
+ end
+ end
+
+ def resolve(class_name)
+ require "active_storage/service/#{class_name.to_s.downcase}_service"
+ ActiveStorage::Service.const_get(:"#{class_name}Service")
+ end
+end
diff --git a/activestorage/lib/active_storage/service/disk_service.rb b/activestorage/lib/active_storage/service/disk_service.rb
new file mode 100644
index 0000000000..35b0909297
--- /dev/null
+++ b/activestorage/lib/active_storage/service/disk_service.rb
@@ -0,0 +1,124 @@
+require "fileutils"
+require "pathname"
+require "digest/md5"
+require "active_support/core_ext/numeric/bytes"
+
+# Wraps a local disk path as a Active Storage service. See `ActiveStorage::Service` for the generic API
+# documentation that applies to all services.
+class ActiveStorage::Service::DiskService < ActiveStorage::Service
+ attr_reader :root
+
+ def initialize(root:)
+ @root = root
+ end
+
+ def upload(key, io, checksum: nil)
+ instrument :upload, key, checksum: checksum do
+ IO.copy_stream(io, make_path_for(key))
+ ensure_integrity_of(key, checksum) if checksum
+ end
+ end
+
+ def download(key)
+ if block_given?
+ instrument :streaming_download, key do
+ File.open(path_for(key), "rb") do |file|
+ while data = file.read(64.kilobytes)
+ yield data
+ end
+ end
+ end
+ else
+ instrument :download, key do
+ File.binread path_for(key)
+ end
+ end
+ end
+
+ def delete(key)
+ instrument :delete, key do
+ begin
+ File.delete path_for(key)
+ rescue Errno::ENOENT
+ # Ignore files already deleted
+ end
+ end
+ end
+
+ def exist?(key)
+ instrument :exist, key do |payload|
+ answer = File.exist? path_for(key)
+ payload[:exist] = answer
+ answer
+ end
+ end
+
+ def url(key, expires_in:, disposition:, filename:, content_type:)
+ instrument :url, key do |payload|
+ verified_key_with_expiration = ActiveStorage.verifier.generate(key, expires_in: expires_in, purpose: :blob_key)
+
+ generated_url =
+ if defined?(Rails.application)
+ Rails.application.routes.url_helpers.rails_disk_service_path \
+ verified_key_with_expiration,
+ disposition: disposition, filename: filename, content_type: content_type
+ else
+ "/rails/active_storage/disk/#{verified_key_with_expiration}/#{filename}?disposition=#{disposition}&content_type=#{content_type}"
+ end
+
+ payload[:url] = generated_url
+
+ generated_url
+ end
+ end
+
+ def url_for_direct_upload(key, expires_in:, content_type:, content_length:, checksum:)
+ instrument :url, key do |payload|
+ verified_token_with_expiration = ActiveStorage.verifier.generate(
+ {
+ key: key,
+ content_type: content_type,
+ content_length: content_length,
+ checksum: checksum
+ },
+ expires_in: expires_in,
+ purpose: :blob_token
+ )
+
+ generated_url =
+ if defined?(Rails.application)
+ Rails.application.routes.url_helpers.update_rails_disk_service_path verified_token_with_expiration
+ else
+ "/rails/active_storage/disk/#{verified_token_with_expiration}"
+ end
+
+ payload[:url] = generated_url
+
+ generated_url
+ end
+ end
+
+ def headers_for_direct_upload(key, content_type:, **)
+ { "Content-Type" => content_type }
+ end
+
+ private
+ def path_for(key)
+ File.join root, folder_for(key), key
+ end
+
+ def folder_for(key)
+ [ key[0..1], key[2..3] ].join("/")
+ end
+
+ def make_path_for(key)
+ path_for(key).tap { |path| FileUtils.mkdir_p File.dirname(path) }
+ end
+
+ def ensure_integrity_of(key, checksum)
+ unless Digest::MD5.file(path_for(key)).base64digest == checksum
+ delete key
+ raise ActiveStorage::IntegrityError
+ end
+ end
+end
diff --git a/activestorage/lib/active_storage/service/gcs_service.rb b/activestorage/lib/active_storage/service/gcs_service.rb
new file mode 100644
index 0000000000..73629f7486
--- /dev/null
+++ b/activestorage/lib/active_storage/service/gcs_service.rb
@@ -0,0 +1,79 @@
+require "google/cloud/storage"
+require "active_support/core_ext/object/to_query"
+
+# Wraps the Google Cloud Storage as a Active Storage service. See `ActiveStorage::Service` for the generic API
+# documentation that applies to all services.
+class ActiveStorage::Service::GCSService < ActiveStorage::Service
+ attr_reader :client, :bucket
+
+ def initialize(project:, keyfile:, bucket:)
+ @client = Google::Cloud::Storage.new(project: project, keyfile: keyfile)
+ @bucket = @client.bucket(bucket)
+ end
+
+ def upload(key, io, checksum: nil)
+ instrument :upload, key, checksum: checksum do
+ begin
+ bucket.create_file(io, key, md5: checksum)
+ rescue Google::Cloud::InvalidArgumentError
+ raise ActiveStorage::IntegrityError
+ end
+ end
+ end
+
+ # FIXME: Add streaming when given a block
+ def download(key)
+ instrument :download, key do
+ io = file_for(key).download
+ io.rewind
+ io.read
+ end
+ end
+
+ def delete(key)
+ instrument :delete, key do
+ file_for(key).try(:delete)
+ end
+ end
+
+ def exist?(key)
+ instrument :exist, key do |payload|
+ answer = file_for(key).present?
+ payload[:exist] = answer
+ answer
+ end
+ end
+
+ def url(key, expires_in:, disposition:, filename:, content_type:)
+ instrument :url, key do |payload|
+ generated_url = file_for(key).signed_url expires: expires_in, query: {
+ "response-content-disposition" => "#{disposition}; filename=\"#{filename}\"",
+ "response-content-type" => content_type
+ }
+
+ payload[:url] = generated_url
+
+ generated_url
+ end
+ end
+
+ def url_for_direct_upload(key, expires_in:, content_type:, content_length:, checksum:)
+ instrument :url, key do |payload|
+ generated_url = bucket.signed_url key, method: "PUT", expires: expires_in,
+ content_type: content_type, content_md5: checksum
+
+ payload[:url] = generated_url
+
+ generated_url
+ end
+ end
+
+ def headers_for_direct_upload(key, content_type:, checksum:, **)
+ { "Content-Type" => content_type, "Content-MD5" => checksum }
+ end
+
+ private
+ def file_for(key)
+ bucket.file(key)
+ end
+end
diff --git a/activestorage/lib/active_storage/service/mirror_service.rb b/activestorage/lib/active_storage/service/mirror_service.rb
new file mode 100644
index 0000000000..7c407f2730
--- /dev/null
+++ b/activestorage/lib/active_storage/service/mirror_service.rb
@@ -0,0 +1,46 @@
+require "active_support/core_ext/module/delegation"
+
+# Wraps a set of mirror services and provides a single `ActiveStorage::Service` object that will all
+# have the files uploaded to them. A `primary` service is designated to answer calls to `download`, `exists?`,
+# and `url`.
+class ActiveStorage::Service::MirrorService < ActiveStorage::Service
+ attr_reader :primary, :mirrors
+
+ delegate :download, :exist?, :url, to: :primary
+
+ # Stitch together from named services.
+ def self.build(primary:, mirrors:, configurator:, **options) #:nodoc:
+ new \
+ primary: configurator.build(primary),
+ mirrors: mirrors.collect { |name| configurator.build name }
+ end
+
+ def initialize(primary:, mirrors:)
+ @primary, @mirrors = primary, mirrors
+ end
+
+ # Upload the `io` to the `key` specified to all services. If a `checksum` is provided, all services will
+ # ensure a match when the upload has completed or raise an `ActiveStorage::IntegrityError`.
+ def upload(key, io, checksum: nil)
+ each_service.collect do |service|
+ service.upload key, io.tap(&:rewind), checksum: checksum
+ end
+ end
+
+ # Delete the file at the `key` on all services.
+ def delete(key)
+ perform_across_services :delete, key
+ end
+
+ private
+ def each_service(&block)
+ [ primary, *mirrors ].each(&block)
+ end
+
+ def perform_across_services(method, *args)
+ # FIXME: Convert to be threaded
+ each_service.collect do |service|
+ service.public_send method, *args
+ end
+ end
+end
diff --git a/activestorage/lib/active_storage/service/s3_service.rb b/activestorage/lib/active_storage/service/s3_service.rb
new file mode 100644
index 0000000000..ca461c2994
--- /dev/null
+++ b/activestorage/lib/active_storage/service/s3_service.rb
@@ -0,0 +1,96 @@
+require "aws-sdk"
+require "active_support/core_ext/numeric/bytes"
+
+# Wraps the Amazon Simple Storage Service (S3) as a Active Storage service.
+# See `ActiveStorage::Service` for the generic API documentation that applies to all services.
+class ActiveStorage::Service::S3Service < ActiveStorage::Service
+ attr_reader :client, :bucket, :upload_options
+
+ def initialize(access_key_id:, secret_access_key:, region:, bucket:, upload: {}, **options)
+ @client = Aws::S3::Resource.new(access_key_id: access_key_id, secret_access_key: secret_access_key, region: region, **options)
+ @bucket = @client.bucket(bucket)
+
+ @upload_options = upload
+ end
+
+ def upload(key, io, checksum: nil)
+ instrument :upload, key, checksum: checksum do
+ begin
+ object_for(key).put(upload_options.merge(body: io, content_md5: checksum))
+ rescue Aws::S3::Errors::BadDigest
+ raise ActiveStorage::IntegrityError
+ end
+ end
+ end
+
+ def download(key)
+ if block_given?
+ instrument :streaming_download, key do
+ stream(key, &block)
+ end
+ else
+ instrument :download, key do
+ object_for(key).get.body.read.force_encoding(Encoding::BINARY)
+ end
+ end
+ end
+
+ def delete(key)
+ instrument :delete, key do
+ object_for(key).delete
+ end
+ end
+
+ def exist?(key)
+ instrument :exist, key do |payload|
+ answer = object_for(key).exists?
+ payload[:exist] = answer
+ answer
+ end
+ end
+
+ def url(key, expires_in:, disposition:, filename:, content_type:)
+ instrument :url, key do |payload|
+ generated_url = object_for(key).presigned_url :get, expires_in: expires_in,
+ response_content_disposition: "#{disposition}; filename=\"#{filename}\"",
+ response_content_type: content_type
+
+ payload[:url] = generated_url
+
+ generated_url
+ end
+ end
+
+ def url_for_direct_upload(key, expires_in:, content_type:, content_length:, checksum:)
+ instrument :url, key do |payload|
+ generated_url = object_for(key).presigned_url :put, expires_in: expires_in,
+ content_type: content_type, content_length: content_length, content_md5: checksum
+
+ payload[:url] = generated_url
+
+ generated_url
+ end
+ end
+
+ def headers_for_direct_upload(key, content_type:, checksum:, **)
+ { "Content-Type" => content_type, "Content-MD5" => checksum }
+ end
+
+ private
+ def object_for(key)
+ bucket.object(key)
+ end
+
+ # Reads the object for the given key in chunks, yielding each to the block.
+ def stream(key, options = {}, &block)
+ object = object_for(key)
+
+ chunk_size = 5.megabytes
+ offset = 0
+
+ while offset < object.content_length
+ yield object.read(options.merge(range: "bytes=#{offset}-#{offset + chunk_size - 1}"))
+ offset += chunk_size
+ end
+ end
+end