Improve the implementation of itertools.tee().

Formerly, underlying queue was implemented in terms of two lists.  The
new queue is a series of singly-linked fixed length lists.

The new implementation runs much faster, supports multi-way tees, and
allows tees of tees without additional memory costs.

The root ideas for this structure were contributed by Andrew Koenig
and Guido van Rossum.
This commit is contained in:
Raymond Hettinger 2003-11-12 14:32:26 +00:00
parent 767126d7b9
commit ad983e79d6
3 changed files with 244 additions and 211 deletions

View file

@ -281,9 +281,9 @@ by functions or loops that truncate the stream.
\end{verbatim}
\end{funcdesc}
\begin{funcdesc}{tee}{iterable}
Return two independent iterators from a single iterable.
Equivalent to:
\begin{funcdesc}{tee}{iterable\optional{, n=2}}
Return \var{n} independent iterators from a single iterable.
The case where \var{n} is two is equivalent to:
\begin{verbatim}
def tee(iterable):
@ -299,6 +299,10 @@ by functions or loops that truncate the stream.
return (gen(it.next), gen(it.next))
\end{verbatim}
Note, once \function{tee()} has made a split, the original \var{iterable}
should not be used anywhere else; otherwise, the \var{iterable} could get
advanced without the tee objects being informed.
Note, this member of the toolkit may require significant auxiliary
storage (depending on how much temporary data needs to be stored).
In general, if one iterator is going use most or all of the data before
@ -408,6 +412,10 @@ def repeatfunc(func, times=None, *args):
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = tee(iterable)
return izip(a, islice(b, 1, None))
try:
b.next()
except StopIteration:
pass
return izip(a, b)
\end{verbatim}

View file

@ -200,7 +200,7 @@ def test_dropwhile(self):
self.assertRaises(ValueError, dropwhile(errfunc, [(4,5)]).next)
def test_tee(self):
n = 100
n = 200
def irange(n):
for i in xrange(n):
yield i
@ -217,16 +217,16 @@ def irange(n):
self.assertEqual(list(b), range(n))
a, b = tee(irange(n)) # test dealloc of leading iterator
self.assertEqual(a.next(), 0)
self.assertEqual(a.next(), 1)
for i in xrange(100):
self.assertEqual(a.next(), i)
del a
self.assertEqual(list(b), range(n))
a, b = tee(irange(n)) # test dealloc of trailing iterator
self.assertEqual(a.next(), 0)
self.assertEqual(a.next(), 1)
for i in xrange(100):
self.assertEqual(a.next(), i)
del b
self.assertEqual(list(a), range(2, n))
self.assertEqual(list(a), range(100, n))
for j in xrange(5): # test randomly interleaved
order = [0]*n + [1]*n
@ -239,21 +239,31 @@ def irange(n):
self.assertEqual(lists[0], range(n))
self.assertEqual(lists[1], range(n))
# test argument format checking
self.assertRaises(TypeError, tee)
self.assertRaises(TypeError, tee, 3)
self.assertRaises(TypeError, tee, [1,2], 'x')
self.assertRaises(TypeError, tee, [1,2], 3, 'x')
try:
class A(tee): pass
except TypeError:
pass
else:
self.fail("tee constructor should not be subclassable")
# tee object should be instantiable
a, b = tee('abc')
c = type(a)('def')
self.assertEqual(list(c), list('def'))
# test long-lagged and multi-way split
a, b, c = tee(xrange(2000), 3)
for i in xrange(100):
self.assertEqual(a.next(), i)
self.assertEqual(list(b), range(2000))
self.assertEqual([c.next(), c.next()], range(2))
self.assertEqual(list(a), range(100,2000))
self.assertEqual(list(c), range(2,2000))
# tee pass-through to copyable iterator
a, b = tee('abc')
c, d = tee(a)
self.assert_(a is c)
# tee_iterator should not be instantiable
a, b = tee(xrange(10))
self.assertRaises(TypeError, type(a))
self.assert_(a is iter(a)) # tee_iterator should support __iter__
def test_StopIteration(self):
self.assertRaises(StopIteration, izip().next)
@ -317,13 +327,6 @@ def test_starmap(self):
a = []
self.makecycle(starmap(lambda *t: t, [(a,a)]*2), a)
def test_tee(self):
a = []
p, q = t = tee([a]*2)
a += [a, p, q, t]
p.next()
del a, p, q, t
def R(seqn):
'Regular generator'
for i in seqn:
@ -626,7 +629,11 @@ def f(t):
>>> def pairwise(iterable):
... "s -> (s0,s1), (s1,s2), (s2, s3), ..."
... a, b = tee(iterable)
... return izip(a, islice(b, 1, None))
... try:
... b.next()
... except StopIteration:
... pass
... return izip(a, b)
This is not part of the examples but it tests to make sure the definitions
perform as purported.

