Singleton Boto Client Wrappers In a Multi-Threaded Environment

Well, the title already sounds thrilling, I know. But that’s the deal today.

Here at Plurk, we use AWS DynamoDB to store some key-value things. This includes, for example, mapping from a single plurk’s ID into its database shard ID, and maintaining global (cross-shard) atomic counters. And we use boto to communicate with DynamoDB. Since boto3 wasn’t available (or rather, wasn’t declared as stable) when we transitioned to AWS, we use boto2. We probably would’ve needed less threading library constructs if we could utilize boto3 sessions!

Indeed, in our codebase, boto library calls are wrapped in our customized modules to expose semantically meaningful data/field accessors. Conventionally, most of our modules are singleton utilities (I’m personally not sure of exactly why, though). For example, if we want to dig out a user’s ID from their username, we’ll do a user_id = Users().get_id_from_name(username), with Users class extending (or rather, mixing-in) the Singleton base class.

 
Preface: (Thread-Safe) Singletons

For sake of completeness, here’s the implementation of that Singleton class:

(For brevity, imports are omitted, or things are used as were imported into the current namespace with from ... import ...)

class Singleton(object):
  __it_lock__ = threading.Lock()
  __it__ = None

  def __new__(cls, *args, **kwargs):
    if not cls.__it__:
      with cls.__it_lock__:
        if not cls.__it__:
          it = object.__new__(cls)
          it.init(*args, **kwargs)
          cls.__it__ = it
    return cls.__it__

  # Don't use __init__() because an instance returned
  # by __new__() will have its __init__() called
  # automatically every time __new__() is called. 
  @overridable
  def init(self, *args, **kwargs):
    pass

Alright, hope this metaclass-flavor singleton implementation is not too ugly, because this is actually more than a typical one — this singleton implementation is thread safe, guarded by double-checked locking. Why do we need thread safety? Because we use CherryPy as our HTTP server, where each HTTP request is served by a different thread, to attain the best possible throughput. So, when our server just launches, it immediately gets lots of requests, and there’s gonna be some occasion where singletons for the same class are almost simultaneously being requested (attribute that “almost” to Python’s GIL) — and we want to make sure only one has been instantiated in the end.

 
Wrapping Boto (and Failing)

Back to our main topic. To wrap our boto calls, taking the plurk mapping and atomic counter examples above, this was the first version of the modules I wrote…

class _DDBBase(Singleton):
  @override
  def init(self, *args, **kwargs):
    self._conn = boto.dynamodb2.connect_to_region(
      config.DYNAMODB_REGION,
      awesomes_access_key_id=config.DYNAMODB_ACCESS_ID,
      aws_secret_access_key=config.DYNAMODB_ACCESS_KEY
    )

class DDBPlurksMap(_DDBBase):
  @override
  def init(self, *args, **kwargs):    
    super(DDBPlurksMap, self).init(*args, **kwargs)
    self._ddb_plurks_map = DDBTable(config.DYNAMODB_TABLE_PLURKS_MAP, connection=self._conn) # from boto.dynamodb2.table import Table as DDBTable

  def get_shard_id(self, plurk_id, strong_consistent=False):
    try:
      item = self._ddb_plurks_map.get_item(consistent=strong_consistent, id=plurk_id)
    except ItemNotFound: # from boto.dynamodb2.exceptions import ItemNotFound
      return None
    else:
      return int(item['shard_id'])

class DDBAtomicCounters(_DDBBase):
  def _inc_and_get(self, key_name):
    res = self._conn.update_item(config.DYNAMODB_TABLE_ATOMIC_COUNTERS, {'name': {'S': key_name}}, {'number': {'Action': 'ADD', 'Value': {'N': '1'}}}, return_values='UPDATED_NEW')
    return int(res['Attributes']['number']['N'])

  def next_magic_id(self):
    return self._inc_and_get('magic_counter')

Looks properly abstracted? Yeah, it turned out this code worked… most of the time, but not always: These pieces of codes were not thread-safe, because boto2 was not thread-safe due to its use of httplib. So, as all my threads shared only one DynamoDBConnection instance stored at _DDBBase._conn, I was perfectly on my way to all the (not so) nice racing conditions down the road. (The dynamodb2.table instance using that connection was also shared, and was probably in peril, too).

 
Thread-Local Storage to the Rescue (Not Really, Yet)

To solve this with boto2, I’d need one connection for each thread. To do that, I needed a thread-local storage. Uhm… I tried writing something like this.

class _DDBBase(Singleton):
  @override
  def init(self, *args, **kwargs):
    self._tls = threading.local() # <- here it is
    self._tls.conn = boto.dynamodb2.connect_to_region(
      config.DYNAMODB_REGION,
      awesomes_access_key_id=config.DYNAMODB_ACCESS_ID,
      aws_secret_access_key=config.DYNAMODB_ACCESS_KEY
    )

