Merged revisions 57620-57771 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/trunk

........
  r57771 | thomas.wouters | 2007-08-30 23:54:39 +0200 (Thu, 30 Aug 2007) | 5 lines


  Don't lie in __all__ attributes when SSL is not available: only add the SSL
  classes when they are actually created.

........
  r57620 | walter.doerwald | 2007-08-28 18:38:26 +0200 (Tue, 28 Aug 2007) | 5 lines

  Fix title endtag in HTMLCalender.formatyearpage(). Fix documentation for
  HTMLCalender.formatyearpage() (there's no themonth parameter).

  This fixes issue1046.
........
  r57622 | georg.brandl | 2007-08-28 20:54:44 +0200 (Tue, 28 Aug 2007) | 2 lines

  Add a crasher for the thread-unsafety of file objects.
........
  r57626 | skip.montanaro | 2007-08-29 01:22:52 +0200 (Wed, 29 Aug 2007) | 1 line

  fixes 813986
........
  r57628 | walter.doerwald | 2007-08-29 01:35:33 +0200 (Wed, 29 Aug 2007) | 2 lines

  Fix test output.
........
  r57631 | skip.montanaro | 2007-08-29 03:24:11 +0200 (Wed, 29 Aug 2007) | 2 lines

  Install pygettext (once the scriptsinstall target is working again).
........
  r57633 | skip.montanaro | 2007-08-29 03:33:45 +0200 (Wed, 29 Aug 2007) | 2 lines

  Recent items.
........
  r57650 | neal.norwitz | 2007-08-29 08:15:33 +0200 (Wed, 29 Aug 2007) | 1 line

  Add Bill as a developer
........
  r57651 | facundo.batista | 2007-08-29 12:28:28 +0200 (Wed, 29 Aug 2007) | 5 lines


  Ignore test failures caused by 'resource temporarily unavailable'
  exceptions raised during FailingServerTestCase tests.
  [GSoC - Alan McIntyre]
........
  r57680 | bill.janssen | 2007-08-30 00:35:05 +0200 (Thu, 30 Aug 2007) | 17 lines

  This contains a number of things:

  1) Improve the documentation of the SSL module, with a fuller
     explanation of certificate usage, another reference, proper
     formatting of this and that.

  2) Fix Windows bug in ssl.py, and general bug in sslsocket.close().
     Remove some unused code from ssl.py.  Allow accept() to be called on
     sslsocket sockets.

  3) Use try-except-else in import of ssl in socket.py.  Deprecate use of
     socket.ssl().

  4) Remove use of socket.ssl() in every library module, except for
     test_socket_ssl.py and test_ssl.py.
........
  r57714 | georg.brandl | 2007-08-30 12:09:42 +0200 (Thu, 30 Aug 2007) | 2 lines

  Stronger urge to convert filenames to str before using them as argument to ZipFile.write().
........
  r57716 | georg.brandl | 2007-08-30 12:38:56 +0200 (Thu, 30 Aug 2007) | 2 lines

  Patch #1680959: add test suite for pipes module.
........
  r57717 | georg.brandl | 2007-08-30 14:32:23 +0200 (Thu, 30 Aug 2007) | 3 lines

  * Skip test_pipes on non-POSIX.
  * Don't raise TestSkipped within a test function.
........
  r57723 | mark.summerfield | 2007-08-30 17:03:03 +0200 (Thu, 30 Aug 2007) | 3 lines

  Added more cross-references.
........
  r57726 | walter.doerwald | 2007-08-30 17:30:09 +0200 (Thu, 30 Aug 2007) | 2 lines

  Rewrap line.
........
  r57727 | walter.doerwald | 2007-08-30 17:34:55 +0200 (Thu, 30 Aug 2007) | 2 lines

  Set startinpos before calling the error handler.
........
  r57730 | bill.janssen | 2007-08-30 19:07:28 +0200 (Thu, 30 Aug 2007) | 3 lines

  Added docstrings to methods and functions.
........
  r57743 | bill.janssen | 2007-08-30 20:08:06 +0200 (Thu, 30 Aug 2007) | 1 line

  added note on new ssl module and deprecation of socket.ssl
........
  r57747 | martin.v.loewis | 2007-08-30 20:14:01 +0200 (Thu, 30 Aug 2007) | 1 line

  Fix popen usage.
........
  r57748 | martin.v.loewis | 2007-08-30 20:15:22 +0200 (Thu, 30 Aug 2007) | 1 line

  Fix typo.
........
  r57750 | martin.v.loewis | 2007-08-30 20:25:47 +0200 (Thu, 30 Aug 2007) | 1 line

  Bug #1746880: Correctly install DLLs into system32 folder on Win64.
........
  r57760 | martin.v.loewis | 2007-08-30 21:04:09 +0200 (Thu, 30 Aug 2007) | 1 line

  Bug #1709599: Run test_1565150 only if the file system is NTFS.
........
  r57762 | martin.v.loewis | 2007-08-30 22:10:57 +0200 (Thu, 30 Aug 2007) | 2 lines

  Bump autoconf minimum version to 2.61.
........
  r57764 | lars.gustaebel | 2007-08-30 22:24:31 +0200 (Thu, 30 Aug 2007) | 2 lines

  Warn about possible risks when extracting untrusted archives.
