Logo Search packages:      
Sourcecode: python-openid version File versions

test_negotiation.py

import unittest
from support import CatchLogs

from openid.message import Message, OPENID2_NS, OPENID1_NS, OPENID_NS
from openid import association
from openid.consumer.consumer import GenericConsumer, ServerError
from openid.consumer.discover import OpenIDServiceEndpoint, OPENID_2_0_TYPE

00010 class ErrorRaisingConsumer(GenericConsumer):
    """
    A consumer whose _requestAssocation will return predefined results
    instead of trying to actually perform association requests.
    """

    # The list of objects to be returned by successive calls to
    # _requestAssocation.  Each call will pop the first element from
    # this list and return it to _negotiateAssociation.  If the
    # element is a Message object, it will be wrapped in a ServerError
    # exception.  Otherwise it will be returned as-is.
    return_messages = []

00023     def _requestAssociation(self, endpoint, assoc_type, session_type):
        m = self.return_messages.pop(0)
        if isinstance(m, Message):
            raise ServerError.fromMessage(m)
        else:
            return m

00030 class TestOpenID2SessionNegotiation(unittest.TestCase, CatchLogs):
    """
    Test the session type negotiation behavior of an OpenID 2
    consumer.
    """
    def setUp(self):
        CatchLogs.setUp(self)
        self.consumer = ErrorRaisingConsumer(store=None)

        self.endpoint = OpenIDServiceEndpoint()
        self.endpoint.type_uris = [OPENID_2_0_TYPE]
        self.endpoint.server_url = 'bogus'

00043     def testBadResponse(self):
        """
        Test the case where the response to an associate request is a
        server error or is otherwise undecipherable.
        """
        self.consumer.return_messages = [Message(self.endpoint.preferredNamespace())]
        self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)
        self.failUnlessLogMatches('Server error when requesting an association')

00052     def testEmptyAssocType(self):
        """
        Test the case where the association type (assoc_type) returned
        in an unsupported-type response is absent.
        """
        msg = Message(self.endpoint.preferredNamespace())
        msg.setArg(OPENID_NS, 'error', 'Unsupported type')
        msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
        msg.setArg(OPENID_NS, 'assoc_type', None)
        msg.setArg(OPENID_NS, 'session_type', 'new-session-type')

        self.consumer.return_messages = [msg]
        self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)

        self.failUnlessLogMatches('Unsupported association type',
                                  'Server responded with unsupported association ' +
                                  'session but did not supply a fallback.')

00070     def testEmptySessionType(self):
        """
        Test the case where the session type (session_type) returned
        in an unsupported-type response is absent.
        """
        msg = Message(self.endpoint.preferredNamespace())
        msg.setArg(OPENID_NS, 'error', 'Unsupported type')
        msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
        msg.setArg(OPENID_NS, 'assoc_type', 'new-assoc-type')
        msg.setArg(OPENID_NS, 'session_type', None)

        self.consumer.return_messages = [msg]
        self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)

        self.failUnlessLogMatches('Unsupported association type',
                                  'Server responded with unsupported association ' +
                                  'session but did not supply a fallback.')

00088     def testNotAllowed(self):
        """
        Test the case where an unsupported-type response specifies a
        preferred (assoc_type, session_type) combination that is not
        allowed by the consumer's SessionNegotiator.
        """
        allowed_types = [
            ('assoc_bogus', 'session_bogus'),
            ]

        negotiator = association.SessionNegotiator(allowed_types)
        self.consumer.negotiator = negotiator

        msg = Message(self.endpoint.preferredNamespace())
        msg.setArg(OPENID_NS, 'error', 'Unsupported type')
        msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
        msg.setArg(OPENID_NS, 'assoc_type', 'not-allowed')
        msg.setArg(OPENID_NS, 'session_type', 'not-allowed')

        self.consumer.return_messages = [msg]
        self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)

        self.failUnlessLogMatches('Unsupported association type',
                                  'Server sent unsupported session/association type:')

00113     def testUnsupportedWithRetry(self):
        """
        Test the case where an unsupported-type response triggers a
        retry to get an association with the new preferred type.
        """
        msg = Message(self.endpoint.preferredNamespace())
        msg.setArg(OPENID_NS, 'error', 'Unsupported type')
        msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
        msg.setArg(OPENID_NS, 'assoc_type', 'HMAC-SHA1')
        msg.setArg(OPENID_NS, 'session_type', 'DH-SHA1')

        assoc = association.Association(
            'handle', 'secret', 'issued', 10000, 'HMAC-SHA1')

        self.consumer.return_messages = [msg, assoc]
        self.failUnless(self.consumer._negotiateAssociation(self.endpoint) is assoc)

        self.failUnlessLogMatches('Unsupported association type')

00132     def testUnsupportedWithRetryAndFail(self):
        """
        Test the case where an unsupported-typ response triggers a
        retry, but the retry fails and None is returned instead.
        """
        msg = Message(self.endpoint.preferredNamespace())
        msg.setArg(OPENID_NS, 'error', 'Unsupported type')
        msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
        msg.setArg(OPENID_NS, 'assoc_type', 'HMAC-SHA1')
        msg.setArg(OPENID_NS, 'session_type', 'DH-SHA1')

        self.consumer.return_messages = [msg,
             Message(self.endpoint.preferredNamespace())]

        self.failUnlessEqual(self.consumer._negotiateAssociation(self.endpoint), None)

        self.failUnlessLogMatches('Unsupported association type',
                                  'Server %s refused' % (self.endpoint.server_url))