class DDBPlurksMap(_DDBBase):
  @override
  def init(self, *args, **kwargs):    
    super(DDBPlurksMap, self).init(*args, **kwargs)
    self._tls.ddb_plurks_map = DDBTable(config.DYNAMODB_TABLE_PLURKS_MAP, connection=self._tls.conn)

  def get_shard_id(self, plurk_id, strong_consistent=False):
    try:
      item = self._tls.ddb_plurks_map.get_item(consistent=strong_consistent, id=plurk_id)
    except ItemNotFound:
      return None
    else:
      return int(item['shard_id'])

For sure, this didn’t work: Since Singleton.init() was called only once for a class across the Python process’s entire life time, only one thread — the one that successfully acquired the singleton lock and executed that init() — had the DynamoDBConnection instance (and the dynamodb2.table instance in DDBPlurksMap‘s case) in its thread local storage. All other threads, while having their own thread-local storage as self._tls, didn’t get those instances constructed and stored, and would hit an AttributeError once they tried to access the instances.

 
Ugly Solution

Indeed, I could write something like the following…

class _DDBBase(Singleton):
  @override
  def init(self, *args, **kwargs):
    self._tls = threading.local()
    self.reinit()

  @overridable
  def reinit(self):
    self._tls.conn = boto.dynamodb2.connect_to_region(
      config.DYNAMODB_REGION,
      awesomes_access_key_id=config.DYNAMODB_ACCESS_ID,
      aws_secret_access_key=config.DYNAMODB_ACCESS_KEY
    )

…and have subclasses check for the need to reinit() whenever a method was called…

class DDBPlurksMap(_DDBBase):
  @override
  def init(self, *args, **kwargs):    
    super(DDBPlurksMap, self).init(*args, **kwargs)

  @override
  def reinit(self):
    if not hasattr(self._tls, 'conn'):
      super(DDBPlurksMap, self).reinit()
    self._tls.ddb_plurks_map = DDBTable(config.DYNAMODB_TABLE_PLURKS_MAP, connection=self._tls.conn)

  def get_shard_id(self, plurk_id, strong_consistent=False):
    if not hasattr(self._tls, 'ddb_plurks_map'):
      self.reinit()
    # other things

  def get_shard_ids(self, plurk_ids, strong_consistent=False):
    if not hasattr(self._tls, 'ddb_plurks_map'):
      self.reinit()     
    # other things

  def delete_plurk_id(self, plurk_id):
    if not hasattr(self._tls, 'ddb_plurks_map'):
      self.reinit()
    # other things

  def set_shard_id(self, plurk_id, shard_id):
    if not hasattr(self._tls, 'ddb_plurks_map'):
      self.reinit()
    # other things

This would work, of course, but looking at the duplicated code (even after being converted into a decorator, which is omitted here) made me think of Dr. Chuen-Liang Chen, professor of my design pattern class back in school, who would definitely graded the code with an F.

Well, I would grade my code with an F too, so the quest for a better solution continued…

 
Subclassed Thread-Local Storage to the Rescue

At this point, I knew what I needed — a thread-local storage that could automagically initialize its contents when they were requested for the first time (or, when the thread was first spawned — but as I couldn’t modify thread spawning, this was a no-go). Fortunately, Python’s threading.local does provide such construct: a threading.local instance’s __init__() is called if its thread-local attribute dict, a threading.local-internal data structure, has not been constructed when someone accesses the attribute dict. Such calling of __init__() can be long after the threading.local instance is apparently created in the source code, taking place only when new threads are spawned and those threads’ threading.local instance’s attributes accessed. Now, there’s the big addendum: overriding __init__() in subclasses of threading.local will also be called.

Therefore, my auto-magic was actually simple:

class _DDBBase(Singleton):
  class _TLS(threading.local):
    def __init__(self):
      self.conn = boto.dynamodb2.connect_to_region(
        config.DYNAMODB_REGION,
        aws_access_key_id=config.DYNAMODB_ACCESS_ID,
        aws_secret_access_key=config.DYNAMODB_ACCESS_KEY
      )

  @override
  def init(self, *args, **kwargs):
    self._tls = _DDBBase._TLS()

  def get_api_ver(self):
    return self._tls.conn.APIVersion # automagic!

There, whenever I called _DDBBase().get_api_ver(), the conn attribute would be looked up in self._tls. If at this moment, the thread-local attribute dict had not been constructed as per threading.local‘s internal implementation, _DDBBase._TLS.__init__() would be called upon that self._tls — and this would properly construct the conn attribute and tuck it into the thread-local storage. This would happen once (and only once) per thread, and only when actually needed, so it’s all perfect.

