I would use collections.deque
with a maxlen
arg
>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
... d.append(i)
...
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)
There is a recipe in the docs for deque
that is similar to what you want. My assertion that it's the most efficient rests entirely on the fact that it's implemented in C by an incredibly skilled crew that is in the habit of cranking out top notch code.
Although there are already a great number of great answers here, I could not find any direct comparison of timings for the options mentioned. Therefore, please find my humble attempt at a comparison below.
For testing purposes only, the class can switch between a list
-based buffer, a collections.deque
-based buffer, and a Numpy.roll
-based buffer.
Note that the update
method adds only one value at a time, to keep it simple.
import numpy
import timeit
import collections
class CircularBuffer(object):
buffer_methods = ('list', 'deque', 'roll')
def __init__(self, buffer_size, buffer_method):
self.content = None
self.size = buffer_size
self.method = buffer_method
def update(self, scalar):
if self.method == self.buffer_methods[0]:
# Use list
try:
self.content.append(scalar)
self.content.pop(0)
except AttributeError:
self.content = [0.] * self.size
elif self.method == self.buffer_methods[1]:
# Use collections.deque
try:
self.content.append(scalar)
except AttributeError:
self.content = collections.deque([0.] * self.size,
maxlen=self.size)
elif self.method == self.buffer_methods[2]:
# Use Numpy.roll
try:
self.content = numpy.roll(self.content, -1)
self.content[-1] = scalar
except IndexError:
self.content = numpy.zeros(self.size, dtype=float)
# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
buffer_method=method)
for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
# We add a convenient number of convenient values (see equality test below)
code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
i, circular_buffer_size)
# Testing
eval(code)
buffer_content = [item for item in cb.content]
assert buffer_content == range(circular_buffer_size)
# Timing
timeit_results.append(
timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
cb.method, timeit_results[-1],
timeit_results[-1] / timeit_iterations * 1e3)
On my system this yields:
list: total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll: total 6.27s (0.63ms per iteration)
popping from the head of a list causes the whole list to be copied, so is inefficient
You should instead use a list/array of fixed size and an index which moves through the buffer as you add/remove items
Based on MoonCactus's answer, here is a circularlist
class. The difference with his version is that here c[0]
will always give the oldest-appended element, c[-1]
the latest-appended element, c[-2]
the penultimate... This is more natural for applications.
c = circularlist(4)
c.append(1); print(c, c[0], c[-1]) #[1] (1/4 items) 1 1
c.append(2); print(c, c[0], c[-1]) #[1, 2] (2/4 items) 1 2
c.append(3); print(c, c[0], c[-1]) #[1, 2, 3] (3/4 items) 1 3
c.append(8); print(c, c[0], c[-1]) #[1, 2, 3, 8] (4/4 items) 1 8
c.append(10); print(c, c[0], c[-1]) #[2, 3, 8, 10] (4/4 items) 2 10
c.append(11); print(c, c[0], c[-1]) #[3, 8, 10, 11] (4/4 items) 3 11
d = circularlist(4, [1, 2, 3, 4, 5]) #[2, 3, 4, 5]
Class:
class circularlist(object):
def __init__(self, size, data = []):
"""Initialization"""
self.index = 0
self.size = size
self._data = list(data)[-size:]
def append(self, value):
"""Append an element"""
if len(self._data) == self.size:
self._data[self.index] = value
else:
self._data.append(value)
self.index = (self.index + 1) % self.size
def __getitem__(self, key):
"""Get element by index, relative to the current index"""
if len(self._data) == self.size:
return(self._data[(key + self.index) % self.size])
else:
return(self._data[key])
def __repr__(self):
"""Return string representation"""
return (self._data[self.index:] + self._data[:self.index]).__repr__() + ' (' + str(len(self._data))+'/{} items)'.format(self.size)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With