View file

@ -7,122 +7,102 @@
All rights reserved.
*/
/* independent iterator object supporting the tee object ***************/
/* tee object and with supporting function and objects ***************/
/* The tee object maintains a queue of data seen by the leading iterator
but not seen by the trailing iterator. When the leading iterator
gets data from PyIter_Next() it appends a copy to the inbasket stack.
When the trailing iterator needs data, it is popped from the outbasket
stack. If the outbasket stack is empty, then it is filled from the
inbasket (i.e. the queue is implemented using two stacks so that only
O(n) operations like append() and pop() are used to access data and
calls to reverse() never move any data element more than once).
If one of the independent iterators gets deallocated, it sets tee's
save_mode to zero so that future calls to PyIter_Next() stop getting
saved to the queue (because there is no longer a second iterator that
may need the data).
/* The teedataobject pre-allocates space for LINKCELLS number of objects.
To help the object fit neatly inside cache lines (space for 16 to 32
pointers), the value should be a multiple of 16 minus space for
the other structure members including PyHEAD overhead. The larger the
value, the less memory overhead per object and the less time spent
allocating/deallocating new links. The smaller the number, the less
wasted space and the more rapid freeing of older data.
*/
#define LINKCELLS 57
typedef struct {
PyObject_HEAD
PyObject *it;
PyObject *inbasket;
PyObject *outbasket;
int save_mode;
int num_seen;
} teeobject;
int numread;
PyObject *nextlink;
PyObject *(values[LINKCELLS]);
} teedataobject;
typedef struct {
PyObject_HEAD
teeobject *tee;
int num_seen;
} iiobject;
teedataobject *dataobj;
int index;
} teeobject;
static PyTypeObject ii_type;
static PyTypeObject teedataobject_type;
static PyObject *
ii_next(iiobject *lz)
teedataobject_new(PyObject *it)
{
teeobject *to = lz->tee;
PyObject *result, *tmp;
int n;
teedataobject *tdo;
if (lz->num_seen == to->num_seen) {
/* This instance is leading, use iter to get more data */
result = PyIter_Next(to->it);
if (result == NULL)
return NULL;
if (to->save_mode)
if(PyList_Append(to->inbasket, result) == -1){
Py_DECREF(result);
return NULL;
}
to->num_seen++;
lz->num_seen++;
return result;
}
/* This instance is trailing, get data from the queue */
if (PyList_GET_SIZE(to->outbasket) == 0) {
/* outbasket is empty, so refill from the inbasket */
tmp = to->outbasket;
to->outbasket = to->inbasket;
to->inbasket = tmp;
PyList_Reverse(to->outbasket);
}
/* Pop result from to->outbasket */
n = PyList_GET_SIZE(to->outbasket);
assert(n>0);
result = PyList_GET_ITEM(to->outbasket, n-1);
Py_INCREF(result);
if (PyList_SetSlice(to->outbasket, n-1, n, NULL) == -1) {
Py_DECREF(result);
tdo = PyObject_New(teedataobject, &teedataobject_type);
if (tdo == NULL)
return NULL;
tdo->numread = 0;
tdo->nextlink = NULL;
Py_INCREF(it);
tdo->it = it;
return (PyObject *)tdo;
}
static PyObject *
teedataobject_jumplink(teedataobject *tdo)
{
if (tdo->nextlink == NULL)
tdo->nextlink = teedataobject_new(tdo->it);
Py_INCREF(tdo->nextlink);
return tdo->nextlink;
}
static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
PyObject *value;
assert(i < LINKCELLS);
if (i < tdo->numread)
value = tdo->values[i];
else {
/* this is the lead iterator, so fetch more data */
assert(i == tdo->numread);
value = PyIter_Next(tdo->it);
if (value == NULL)
return NULL;
tdo->numread++;
tdo->values[i] = value;
}
lz->num_seen++;
return result;
Py_INCREF(value);
return value;
}
static void
ii_dealloc(iiobject *ii)
teedataobject_dealloc(teedataobject *tdo)
{
teeobject *to = ii->tee;
int n;
int i;
PyObject_GC_UnTrack(ii);
ii->tee->save_mode = 0; /* Stop saving data */
if (ii->num_seen < to->num_seen) {
/* It is the trailing iterator being freed; this means
that the stored queue data is no longer needed */
n = PyList_GET_SIZE(to->inbasket);
PyList_SetSlice(to->inbasket, 0, n, NULL);
n = PyList_GET_SIZE(to->outbasket);
PyList_SetSlice(to->outbasket, 0, n, NULL);
}
Py_XDECREF(ii->tee);
PyObject_GC_Del(ii);
for (i=0 ; i<tdo->numread ; i++)
Py_DECREF(tdo->values[i]);
Py_XDECREF(tdo->it);
Py_XDECREF(tdo->nextlink);
PyObject_Del(tdo);
}
static int
ii_traverse(iiobject *ii, visitproc visit, void *arg)
{
if (ii->tee)
return visit((PyObject *)(ii->tee), arg);
return 0;
}
PyDoc_STRVAR(teedataobject_doc, "Data container common to multiple tee objects.");
PyDoc_STRVAR(ii_doc, "Independent iterator created by tee(iterable).");
static PyTypeObject ii_type = {
static PyTypeObject teedataobject_type = {
PyObject_HEAD_INIT(&PyType_Type)
0, /* ob_size */
"itertools.tee_iterator", /* tp_name */
sizeof(iiobject), /* tp_basicsize */
"itertools.tee_dataobject", /* tp_name */
sizeof(teedataobject), /* tp_basicsize */
0, /* tp_itemsize */
/* methods */
(destructor)ii_dealloc, /* tp_dealloc */
(destructor)teedataobject_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
@ -137,112 +117,96 @@ static PyTypeObject ii_type = {
PyObject_GenericGetAttr, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC,/* tp_flags */
ii_doc, /* tp_doc */
(traverseproc)ii_traverse, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
PyObject_SelfIter, /* tp_iter */
(iternextfunc)ii_next, /* tp_iternext */
0, /* tp_methods */
Py_TPFLAGS_DEFAULT, /* tp_flags */
teedataobject_doc, /* tp_doc */
0, /* tp_traverse */
};
/* tee object **********************************************************/
static PyTypeObject tee_type;
static PyObject *
tee_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
tee_next(teeobject *to)
{
PyObject *value, *link;
if (to->index >= LINKCELLS) {
link = teedataobject_jumplink(to->dataobj);
Py_XDECREF(to->dataobj);
to->dataobj = (teedataobject *)link;
to->index = 0;
}
value = teedataobject_getitem(to->dataobj, to->index);
if (value == NULL)
return NULL;
to->index++;
return value;
}
static PyObject *
tee_copy(teeobject *to)
{
teeobject *newto;
newto = PyObject_New(teeobject, &tee_type);
if (newto == NULL)
return NULL;
Py_INCREF(to->dataobj);
newto->dataobj = to->dataobj;
newto->index = to->index;
return (PyObject *)newto;
}
PyDoc_STRVAR(teecopy_doc, "Returns an independent iterator.");
static PyObject *
tee_fromiterable(PyObject *iterable)
{
teeobject *to;
PyObject *it = NULL;
it = PyObject_GetIter(iterable);
if (it == NULL)
return NULL;
if (PyObject_TypeCheck(it, &tee_type)) {
to = (teeobject *)tee_copy((teeobject *)it);
goto done;
}
to = PyObject_New(teeobject, &tee_type);
if (to == NULL)
goto done;
to->dataobj = (teedataobject *)teedataobject_new(it);
to->index = 0;
done:
Py_XDECREF(it);
return (PyObject *)to;
}
static PyObject *
tee_new(PyTypeObject *type, PyObject *args, PyObject *kw)
{
PyObject *iterable;
PyObject *inbasket = NULL, *outbasket = NULL, *result = NULL;
teeobject *to = NULL;
int i;
if (!PyArg_UnpackTuple(args, "tee", 1, 1, &iterable))
return NULL;
it = PyObject_GetIter(iterable);
if (it == NULL) goto fail;
inbasket = PyList_New(0);
if (inbasket == NULL) goto fail;
outbasket = PyList_New(0);
if (outbasket == NULL) goto fail;
to = PyObject_GC_New(teeobject, &tee_type);
if (to == NULL) goto fail;
to->it = it;
to->inbasket = inbasket;
to->outbasket = outbasket;
to->save_mode = 1;
to->num_seen = 0;
PyObject_GC_Track(to);
/* create independent iterators */
result = PyTuple_New(2);
if (result == NULL) goto fail;
for (i=0 ; i<2 ; i++) {
iiobject *indep_it = PyObject_GC_New(iiobject, &ii_type);
if (indep_it == NULL) goto fail;
Py_INCREF(to);
indep_it->tee = to;
indep_it->num_seen = 0;
PyObject_GC_Track(indep_it);
PyTuple_SET_ITEM(result, i, (PyObject *)indep_it);
}
goto succeed;
fail:
Py_XDECREF(it);
Py_XDECREF(inbasket);
Py_XDECREF(outbasket);
Py_XDECREF(result);
succeed:
Py_XDECREF(to);
return result;
return tee_fromiterable(iterable);
}
static void
tee_dealloc(teeobject *to)
{
PyObject_GC_UnTrack(to);
Py_XDECREF(to->inbasket);
Py_XDECREF(to->outbasket);
Py_XDECREF(to->it);
PyObject_GC_Del(to);
Py_XDECREF(to->dataobj);
PyObject_Del(to);
}
static int
tee_traverse(teeobject *to, visitproc visit, void *arg)
{
int err;
PyDoc_STRVAR(teeobject_doc,
"Iterator wrapped to make it copyable");
if (to->it) {
err = visit(to->it, arg);
if (err)
return err;
}
if (to->inbasket) {
err = visit(to->inbasket, arg);
if (err)
return err;
}
if (to->outbasket) {
err = visit(to->outbasket, arg);
if (err)
return err;
}
return 0;
}
PyDoc_STRVAR(tee_doc,
"tee(iterable) --> (it1, it2)\n\
\n\
Split the iterable into two independent iterables.");
static PyMethodDef tee_methods[] = {
{"__copy__", (PyCFunction)tee_copy, METH_NOARGS, teecopy_doc},
{NULL, NULL} /* sentinel */
};
static PyTypeObject tee_type = {
PyObject_HEAD_INIT(NULL)
@ -266,15 +230,15 @@ static PyTypeObject tee_type = {
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /* tp_flags */
tee_doc, /* tp_doc */
(traverseproc)tee_traverse, /* tp_traverse */
Py_TPFLAGS_DEFAULT, /* tp_flags */
teeobject_doc, /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
0, /* tp_methods */
PyObject_SelfIter, /* tp_iter */
(iternextfunc)tee_next, /* tp_iternext */
tee_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
@ -285,8 +249,52 @@ static PyTypeObject tee_type = {
0, /* tp_init */
0, /* tp_alloc */
tee_new, /* tp_new */
PyObject_Del, /* tp_free */
};
static PyObject *
tee(PyObject *self, PyObject *args)
{
int i, n=2;
PyObject *it, *iterable, *copyable, *result;
if (!PyArg_ParseTuple(args, "O|i", &iterable, &n))
return NULL;
result = PyTuple_New(n);
if (result == NULL)
return NULL;
if (n == 0)
return result;
it = PyObject_GetIter(iterable);
if (it == NULL) {
Py_DECREF(result);
return NULL;
}
if (!PyObject_HasAttrString(it, "__copy__")) {
copyable = tee_fromiterable(it);
Py_DECREF(it);
if (copyable == NULL) {
Py_DECREF(result);
return NULL;
}
} else
copyable = it;
PyTuple_SET_ITEM(result, 0, copyable);
for (i=1 ; i<n ; i++) {
copyable = PyObject_CallMethod(copyable, "__copy__", NULL);
if (copyable == NULL) {
Py_DECREF(result);
return NULL;
}
PyTuple_SET_ITEM(result, i, copyable);
}
return result;
}
PyDoc_STRVAR(tee_doc,
"tee(iterable, n=2) --> tuple of n independent iterators.");
/* cycle object **********************************************************/
typedef struct {
@ -2091,13 +2099,18 @@ islice(seq, [start,] stop [, step]) --> elements from\n\
seq[start:stop:step]\n\
imap(fun, p, q, ...) --> fun(p0, q0), fun(p1, q1), ...\n\
starmap(fun, seq) --> fun(*seq[0]), fun(*seq[1]), ...\n\
tee(it) --> (it1, it2) splits one iterator into two \n\
tee(it, n=2) --> (it1, it2 , ... itn) splits one iterator into n\n\
chain(p, q, ...) --> p0, p1, ... plast, q0, q1, ... \n\
takewhile(pred, seq) --> seq[0], seq[1], until pred fails\n\
dropwhile(pred, seq) --> seq[n], seq[n+1], starting when pred fails\n\
");
static PyMethodDef module_methods[] = {
{"tee", (PyCFunction)tee, METH_VARARGS, tee_doc},
{NULL, NULL} /* sentinel */
};
PyMODINIT_FUNC
inititertools(void)
{
@ -2105,7 +2118,6 @@ inititertools(void)
PyObject *m;
char *name;
PyTypeObject *typelist[] = {
&tee_type,
&cycle_type,
&dropwhile_type,
&takewhile_type,
@ -2121,7 +2133,7 @@ inititertools(void)
NULL
};
m = Py_InitModule3("itertools", NULL, module_doc);
m = Py_InitModule3("itertools", module_methods, module_doc);
for (i=0 ; typelist[i] != NULL ; i++) {
if (PyType_Ready(typelist[i]) < 0)
@ -2131,4 +2143,10 @@ inititertools(void)
Py_INCREF(typelist[i]);
PyModule_AddObject(m, name+1, (PyObject *)typelist[i]);
}
if (PyType_Ready(&teedataobject_type) < 0)
return;
if (PyType_Ready(&tee_type) < 0)
return;
}