Now, as DDBPlurksMap also needed to store its own data into the thread-local storage, I needed to be able to customize _TLS.__init__()‘s behavior too. Here’s the complete piece of codes:

class _DDBBase(Singleton):
  @hidable
  class _TLS(threading.local):
    def __init__(self):
      self.conn = boto.dynamodb2.connect_to_region(
        config.DYNAMODB_REGION,
        aws_access_key_id=config.DYNAMODB_ACCESS_ID,
        aws_secret_access_key=config.DYNAMODB_ACCESS_KEY
      )

  @override
  def init(self, *args, **kwargs):
    self._tls = self.__class__._TLS()

class DDBPlurksMap(_DDBBase):
  @hide
  class _TLS(_DDBBase._TLS):
    def __init__(self, *args, **kwargs):
      super(DDBPlurksMap._TLS, self).__init__(*args, **kwargs)
      self.ddb_plurks_map = DDBTable(config.DYNAMODB_TABLE_PLURKS_MAP, connection=self.conn)

  def get_shard_id(self, plurk_id, strong_consistent=False):
    try:
      item = self._tls.ddb_plurks_map.get_item(consistent=strong_consistent, id=plurk_id)
    except ItemNotFound:
      return None
    else:
      return int(item['shard_id'])

  def get_shard_ids(self, plurk_ids, strong_consistent=False):
    result_set = self._tls.ddb_plurks_map.batch_get(consistent=strong_consistent, keys=[{'id': plurk_id} for plurk_id in set(plurk_ids)])
    # other things

  def delete_plurk_id(self, plurk_id):
    self._tls.ddb_plurks_map.delete_item(id=plurk_id)

  def set_shard_id(self, plurk_id, shard_id):
    self._tls.ddb_plurks_map.put_item(data={'id': plurk_id, 'shard_id': shard_id}, overwrite=True)

# nothing really to customize, so it's pretty clean for this module
class DDBAtomicCounters(_DDBBase):
  def _inc_and_get(self, key_name):
    res = self._tls.conn.update_item(config.DYNAMODB_TABLE_ATOMIC_COUNTERS, {'name': {'S': key_name}}, {'number': {'Action': 'ADD', 'Value': {'N': '1'}}}, return_values='UPDATED_NEW')
    return int(res['Attributes']['number']['N'])

  def next_magic_id(self):
    return self._inc_and_get('magic_counter')

Well, things are again getting metaclassy here, so let me do a quick step-by-step on what would happen if we call DDBPlurksMap().set_shard_id(1, 2) for example:

  1. For the DDBPlurksMap() part, Singleton.__new__() is called on the DDBPlurksMap class.

    • If an instance of DDBPlurksMap has ever been constructed in the whole Python process, nothing really happens except for us getting that instance.
    • Otherwise, the instance is constructed, and since DDBPlurksMap has no init() defined, _DDBBase.init() is called upon that instance, which creates an instance of our subclassed thread-local storage. Note that self.__class__ is DDBPlurksMap, so we’ll be properly constructing an instance of DDBPlurksMap._TLS, instead of that of _DDBBase._TLS.
  2. Now, inside set_shard_id(), the ddb_plurks_map attribute is requested from self._tls.

    • If at this moment, the subclassed thread-local storage for this thread has been constructed, which means DDBPlurksMap._TLS.__init__() has been called, ddb_plurks_map will be a dynamodb2.table instance, local to the current thread, parameterized by a DynamoDBConnection instance (i.e. conn), also local to the current thread. These two instances would have been constructed within DDBPlurksMap._TLS.__init__() and _DDBBase._TLS.__init__(), respectively (in the reversed order for sure).
    • If not, then threading.local‘s internal implementation calls DDBPlurksMap._TLS.__init__(), which calls _DDBBase._TLS.__init__() to create and store a DynamoDBConnection instance into the thread-local storage. And then, DDBPlurksMap._TLS.__init__() creates and store a dynamodb2.table instance into the thread-local storage. Those instances are only visible to the current thread, no racing!
  3. Either way, a thread-local dynamodb2.table instance will be returned by the __getattr__() call on the thread-local storage for the ddb_plrurks_map attribute, and everything is good! Not many duplicated codes!

 
Afterwords
Really, as Greg Weng, my former colleague at Mozilla, puts it, threads might be too heavy a weapon to maximize request throughput by working around I/O blockage. In the future, we should really begin investigating into those state-of-the-art async I/O constructs such as coroutines and futures and the like. Too bad we’re still stuck in Python 2.7, though…

發表迴響

你的電子郵件位址並不會被公開。 必要欄位標記為 *