In optimising the Python RF Gateway script there was a need to save some data temporarily in an array and have that data “timeout” after 3 seconds. Here is a Python class that implements an expiring dictionary. This is a straight-forward algorithm used to deduplicate MQTT messages.
It overrides the default Python dictionary that you can copy and paste into your code. There are existing implementations, most notably ExpiringDict, however, I was unable to use these due to my Python version.
# A small helper class implementing a dictionary with expiring items. This class might cause memory leaks because items are deleted only when tick() is called with the key.
# Calling class is responsible for calling clear() periodically to remove stale entries.
class ExpiringDict(dict):
def __init__(self, *args):
dict.__init__(self, args)
def __getitem__(self, key):
return dict.__getitem__(self, key)[1]
def __setitem__(self, key, val):
#logging.info("SET %s['%s'] = %s" % (str(dict.get(self, 'name_label')), str(key), str(val)))
dict.__setitem__(self, key, (val,time.time()))
def tick(self, key):
try:
item = dict.__getitem__(self, key)
item_age = time.time() - item[1]
if item_age < 3: # age less than (still valid)
#logging.info("Item still valid")
return 1
else: # age older than (it expired, delete the record)
#logging.info("item expired, deleting item")
del self[key]
return 0
except KeyError:
return 0 # same as if the record was there and it was deleted as a result of the tick call
Usage
# Paste class here or place in separate file and include that file.
cache = ExpiringDict()
while True:
# mydata = somedata.get() # you have some data coming from somewhere
if cache.tick(mydata.id) == 0: # Checks if data exists in cache, if not we want to process it:
# do something to your data because it is new data
# save data in cache
cache[mydata.id] = mydata
Use Case Description
The use case for this class is deduplication of MQTT messages sent in rapid succession. In my home automation scenario, we receive a stream of radio codes and there could be duplicate messages that we want to filter out. (Radio devices send the same messages some 20 times for error correction to ensure the data is received.)
The RF gateway must only process one of these messages within a timeout period (150ms). This is where the expiring dictionary comes in. We want to process the first message, save it in the dictionary where it lives for 150ms.
Any duplicate messages received within the 150ms time out period will be ignored because cache.tick(mydata.id)
returns a status code of 1, indicating we have already processed the data. After 150ms, tick()
returns 0
and we can process the data because it represents new data. The Expiring dictionary pretends like it has never seen this data due to the internal timeout implementation.
Note that items are only removed when tick() is called with the corresponding ID number. It is your responsibility to clear this cache periodically to avoid memory leaks.This can be done by reinitialising a new ExpiringDict. The reason I did not include this is to avoid complexity.
Conclusion
I think it is pretty neat. Let me know in the comments about your particular use case. Hope this helps.