Merge pull request #42049 from Shopify/refactor-entry-compression

Refactor Cache::Entry compression handling
This commit is contained in:
Jean Boussier 2021-04-22 14:50:35 +02:00 committed by GitHub
commit d42a34a8c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 99 additions and 61 deletions

@ -24,6 +24,8 @@ module Cache
# implementations may support additional options.
UNIVERSAL_OPTIONS = [:namespace, :compress, :compress_threshold, :expires_in, :expire_in, :expired_in, :race_condition_ttl, :coder, :skip_nil]
DEFAULT_COMPRESS_LIMIT = 1.kilobyte
# Mapping of canonical option names to aliases that a store will recognize.
OPTION_ALIASES = {
expires_in: [:expire_in, :expired_in]
@ -169,7 +171,23 @@ def retrieve_store_class(store)
# threshold is configurable with the <tt>:compress_threshold</tt> option,
# specified in bytes.
class Store
DEFAULT_CODER = Marshal
module MarshalCoder # :nodoc:
extend self
def dump(entry)
Marshal.dump(entry)
end
def dump_compressed(entry, threshold)
Marshal.dump(entry.compressed(threshold))
end
def load(payload)
Marshal.load(payload)
end
end
DEFAULT_CODER = MarshalCoder
cattr_accessor :logger, instance_writer: true
@ -198,7 +216,11 @@ def ensure_connection_pool_added!
# namespace for the cache.
def initialize(options = nil)
@options = options ? normalize_options(options) : {}
@options[:compress] = true unless @options.key?(:compress)
@options[:compress_threshold] = DEFAULT_COMPRESS_LIMIT unless @options.key?(:compress_threshold)
@coder = @options.delete(:coder) { self.class::DEFAULT_CODER } || NullCoder
@coder_supports_compression = @coder.respond_to?(:dump_compressed)
end
# Silences the logger.
@ -609,8 +631,13 @@ def write_entry(key, entry, **options)
raise NotImplementedError.new
end
def serialize_entry(entry)
@coder.dump(entry)
def serialize_entry(entry, **options)
options = merged_options(options)
if @coder_supports_compression && options[:compress]
@coder.dump_compressed(entry, options[:compress_threshold] || DEFAULT_COMPRESS_LIMIT)
else
@coder.dump(entry)
end
end
def deserialize_entry(payload)
@ -789,14 +816,18 @@ def save_block_result_to_cache(name, options)
end
module NullCoder # :nodoc:
class << self
def load(payload)
payload
end
extend self
def dump(entry)
entry
end
def dump(entry)
entry
end
def dump_compressed(entry, threshold)
entry.compressed(threshold)
end
def load(payload)
payload
end
end
@ -810,17 +841,15 @@ def dump(entry)
class Entry # :nodoc:
attr_reader :version
DEFAULT_COMPRESS_LIMIT = 1.kilobyte
# Creates a new cache entry for the specified value. Options supported are
# +:compress+, +:compress_threshold+, +:version+, +:expires_at+ and +:expires_in+.
def initialize(value, compress: true, compress_threshold: DEFAULT_COMPRESS_LIMIT, version: nil, expires_in: nil, expires_at: nil, **)
# +:compressed+, +:version+, +:expires_at+ and +:expires_in+.
def initialize(value, compressed: false, version: nil, expires_in: nil, expires_at: nil, **)
@value = value
@version = version
@created_at = 0.0
@expires_in = expires_at&.to_f || expires_in && (expires_in.to_f + Time.now.to_f)
compress!(compress_threshold) if compress
@compressed = true if compressed
end
def value
@ -866,6 +895,30 @@ def compressed? # :nodoc:
defined?(@compressed)
end
def compressed(compress_threshold)
return self if compressed?
case @value
when nil, true, false, Numeric
uncompressed_size = 0
when String
uncompressed_size = @value.bytesize
else
serialized = Marshal.dump(@value)
uncompressed_size = serialized.bytesize
end
if uncompressed_size >= compress_threshold
serialized ||= Marshal.dump(@value)
compressed = Zlib::Deflate.deflate(serialized)
if compressed.bytesize < uncompressed_size
return Entry.new(compressed, compressed: true, expires_at: expires_at, version: version)
end
end
self
end
def local?
false
end
@ -883,28 +936,6 @@ def dup_value!
end
private
def compress!(compress_threshold)
case @value
when nil, true, false, Numeric
uncompressed_size = 0
when String
uncompressed_size = @value.bytesize
else
serialized = Marshal.dump(@value)
uncompressed_size = serialized.bytesize
end
if uncompressed_size >= compress_threshold
serialized ||= Marshal.dump(@value)
compressed = Zlib::Deflate.deflate(serialized)
if compressed.bytesize < uncompressed_size
@value = compressed
@compressed = true
end
end
end
def uncompress(value)
Marshal.load(Zlib::Inflate.inflate(value))
end

@ -73,7 +73,7 @@ def delete_matched(matcher, options = nil)
private
def read_entry(key, **options)
if File.exist?(key)
entry = File.open(key) { |f| deserialize_entry(f.read) }
entry = deserialize_entry(File.binread(key))
entry if entry.is_a?(Cache::Entry)
end
rescue => e
@ -84,7 +84,8 @@ def read_entry(key, **options)
def write_entry(key, entry, **options)
return false if options[:unless_exist] && File.exist?(key)
ensure_cache_path(File.dirname(key))
File.atomic_write(key, cache_path) { |f| f.write(serialize_entry(entry)) }
payload = serialize_entry(entry, **options)
File.atomic_write(key, cache_path) { |f| f.write(payload) }
true
end

@ -148,7 +148,7 @@ def read_entry(key, **options)
# Write an entry to the cache.
def write_entry(key, entry, **options)
method = options[:unless_exist] ? :add : :set
value = options[:raw] ? entry.value.to_s : serialize_entry(entry)
value = options[:raw] ? entry.value.to_s : serialize_entry(entry, **options)
expires_in = options[:expires_in].to_i
if options[:race_condition_ttl] && expires_in > 0 && !options[:raw]
# Set the memcache expire a few minutes in the future to support race condition ttls on read
@ -198,7 +198,7 @@ def normalize_key(key, options)
def deserialize_entry(payload)
entry = super
entry = Entry.new(entry, compress: false) if entry && !entry.is_a?(Entry)
entry = Entry.new(entry) if entry && !entry.is_a?(Entry)
entry
end

@ -25,17 +25,23 @@ module Cache
# MemoryStore is thread-safe.
class MemoryStore < Store
module DupCoder # :nodoc:
class << self
def load(entry)
entry = entry.dup
entry.dup_value!
entry
end
extend self
def dump(entry)
entry.dup_value!
entry
end
def dump(entry)
entry.dup_value! unless entry.compressed?
entry
end
def dump_compressed(entry, threshold)
entry = entry.compressed(threshold)
entry.dup_value! unless entry.compressed?
entry
end
def load(entry)
entry = entry.dup
entry.dup_value!
entry
end
end
@ -156,7 +162,7 @@ def read_entry(key, **options)
end
def write_entry(key, entry, **options)
payload = serialize_entry(entry)
payload = serialize_entry(entry, **options)
synchronize do
return false if options[:unless_exist] && @data.key?(key)

@ -388,7 +388,7 @@ def read_multi_mget(*names)
#
# Requires Redis 2.6.12+ for extended SET options.
def write_entry(key, entry, unless_exist: false, raw: false, expires_in: nil, race_condition_ttl: nil, **options)
serialized_entry = serialize_entry(entry, raw: raw)
serialized_entry = serialize_entry(entry, raw: raw, **options)
# If race condition TTL is in use, ensure that cache entries
# stick around a bit longer after they would have expired
@ -433,7 +433,7 @@ def write_multi_entries(entries, expires_in: nil, **options)
if entries.any?
if mset_capable? && expires_in.nil?
failsafe :write_multi_entries do
redis.with { |c| c.mapped_mset(serialize_entries(entries, raw: options[:raw])) }
redis.with { |c| c.mapped_mset(serialize_entries(entries, **options)) }
end
else
super
@ -458,23 +458,23 @@ def truncate_key(key)
def deserialize_entry(payload, raw:)
if payload && raw
Entry.new(payload, compress: false)
Entry.new(payload)
else
super(payload)
end
end
def serialize_entry(entry, raw: false)
def serialize_entry(entry, raw: false, **options)
if raw
entry.value.to_s
else
super(entry)
super(entry, raw: raw, **options)
end
end
def serialize_entries(entries, raw: false)
def serialize_entries(entries, **options)
entries.transform_values do |entry|
serialize_entry entry, raw: raw
serialize_entry(entry, **options)
end
end

@ -77,7 +77,7 @@ def test_raw_read_entry_compression
cache = lookup_store(raw: true)
cache.write("foo", 2)
assert_not_called_on_instance_of ActiveSupport::Cache::Entry, :compress! do
assert_not_called_on_instance_of ActiveSupport::Cache::Entry, :compressed do
cache.read("foo")
end
end

@ -329,7 +329,7 @@ class RawTest < StoreTest
test "does not compress values read with \"raw\" enabled" do
@cache.write("foo", "bar", raw: true)
assert_not_called_on_instance_of ActiveSupport::Cache::Entry, :compress! do
assert_not_called_on_instance_of ActiveSupport::Cache::Entry, :compressed do
@cache.read("foo", raw: true)
end
end