The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

ShardedKV - An interface to sharded key-value stores

VERSION

version 0.20

SYNOPSIS

  use ShardedKV;
  use ShardedKV::Continuum::Ketama;
  use ShardedKV::Storage::Redis;
  
  my $continuum_spec = [
    ["shard1", 100], # shard name, weight
    ["shard2", 150],
  ];
  my $continuum = ShardedKV::Continuum::Ketama->new(from => $continuum_spec);
  
  # Redis storage chosen here, but can also be "Memory" or "MySQL".
  # "Memory" is for testing. Mixing storages likely has weird side effects.
  my %storages = (
    shard1 => ShardedKV::Storage::Redis->new(
      redis_connect_str => 'redisserver:6379',
    ),
    shard2 => ShardedKV::Storage::Redis->new(
      redis_connect_str => 'redisserver:6380',
    ),
  );
  
  my $skv = ShardedKV->new(
    storages => \%storages,
    continuum => $continuum,
  );
  
  my $value = $skv->get($key);
  $skv->set($key, $value);
  $skv->delete($key);

DESCRIPTION

This module implements an abstract interface to a sharded key-value store. The storage backends as well as the "continuum" are pluggable. "Continuum" is to mean "the logic that decides in which shard a particular key lives". Typically, people use consistent hashing for this purpose and very commonly the choice is to use ketama specifically. See below for references.

Beside the abstract querying interface, this module also implements logic to add one or more servers to the continuum and use passive key migration to extend capacity without downtime. Do make it a point to understand the logic before using it. More on that below.

LOGGING

ShardedKV allows instrumentation for logging and debugging by setting the logger attribute of the main ShardedKV object, and/or its continuum and/or any or all storage sub-objects. If set, the logger attribute must be an object implementing the following methods:

  • trace

  • debug

  • info

  • warn

  • error

  • fatal

which take a string parameter that is to be logged. These logging levels might be familiar since they are taken from Log::Log4perl, which means that you can use a Log::Log4perl::Logger object here.

Additionally, the following methods must return whether or not the given log level is enabled, to potentially avoid costly construction of log messages:

  • is_trace

  • is_debug

  • is_info

  • is_warn

  • is_error

  • is_fatal

PUBLIC ATTRIBUTES

continuum

The continuum object decides on which shard a given key lives. This is required for a ShardedKV object and must be an object that implements the ShardedKV::Continuum role.

migration_continuum

This is a second continuum object that has additional shards configured. If this is set, a passive key migration is in effect. See begin_migration below!

storages

A hashref of storage objects, each of which represents one shard. Keys in the hash must be the same labels/shard names that are used in the continuum. Each storage object must implement the ShardedKV::Storage role.

logger

If set, this must be a user-supplied object that implements a certain number of methods which are called throughout ShardedKV for logging/debugging purposes. See "LOGGING" for details.

PUBLIC METHODS

get

Given a key, fetches the value for that key from the correct shard and returns that value or undef on failure.

Different storage backends may return a reference to the value instead. For example, the Redis and Memory backends return scalar references, whereas the mysql backend returns an array reference. This might still change, likely, all backends may be required to return scalar references in the future.

set

Given a key and a value, saves the value into the key within the correct shard.

The value needs to be a reference of the same type that would be returned by the storage backend when calling get(). See the discussion above.

delete

Given a key, deletes the key's entry from the correct shard.

In a migration situation, this might attempt to delete the key from multiple shards, see below.

reset_connection

Given a key, it retrieves to which shard it would have communicated and calls reset_connection() upon it. This allows doing a reconnect only for the shards that have problems. If there is a migration_continuum it will also reset the connection to that shard as well in an abundance of caution.

begin_migration

Given a ShardedKV::Continuum object, this sets the migration_continuum property of the ShardedKV, thus beginning a passive key migration. Right now, the only kind of migration that is supported is adding shards! Only one migration may be in effect at a time. The passive qualification there is very significant. If you are, for example, using the Redis storage backend with a key expiration of one hour, then you know, that after letting the passive migration run for one hour, all keys that are still relevant will have been migrated (or expired if they were not relevant).

Full migration example:

  use ShardedKV;
  use ShardedKV::Continuum::Ketama;
  use ShardedKV::Storage::Redis;
  
  my $continuum_spec = [
    ["shard1", 100], # shard name, weight
    ["shard2", 150],
  ];
  my $continuum = ShardedKV::Continuum::Ketama->new(from => $continuum_spec);
  
  # Redis storage chosen here, but can also be "Memory" or "MySQL".
  # "Memory" is for testing. Mixing storages likely has weird side effects.
  my %storages = (
    shard1 => ShardedKV::Storage::Redis->new(
      redis_connect_str => 'redisserver:6379',
      expiration_time => 60*60,
    ),
    shard2 => ShardedKV::Storage::Redis->new(
      redis_connect_str => 'redisserver:6380',
      expiration_time => 60*60,
    ),
  );
  
  my $skv = ShardedKV->new(
    storages => \%storages,
    continuum => $continuum,
  );
  # ... use the skv ...
  
  # Oh, we need to extend it!
  # Add storages:
  $skv->storages->{shard3} = ShardedKV::Storage::Redis->new(
    redis_connect_str => 'NEWredisserver:6379',
    expiration_time => 60*60,
  );
  # ... could add more at the same time...
  my $old_continuum = $skv->continuum;
  my $extended_continuum = $old_continuum->clone;
  $extended_continuum->extend([shard3 => 120]);
  $skv->begin_migration($extended_continuum);
  # ... use the skv normally...
  # ... after one hour (60*60 seconds), we can stop the migration:
  $skv->end_migration();

The logic for the migration is fairly simple:

If there is a migration continuum, then for get requests, that continuum is used to find the right shard for the given key. If that shard does not have the key, we check the original continuum and if that points the key at a different shard, we query that.

For delete requests, we also attempt to delete from the shard pointed to by the migration continuum AND the shard pointed to by the main continuum.

For set requests, we always only use the shard deduced from the migration continuum

end_migration() will promote the migration continuum to the regular continuum and set the migration_continuum property to undef.

end_migration

See the begin_migration docs above.

SEE ALSO

ACKNLOWLEDGMENT

This module was originally developed for Booking.com. With approval from Booking.com, this module was generalized and put on CPAN, for which the authors would like to express their gratitude.

AUTHORS

  • Steffen Mueller <smueller@cpan.org>

  • Nick Perez <nperez@cpan.org>

  • Damian Gryski <dgryski@cpan.org>

COPYRIGHT AND LICENSE

This software is copyright (c) 2013 by Steffen Mueller.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.