........
  r57769 | thomas.wouters | 2007-08-30 23:01:17 +0200 (Thu, 30 Aug 2007) | 7 lines


  Somewhat-preliminary slice-object and extended slicing support for ctypes.
  The exact behaviour of omitted and negative indices for the Pointer type may
  need a closer look (especially as it's subtly different from simple slices)
  but there's time yet before 2.6, and not enough before 3.0a1 :-)
........
This commit is contained in:
Thomas Wouters 2007-08-30 22:15:33 +00:00
parent cf1be88b43
commit 47b49bf6dc
40 changed files with 9565 additions and 8793 deletions

View file

@ -162,7 +162,7 @@ it's the base calendar for all computations.
the number of months per row.
.. method:: HTMLCalendar.formatyearpage(theyear, themonth[, width[, css[, encoding]]])
.. method:: HTMLCalendar.formatyearpage(theyear[, width[, css[, encoding]]])
Return a year's calendar as a complete HTML page. *width* (defaulting to 3)
specifies the number of months per row. *css* is the name for the cascading

View file

@ -405,10 +405,14 @@ Setting the :attr:`default_factory` to :class:`set` makes the
print record
To cast an individual record stored as :class:`list`, :class:`tuple`, or some
other iterable type, use the star-operator to unpack the values::
other iterable type, use the star-operator [#]_ to unpack the values::
>>> Color = NamedTuple('Color', 'name code')
>>> m = dict(red=1, green=2, blue=3)
>>> print Color(*m.popitem())
Color(name='blue', code=3)
.. rubric:: Footnotes
.. [#] For information on the star-operator see
:ref:`tut-unpacking-arguments` and :ref:`calls`.

View file

@ -12,8 +12,8 @@ numbers representations in 100% pure Python.
.. note::
This module is unneeded: everything here could be done via the ``%`` string
interpolation operator.
This module is unnecessary: everything here can be done using the ``%`` string
interpolation operator described in the :ref:`string-formatting` section.
The :mod:`fpformat` module defines the following functions and an exception:

View file

@ -1,12 +1,16 @@
:mod:`ssl` --- SSL wrapper for socket objects, and utility functions
:mod:`ssl` --- SSL wrapper for socket objects
====================================================================
.. module:: ssl
:synopsis: SSL wrapper for socket objects, and utility functions
:synopsis: SSL wrapper for socket objects
.. moduleauthor:: Bill Janssen <bill.janssen@gmail.com>
.. versionadded:: 2.6
.. sectionauthor:: Bill Janssen <bill.janssen@gmail.com>
This module provides access to Transport Layer Security (often known
as "Secure Sockets Layer") encryption and peer authentication
@ -20,10 +24,9 @@ platforms, as long as OpenSSL is installed on that platform.
Some behavior may be platform dependent, since calls are made to the operating
system socket APIs.
This section documents the objects and functions in the `ssl` module;
This section documents the objects and functions in the ``ssl`` module;
for more general information about TLS, SSL, and certificates, the
reader is referred to the paper, *Introducing SSL and Certificates using OpenSSL*, by Frederick J. Hirsch, at
http://old.pseudonym.org/ssl/wwwj-index.html.
reader is referred to the documents in the :ref:`ssl-references` section.
This module defines a class, :class:`ssl.sslsocket`, which is
derived from the :class:`socket.socket` type, and supports additional
@ -57,25 +60,25 @@ This module defines the following functions, exceptions, and constants:
.. data:: CERT_NONE
Value to pass to the `cert_reqs` parameter to :func:`sslobject`
Value to pass to the ``cert_reqs`` parameter to :func:`sslobject`
when no certificates will be required or validated from the other
side of the socket connection.
.. data:: CERT_OPTIONAL
Value to pass to the `cert_reqs` parameter to :func:`sslobject`
Value to pass to the ``cert_reqs`` parameter to :func:`sslobject`
when no certificates will be required from the other side of the
socket connection, but if they are provided, will be validated.
Note that use of this setting requires a valid certificate
validation file also be passed as a value of the `ca_certs`
validation file also be passed as a value of the ``ca_certs``
parameter.
.. data:: CERT_REQUIRED
Value to pass to the `cert_reqs` parameter to :func:`sslobject`
Value to pass to the ``cert_reqs`` parameter to :func:`sslobject`
when certificates will be required from the other side of the
socket connection. Note that use of this setting requires a valid certificate
validation file also be passed as a value of the `ca_certs`
validation file also be passed as a value of the ``ca_certs``
parameter.
.. data:: PROTOCOL_SSLv2
@ -99,10 +102,12 @@ This module defines the following functions, exceptions, and constants:
protection, if both sides can speak it.
.. _ssl-certificates:
Certificates
------------
Certificates in general are part of a public-key / private-key system. In this system, each `principal`,
Certificates in general are part of a public-key / private-key system. In this system, each *principal*,
(which may be a machine, or a person, or an organization) is assigned a unique two-part encryption key.
One part of the key is public, and is called the *public key*; the other part is kept secret, and is called
the *private key*. The two parts are related, in that if you encrypt a message with one of the parts, you can
@ -120,17 +125,54 @@ the certificate. The certificate also contains information about the
time period over which it is valid. This is expressed as two fields,
called "notBefore" and "notAfter".
The underlying system which is used in the Python SSL support is
called "OpenSSL". It contains facilities for constructing and
validating certificates. In the Python use of certificates, the other
side of a network connection can be required to produce a certificate,
and that certificate can be validated against a file filled with
self-signed *root* certificates (so-called because the issuer is the
same as the subject), and and "CA" (certification authority)
certificates assured by those root certificates (and by other CA
certificates). Either side of a connection, client or server, can
request certificates and validation, and the connection can be optionally
set up to fail if a valid certificate is not presented by the other side.
In the Python use of certificates, a client or server
can use a certificate to prove who they are. The other
side of a network connection can also be required to produce a certificate,
and that certificate can be validated to the satisfaction
of the client or server that requires such validation.
The connection can be set to fail automatically if such
validation is not achieved.
Python uses files to contain certificates. They should be formatted
as "PEM" (see :rfc:`1422`), which is a base-64 encoded form wrapped
with a header line and a footer line::
-----BEGIN CERTIFICATE-----
... (certificate in base64 PEM encoding) ...
-----END CERTIFICATE-----
The Python files which contain certificates can contain a sequence
of certificates, sometimes called a *certificate chain*. This chain
should start with the specific certificate for the principal who "is"
the client or server, and then the certificate for the issuer of that
certificate, and then the certificate for the issuer of *that* certificate,
and so on up the chain till you get to a certificate which is *self-signed*,
that is, a certificate which has the same subject and issuer,
sometimes called a *root certificate*. The certificates should just
be concatenated together in the certificate file. For example, suppose
we had a three certificate chain, from our server certificate to the
certificate of the certification authority that signed our server certificate,
to the root certificate of the agency which issued the certification authority's
certificate::
-----BEGIN CERTIFICATE-----
... (certificate for your server)...
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
... (the certificate for the CA)...
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
... (the root certificate for the CA's issuer)...
-----END CERTIFICATE-----
If you are going to require validation of the other side of the connection's
certificate, you need to provide a "CA certs" file, filled with the certificate
chains for each issuer you are willing to trust. Again, this file just
contains these chains concatenated together. For validation, Python will
use the first chain it finds in the file which matches.
Some "standard" root certificates are available at
http://www.thawte.com/roots/ (for Thawte roots) and
http://www.verisign.com/support/roots.html (for Verisign roots).
sslsocket Objects
@ -138,76 +180,67 @@ sslsocket Objects
.. class:: sslsocket(sock [, keyfile=None, certfile=None, server_side=False, cert_reqs=CERT_NONE, ssl_version=PROTOCOL_SSLv23, ca_certs=None])
Takes an instance *sock* of :class:`socket.socket`, and returns an instance of a subtype
Takes an instance ``sock`` of :class:`socket.socket`, and returns an instance of a subtype
of :class:`socket.socket` which wraps the underlying socket in an SSL context.
For client-side sockets, the context construction is lazy; if the underlying socket isn't
connected yet, the context construction will be performed after :meth:`connect` is called
on the socket.
The `keyfile` and `certfile` parameters specify optional files which contain a certificate
to be used to identify the local side of the connection. Often the private key is stored
in the same file as the certificate; in this case, only the `certfile` parameter need be
passed. If the private key is stored in a separate file, both parameters must be used.
The ``keyfile`` and ``certfile`` parameters specify optional files which contain a certificate
to be used to identify the local side of the connection. See the above discussion of :ref:`ssl-certificates`
for more information on how the certificate is stored in the ``certfile``.
The parameter `server_side` is a boolean which identifies whether server-side or client-side
Often the private key is stored
in the same file as the certificate; in this case, only the ``certfile`` parameter need be
passed. If the private key is stored in a separate file, both parameters must be used.
If the private key is stored in the ``certfile``, it should come before the first certificate
in the certificate chain::
-----BEGIN RSA PRIVATE KEY-----
... (private key in base64 encoding) ...
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
... (certificate in base64 PEM encoding) ...
-----END CERTIFICATE-----
The parameter ``server_side`` is a boolean which identifies whether server-side or client-side
behavior is desired from this socket.
The parameter `cert_reqs` specifies whether a certificate is
The parameter ``cert_reqs`` specifies whether a certificate is
required from the other side of the connection, and whether it will
be validated if provided. It must be one of the three values
:const:`CERT_NONE` (certificates ignored), :const:`CERT_OPTIONAL` (not required,
but validated if provided), or :const:`CERT_REQUIRED` (required and
validated). If the value of this parameter is not :const:`CERT_NONE`, then
the `ca_certs` parameter must point to a file of CA certificates.
the ``ca_certs`` parameter must point to a file of CA certificates.
The parameter `ssl_version` specifies which version of the SSL protocol to use. Typically,
The parameter ``ssl_version`` specifies which version of the SSL protocol to use. Typically,
the server specifies this, and a client connecting to it must use the same protocol. An
SSL server using :const:`PROTOCOL_SSLv23` can understand a client connecting via SSL2, SSL3, or TLS1,
but a client using :const:`PROTOCOL_SSLv23` can only connect to an SSL2 server.
The `ca_certs` file contains a set of concatenated "certification authority" certificates,
The ``ca_certs`` file contains a set of concatenated "certification authority" certificates,
which are used to validate certificates passed from the other end of the connection.
This file
contains the certificates in PEM format (IETF RFC 1422) where each certificate is
encoded in base64 encoding and surrounded with a header and footer::
-----BEGIN CERTIFICATE-----
... (CA certificate in base64 encoding) ...
-----END CERTIFICATE-----
The various certificates in the file are just concatenated together::
-----BEGIN CERTIFICATE-----
... (CA certificate in base64 encoding) ...
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
... (a second CA certificate in base64 encoding) ...
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
... (a root certificate in base64 encoding) ...
-----END CERTIFICATE-----
Some "standard" root certificates are available at
http://www.thawte.com/roots/ (for Thawte roots) and
http://www.verisign.com/support/roots.html (for Verisign roots).
See the above discussion of :ref:`ssl-certificates` for more information about how to arrange
the certificates in this file.
.. method:: sslsocket.read([nbytes])
Reads up to `nbytes` bytes from the SSL-encrypted channel and returns them.
Reads up to ``nbytes`` bytes from the SSL-encrypted channel and returns them.
.. method:: sslsocket.write(data)
Writes the `data` to the other side of the connection, using the SSL channel to encrypt. Returns the number
Writes the ``data`` to the other side of the connection, using the SSL channel to encrypt. Returns the number
of bytes written.
.. method:: sslsocket.getpeercert()
If there is no certificate for the peer on the other end of the connection, returns `None`.
If a certificate was received from the peer, but not validated, returns an empty `dict` instance.
If a certificate was received and validated, returns a `dict` instance with the fields
`subject` (the principal for which the certificate was issued), `issuer` (the signer of
the certificate), `notBefore` (the time before which the certificate should not be trusted),
and `notAfter` (the time after which the certificate should not be trusted) filled in.
If there is no certificate for the peer on the other end of the connection, returns ``None``.
If a certificate was received from the peer, but not validated, returns an empty ``dict`` instance.
If a certificate was received and validated, returns a ``dict`` instance with the fields
``subject`` (the principal for which the certificate was issued), ``issuer`` (the signer of
the certificate), ``notBefore`` (the time before which the certificate should not be trusted),
and ``notAfter`` (the time after which the certificate should not be trusted) filled in.
The "subject" and "issuer" fields are themselves dictionaries containing the fields given
in the certificate's data structure for each principal::
@ -229,12 +262,34 @@ sslsocket Objects
'version': 2}
This certificate is said to be *self-signed*, because the subject
and issuer are the same entity. The *version* field refers the the X509 version
and issuer are the same entity. The *version* field refers to the X509 version
that's used for the certificate.
.. method:: sslsocket.ssl_shutdown()
Closes the SSL context (if any) over the socket, but leaves the socket connection
open for further use, if both sides are willing. This is different from :meth:`socket.socket.shutdown`,
which will close the connection, but leave the local socket available for further use.
Examples
--------
Testing for SSL support
^^^^^^^^^^^^^^^^^^^^^^^
To test for the presence of SSL support in a Python installation, user code should use the following idiom::
try:
import ssl
except ImportError:
pass
else:
[ do something that requires SSL support ]
Client-side operation
^^^^^^^^^^^^^^^^^^^^^
This example connects to an SSL server, prints the server's address and certificate,
sends some bytes, and reads part of the response::
@ -281,6 +336,9 @@ looked like this::
'notBefore': 'May 9 00:00:00 2007 GMT',
'version': 2}
Server-side operation
^^^^^^^^^^^^^^^^^^^^^
For server operation, typically you'd need to have a server certificate, and private key, each in a file.
You'd open a socket, bind it to a port, call :meth:`listen` on it, then start waiting for clients
to connect::
@ -300,7 +358,7 @@ end, and use :func:`sslsocket` to create a server-side SSL context for it::
keyfile="mykeyfile", ssl_protocol=ssl.PROTOCOL_TLSv1)
deal_with_client(connstream)
Then you'd read data from the `connstream` and do something with it till you are finished with the client (or the client is finished with you)::
Then you'd read data from the ``connstream`` and do something with it till you are finished with the client (or the client is finished with you)::
def deal_with_client(connstream):
@ -317,3 +375,14 @@ Then you'd read data from the `connstream` and do something with it till you are
And go back to listening for new client connections.
.. _ssl-references:
References
----------
Class :class:`socket.socket`
Documentation of underlying :mod:`socket` class
`Introducing SSL and Certificates using OpenSSL <http://old.pseudonym.org/ssl/wwwj-index.html>`_, by Frederick J. Hirsch
`Privacy Enhancement for Internet Electronic Mail: Part II: Certificate-Based Key Management`, :rfc:`1422`, by Steve Kent

View file

@ -8,7 +8,8 @@
This module implements a file-like class, :class:`StringIO`, that reads and
writes a string buffer (also known as *memory files*). See the description of
file objects for operations (section :ref:`bltin-file-objects`).
file objects for operations (section :ref:`bltin-file-objects`). (For
standard strings, see :class:`str` and :class:`unicode`.)
.. class:: StringIO([buffer])

View file

@ -338,6 +338,13 @@ object, see :ref:`tarinfo-objects` for details.
reset each time a file is created in it. And, if a directory's permissions do
not allow writing, extracting files to it will fail.
.. warning::
Never extract archives from untrusted sources without prior inspection.
It is possible that files are created outside of *path*, e.g. members
that have absolute filenames starting with ``"/"`` or filenames with two
dots ``".."``.
.. versionadded:: 2.5
@ -354,6 +361,10 @@ object, see :ref:`tarinfo-objects` for details.
are some issues you must take care of yourself. See the description for
:meth:`extractall` above.
.. warning::
See the warning for :meth:`extractall`.
.. method:: TarFile.extractfile(member)

View file

@ -223,7 +223,7 @@ ZipFile Objects
.. note::
There is no official file name encoding for ZIP files. If you have unicode file
names, please convert them to byte strings in your desired encoding before
names, you must convert them to byte strings in your desired encoding before
passing them to :meth:`write`. WinZip interprets all file names as encoded in
CP437, also known as DOS Latin.

View file

@ -471,7 +471,7 @@ def formatyearpage(self, theyear, width=3, css='calendar.css', encoding=None):
a('<meta http-equiv="Content-Type" content="text/html; charset=%s" />\n' % encoding)
if css is not None:
a('<link rel="stylesheet" type="text/css" href="%s" />\n' % css)
a('<title>Calendar for %d</title\n' % theyear)
a('<title>Calendar for %d</title>\n' % theyear)
a('</head>\n')
a('<body>\n')
a(self.formatyear(theyear, width))

View file

@ -95,6 +95,10 @@ def test_from_address(self):
p = create_string_buffer("foo")
sz = (c_char * 3).from_address(addressof(p))
self.failUnlessEqual(sz[:], "foo")
self.failUnlessEqual(sz[::], "foo")
self.failUnlessEqual(sz[::-1], "oof")
self.failUnlessEqual(sz[::3], "f")
self.failUnlessEqual(sz[1:4:2], "o")
self.failUnlessEqual(sz.value, "foo")
try:
@ -106,6 +110,10 @@ def test_from_addressW(self):
p = create_unicode_buffer("foo")
sz = (c_wchar * 3).from_address(addressof(p))
self.failUnlessEqual(sz[:], "foo")
self.failUnlessEqual(sz[::], "foo")
self.failUnlessEqual(sz[::-1], "oof")
self.failUnlessEqual(sz[::3], "f")
self.failUnlessEqual(sz[1:4:2], "o")
self.failUnlessEqual(sz.value, "foo")
if __name__ == '__main__':

View file

@ -15,6 +15,10 @@ def test_buffer(self):
self.failUnless(type(b[0]) is bytes)
self.failUnlessEqual(b[0], b"a")
self.failUnlessEqual(b[:], "abc\0")
self.failUnlessEqual(b[::], "abc\0")
self.failUnlessEqual(b[::-1], "\0cba")
self.failUnlessEqual(b[::2], "ac")
self.failUnlessEqual(b[::5], "a")
def test_string_conversion(self):
b = create_string_buffer("abc")
@ -23,6 +27,10 @@ def test_string_conversion(self):
self.failUnless(type(b[0]) is bytes)
self.failUnlessEqual(b[0], b"a")
self.failUnlessEqual(b[:], "abc\0")
self.failUnlessEqual(b[::], "abc\0")
self.failUnlessEqual(b[::-1], "\0cba")
self.failUnlessEqual(b[::2], "ac")
self.failUnlessEqual(b[::5], "a")
try:
c_wchar
@ -41,6 +49,10 @@ def test_unicode_buffer(self):
self.failUnless(type(b[0]) is str)
self.failUnlessEqual(b[0], "a")
self.failUnlessEqual(b[:], "abc\0")
self.failUnlessEqual(b[::], "abc\0")
self.failUnlessEqual(b[::-1], "\0cba")
self.failUnlessEqual(b[::2], "ac")
self.failUnlessEqual(b[::5], "a")
def test_unicode_conversion(self):
b = create_unicode_buffer("abc")
@ -49,6 +61,10 @@ def test_unicode_conversion(self):
self.failUnless(type(b[0]) is str)
self.failUnlessEqual(b[0], "a")
self.failUnlessEqual(b[:], "abc\0")
self.failUnlessEqual(b[::], "abc\0")
self.failUnlessEqual(b[::-1], "\0cba")
self.failUnlessEqual(b[::2], "ac")
self.failUnlessEqual(b[::5], "a")
if __name__ == "__main__":
unittest.main()

View file

@ -50,12 +50,24 @@ def test_p2a_objects(self):
def test_other(self):
p = cast((c_int * 4)(1, 2, 3, 4), POINTER(c_int))
self.failUnlessEqual(p[:4], [1,2, 3, 4])
self.failUnlessEqual(p[:4:], [1, 2, 3, 4])
self.failUnlessEqual(p[3:-1:-1], [4, 3, 2, 1])
self.failUnlessEqual(p[:4:3], [1, 4])
c_int()
self.failUnlessEqual(p[:4], [1, 2, 3, 4])
self.failUnlessEqual(p[:4:], [1, 2, 3, 4])
self.failUnlessEqual(p[3:-1:-1], [4, 3, 2, 1])
self.failUnlessEqual(p[:4:3], [1, 4])
p[2] = 96
self.failUnlessEqual(p[:4], [1, 2, 96, 4])
self.failUnlessEqual(p[:4:], [1, 2, 96, 4])
self.failUnlessEqual(p[3:-1:-1], [4, 96, 2, 1])
self.failUnlessEqual(p[:4:3], [1, 4])
c_int()
self.failUnlessEqual(p[:4], [1, 2, 96, 4])
self.failUnlessEqual(p[:4:], [1, 2, 96, 4])
self.failUnlessEqual(p[3:-1:-1], [4, 96, 2, 1])
self.failUnlessEqual(p[:4:3], [1, 4])
def test_char_p(self):
# This didn't work: bad argument to internal function

View file

@ -30,6 +30,14 @@ def test_cast(self):
self.failUnlessEqual(cast(a, c_char_p).value, "abcdef")
self.failUnlessEqual(cast(a, POINTER(c_byte))[:7],
[97, 98, 99, 100, 101, 102, 0])
self.failUnlessEqual(cast(a, POINTER(c_byte))[:7:],
[97, 98, 99, 100, 101, 102, 0])
self.failUnlessEqual(cast(a, POINTER(c_byte))[6:-1:-1],
[0, 102, 101, 100, 99, 98, 97])
self.failUnlessEqual(cast(a, POINTER(c_byte))[:7:2],
[97, 99, 101, 0])
self.failUnlessEqual(cast(a, POINTER(c_byte))[:7:7],
[97])
def test_string_at(self):
s = string_at(b"foo bar")

View file

@ -8,13 +8,22 @@ def test_getslice_cint(self):
a = (c_int * 100)(*range(1100, 1200))
b = list(range(1100, 1200))
self.failUnlessEqual(a[0:2], b[0:2])
self.failUnlessEqual(a[0:2:], b[0:2:])
self.failUnlessEqual(len(a), len(b))
self.failUnlessEqual(a[5:7], b[5:7])
self.failUnlessEqual(a[5:7:], b[5:7:])
self.failUnlessEqual(a[-1], b[-1])
self.failUnlessEqual(a[:], b[:])
self.failUnlessEqual(a[::], b[::])
self.failUnlessEqual(a[10::-1], b[10::-1])
self.failUnlessEqual(a[30:20:-1], b[30:20:-1])
self.failUnlessEqual(a[:12:6], b[:12:6])
self.failUnlessEqual(a[2:6:4], b[2:6:4])
a[0:5] = range(5, 10)
self.failUnlessEqual(a[0:5], list(range(5, 10)))
self.failUnlessEqual(a[0:5:], list(range(5, 10)))
self.failUnlessEqual(a[4::-1], list(range(9, 4, -1)))
def test_setslice_cint(self):
a = (c_int * 100)(*range(1100, 1200))
@ -22,17 +31,36 @@ def test_setslice_cint(self):
a[32:47] = list(range(32, 47))
self.failUnlessEqual(a[32:47], list(range(32, 47)))
a[32:47] = range(132, 147)
self.failUnlessEqual(a[32:47:], list(range(132, 147)))
a[46:31:-1] = range(232, 247)
self.failUnlessEqual(a[32:47:1], list(range(246, 231, -1)))
from operator import setslice
a[32:47] = range(1132, 1147)
self.failUnlessEqual(a[:], b)
a[32:47:7] = range(3)
b[32:47:7] = range(3)
self.failUnlessEqual(a[:], b)
a[33::-3] = range(12)
b[33::-3] = range(12)
self.failUnlessEqual(a[:], b)
from operator import setslice, setitem
# TypeError: int expected instead of str instance
self.assertRaises(TypeError, setslice, a, 0, 5, "abcde")
self.assertRaises(TypeError, setitem, a, slice(0, 5), "abcde")
# TypeError: int expected instead of str instance
self.assertRaises(TypeError, setslice, a, 0, 5, ["a", "b", "c", "d", "e"])
self.assertRaises(TypeError, setitem, a, slice(0, 5),
["a", "b", "c", "d", "e"])
# TypeError: int expected instead of float instance
self.assertRaises(TypeError, setslice, a, 0, 5, [1, 2, 3, 4, 3.14])
self.assertRaises(TypeError, setitem, a, slice(0, 5),
[1, 2, 3, 4, 3.14])
# ValueError: Can only assign sequence of same size
self.assertRaises(ValueError, setslice, a, 0, 5, range(32))
self.assertRaises(ValueError, setitem, a, slice(0, 5), range(32))
def test_char_ptr(self):
s = b"abcdefghijklmnopqrstuvwxyz"
@ -42,15 +70,32 @@ def test_char_ptr(self):
dll.my_free.restype = None
res = dll.my_strdup(s)
self.failUnlessEqual(res[:len(s)], s)
self.failUnlessEqual(res[:3], s[:3])
self.failUnlessEqual(res[:len(s):], s)
self.failUnlessEqual(res[len(s)-1:-1:-1], s[::-1])
self.failUnlessEqual(res[len(s)-1:5:-7], s[:5:-7])
self.failUnlessEqual(res[0:-1:-1], s[0::-1])
import operator
self.assertRaises(ValueError, operator.getitem,
res, slice(None, None, None))
self.assertRaises(ValueError, operator.getitem,
res, slice(0, None, None))
self.assertRaises(ValueError, operator.getitem,
res, slice(None, 5, -1))
self.assertRaises(ValueError, operator.getitem,
res, slice(-5, None, None))
self.assertRaises(TypeError, operator.setslice,
res, 0, 5, "abcde")
self.assertRaises(TypeError, operator.setitem,
res, slice(0, 5), "abcde")
dll.my_free(res)
dll.my_strdup.restype = POINTER(c_byte)
res = dll.my_strdup(s)
self.failUnlessEqual(res[:len(s)], list(range(ord("a"), ord("z")+1)))
self.failUnlessEqual(res[:len(s):], list(range(ord("a"), ord("z")+1)))
dll.my_free(res)
def test_char_ptr_with_free(self):
@ -80,6 +125,10 @@ def test_char_array(self):
p = (c_char * 27)(*s)
self.failUnlessEqual(p[:], s)
self.failUnlessEqual(p[::], s)
self.failUnlessEqual(p[::-1], s[::-1])
self.failUnlessEqual(p[5::-2], s[5::-2])
self.failUnlessEqual(p[2:5:-3], s[2:5:-3])
try:
@ -96,10 +145,15 @@ def test_wchar_ptr(self):
dll.my_free.restype = None
res = dll.my_wcsdup(s)
self.failUnlessEqual(res[:len(s)], s)
self.failUnlessEqual(res[:len(s):], s)
self.failUnlessEqual(res[len(s)-1:-1:-1], s[::-1])
self.failUnlessEqual(res[len(s)-1:5:-7], s[:5:-7])
import operator
self.assertRaises(TypeError, operator.setslice,
res, 0, 5, "abcde")
self.assertRaises(TypeError, operator.setitem,
res, slice(0, 5), "abcde")
dll.my_free(res)
if sizeof(c_wchar) == sizeof(c_short):
@ -111,8 +165,11 @@ def test_wchar_ptr(self):
else:
return
res = dll.my_wcsdup(s)
self.failUnlessEqual(res[:len(s)-1],
list(range(ord("a"), ord("z")+1)))
tmpl = list(range(ord("a"), ord("z")+1))
self.failUnlessEqual(res[:len(s)-1], tmpl)
self.failUnlessEqual(res[:len(s)-1:], tmpl)
self.failUnlessEqual(res[len(s)-2:-1:-1], tmpl[::-1])
self.failUnlessEqual(res[len(s)-2:5:-7], tmpl[:5:-7])
dll.my_free(res)
################################################################

View file

@ -121,6 +121,9 @@ def XX_test_sized_strings(self):
def XX_test_initialized_strings(self):
self.failUnless(c_string("ab", 4).raw[:2] == "ab")
self.failUnless(c_string("ab", 4).raw[:2:] == "ab")
self.failUnless(c_string("ab", 4).raw[:2:-1] == "ba")
self.failUnless(c_string("ab", 4).raw[:2:2] == "a")
self.failUnless(c_string("ab", 4).raw[-1] == "\000")
self.failUnless(c_string("ab", 2).raw == "a\000")

View file

@ -236,7 +236,13 @@ class SomeInts(Structure):
# can use tuple to initialize array (but not list!)
self.failUnlessEqual(SomeInts((1, 2)).a[:], [1, 2, 0, 0])
self.failUnlessEqual(SomeInts((1, 2)).a[::], [1, 2, 0, 0])
self.failUnlessEqual(SomeInts((1, 2)).a[::-1], [0, 0, 2, 1])
self.failUnlessEqual(SomeInts((1, 2)).a[::2], [1, 0])
self.failUnlessEqual(SomeInts((1, 2)).a[1:5:6], [2])
self.failUnlessEqual(SomeInts((1, 2)).a[6:4:-1], [])
self.failUnlessEqual(SomeInts((1, 2, 3, 4)).a[:], [1, 2, 3, 4])
self.failUnlessEqual(SomeInts((1, 2, 3, 4)).a[::], [1, 2, 3, 4])
# too long
# XXX Should raise ValueError?, not RuntimeError
self.assertRaises(RuntimeError, SomeInts, (1, 2, 3, 4, 5))

View file

@ -58,11 +58,19 @@ def test_buffers(self):
ctypes.set_conversion_mode("ascii", "replace")
buf = ctypes.create_unicode_buffer(b"ab\xe4\xf6\xfc")
self.failUnlessEqual(buf[:], "ab\uFFFD\uFFFD\uFFFD\0")
self.failUnlessEqual(buf[::], "ab\uFFFD\uFFFD\uFFFD\0")
self.failUnlessEqual(buf[::-1], "\0\uFFFD\uFFFD\uFFFDba")
self.failUnlessEqual(buf[::2], "a\uFFFD\uFFFD")
self.failUnlessEqual(buf[6:5:-1], "")
ctypes.set_conversion_mode("ascii", "ignore")
buf = ctypes.create_unicode_buffer(b"ab\xe4\xf6\xfc")
# is that correct? not sure. But with 'ignore', you get what you pay for..
self.failUnlessEqual(buf[:], "ab\0\0\0\0")
self.failUnlessEqual(buf[::], "ab\0\0\0\0")
self.failUnlessEqual(buf[::-1], "\0\0\0\0ba")
self.failUnlessEqual(buf[::2], "a\0\0")
self.failUnlessEqual(buf[6:5:-1], "")
import _ctypes_test
func = ctypes.CDLL(_ctypes_test.__file__)._testfunc_p_p
@ -104,11 +112,17 @@ def test_buffers(self):
ctypes.set_conversion_mode("ascii", "replace")
buf = ctypes.create_string_buffer("ab\xe4\xf6\xfc")
self.failUnlessEqual(buf[:], "ab???\0")
self.failUnlessEqual(buf[::], "ab???\0")
self.failUnlessEqual(buf[::-1], "\0???ba")
self.failUnlessEqual(buf[::2], "a??")
self.failUnlessEqual(buf[6:5:-1], "")
ctypes.set_conversion_mode("ascii", "ignore")
buf = ctypes.create_string_buffer("ab\xe4\xf6\xfc")
# is that correct? not sure. But with 'ignore', you get what you pay for..
self.failUnlessEqual(buf[:], "ab\0\0\0\0")
self.failUnlessEqual(buf[::], "ab\0\0\0\0")
self.failUnlessEqual(buf[::-1], "\0\0\0\0ba")
if __name__ == '__main__':
unittest.main()

View file

@ -72,7 +72,7 @@
import socket
from urlparse import urlsplit
__all__ = ["HTTPResponse", "HTTPConnection", "HTTPSConnection",
__all__ = ["HTTPResponse", "HTTPConnection",
"HTTPException", "NotConnected", "UnknownProtocol",
"UnknownTransferEncoding", "UnimplementedFileMode",
"IncompleteRead", "InvalidURL", "ImproperConnectionState",
@ -964,207 +964,33 @@ def getresponse(self):
return response
# The next several classes are used to define FakeSocket, a socket-like
# interface to an SSL connection.
try:
import ssl
except ImportError:
pass
else:
class HTTPSConnection(HTTPConnection):
"This class allows communication via SSL."
# The primary complexity comes from faking a makefile() method. The
# standard socket makefile() implementation calls dup() on the socket
# file descriptor. As a consequence, clients can call close() on the
# parent socket and its makefile children in any order. The underlying
# socket isn't closed until they are all closed.
default_port = HTTPS_PORT
# The implementation uses reference counting to keep the socket open
# until the last client calls close(). SharedSocket keeps track of
# the reference counting and SharedSocketClient provides an constructor
# and close() method that call incref() and decref() correctly.
def __init__(self, host, port=None, key_file=None, cert_file=None,
strict=None, timeout=None):
HTTPConnection.__init__(self, host, port, strict, timeout)
self.key_file = key_file
self.cert_file = cert_file
class SharedSocket:
def connect(self):
"Connect to a host on a given (SSL) port."
def __init__(self, sock):
self.sock = sock
self._refcnt = 0
sock = socket.create_connection((self.host, self.port), self.timeout)
self.sock = ssl.sslsocket(sock, self.key_file, self.cert_file)
def incref(self):
self._refcnt += 1
def decref(self):
self._refcnt -= 1
assert self._refcnt >= 0
if self._refcnt == 0:
self.sock.close()
def __del__(self):
self.sock.close()
class SharedSocketClient:
def __init__(self, shared):
self._closed = 0
self._shared = shared
self._shared.incref()
self._sock = shared.sock
def close(self):
if not self._closed:
self._shared.decref()
self._closed = 1
self._shared = None
class SSLFile(SharedSocketClient):
"""File-like object wrapping an SSL socket."""
BUFSIZE = 8192
def __init__(self, sock, ssl, bufsize=None):
SharedSocketClient.__init__(self, sock)
self._ssl = ssl
self._buf = b""
self._bufsize = bufsize or self.__class__.BUFSIZE
def _read(self):
buf = b""
# put in a loop so that we retry on transient errors
while True:
try:
buf = self._ssl.read(self._bufsize)
except socket.sslerror as err:
err_type = err.args[0]
if (err_type == socket.SSL_ERROR_WANT_READ
or err_type == socket.SSL_ERROR_WANT_WRITE):
continue
if (err_type == socket.SSL_ERROR_ZERO_RETURN
or err_type == socket.SSL_ERROR_EOF):
break
raise
except socket.error as err:
err_type = err.args[0]
if err_type == errno.EINTR:
continue
if err_type == errno.EBADF:
# XXX socket was closed?
break
raise
else:
break
return buf
def read(self, size=None):
L = [self._buf]
avail = len(self._buf)
while size is None or avail < size:
s = self._read()
if s == b"":
break
L.append(s)
avail += len(s)
all = b"".join(L)
if size is None:
self._buf = b""
return all
else:
self._buf = all[size:]
return all[:size]
def readline(self):
L = [self._buf]
self._buf = b""
while 1:
i = L[-1].find("\n")
if i >= 0:
break
s = self._read()
if s == b"":
break
L.append(s)
if i == -1:
# loop exited because there is no more data
return b"".join(L)
else:
all = b"".join(L)
# XXX could do enough bookkeeping not to do a 2nd search
i = all.find("\n") + 1
line = all[:i]
self._buf = all[i:]
return line
def readlines(self, sizehint=0):
total = 0
list = []
while True:
line = self.readline()
if not line:
break
list.append(line)
total += len(line)
if sizehint and total >= sizehint:
break
return list
def fileno(self):
return self._sock.fileno()
def __iter__(self):
return self
def __next__(self):
line = self.readline()
if not line:
raise StopIteration
return line
class FakeSocket(SharedSocketClient):
class _closedsocket:
def __getattr__(self, name):
raise error(9, 'Bad file descriptor')
def __init__(self, sock, ssl):
sock = SharedSocket(sock)
SharedSocketClient.__init__(self, sock)
self._ssl = ssl
def close(self):
SharedSocketClient.close(self)
self._sock = self.__class__._closedsocket()
def makefile(self, mode, bufsize=None):
if mode != 'r' and mode != 'rb':
raise UnimplementedFileMode()
return SSLFile(self._shared, self._ssl, bufsize)
def send(self, stuff, flags = 0):
return self._ssl.write(stuff)
sendall = send
def recv(self, len = 1024, flags = 0):
return self._ssl.read(len)
def __getattr__(self, attr):
return getattr(self._sock, attr)
def close(self):
SharedSocketClient.close(self)
self._ssl = None
class HTTPSConnection(HTTPConnection):
"This class allows communication via SSL."
default_port = HTTPS_PORT
def __init__(self, host, port=None, key_file=None, cert_file=None,
strict=None, timeout=None):
HTTPConnection.__init__(self, host, port, strict, timeout)
self.key_file = key_file
self.cert_file = cert_file
def connect(self):
"Connect to a host on a given (SSL) port."
sock = socket.create_connection((self.host, self.port), self.timeout)
ssl = socket.ssl(sock, self.key_file, self.cert_file)
self.sock = FakeSocket(sock, ssl)
def FakeSocket (sock, sslobj):
return sslobj
__all__.append("HTTPSConnection")
class HTTPException(Exception):
# Subclasses that define an __init__ must call Exception.__init__

View file

@ -24,7 +24,7 @@
import binascii, os, random, re, socket, sys, time
__all__ = ["IMAP4", "IMAP4_SSL", "IMAP4_stream", "Internaldate2tuple",
__all__ = ["IMAP4", "IMAP4_stream", "Internaldate2tuple",
"Int2AP", "ParseFlags", "Time2Internaldate"]
# Globals
@ -1111,95 +1111,101 @@ def print_log(self):
class IMAP4_SSL(IMAP4):
try:
import ssl
except ImportError:
pass
else:
class IMAP4_SSL(IMAP4):
"""IMAP4 client class over SSL connection
"""IMAP4 client class over SSL connection
Instantiate with: IMAP4_SSL([host[, port[, keyfile[, certfile]]]])
Instantiate with: IMAP4_SSL([host[, port[, keyfile[, certfile]]]])
host - host's name (default: localhost);
port - port number (default: standard IMAP4 SSL port).
keyfile - PEM formatted file that contains your private key (default: None);
certfile - PEM formatted certificate chain file (default: None);
host - host's name (default: localhost);
port - port number (default: standard IMAP4 SSL port).
keyfile - PEM formatted file that contains your private key (default: None);
certfile - PEM formatted certificate chain file (default: None);
for more documentation see the docstring of the parent class IMAP4.
"""
def __init__(self, host = '', port = IMAP4_SSL_PORT, keyfile = None, certfile = None):
self.keyfile = keyfile
self.certfile = certfile
IMAP4.__init__(self, host, port)
def open(self, host = '', port = IMAP4_SSL_PORT):
"""Setup connection to remote server on "host:port".
(default: localhost:standard IMAP4 SSL port).
This connection will be used by the routines:
read, readline, send, shutdown.
for more documentation see the docstring of the parent class IMAP4.
"""
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
self.sslobj = socket.ssl(self.sock, self.keyfile, self.certfile)
def read(self, size):
"""Read 'size' bytes from remote."""
# sslobj.read() sometimes returns < size bytes
chunks = []
read = 0
while read < size:
data = self.sslobj.read(size-read)
read += len(data)
chunks.append(data)
return ''.join(chunks)
def __init__(self, host = '', port = IMAP4_SSL_PORT, keyfile = None, certfile = None):
self.keyfile = keyfile
self.certfile = certfile
IMAP4.__init__(self, host, port)
def readline(self):
"""Read line from remote."""
# NB: socket.ssl needs a "readline" method, or perhaps a "makefile" method.
line = []
while 1:
char = self.sslobj.read(1)
line.append(char)
if char == "\n": return ''.join(line)
def open(self, host = '', port = IMAP4_SSL_PORT):
"""Setup connection to remote server on "host:port".
(default: localhost:standard IMAP4 SSL port).
This connection will be used by the routines:
read, readline, send, shutdown.
"""
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
self.sslobj = ssl.sslsocket(self.sock, self.keyfile, self.certfile)
def send(self, data):
"""Send data to remote."""
# NB: socket.ssl needs a "sendall" method to match socket objects.
bytes = len(data)
while bytes > 0:
sent = self.sslobj.write(data)
if sent == bytes:
break # avoid copy
data = data[sent:]
bytes = bytes - sent
def read(self, size):
"""Read 'size' bytes from remote."""
# sslobj.read() sometimes returns < size bytes
chunks = []
read = 0
while read < size:
data = self.sslobj.read(size-read)
read += len(data)
chunks.append(data)
return ''.join(chunks)
def shutdown(self):
"""Close I/O established in "open"."""
self.sock.close()
def readline(self):
"""Read line from remote."""
# NB: socket.ssl needs a "readline" method, or perhaps a "makefile" method.
line = []
while 1:
char = self.sslobj.read(1)
line.append(char)
if char == "\n": return ''.join(line)
def socket(self):
"""Return socket instance used to connect to IMAP4 server.
socket = <instance>.socket()
"""
return self.sock
def send(self, data):
"""Send data to remote."""
# NB: socket.ssl needs a "sendall" method to match socket objects.
bytes = len(data)
while bytes > 0:
sent = self.sslobj.write(data)
if sent == bytes:
break # avoid copy
data = data[sent:]
bytes = bytes - sent
def ssl(self):
"""Return SSLObject instance used to communicate with the IMAP4 server.
def shutdown(self):
"""Close I/O established in "open"."""
self.sock.close()
ssl = <instance>.socket.ssl()
"""
return self.sslobj
def socket(self):
"""Return socket instance used to connect to IMAP4 server.
socket = <instance>.socket()
"""
return self.sock
def ssl(self):
"""Return SSLObject instance used to communicate with the IMAP4 server.
ssl = <instance>.socket.ssl()
"""
return self.sslobj
__all__.append("IMAP4_SSL")
class IMAP4_stream(IMAP4):

View file

@ -60,7 +60,6 @@
import re
import os
import tempfile
import string
@ -267,18 +266,3 @@ def quote(file):
c = '\\' + c
res = res + c
return '"' + res + '"'
# Small test program and example
def test():
print('Testing...')
t = Template()
t.append('togif $IN $OUT', 'ff')
t.append('giftoppm', '--')
t.append('ppmtogif >$OUT', '-f')
t.append('fromgif $IN $OUT', 'ff')
t.debug(1)
FILE = '/usr/local/images/rgb/rogues/guido.rgb'
t.copy(FILE, '@temp')
print('Done.')

View file

@ -15,7 +15,7 @@
import re, socket
__all__ = ["POP3","error_proto","POP3_SSL"]
__all__ = ["POP3","error_proto"]
# Exception raised when an error or invalid response is received:
@ -307,90 +307,97 @@ def uidl(self, which=None):
return self._shortcmd('UIDL %s' % which)
return self._longcmd('UIDL')
class POP3_SSL(POP3):
"""POP3 client class over SSL connection
try:
import ssl
except ImportError:
pass
else:
Instantiate with: POP3_SSL(hostname, port=995, keyfile=None, certfile=None)
class POP3_SSL(POP3):
"""POP3 client class over SSL connection
hostname - the hostname of the pop3 over ssl server
port - port number
keyfile - PEM formatted file that countains your private key
certfile - PEM formatted certificate chain file
Instantiate with: POP3_SSL(hostname, port=995, keyfile=None, certfile=None)
See the methods of the parent class POP3 for more documentation.
"""
hostname - the hostname of the pop3 over ssl server
port - port number
keyfile - PEM formatted file that countains your private key
certfile - PEM formatted certificate chain file
def __init__(self, host, port = POP3_SSL_PORT, keyfile = None, certfile = None):
self.host = host
self.port = port
self.keyfile = keyfile
self.certfile = certfile
self.buffer = ""
msg = "getaddrinfo returns an empty list"
self.sock = None
for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
self.sock.connect(sa)
except socket.error as msg:
if self.sock:
self.sock.close()
self.sock = None
continue
break
if not self.sock:
raise socket.error(msg)
self.file = self.sock.makefile('rb')
self.sslobj = socket.ssl(self.sock, self.keyfile, self.certfile)
self._debugging = 0
self.welcome = self._getresp()
See the methods of the parent class POP3 for more documentation.
"""
def _fillBuffer(self):
localbuf = self.sslobj.read()
if len(localbuf) == 0:
raise error_proto('-ERR EOF')
self.buffer += localbuf
def __init__(self, host, port = POP3_SSL_PORT, keyfile = None, certfile = None):
self.host = host
self.port = port
self.keyfile = keyfile
self.certfile = certfile
self.buffer = ""
msg = "getaddrinfo returns an empty list"
self.sock = None
for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
self.sock.connect(sa)
except socket.error as msg:
if self.sock:
self.sock.close()
self.sock = None
continue
break
if not self.sock:
raise socket.error(msg)
self.file = self.sock.makefile('rb')
self.sslobj = ssl.sslsocket(self.sock, self.keyfile, self.certfile)
self._debugging = 0
self.welcome = self._getresp()
def _getline(self):
line = ""
renewline = re.compile(r'.*?\n')
match = renewline.match(self.buffer)
while not match:
self._fillBuffer()
def _fillBuffer(self):
localbuf = self.sslobj.read()
if len(localbuf) == 0:
raise error_proto('-ERR EOF')
self.buffer += localbuf
def _getline(self):
line = ""
renewline = re.compile(r'.*?\n')
match = renewline.match(self.buffer)
line = match.group(0)
self.buffer = renewline.sub('' ,self.buffer, 1)
if self._debugging > 1: print('*get*', repr(line))
while not match:
self._fillBuffer()
match = renewline.match(self.buffer)
line = match.group(0)
self.buffer = renewline.sub('' ,self.buffer, 1)
if self._debugging > 1: print('*get*', repr(line))
octets = len(line)
if line[-2:] == CRLF:
return line[:-2], octets
if line[0] == CR:
return line[1:-1], octets
return line[:-1], octets
octets = len(line)
if line[-2:] == CRLF:
return line[:-2], octets
if line[0] == CR:
return line[1:-1], octets
return line[:-1], octets
def _putline(self, line):
if self._debugging > 1: print('*put*', repr(line))
line += CRLF
bytes = len(line)
while bytes > 0:
sent = self.sslobj.write(line)
if sent == bytes:
break # avoid copy
line = line[sent:]
bytes = bytes - sent
def _putline(self, line):
if self._debugging > 1: print('*put*', repr(line))
line += CRLF
bytes = len(line)
while bytes > 0:
sent = self.sslobj.write(line)
if sent == bytes:
break # avoid copy
line = line[sent:]
bytes = bytes - sent
def quit(self):
"""Signoff: commit changes on server, unlock mailbox, close connection."""
try:
resp = self._shortcmd('QUIT')
except error_proto as val:
resp = val
self.sock.close()
del self.sslobj, self.sock
return resp
def quit(self):
"""Signoff: commit changes on server, unlock mailbox, close connection."""
try:
resp = self._shortcmd('QUIT')
except error_proto as val:
resp = val
self.sock.close()
del self.sslobj, self.sock
return resp
__all__.append("POP3_SSL")
if __name__ == "__main__":
import sys

View file

@ -230,6 +230,11 @@ def __init__(self, *args):
urllib.FancyURLopener.__init__(self, *args)
self.errcode = 200
def prompt_user_passwd(self, host, realm):
## If robots.txt file is accessible only with a password,
## we act as if the file wasn't there.
return None, None
def http_error_default(self, url, fp, errcode, errmsg, headers):
self.errcode = errcode
return urllib.FancyURLopener.http_error_default(self, url, fp, errcode,

View file

@ -52,7 +52,7 @@
__all__ = ["SMTPException","SMTPServerDisconnected","SMTPResponseException",
"SMTPSenderRefused","SMTPRecipientsRefused","SMTPDataError",
"SMTPConnectError","SMTPHeloError","SMTPAuthenticationError",
"quoteaddr","quotedata","SMTP","SMTP_SSL"]
"quoteaddr","quotedata","SMTP"]
SMTP_PORT = 25
SMTP_SSL_PORT = 465
@ -128,43 +128,6 @@ class SMTPAuthenticationError(SMTPResponseException):
combination provided.
"""
class SSLFakeSocket:
"""A fake socket object that really wraps a SSLObject.
It only supports what is needed in smtplib.
"""
def __init__(self, realsock, sslobj):
self.realsock = realsock
self.sslobj = sslobj
def send(self, str):
self.sslobj.write(str)
return len(str)
sendall = send
def close(self):
self.realsock.close()
class SSLFakeFile:
"""A fake file like object that really wraps a SSLObject.
It only supports what is needed in smtplib.
"""
def __init__(self, sslobj):
self.sslobj = sslobj
def readline(self):
str = ""
chr = None
while chr != "\n":
chr = self.sslobj.read(1)
str += chr
return str
def close(self):
pass
def quoteaddr(addr):
"""Quote a subset of the email addresses defined by RFC 821.
@ -193,6 +156,33 @@ def quotedata(data):
return re.sub(r'(?m)^\.', '..',
re.sub(r'(?:\r\n|\n|\r(?!\n))', CRLF, data))
try:
import ssl
except ImportError:
_have_ssl = False
else:
class SSLFakeFile:
"""A fake file like object that really wraps a SSLObject.
It only supports what is needed in smtplib.
"""
def __init__(self, sslobj):
self.sslobj = sslobj
def readline(self):
str = b""
chr = None
while chr != b"\n":
chr = self.sslobj.read(1)
str += chr
return str
def close(self):
pass
_have_ssl = True
class SMTP:
"""This class manages a connection to an SMTP or ESMTP server.
@ -597,9 +587,10 @@ def starttls(self, keyfile = None, certfile = None):
"""
(resp, reply) = self.docmd("STARTTLS")
if resp == 220:
sslobj = socket.ssl(self.sock, keyfile, certfile)
self.sock = SSLFakeSocket(self.sock, sslobj)
self.file = SSLFakeFile(sslobj)
if not _have_ssl:
raise RuntimeError("No SSL support included in this Python")
self.sock = ssl.sslsocket(self.sock, keyfile, certfile)
self.file = SSLFakeFile(self.sock)
return (resp, reply)
def sendmail(self, from_addr, to_addrs, msg, mail_options=[],
@ -711,27 +702,31 @@ def quit(self):
self.docmd("quit")
self.close()
class SMTP_SSL(SMTP):
""" This is a subclass derived from SMTP that connects over an SSL encrypted
socket (to use this class you need a socket module that was compiled with SSL
support). If host is not specified, '' (the local host) is used. If port is
omitted, the standard SMTP-over-SSL port (465) is used. keyfile and certfile
are also optional - they can contain a PEM formatted private key and
certificate chain file for the SSL connection.
"""
def __init__(self, host='', port=0, local_hostname=None,
keyfile=None, certfile=None, timeout=None):
self.keyfile = keyfile
self.certfile = certfile
SMTP.__init__(self, host, port, local_hostname, timeout)
self.default_port = SMTP_SSL_PORT
if _have_ssl:
def _get_socket(self, host, port, timeout):
if self.debuglevel > 0: print('connect:', (host, port), file=stderr)
self.sock = socket.create_connection((host, port), timeout)
sslobj = socket.ssl(self.sock, self.keyfile, self.certfile)
self.sock = SSLFakeSocket(self.sock, sslobj)
self.file = SSLFakeFile(sslobj)
class SMTP_SSL(SMTP):
""" This is a subclass derived from SMTP that connects over an SSL encrypted
socket (to use this class you need a socket module that was compiled with SSL
support). If host is not specified, '' (the local host) is used. If port is
omitted, the standard SMTP-over-SSL port (465) is used. keyfile and certfile
are also optional - they can contain a PEM formatted private key and
certificate chain file for the SSL connection.
"""
def __init__(self, host='', port=0, local_hostname=None,
keyfile=None, certfile=None, timeout=None):
self.keyfile = keyfile
self.certfile = certfile
SMTP.__init__(self, host, port, local_hostname, timeout)
self.default_port = SMTP_SSL_PORT
def _get_socket(self, host, port, timeout):
if self.debuglevel > 0: print('connect:', (host, port), file=stderr)
self.sock = socket.create_connection((host, port), timeout)
sslobj = socket.ssl(self.sock, self.keyfile, self.certfile)
self.sock = SSLFakeSocket(self.sock, sslobj)
self.file = SSLFakeFile(sslobj)
__all__.append("SMTP_SSL")
#
# LMTP extension

View file

@ -46,13 +46,35 @@
import _socket
from _socket import *
_have_ssl = False
## try:
## import _ssl
## from _ssl import *
## _have_ssl = True
## except ImportError:
## pass
try:
import _ssl
import ssl as _realssl
except ImportError:
# no SSL support
pass
else:
def ssl(sock, keyfile=None, certfile=None):
# we do an internal import here because the ssl
# module imports the socket module
warnings.warn("socket.ssl() is deprecated. Use ssl.sslsocket() instead.",
DeprecationWarning, stacklevel=2)
return _realssl.sslwrap_simple(sock, keyfile, certfile)
# we need to import the same constants we used to...
from _ssl import \
sslerror, \
RAND_add, \
RAND_egd, \
RAND_status, \
SSL_ERROR_ZERO_RETURN, \
SSL_ERROR_WANT_READ, \
SSL_ERROR_WANT_WRITE, \
SSL_ERROR_WANT_X509_LOOKUP, \
SSL_ERROR_SYSCALL, \
SSL_ERROR_SSL, \
SSL_ERROR_WANT_CONNECT, \
SSL_ERROR_EOF, \
SSL_ERROR_INVALID_ERROR_CODE
import os, sys, io
@ -63,12 +85,9 @@
__all__ = ["getfqdn"]
__all__.extend(os._get_exports_list(_socket))
if _have_ssl:
__all__.extend(os._get_exports_list(_ssl))
def ssl(sock, keyfile=None, certfile=None):
import ssl as realssl
return realssl.sslwrap_simple(sock, keyfile, certfile)
__all__.append("ssl")
_realsocket = socket
# WSA error codes
if sys.platform.lower().startswith("win"):

View file

@ -60,55 +60,47 @@
import os, sys
import _ssl # if we can't import it, let the error propagate
from socket import socket
from _ssl import sslerror
from _ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
from _ssl import PROTOCOL_SSLv2, PROTOCOL_SSLv3, PROTOCOL_SSLv23, PROTOCOL_TLSv1
from _ssl import \
SSL_ERROR_ZERO_RETURN, \
SSL_ERROR_WANT_READ, \
SSL_ERROR_WANT_WRITE, \
SSL_ERROR_WANT_X509_LOOKUP, \
SSL_ERROR_SYSCALL, \
SSL_ERROR_SSL, \
SSL_ERROR_WANT_CONNECT, \
SSL_ERROR_EOF, \
SSL_ERROR_INVALID_ERROR_CODE
from socket import socket
from socket import getnameinfo as _getnameinfo
# Root certs:
#
# The "ca_certs" argument to sslsocket() expects a file containing one or more
# certificates that are roots of various certificate signing chains. This file
# contains the certificates in PEM format (RFC ) where each certificate is
# encoded in base64 encoding and surrounded with a header and footer:
# -----BEGIN CERTIFICATE-----
# ... (CA certificate in base64 encoding) ...
# -----END CERTIFICATE-----
# The various certificates in the file are just concatenated together:
# -----BEGIN CERTIFICATE-----
# ... (CA certificate in base64 encoding) ...
# -----END CERTIFICATE-----
# -----BEGIN CERTIFICATE-----
# ... (a second CA certificate in base64 encoding) ...
# -----END CERTIFICATE-----
#
# Some "standard" root certificates are available at
#
# http://www.thawte.com/roots/ (for Thawte roots)
# http://www.verisign.com/support/roots.html (for Verisign)
class sslsocket (socket):
"""This class implements a subtype of socket.socket that wraps
the underlying OS socket in an SSL context when necessary, and
provides read and write methods over that channel."""
def __init__(self, sock, keyfile=None, certfile=None,
server_side=False, cert_reqs=CERT_NONE,
ssl_version=PROTOCOL_SSLv23, ca_certs=None):
socket.__init__(self, _sock=sock._sock)
if certfile and not keyfile:
keyfile = certfile
if server_side:
self._sslobj = _ssl.sslwrap(self._sock, 1, keyfile, certfile,
cert_reqs, ssl_version, ca_certs)
# see if it's connected
try:
socket.getpeername(self)
except:
# no, no connection yet
self._sslobj = None
else:
# see if it's connected
try:
socket.getpeername(self)
except:
# no, no connection yet
self._sslobj = None
else:
# yes, create the SSL object
self._sslobj = _ssl.sslwrap(self._sock, 0, keyfile, certfile,
cert_reqs, ssl_version, ca_certs)
# yes, create the SSL object
self._sslobj = _ssl.sslwrap(self._sock, server_side,
keyfile, certfile,
cert_reqs, ssl_version, ca_certs)
self.keyfile = keyfile
self.certfile = certfile
self.cert_reqs = cert_reqs
@ -116,73 +108,123 @@ def __init__(self, sock, keyfile=None, certfile=None,
self.ca_certs = ca_certs
def read(self, len=1024):
"""Read up to LEN bytes and return them.
Return zero-length string on EOF."""
return self._sslobj.read(len)
def write(self, data):
"""Write DATA to the underlying SSL channel. Returns
number of bytes of DATA actually transmitted."""
return self._sslobj.write(data)
def getpeercert(self):
"""Returns a formatted version of the data in the
certificate provided by the other end of the SSL channel.
Return None if no certificate was provided, {} if a
certificate was provided, but not validated."""
return self._sslobj.peer_certificate()
def send (self, data, flags=0):
if flags != 0:
raise ValueError(
"non-zero flags not allowed in calls to send() on %s" %
self.__class__)
return self._sslobj.write(data)
if self._sslobj:
if flags != 0:
raise ValueError(
"non-zero flags not allowed in calls to send() on %s" %
self.__class__)
return self._sslobj.write(data)
else:
return socket.send(self, data, flags)
def send_to (self, data, addr, flags=0):
raise ValueError("send_to not allowed on instances of %s" %
self.__class__)
if self._sslobj:
raise ValueError("send_to not allowed on instances of %s" %
self.__class__)
else:
return socket.send_to(self, data, addr, flags)
def sendall (self, data, flags=0):
if flags != 0:
raise ValueError(
"non-zero flags not allowed in calls to sendall() on %s" %
self.__class__)
return self._sslobj.write(data)
if self._sslobj:
if flags != 0:
raise ValueError(
"non-zero flags not allowed in calls to sendall() on %s" %
self.__class__)
return self._sslobj.write(data)
else:
return socket.sendall(self, data, flags)
def recv (self, buflen=1024, flags=0):
if flags != 0:
raise ValueError(
"non-zero flags not allowed in calls to sendall() on %s" %
self.__class__)
return self._sslobj.read(data, buflen)
if self._sslobj:
if flags != 0:
raise ValueError(
"non-zero flags not allowed in calls to sendall() on %s" %
self.__class__)
return self._sslobj.read(data, buflen)
else:
return socket.recv(self, buflen, flags)
def recv_from (self, addr, buflen=1024, flags=0):
raise ValueError("recv_from not allowed on instances of %s" %
self.__class__)
if self._sslobj:
raise ValueError("recv_from not allowed on instances of %s" %
self.__class__)
else:
return socket.recv_from(self, addr, buflen, flags)
def ssl_shutdown(self):
"""Shuts down the SSL channel over this socket (if active),
without closing the socket connection."""
def shutdown(self):
if self._sslobj:
self._sslobj.shutdown()
self._sslobj = None
else:
socket.shutdown(self)
def shutdown(self, how):
self.ssl_shutdown()
socket.shutdown(self, how)
def close(self):
if self._sslobj:
self.shutdown()
else:
socket.close(self)
self.ssl_shutdown()
socket.close(self)
def connect(self, addr):
"""Connects to remote ADDR, and then wraps the connection in
an SSL channel."""
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._sslobj or (self.getsockname()[1] != 0):
if self._sslobj:
raise ValueError("attempt to connect already-connected sslsocket!")
socket.connect(self, addr)
self._sslobj = _ssl.sslwrap(self._sock, 0, self.keyfile, self.certfile,
self._sslobj = _ssl.sslwrap(self._sock, False, self.keyfile, self.certfile,
self.cert_reqs, self.ssl_version,
self.ca_certs)
def accept(self):
raise ValueError("accept() not supported on an sslsocket")
"""Accepts a new connection from a remote client, and returns
a tuple containing that new connection wrapped with a server-side
SSL channel, and the address of the remote client."""
newsock, addr = socket.accept(self)
return (sslsocket(newsock, True, self.keyfile, self.certfile,
self.cert_reqs, self.ssl_version,
self.ca_certs), addr)
# some utility functions
def cert_time_to_seconds(cert_time):
"""Takes a date-time string in standard ASN1_print form
("MON DAY 24HOUR:MINUTE:SEC YEAR TIMEZONE") and return
a Python time value in seconds past the epoch."""
import time
return time.mktime(time.strptime(cert_time, "%b %d %H:%M:%S %Y GMT"))
@ -190,66 +232,9 @@ def cert_time_to_seconds(cert_time):
def sslwrap_simple (sock, keyfile=None, certfile=None):
"""A replacement for the old socket.ssl function. Designed
for compability with Python 2.5 and earlier. Will disappear in
Python 3.0."""
return _ssl.sslwrap(sock._sock, 0, keyfile, certfile, CERT_NONE,
PROTOCOL_SSLv23, None)
# fetch the certificate that the server is providing in PEM form
def fetch_server_certificate (host, port):
import re, tempfile, os
def subproc(cmd):
from subprocess import Popen, PIPE, STDOUT
proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True)
status = proc.wait()
output = proc.stdout.read()
return status, output
def strip_to_x509_cert(certfile_contents, outfile=None):
m = re.search(r"^([-]+BEGIN CERTIFICATE[-]+[\r]*\n"
r".*[\r]*^[-]+END CERTIFICATE[-]+)$",
certfile_contents, re.MULTILINE | re.DOTALL)
if not m:
return None
else:
tn = tempfile.mktemp()
fp = open(tn, "w")
fp.write(m.group(1) + "\n")
fp.close()
try:
tn2 = (outfile or tempfile.mktemp())
status, output = subproc(r'openssl x509 -in "%s" -out "%s"' %
(tn, tn2))
if status != 0:
raise OperationError(status, tsig, output)
fp = open(tn2, 'rb')
data = fp.read()
fp.close()
os.unlink(tn2)
return data
finally:
os.unlink(tn)
if sys.platform.startswith("win"):
tfile = tempfile.mktemp()
fp = open(tfile, "w")
fp.write("quit\n")
fp.close()
try:
status, output = subproc(
'openssl s_client -connect "%s:%s" -showcerts < "%s"' %
(host, port, tfile))
finally:
os.unlink(tfile)
else:
status, output = subproc(
'openssl s_client -connect "%s:%s" -showcerts < /dev/null' %
(host, port))
if status != 0:
raise OSError(status)
certtext = strip_to_x509_cert(output)
if not certtext:
raise ValueError("Invalid response received from server at %s:%s" %
(host, port))
return certtext

View file

@ -0,0 +1,8 @@
# An example for http://bugs.python.org/issue815646
import thread
while 1:
f = open("/tmp/dupa", "w")
thread.start_new_thread(f.close, ())
f.close()

View file

@ -862,6 +862,7 @@ def printlist(x, width=70, indent=4):
test_mhlib
test_openpty
test_ossaudiodev
test_pipes
test_poll
test_posix
test_pty

View file

@ -49,7 +49,7 @@
<head>
<meta http-equiv="Content-Type" content="text/html; charset=ascii" />
<link rel="stylesheet" type="text/css" href="calendar.css" />
<title>Calendar for 2004</title
<title>Calendar for 2004</title>
</head>
<body>
<table border="0" cellpadding="0" cellspacing="0" class="year">

View file

@ -235,10 +235,20 @@ def test_utime_dir(self):
# Restrict test to Win32, since there is no guarantee other
# systems support centiseconds
if sys.platform == 'win32':
def test_1565150(self):
t1 = 1159195039.25
os.utime(self.fname, (t1, t1))
self.assertEquals(os.stat(self.fname).st_mtime, t1)
def get_file_system(path):
import os
root = os.path.splitdrive(os.path.realpath("."))[0] + '\\'
import ctypes
kernel32 = ctypes.windll.kernel32
buf = ctypes.create_string_buffer("", 100)
if kernel32.GetVolumeInformationA(root, None, 0, None, None, None, buf, len(buf)):
return buf.value
if get_file_system(test_support.TESTFN) == "NTFS":
def test_1565150(self):
t1 = 1159195039.25
os.utime(self.fname, (t1, t1))
self.assertEquals(os.stat(self.fname).st_mtime, t1)
def test_1686475(self):
# Verify that an open file can be stat'ed

187
Lib/test/test_pipes.py Normal file
View file

@ -0,0 +1,187 @@
import pipes
import os
import string
import unittest
from test.test_support import TESTFN, run_unittest, unlink, TestSkipped
if os.name != 'posix':
raise TestSkipped('pipes module only works on posix')
TESTFN2 = TESTFN + "2"
class SimplePipeTests(unittest.TestCase):
def tearDown(self):
for f in (TESTFN, TESTFN2):
unlink(f)
def testSimplePipe1(self):
t = pipes.Template()
t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
f = t.open(TESTFN, 'w')
f.write('hello world #1')
f.close()
self.assertEqual(open(TESTFN).read(), 'HELLO WORLD #1')
def testSimplePipe2(self):
open(TESTFN, 'w').write('hello world #2')
t = pipes.Template()
t.append('tr a-z A-Z < $IN > $OUT', pipes.FILEIN_FILEOUT)
t.copy(TESTFN, TESTFN2)
self.assertEqual(open(TESTFN2).read(), 'HELLO WORLD #2')
def testSimplePipe3(self):
open(TESTFN, 'w').write('hello world #2')
t = pipes.Template()
t.append('tr a-z A-Z < $IN', pipes.FILEIN_STDOUT)
self.assertEqual(t.open(TESTFN, 'r').read(), 'HELLO WORLD #2')
def testEmptyPipeline1(self):
# copy through empty pipe
d = 'empty pipeline test COPY'
open(TESTFN, 'w').write(d)
open(TESTFN2, 'w').write('')
t=pipes.Template()
t.copy(TESTFN, TESTFN2)
self.assertEqual(open(TESTFN2).read(), d)
def testEmptyPipeline2(self):
# read through empty pipe
d = 'empty pipeline test READ'
open(TESTFN, 'w').write(d)
t=pipes.Template()
self.assertEqual(t.open(TESTFN, 'r').read(), d)
def testEmptyPipeline3(self):
# write through empty pipe
d = 'empty pipeline test WRITE'
t = pipes.Template()
t.open(TESTFN, 'w').write(d)
self.assertEqual(open(TESTFN).read(), d)
def testQuoting(self):
safeunquoted = string.ascii_letters + string.digits + '!@%_-+=:,./'
unsafe = '"`$\\'
self.assertEqual(pipes.quote(safeunquoted), safeunquoted)
self.assertEqual(pipes.quote('test file name'), "'test file name'")
for u in unsafe:
self.assertEqual(pipes.quote('test%sname' % u),
"'test%sname'" % u)
for u in unsafe:
self.assertEqual(pipes.quote("test%s'name'" % u),
'"test\\%s\'name\'"' % u)
def testRepr(self):
t = pipes.Template()
self.assertEqual(repr(t), "<Template instance, steps=[]>")
t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
self.assertEqual(repr(t),
"<Template instance, steps=[('tr a-z A-Z', '--')]>")
def testSetDebug(self):
t = pipes.Template()
t.debug(False)
self.assertEqual(t.debugging, False)
t.debug(True)
self.assertEqual(t.debugging, True)
def testReadOpenSink(self):
# check calling open('r') on a pipe ending with
# a sink raises ValueError
t = pipes.Template()
t.append('boguscmd', pipes.SINK)
self.assertRaises(ValueError, t.open, 'bogusfile', 'r')
def testWriteOpenSource(self):
# check calling open('w') on a pipe ending with
# a source raises ValueError
t = pipes.Template()
t.prepend('boguscmd', pipes.SOURCE)
self.assertRaises(ValueError, t.open, 'bogusfile', 'w')
def testBadAppendOptions(self):
t = pipes.Template()
# try a non-string command
self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT)
# try a type that isn't recognized
self.assertRaises(ValueError, t.append, 'boguscmd', 'xx')
# shouldn't be able to append a source
self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE)
# check appending two sinks
t = pipes.Template()
t.append('boguscmd', pipes.SINK)
self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK)
# command needing file input but with no $IN
t = pipes.Template()
self.assertRaises(ValueError, t.append, 'boguscmd $OUT',
pipes.FILEIN_FILEOUT)
t = pipes.Template()
self.assertRaises(ValueError, t.append, 'boguscmd',
pipes.FILEIN_STDOUT)
# command needing file output but with no $OUT
t = pipes.Template()
self.assertRaises(ValueError, t.append, 'boguscmd $IN',
pipes.FILEIN_FILEOUT)
t = pipes.Template()
self.assertRaises(ValueError, t.append, 'boguscmd',
pipes.STDIN_FILEOUT)
def testBadPrependOptions(self):
t = pipes.Template()
# try a non-string command
self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT)
# try a type that isn't recognized
self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx')
# shouldn't be able to prepend a sink
self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK)
# check prepending two sources
t = pipes.Template()
t.prepend('boguscmd', pipes.SOURCE)
self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE)
# command needing file input but with no $IN
t = pipes.Template()
self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT',
pipes.FILEIN_FILEOUT)
t = pipes.Template()
self.assertRaises(ValueError, t.prepend, 'boguscmd',
pipes.FILEIN_STDOUT)
# command needing file output but with no $OUT
t = pipes.Template()
self.assertRaises(ValueError, t.prepend, 'boguscmd $IN',
pipes.FILEIN_FILEOUT)
t = pipes.Template()
self.assertRaises(ValueError, t.prepend, 'boguscmd',
pipes.STDIN_FILEOUT)
def testBadOpenMode(self):
t = pipes.Template()
self.assertRaises(ValueError, t.open, 'bogusfile', 'x')
def testClone(self):
t = pipes.Template()
t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
u = t.clone()
self.assertNotEqual(id(t), id(u))
self.assertEqual(t.steps, u.steps)
self.assertNotEqual(id(t.steps), id(u.steps))
self.assertEqual(t.debugging, u.debugging)
def test_main():
run_unittest(SimplePipeTests)
if __name__ == "__main__":
test_main()

View file

@ -135,8 +135,19 @@ def RobotTest(index, robots_txt, good_urls, bad_urls,
RobotTest(7, doc, good, bad)
class TestCase(unittest.TestCase):
def runTest(self):
test_support.requires('network')
# whole site is password-protected.
url = 'http://mueblesmoraleda.com'
parser = robotparser.RobotFileParser()
parser.set_url(url)
parser.read()
self.assertEqual(parser.can_fetch("*", url+"/robots.txt"), False)
def test_main():
test_support.run_unittest(tests)
TestCase().run()
if __name__=='__main__':
test_support.Verbose = 1

View file

@ -110,12 +110,12 @@ def test_978833(self):
if test_support.verbose:
print("test_978833 ...")
import os, httplib
import os, httplib, ssl
with test_support.transient_internet():
s = socket.socket(socket.AF_INET)
s.connect(("www.sf.net", 443))
fd = s.fileno()
sock = httplib.FakeSocket(s, socket.ssl(s))
sock = ssl.sslsocket(s)
s = None
sock.close()
try:

View file

@ -83,7 +83,6 @@
import opcode
import os2emxpath
import pdb
import pipes
import pstats
import py_compile
import pydoc

View file

@ -147,8 +147,7 @@ def testConnectRegistryToLocalMachineWorks(self):
def testRemoteMachineRegistryWorks(self):
if not self.remote_name:
raise test_support.TestSkipped("Remote machine name "
"not specified.")
return # remote machine name not specified
remote_key = ConnectRegistry(self.remote_name, HKEY_CURRENT_USER)
self.TestAll(remote_key)

View file

@ -441,8 +441,10 @@ def test_basic(self):
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
self.assertEqual(p.pow(6,8), 6**8)
except xmlrpclib.ProtocolError as e:
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, e.headers))
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, e.headers))
def test_fail_no_info(self):
# use the broken message class
@ -452,9 +454,11 @@ def test_fail_no_info(self):
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
p.pow(6,8)
except xmlrpclib.ProtocolError as e:
# The two server-side error headers shouldn't be sent back in this case
self.assertTrue(e.headers.get("X-exception") is None)
self.assertTrue(e.headers.get("X-traceback") is None)
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# The two server-side error headers shouldn't be sent back in this case
self.assertTrue(e.headers.get("X-exception") is None)
self.assertTrue(e.headers.get("X-traceback") is None)
else:
self.fail('ProtocolError not raised')
@ -470,10 +474,12 @@ def test_fail_with_info(self):
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
p.pow(6,8)
except xmlrpclib.ProtocolError as e:
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("x-exception"), expected_err)
self.assertTrue(e.headers.get("x-traceback") is not None)
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("x-exception"), expected_err)
self.assertTrue(e.headers.get("x-traceback") is not None)
else:
self.fail('ProtocolError not raised')

View file

@ -91,6 +91,14 @@ def urlcleanup():
if _urlopener:
_urlopener.cleanup()
# check for SSL
try:
import ssl
except:
_have_ssl = False
else:
_have_ssl = True
# exception raised when downloaded size does not match content-length
class ContentTooShortError(IOError):
def __init__(self, message, content):
@ -382,7 +390,7 @@ def http_error_default(self, url, fp, errcode, errmsg, headers):
fp.close()
raise IOError('http error', errcode, errmsg, headers)
if hasattr(socket, "ssl"):
if _have_ssl:
def _https_connection(self, host):
return httplib.HTTPSConnection(host,
key_file=self.key_file,

View file

@ -17,6 +17,9 @@ the format to accommodate documentation needs as they arise.
Permissions History
-------------------
- Bill Janssen was given SVN access on 28 August 2007 by NCN,
for his work on the SSL module and other things related to (SSL) sockets.
- Jeffrey Yasskin was given SVN access on 9 August 2007 by NCN,
for his work on PEPs and other general patches.

View file

@ -3747,6 +3747,108 @@ Array_slice(PyObject *_self, Py_ssize_t ilow, Py_ssize_t ihigh)
return (PyObject *)np;
}
static PyObject *
Array_subscript(PyObject *_self, PyObject *item)
{
CDataObject *self = (CDataObject *)_self;
if (PyIndex_Check(item)) {
Py_ssize_t i = PyNumber_AsSsize_t(item, PyExc_IndexError);
if (i == -1 && PyErr_Occurred())
return NULL;
if (i < 0)
i += self->b_length;
return Array_item(_self, i);
}
else if PySlice_Check(item) {
StgDictObject *stgdict, *itemdict;
PyObject *proto;
PyObject *np;
Py_ssize_t start, stop, step, slicelen, cur, i;
if (PySlice_GetIndicesEx((PySliceObject *)item,
self->b_length, &start, &stop,
&step, &slicelen) < 0) {
return NULL;
}
stgdict = PyObject_stgdict((PyObject *)self);
assert(stgdict); /* Cannot be NULL for array object instances */
proto = stgdict->proto;
itemdict = PyType_stgdict(proto);
assert(itemdict); /* proto is the item type of the array, a
ctypes type, so this cannot be NULL */
if (itemdict->getfunc == getentry("c")->getfunc) {
char *ptr = (char *)self->b_ptr;
char *dest;
if (slicelen <= 0)
return PyString_FromString("");
if (step == 1) {
return PyString_FromStringAndSize(ptr + start,
slicelen);
}
dest = (char *)PyMem_Malloc(slicelen);
if (dest == NULL)
return PyErr_NoMemory();
for (cur = start, i = 0; i < slicelen;
cur += step, i++) {
dest[i] = ptr[cur];
}
np = PyString_FromStringAndSize(dest, slicelen);
PyMem_Free(dest);
return np;
}
#ifdef CTYPES_UNICODE
if (itemdict->getfunc == getentry("u")->getfunc) {
wchar_t *ptr = (wchar_t *)self->b_ptr;
wchar_t *dest;
if (slicelen <= 0)
return PyUnicode_FromUnicode(NULL, 0);
if (step == 1) {
return PyUnicode_FromWideChar(ptr + start,
slicelen);
}
dest = (wchar_t *)PyMem_Malloc(
slicelen * sizeof(wchar_t));
for (cur = start, i = 0; i < slicelen;
cur += step, i++) {
dest[i] = ptr[cur];
}
np = PyUnicode_FromWideChar(dest, slicelen);
PyMem_Free(dest);
return np;
}
#endif
np = PyList_New(slicelen);
if (np == NULL)
return NULL;
for (cur = start, i = 0; i < slicelen;
cur += step, i++) {
PyObject *v = Array_item(_self, cur);
PyList_SET_ITEM(np, i, v);
}
return np;
}
else {
PyErr_SetString(PyExc_TypeError,
"indices must be integers");
return NULL;
}
}
static int
Array_ass_item(PyObject *_self, Py_ssize_t index, PyObject *value)
{
@ -3818,6 +3920,63 @@ Array_ass_slice(PyObject *_self, Py_ssize_t ilow, Py_ssize_t ihigh, PyObject *va
return 0;
}
static int
Array_ass_subscript(PyObject *_self, PyObject *item, PyObject *value)
{
CDataObject *self = (CDataObject *)_self;
if (value == NULL) {
PyErr_SetString(PyExc_TypeError,
"Array does not support item deletion");
return -1;
}
if (PyIndex_Check(item)) {
Py_ssize_t i = PyNumber_AsSsize_t(item, PyExc_IndexError);
if (i == -1 && PyErr_Occurred())
return -1;
if (i < 0)
i += self->b_length;
return Array_ass_item(_self, i, value);
}
else if (PySlice_Check(item)) {
Py_ssize_t start, stop, step, slicelen, otherlen, i, cur;
if (PySlice_GetIndicesEx((PySliceObject *)item,
self->b_length, &start, &stop,
&step, &slicelen) < 0) {
return -1;
}
if ((step < 0 && start < stop) ||
(step > 0 && start > stop))
stop = start;
otherlen = PySequence_Length(value);
if (otherlen != slicelen) {
PyErr_SetString(PyExc_ValueError,
"Can only assign sequence of same size");
return -1;
}
for (cur = start, i = 0; i < otherlen; cur += step, i++) {
PyObject *item = PySequence_GetItem(value, i);
int result;
if (item == NULL)
return -1;
result = Array_ass_item(_self, cur, item);
Py_DECREF(item);
if (result == -1)
return -1;
}
return 0;
}
else {
PyErr_SetString(PyExc_TypeError,
"indices must be integer");
return -1;
}
}
static Py_ssize_t
Array_length(PyObject *_self)
{
@ -3839,6 +3998,12 @@ static PySequenceMethods Array_as_sequence = {
0, /* sq_inplace_repeat; */
};
static PyMappingMethods Array_as_mapping = {
Array_length,
Array_subscript,
Array_ass_subscript,
};
PyTypeObject Array_Type = {
PyVarObject_HEAD_INIT(NULL, 0)
"_ctypes.Array",
@ -3852,7 +4017,7 @@ PyTypeObject Array_Type = {
0, /* tp_repr */
0, /* tp_as_number */
&Array_as_sequence, /* tp_as_sequence */
0, /* tp_as_mapping */
&Array_as_mapping, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
@ -4303,6 +4468,139 @@ Pointer_slice(PyObject *_self, Py_ssize_t ilow, Py_ssize_t ihigh)
return (PyObject *)np;
}
static PyObject *
Pointer_subscript(PyObject *_self, PyObject *item)
{
CDataObject *self = (CDataObject *)_self;
if (PyIndex_Check(item)) {
Py_ssize_t i = PyNumber_AsSsize_t(item, PyExc_IndexError);
if (i == -1 && PyErr_Occurred())
return NULL;
return Pointer_item(_self, i);
}
else if (PySlice_Check(item)) {
PySliceObject *slice = (PySliceObject *)item;
Py_ssize_t start, stop, step;
PyObject *np;
StgDictObject *stgdict, *itemdict;
PyObject *proto;
Py_ssize_t i, len, cur;
/* Since pointers have no length, and we want to apply
different semantics to negative indices than normal
slicing, we have to dissect the slice object ourselves.*/
if (slice->step == Py_None) {
step = 1;
}
else {
step = PyNumber_AsSsize_t(slice->step,
PyExc_ValueError);
if (step == -1 && PyErr_Occurred())
return NULL;
if (step == 0) {
PyErr_SetString(PyExc_ValueError,
"slice step cannot be zero");
return NULL;
}
}
if (slice->start == Py_None) {
if (step < 0) {
PyErr_SetString(PyExc_ValueError,
"slice start is required "
"for step < 0");
return NULL;
}
start = 0;
}
else {
start = PyNumber_AsSsize_t(slice->start,
PyExc_ValueError);
if (start == -1 && PyErr_Occurred())
return NULL;
}
if (slice->stop == Py_None) {
PyErr_SetString(PyExc_ValueError,
"slice stop is required");
return NULL;
}
stop = PyNumber_AsSsize_t(slice->stop,
PyExc_ValueError);
if (stop == -1 && PyErr_Occurred())
return NULL;
if ((step > 0 && start > stop) ||
(step < 0 && start < stop))
len = 0;
else if (step > 0)
len = (stop - start - 1) / step + 1;
else
len = (stop - start + 1) / step + 1;
stgdict = PyObject_stgdict((PyObject *)self);
assert(stgdict); /* Cannot be NULL for pointer instances */
proto = stgdict->proto;
assert(proto);
itemdict = PyType_stgdict(proto);
assert(itemdict);
if (itemdict->getfunc == getentry("c")->getfunc) {
char *ptr = *(char **)self->b_ptr;
char *dest;
if (len <= 0)
return PyString_FromString("");
if (step == 1) {
return PyString_FromStringAndSize(ptr + start,
len);
}
dest = (char *)PyMem_Malloc(len);
if (dest == NULL)
return PyErr_NoMemory();
for (cur = start, i = 0; i < len; cur += step, i++) {
dest[i] = ptr[cur];
}
np = PyString_FromStringAndSize(dest, len);
PyMem_Free(dest);
return np;
}
#ifdef CTYPES_UNICODE
if (itemdict->getfunc == getentry("u")->getfunc) {
wchar_t *ptr = *(wchar_t **)self->b_ptr;
wchar_t *dest;
if (len <= 0)
return PyUnicode_FromUnicode(NULL, 0);
if (step == 1) {
return PyUnicode_FromWideChar(ptr + start,
len);
}
dest = (wchar_t *)PyMem_Malloc(len * sizeof(wchar_t));
if (dest == NULL)
return PyErr_NoMemory();
for (cur = start, i = 0; i < len; cur += step, i++) {
dest[i] = ptr[cur];
}
np = PyUnicode_FromWideChar(dest, len);
PyMem_Free(dest);
return np;
}
#endif
np = PyList_New(len);
if (np == NULL)
return NULL;
for (cur = start, i = 0; i < len; cur += step, i++) {
PyObject *v = Pointer_item(_self, cur);
PyList_SET_ITEM(np, i, v);
}
return np;
}
else {
PyErr_SetString(PyExc_TypeError,
"Pointer indices must be integer");
return NULL;
}
}
static PySequenceMethods Pointer_as_sequence = {
0, /* inquiry sq_length; */
0, /* binaryfunc sq_concat; */
@ -4317,6 +4615,11 @@ static PySequenceMethods Pointer_as_sequence = {
0, /* intargfunc sq_inplace_repeat; */
};
static PyMappingMethods Pointer_as_mapping = {
0,
Pointer_subscript,
};
static int
Pointer_bool(CDataObject *self)
{
@ -4349,7 +4652,7 @@ PyTypeObject Pointer_Type = {
0, /* tp_repr */
&Pointer_as_number, /* tp_as_number */
&Pointer_as_sequence, /* tp_as_sequence */
0, /* tp_as_mapping */
&Pointer_as_mapping, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */

View file

@ -11,6 +11,7 @@
'ftpmirror.py',
'h2py.py',
'lfcr.py',
'../i18n/pygettext.py',
'logmerge.py',
'../../Lib/tabnanny.py',
'../../Lib/timeit.py',

16480
configure vendored

File diff suppressed because it is too large Load diff

View file

@ -4,7 +4,7 @@ dnl Process this file with autoconf 2.0 or later to make a configure script.
m4_define(PYTHON_VERSION, 3.0)
AC_REVISION($Revision$)
AC_PREREQ(2.59)
AC_PREREQ(2.61)
AC_INIT(python, PYTHON_VERSION, http://www.python.org/python-bugs)
AC_CONFIG_SRCDIR([Include/object.h])
AC_CONFIG_HEADER(pyconfig.h)