00151     def testValid(self):
        """
        Test the valid case, wherein an association is returned on the
        first attempt to get one.
        """
        assoc = association.Association(
            'handle', 'secret', 'issued', 10000, 'HMAC-SHA1')

        self.consumer.return_messages = [assoc]
        self.failUnless(self.consumer._negotiateAssociation(self.endpoint) is assoc)
        self.failUnlessLogEmpty()

00163 class TestOpenID1SessionNegotiation(unittest.TestCase, CatchLogs):
    """
    Tests for the OpenID 1 consumer association session behavior.  See
    the docs for TestOpenID2SessionNegotiation.  Notice that this
    class is not a subclass of the OpenID 2 tests.  Instead, it uses
    many of the same inputs but inspects the log messages logged with
    oidutil.log.  See the calls to self.failUnlessLogMatches.  Some of
    these tests pass openid2-style messages to the openid 1
    association processing logic to be sure it ignores the extra data.
    """
    def setUp(self):
        CatchLogs.setUp(self)
        self.consumer = ErrorRaisingConsumer(store=None)

        self.endpoint = OpenIDServiceEndpoint()
        self.endpoint.type_uris = [OPENID1_NS]
        self.endpoint.server_url = 'bogus'

    def testBadResponse(self):
        self.consumer.return_messages = [Message(self.endpoint.preferredNamespace())]
        self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)
        self.failUnlessLogMatches('Server error when requesting an association')

    def testEmptyAssocType(self):
        msg = Message(self.endpoint.preferredNamespace())
        msg.setArg(OPENID_NS, 'error', 'Unsupported type')
        msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
        msg.setArg(OPENID_NS, 'assoc_type', None)
        msg.setArg(OPENID_NS, 'session_type', 'new-session-type')

        self.consumer.return_messages = [msg]
        self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)

        self.failUnlessLogMatches('Server error when requesting an association')

    def testEmptySessionType(self):
        msg = Message(self.endpoint.preferredNamespace())
        msg.setArg(OPENID_NS, 'error', 'Unsupported type')
        msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
        msg.setArg(OPENID_NS, 'assoc_type', 'new-assoc-type')
        msg.setArg(OPENID_NS, 'session_type', None)

        self.consumer.return_messages = [msg]
        self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)

        self.failUnlessLogMatches('Server error when requesting an association')

    def testNotAllowed(self):
        allowed_types = [
            ('assoc_bogus', 'session_bogus'),
            ]

        negotiator = association.SessionNegotiator(allowed_types)
        self.consumer.negotiator = negotiator

        msg = Message(self.endpoint.preferredNamespace())
        msg.setArg(OPENID_NS, 'error', 'Unsupported type')
        msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
        msg.setArg(OPENID_NS, 'assoc_type', 'not-allowed')
        msg.setArg(OPENID_NS, 'session_type', 'not-allowed')

        self.consumer.return_messages = [msg]
        self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None)

        self.failUnlessLogMatches('Server error when requesting an association')

    def testUnsupportedWithRetry(self):
        msg = Message(self.endpoint.preferredNamespace())
        msg.setArg(OPENID_NS, 'error', 'Unsupported type')
        msg.setArg(OPENID_NS, 'error_code', 'unsupported-type')
        msg.setArg(OPENID_NS, 'assoc_type', 'HMAC-SHA1')
        msg.setArg(OPENID_NS, 'session_type', 'DH-SHA1')

        assoc = association.Association(
            'handle', 'secret', 'issued', 10000, 'HMAC-SHA1')

        self.consumer.return_messages = [msg, assoc]
        self.failUnless(self.consumer._negotiateAssociation(self.endpoint) is None)

        self.failUnlessLogMatches('Server error when requesting an association')

    def testValid(self):
        assoc = association.Association(
            'handle', 'secret', 'issued', 10000, 'HMAC-SHA1')

        self.consumer.return_messages = [assoc]
        self.failUnless(self.consumer._negotiateAssociation(self.endpoint) is assoc)
        self.failUnlessLogEmpty()

class TestNegotiatorBehaviors(unittest.TestCase, CatchLogs):
    def setUp(self):
        self.allowed_types = [
            ('assoc1', 'session1'),
            ('assoc2', 'session2'),
            ]

        self.n = association.SessionNegotiator(self.allowed_types)

    def testAddAllowedTypeNoSessionTypes(self):
        self.assertRaises(ValueError, self.n.addAllowedType, 'invalid')

    def testAddAllowedTypeBadSessionType(self):
        self.assertRaises(ValueError, self.n.addAllowedType, 'assoc1', 'invalid')

    def testAddAllowedTypeContents(self):
        assoc_type = 'HMAC-SHA1'
        self.failUnless(self.n.addAllowedType(assoc_type) is None)

        for typ in association.getSessionTypes(assoc_type):
            self.failUnless((assoc_type, typ) in self.n.allowed_types)

if __name__ == '__main__':
    unittest.main()

Generated by  Doxygen 1.6.0   Back to index