--- /dev/null
+#!/usr/bin/python2.4
+#
+# Copyright 2009 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Implementation of Magic Signatures protocol.
+
+See Magic Signatures RFC for specification. This module
+implements the Magic Signature API on top of the crypto
+layer in magicsigalg.py, hiding the low level crypto details.
+"""
+
+__author__ = 'jpanzer@google.com (John Panzer)'
+
+
+import base64
+import re
+import sys
+import time
+
+# ElementTree is standard with Python >=2.5, needs
+# environment support for 2.4 and lower.
+try:
+ import xml.etree.ElementTree as et # Python >=2.5
+except ImportError:
+ try:
+ import elementtree as et # Allow local path override
+ except ImportError:
+ raise
+
+import magicsig.magicsigalg
+
+
+_WHITESPACE_RE = re.compile(r'\s+')
+
+
+class Error(Exception):
+ """Error thrown for generic magic envelope failures."""
+
+ def __init__(self):
+ Exception.__init__(self)
+
+
+def NormalizeUserIdToUri(userid):
+ """Normalizes a user-provided user id to a reasonable guess at a URI."""
+ userid = userid.strip()
+
+ # If already in a URI form, we're done:
+ if (userid.startswith('http:') or
+ userid.startswith('https:') or
+ userid.startswith('acct:')):
+ return userid
+
+ if userid.find('@') > 0:
+ return 'acct:'+userid
+
+ # Catchall: Guess at http: if nothing else works.
+ return 'http://'+userid
+
+
+def _GetElementByTagName(e, ns, tag_name):
+ """Retrieves a unique element from a DOM subtree by name.
+
+ Convenience wrapper for the case where the format
+ dictates exactly-once semantics.
+
+ Args:
+ e: Root element of DOM subtree.
+ ns: Namespace of desired element.
+ tag_name: Name of desired element.
+ Raises:
+ ValueError: If the element was not unique or not found.
+ Returns:
+ The desired element.
+ """
+ seq = e.getElementsByTagNameNS(str(ns), str(tag_name))
+ if seq.length == 0: raise ValueError('Element %s not found' % tag_name)
+ if seq.length > 1: raise ValueError('Element %s appears multiple times' %
+ tag_name)
+ return seq.item(0)
+
+
+class KeyRetriever(object):
+ """Retrieves public or private keys for a signer identifier (URI)."""
+
+ def LookupPublicKey(self, signer_uri):
+ # TODO(jpanzer): Really look this up with Webfinger.
+ if not signer_uri:
+ return None
+ return ('RSA.mVgY8RN6URBTstndvmUUPb4UZTdwvwmddSKE5z_jvKUEK6yk1'
+ 'u3rrC9yN8k6FilGj9K0eeUPe2hf4Pj-5CmHww=='
+ '.AQAB'
+ '.Lgy_yL3hsLBngkFdDw1Jy9TmSRMiH6yihYetQ8jy-jZXdsZXd8V5'
+ 'ub3kuBHHk4M39i3TduIkcrjcsiWQb77D8Q==')
+
+ def LookupPrivateKey(self, signer_uri):
+ """Look up signing key for a given signer URI."""
+ # TODO(jpanzer): Fix this up to really work, or eliminate.
+ return self.LookupPublicKey(signer_uri)
+
+_ATOM_NS_URL = 'http://www.w3.org/2005/Atom'
+_ME_NS_URL = 'http://salmon-protocol.org/ns/magic-env'
+_ATOM_NS='{%s}' % _ATOM_NS_URL
+_ME_NS='{%s}' % _ME_NS_URL
+
+# Set up default namespace mappings for things we care about:
+try:
+ __register_namespace = et.register_namespace
+except AttributeError:
+ def __register_namespace(prefix, uri):
+ et._namespace_map[uri] = prefix
+__register_namespace('atom', _ATOM_NS_URL)
+__register_namespace('me', _ME_NS_URL)
+__register_namespace('thr', 'http://purl.org/syndication/thread/1.0')
+
+class MagicEnvelopeProtocol(object):
+ """Implementation of Magic Envelope protocol."""
+
+ ENCODING = 'base64url' # This is a constant for now.
+ key_retriever = KeyRetriever()
+
+ def GetPrivateKey(self, signer_uri):
+ """Retrieves private signing key to be used."""
+ return self.key_retriever.LookupPrivateKey(signer_uri)
+
+ def GetPublicKey(self, signer_uri):
+ """Retrieves public key to be used to verify signatures for signer."""
+ return self.key_retriever.LookupPublicKey(signer_uri)
+
+ def GetSignerURI(self, data):
+ """Grabs signer == first author from given message.
+
+ Currently we're assuming most messages are single author
+ and punting on what it means to sign a multi-author
+ message. We only look at the first (lexical) author
+ in the input and act as if that is the only author.
+
+ Args:
+ data: The message, either pre-parsed or a string.
+ Returns:
+ The URI of the author of the message.
+ """
+ if isinstance(data, et.ElementTree):
+ d = data
+ else:
+ d = et.ElementTree()
+ data = data.encode('utf8') if type(data) is str else data
+ d._setroot(et.XML(data))
+
+ auth_uris = d.getroot().findall(_ATOM_NS+'author/'+_ATOM_NS+'uri')
+ for u in auth_uris:
+ return NormalizeUserIdToUri(u.text)
+
+ def IsAllowedSigner(self, data, userid_uri):
+ """Checks that userid_uri is identified as an allowed signer.
+
+ Note that this does not do a signature check.
+
+ Args:
+ data: The message, either pre-parsed or a string.
+ userid_uri: The URI of the author to be checked.
+ Returns:
+ True iff userid_uri is identified as the first author.
+ """
+ return self.GetSignerURI(data) == userid_uri
+
+ def Verify(self, env):
+ """Verifies magic envelope data.
+
+ Checks that its signature matches the contents and that the
+ author's public key generated the signature.
+
+ Args:
+ env: The magic envelope data in dict form (section 3.1 of spec)
+ Returns:
+ True iff the signature is verified.
+ """
+ assert env['alg'] == 'RSA-SHA256'
+ assert env['encoding'] == self.ENCODING
+
+ # Decode data to text and grab the author:
+ text = base64.urlsafe_b64decode(env['data'].encode('utf-8'))
+ signer_uri = self.GetSignerURI(text)
+
+ verifier = magicsigalg.SignatureAlgRsaSha256(self.GetKeypair(signer_uri))
+
+ return verifier.Verify(env['data'], env['sig'])
+
+ def GetSigningAlg(self, signing_key):
+ """Returns algorithm to use for signing messages.
+
+ Args:
+ signing_key: Keypair to use to construct the algorithm.
+ Returns:
+ An algorithm object that can be used to sign byte sequences.
+ """
+ # TODO(jpanzer): Massage signing_key into appropriate format if needed.
+
+ # Use standard test key if testing:
+ if signing_key == 'TEST':
+ signing_key = ('RSA.mVgY8RN6URBTstndvmUUPb4UZTdwvwmddSKE5z_jvKUEK6yk1'
+ 'u3rrC9yN8k6FilGj9K0eeUPe2hf4Pj-5CmHww=='
+ '.AQAB'
+ '.Lgy_yL3hsLBngkFdDw1Jy9TmSRMiH6yihYetQ8jy-jZXdsZXd8V5'
+ 'ub3kuBHHk4M39i3TduIkcrjcsiWQb77D8Q==')
+
+ return magicsigalg.SignatureAlgRsaSha256(signing_key)
+
+ def GetVerifierAlg(self, public_key):
+ """Returns algorithm to use for verifying messages.
+
+ Args:
+ public_key: Public key to use to construct the algorithm.
+ Returns:
+ An algorithm object that can be used to sign byte sequences.
+ """
+ # TODO(jpanzer): Massage public_key into appropriate format if needed.
+ return magicsigalg.SignatureAlgRsaSha256(public_key)
+
+ def EncodeData(self, raw_text_data, encoding):
+ """Encodes raw data into an armored form.
+
+ Args:
+ raw_text_data: Textual data to be encoded; should be in utf-8 form.
+ encoding: Encoding to use (must be base64url)
+ Raises:
+ ValueError: The encoding is unknown or missing.
+ Returns:
+ The encoded data in the specified format.
+ """
+ if encoding != 'base64url':
+ raise ValueError('Unknown encoding %s' % encoding)
+
+ return base64.urlsafe_b64encode(
+ raw_text_data.encode('utf8'))
+
+ def DecodeData(self, encoded_text_data, encoding):
+ """Decodes armored data into raw text form.
+
+ Args:
+ encoded_text_data: Armored data to be decoded.
+ encoding: Encoding to use.
+ Raises:
+ ValueError: If the encoding is unknown.
+ Returns:
+ The raw decoded text as a string.
+ """
+ if encoding != 'base64url':
+ raise ValueError('Unknown encoding %s' % encoding)
+ return base64.urlsafe_b64decode(encoded_text_data.encode('utf-8'))
+
+ def ParseData(self, raw_text_data, mime_type):
+ """Parses the payload of a magic envelope's data field.
+
+ Args:
+ raw_text_data: Data in given MIME type.
+ mime_type: Type of the textual data. application/atom+xml supported
+ Raises:
+ ValueError: The input format was unrecognized or badly formed.
+ Returns:
+ Parsed data suitable for passing in to other methods of this object.
+ """
+ if mime_type != 'application/atom+xml':
+ raise ValueError('Unknown MIME type %s' % mime_type)
+
+ d = et.ElementTree()
+ raw_text_data = raw_text_data.encode('utf8') if type(raw_text_data) is str else raw_text_data
+ d._setroot(et.XML(raw_text_data))
+
+ return d
+
+ def Parse(self, textinput, mime_type='application/magic-envelope+xml'):
+ """Parses a magic envelope.
+
+ Args:
+ textinput: Input message in either application/magic-envelope
+ or application/atom format.
+ mime_type: MIME type of textinput data.
+ Raises:
+ ValueError: The input format was unrecognized or badly formed.
+ Returns:
+ Magic envelope fields in dict format per section 3.1 of spec.
+ """
+ ns = 'http://salmon-protocol.org/ns/magic-env'
+
+ # TODO(jpanzer): Support JSON format, do real sanity checks against
+ # mime type
+ d = et.ElementTree()
+ textinput = textinput.strip()
+ textinput = textinput.encode('utf8') if type(textinput) is str else textinput
+ d._setroot(et.XML(textinput))
+
+ if d.getroot().tag == _ATOM_NS+'entry':
+ env_el = d.find(_ME_NS+'provenance')
+ elif d.getroot().tag == _ME_NS+'env':
+ env_el = d.getroot()
+ else:
+ raise ValueError('Unrecognized input format')
+
+ def Squeeze(s): # Remove all whitespace
+ return re.sub(_WHITESPACE_RE, '', s)
+
+ data_el = env_el.find(_ME_NS+'data')
+
+ # Pull magic envelope fields out into dict. Don't forget
+ # to remove leading and trailing whitepace from each field's
+ # data.
+ return dict (
+ data=Squeeze(data_el.text),
+ encoding=env_el.findtext(_ME_NS+'encoding'),
+ data_type=data_el.get('type'),
+ alg=env_el.findtext(_ME_NS+'alg'),
+ sig=Squeeze(env_el.findtext(_ME_NS+'sig')),
+ )
+
+
+class EnvelopeError(Error):
+ """Error thrown on failure to initialize an Envelope."""
+ invalid_envelope = None # The failed envelope
+ error_text = None # Human readable error text
+ context = None # Tuple of type,value from chained exception if any
+
+ def __init__(self, envelope, err, context=None):
+ self.invalid_envelope = envelope
+ self.error_text = err
+ self.context = context
+ Error.__init__(self)
+
+ def __str__(self):
+ return '<magicsig.Error "%s" for envelope %s (prior exception: %s)>' % (
+ self.error_text, self.invalid_envelope, self.context)
+
+
+class Envelope(object):
+ """Represents a Magic Envelope."""
+
+ # Envelope contents (verified)
+ _data = None # The payload data as a string
+ _data_type = None # The MIME type of the payload
+ _encoding = None # The encoding to use ("base64url")
+ _alg = None # The algorithm used ("RSA")
+ _sig = None # The signature string
+
+ _parsed_data = None # The data as a parsed object
+ _signer_uri = None # URI of signer
+ _signer_key = None # Key(pair) associated w/signature
+
+ _init_timestamp = None # Timestamp when signed or verified
+
+ def __init__(self,
+ protocol=MagicEnvelopeProtocol(),
+ **kwargs):
+ """Initializes an envelope from arbitrary input."""
+ try:
+ self._protocol = protocol
+ self._Initialize(kwargs)
+
+ if self._sig: # Verify signature if provided
+ self._PerformVerification()
+ elif self._signer_key: # Sign w/signer key if provided
+ self._Sign()
+ else:
+ raise EnvelopeError(self, 'Can neither verify nor sign envelope')
+ except EnvelopeError:
+ raise
+ #except:
+ # raise EnvelopeError(self, 'Unknown envelope failure %s' %
+ # sys.exc_info()[:1],
+ # sys.exc_info()[:2])
+
+ # Record when object successfully initialized. This
+ # also serves as a validity flag.
+ self._init_timestamp = time.time()
+
+ def _Initialize(self, kwargs):
+ """Initializes envelope data from input."""
+ # Input from serialized text document if provided:
+ self._mime_type = kwargs.get('mime_type', None)
+ self._document = kwargs.get('document', None)
+
+ if self._document:
+ # If document provided, use it to parse out fields:
+ fields = self._protocol.Parse(self._document, self._mime_type)
+ kwargs.update(fields)
+
+ # Pull structured data from kwargs and sanity check:
+ self._data = kwargs.get('data', None)
+ self._data_type = kwargs.get('data_type', None)
+ self._encoding = kwargs.get('encoding', 'base64url')
+ self._alg = kwargs.get('alg', 'RSA-SHA256')
+ self._sig = kwargs.get('sig', None)
+
+ # Sanity checks:
+ if not self._data_type:
+ raise EnvelopeError(self, 'Missing data_type')
+ if self._alg != 'RSA-SHA256':
+ raise EnvelopeError(self, 'Unknown alg %s; must be RSA-SHA256' %
+ self._alg)
+ if self._encoding != 'base64url':
+ raise EnvelopeError(self, 'Unknown encoding %s; must be base64url' %
+ self._encoding)
+
+ raw_data = kwargs.get('raw_data_to_sign', None)
+ if raw_data:
+ # If passed raw data to sign, the envelope goes into signing mode.
+ assert self._data_type
+ assert not self._sig
+ assert not self._data
+ assert 'signer_uri' in kwargs
+ assert 'signer_key' in kwargs # And it better be a keypair too!
+
+ self._parsed_data = self._protocol.ParseData(raw_data,
+ self._data_type)
+ self._data = self._protocol.EncodeData(raw_data,
+ self._encoding)
+ self._signer_uri = kwargs['signer_uri']
+ self._signer_key = kwargs['signer_key']
+ elif self._sig:
+ # If passed a signature, the envelope goes into verify mode.
+ if not self._data:
+ raise EnvelopeError(self, 'No data to verify')
+ raw_data = self._protocol.DecodeData(self._data, self._encoding)
+ else:
+ # No raw data and no signature, give up.
+ raise EnvelopeError(self, 'Insufficient data to initialize envelope.')
+
+ # Cache a parsed representation of the raw data:
+ self._parsed_data = self._protocol.ParseData(raw_data, self._data_type)
+
+ # At this point the envelope is initialized but is not yet valid.
+ # (It needs to be either verified or signed.)
+ self._init_timestamp = None
+
+ def Age(self):
+ """Age of object since successful verification."""
+ assert self._init_timestamp
+
+ return self._init_timestamp - time.time()
+
+ def _Sign(self):
+ """Signs an envelope given appropriate key inputs."""
+ assert self._signer_uri
+ assert self._signer_key
+ assert self._protocol.IsAllowedSigner(self._parsed_data, self._signer_uri)
+
+ signature_alg = self._protocol.GetSigningAlg(self._signer_key)
+ self._sig = signature_alg.Sign(self._data)
+ self._alg = signature_alg.GetName()
+
+ # Hmm. This seems like a no-brainer assert but what if you're
+ # signing something with a not-yet-published public key?
+ assert signature_alg.Verify(self._data, self._sig)
+
+ # TODO(jpanzer): Clear private key data from object?
+
+ def _PerformVerification(self):
+ """Performs signature verification on parsed data."""
+ # Decode data to text, cache parsed representation,
+ # and find the key to use:
+ text = base64.urlsafe_b64decode(self._data.encode('utf-8'))
+ self._parsed_data = self._protocol.ParseData(text, self._data_type)
+ self._signer_uri = self._protocol.GetSignerURI(self._parsed_data)
+ self._signer_public_key = self._protocol.GetPublicKey(self._signer_uri)
+
+ # Get a verifier for that key:
+ verifier = self._protocol.GetVerifierAlg(self._signer_public_key)
+
+ # Check whether the signature verifies; if not, abandon
+ # this envelope.
+ if not verifier.Verify(self._data, self._sig):
+ raise EnvelopeError(self, 'Signature verification failed.')
+
+ def ToXML(self, fulldoc=True, indentation=0):
+ """Turns envelope into serialized XML suitable for transmission.
+
+ Args:
+ fulldoc: Return a full XML document with <?xml...
+ indentation: Indent each line this number of spaces.
+ Returns:
+ An XML document or fragment in string form.
+ """
+ assert self._init_timestamp # Object must be successfully initialized
+ # TODO(jpanzer): Determine leeway period before requiring another
+ # verification
+ # (we can't keep an object sitting around in memory for a month without
+ # rechecking the signature).
+
+ # Template for a Magic Envelope:
+ if fulldoc:
+ template = '<?xml version=\'1.0\' encoding=\'UTF-8\'?>'
+ else:
+ template = ''
+ template += """
+<me:env xmlns:me='http://salmon-protocol.org/ns/magic-env'>
+ <me:encoding>%s</me:encoding>
+ <me:data type='application/atom+xml'>
+%s
+ </me:data>
+ <me:alg>%s</me:alg>
+ <me:sig>
+%s
+ </me:sig>
+</me:env>
+"""
+ text = template % (self._encoding,
+ _ToPretty(self._data, 4, 60),
+ self._alg,
+ _ToPretty(self._sig, 4, 60))
+ indented_text = ''
+ for line in text.strip().split('\n'):
+ indented_text += ' '*indentation + line + '\n'
+
+ return indented_text
+
+ def ToAtom(self, fulldoc=True, indentation=0):
+ """Turns envelope into serialized Atom entry.
+
+ Args:
+ fulldoc: Return a full XML document with <?xml...
+ indentation: Indent each line this number of spaces.
+ Returns:
+ An Atom entry XML document with an me:provenance element
+ containing the original magic signature data.
+ """
+ if not self._parsed_data:
+ self._parsed_data = self._protocol.ParseData(text, self._data_type)
+
+ d = self._parsed_data
+ assert d.getroot().tag == _ATOM_NS+'entry'
+
+ # Create a provenance and add it in.
+ prov_el = et.Element(_ME_NS+'provenance')
+ data_el = et.SubElement(prov_el, _ME_NS+'data')
+ data_el.set('type', self._data_type)
+ data_el.text = '\n'+_ToPretty(self._data, indentation+6, 60)
+ et.SubElement(prov_el, _ME_NS+'encoding').text = self._encoding
+ et.SubElement(prov_el, _ME_NS+'sig').text = '\n'+_ToPretty(self._sig,
+ indentation+6,
+ 60)
+
+ # Add in the provenance element:
+ d.getroot().append(prov_el)
+
+ # Prettify:
+ self._PrettyIndent(d.getroot(), indentation/2)
+
+ # Turn it back into text for consumption:
+ text = et.tostring(d.getroot(),encoding='utf-8')
+
+ indented_text = ''
+ for line in text.strip().split('\n'):
+ if line.strip() != '':
+ indented_text += ' '*indentation + line + '\n'
+
+ if fulldoc:
+ indented_text = ('<?xml version=\'1.0\' encoding=\'UTF-8\'?>\n' +
+ indented_text)
+ return indented_text
+
+ def GetData(self):
+ """Returns envelope's verified data."""
+ return self._protocol.Decode(self._data, self._encoding)
+
+ def GetParsedData(self):
+ """Returns envelope's verified data in parsed form."""
+ if not self._parsed_data:
+ self._parsed_data = self._protocol.ParseData(
+ self._protocol.Decode(self._data),
+ self._data_type)
+ return self._parsed_data
+
+ def GetDataWithProvenance(self):
+ """Returns envelope's data as a string with provenance attached."""
+ # TODO(jpanzer): Implement.
+
+ def GetParsedDataWithProvenance(self):
+ """Returns data with provenance in parsed form."""
+ # TODO(jpanzer): Implement.
+
+
+ def _PrettyIndent(self, elem, level=0):
+ """Prettifies an element tree in-place"""
+ # TODO(jpanzer): Avoid munging text nodes where it matters?
+ i = "\n" + level*" "
+ if len(elem):
+ if not elem.text or not elem.text.strip():
+ elem.text = i + " "
+ if not elem.tail or not elem.tail.strip():
+ elem.tail = i
+ for elem in elem:
+ self._PrettyIndent(elem, level+1)
+ if not elem.tail or not elem.tail.strip():
+ elem.tail = i
+ else:
+ if level and (not elem.tail or not elem.tail.strip()):
+ elem.tail = i
+
+def _ToPretty(text, indent, linelength):
+ """Makes huge text lines pretty, or at least printable."""
+ return text
+ tl = linelength - indent
+ output = ''
+ for i in range(0, len(text), tl):
+ if output:
+ output += '\n'
+ output += ' ' * indent + text[i:i+tl]
+ return output
--- /dev/null
+#!/usr/bin/python2.4
+#
+# Copyright 2009 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Implementation of Magic Signatures low level operations.
+
+See Magic Signatures RFC for specification. This implements
+the cryptographic layer of the spec, essentially signing and
+verifying byte buffers using a public key algorithm.
+"""
+
+__author__ = 'jpanzer@google.com (John Panzer)'
+
+
+import base64
+import re
+
+# PyCrypto: Note that this is not available in the
+# downloadable GAE SDK, must be installed separately.
+# See http://code.google.com/p/googleappengine/issues/detail?id=2493
+# for why this is most easily installed under the
+# project's path rather than somewhere more sane.
+import Crypto.PublicKey
+import Crypto.PublicKey.RSA
+from Crypto.Util import number
+
+import hashlib
+
+
+# Note that PyCrypto is a very low level library and its documentation
+# leaves something to be desired. As a cheat sheet, for the RSA
+# algorithm, here's a decoding of terminology:
+# n - modulus (public)
+# e - public exponent
+# d - private exponent
+# (n, e) - public key
+# (n, d) - private key
+# (p, q) - the (private) primes from which the keypair is derived.
+
+# Thus a public key is a tuple (n,e) and a public/private key pair
+# is a tuple (n,e,d). Often the exponent is 65537 so for convenience
+# we default e=65537 in this code.
+
+
+def GenSampleSignature(text):
+ """Demo using a hard coded, test public/private keypair."""
+ demo_keypair = ('RSA.mVgY8RN6URBTstndvmUUPb4UZTdwvwmddSKE5z_jvKUEK6yk1'
+ 'u3rrC9yN8k6FilGj9K0eeUPe2hf4Pj-5CmHww=='
+ '.AQAB'
+ '.Lgy_yL3hsLBngkFdDw1Jy9TmSRMiH6yihYetQ8jy-jZXdsZXd8V5'
+ 'ub3kuBHHk4M39i3TduIkcrjcsiWQb77D8Q==')
+
+ signer = SignatureAlgRsaSha256(demo_keypair)
+ return signer.Sign(text)
+
+
+# Utilities
+def _NumToB64(num):
+ """Turns a bignum into a urlsafe base64 encoded string."""
+ return base64.urlsafe_b64encode(number.long_to_bytes(num))
+
+
+def _B64ToNum(b64):
+ """Turns a urlsafe base64 encoded string into a bignum."""
+ print(b64)
+ return number.bytes_to_long(base64.urlsafe_b64decode(b64))
+
+# Patterns for parsing serialized keys
+_WHITESPACE_RE = re.compile(r'\s+')
+_KEY_RE = re.compile(
+ r"""RSA\.
+ (?P<mod>[^\.]+)
+ \.
+ (?P<exp>[^\.]+)
+ (?:\.
+ (?P<private_exp>[^\.]+)
+ )?""",
+ re.VERBOSE)
+
+
+# Implementation of the Magic Envelope signature algorithm
+class SignatureAlgRsaSha256(object):
+ """Signature algorithm for RSA-SHA256 Magic Envelope."""
+
+ def __init__(self, rsa_key):
+ """Initializes algorithm with key information.
+
+ Args:
+ rsa_key: Key in either string form or a tuple in the
+ format expected by Crypto.PublicKey.RSA.
+ Raises:
+ ValueError: The input format was incorrect.
+ """
+ if isinstance(rsa_key, tuple):
+ self.keypair = Crypto.PublicKey.RSA.construct(rsa_key)
+ else:
+ self._InitFromString(rsa_key)
+
+ def ToString(self, full_key_pair=True):
+ """Serializes key to a safe string storage format.
+
+ Args:
+ full_key_pair: Whether to save the private key portion as well.
+ Returns:
+ The string representation of the key in the format:
+
+ RSA.mod.exp[.optional_private_exp]
+
+ Each component is a urlsafe-base64 encoded representation of
+ the corresponding RSA key field.
+ """
+ mod = _NumToB64(self.keypair.n)
+ exp = '.' + _NumToB64(self.keypair.e)
+ private_exp = ''
+ if full_key_pair and self.keypair.d:
+ private_exp = '.' + _NumToB64(self.keypair.d)
+ return 'RSA.' + mod + exp + private_exp
+
+ def _InitFromString(self, text):
+ """Parses key from a standard string storage format.
+
+ Args:
+ text: The key in text form. See ToString for description
+ of expected format.
+ Raises:
+ ValueError: The input format was incorrect.
+ """
+ # First, remove all whitespace:
+ text = re.sub(_WHITESPACE_RE, '', text)
+ #throw away first item and convert from base64 to long
+ items = [_B64ToNum(item) for item in text.split('.')[1:]]
+ self.keypair = Crypto.PublicKey.RSA.construct(items)
+
+ def GetName(self):
+ """Returns string identifier for algorithm used."""
+ return 'RSA-SHA256'
+
+ def _MakeEmsaMessageSha256(self, msg, modulus_size, logf=None):
+ """Algorithm EMSA_PKCS1-v1_5 from PKCS 1 version 2.
+
+ This is derived from keyczar code, and implements the
+ additional ASN.1 compatible magic header bytes and
+ padding needed to implement PKCS1-v1_5.
+
+ Args:
+ msg: The message to sign.
+ modulus_size: The size of the key (in bits) used.
+ Returns:
+ The byte sequence of the message to be signed.
+ """
+ magic_sha256_header = [0x30, 0x31, 0x30, 0xd, 0x6, 0x9, 0x60, 0x86, 0x48,
+ 0x1, 0x65, 0x3, 0x4, 0x2, 0x1, 0x5, 0x0, 0x4, 0x20]
+
+ hash_of_msg = hashlib.sha256(msg).digest() #???
+
+ self._Log(logf, 'sha256 digest of msg %s: %s' % (msg, hash_of_msg))
+
+ encoded = bytes(magic_sha256_header) + hash_of_msg
+ msg_size_bits = modulus_size + 8-(modulus_size % 8) # Round up to next byte
+
+ pad_string = b'\xFF' * (msg_size_bits // 8 - len(encoded) - 3)
+ return b'\x00\x01' + pad_string + b'\x00' + encoded
+
+ def _Log(self, logf, s):
+ """Append message to log if log exists."""
+ print(s)
+ if logf:
+ logf(s + '\n')
+
+ def Sign(self, bytes_to_sign, logf=None):
+ """Signs the bytes using PKCS-v1_5.
+
+ Args:
+ bytes_to_sign: The bytes to be signed.
+ Returns:
+ The signature in base64url encoded format.
+ """
+ # Implements PKCS1-v1_5 w/SHA256 over the bytes, and returns
+ # the result as a base64url encoded bignum.
+
+ self._Log(logf, 'bytes_to_sign = %s' % bytes_to_sign)
+ self._Log(logf, 'len = %d' % len(bytes_to_sign))
+
+ self._Log(logf, 'keypair size : %s' % self.keypair.size())
+
+ # Generate the PKCS1-v1_5 compatible message, which includes
+ # magic ASN.1 bytes and padding:
+ emsa_msg = self._MakeEmsaMessageSha256(bytes_to_sign, self.keypair.size(), logf)
+ # TODO(jpanzer): Check whether we need to use max keysize above
+ # or just keypair.size
+
+ self._Log(logf, 'emsa_msg = %s' % emsa_msg)
+
+ # Compute the signature:
+ signature_long = self.keypair.sign(emsa_msg, None)[0]
+
+ # Encode the signature as armored text:
+ signature_bytes = number.long_to_bytes(signature_long)
+
+ self._Log(logf, 'signature_bytes = [%s]' % signature_bytes)
+
+ return base64.urlsafe_b64encode(signature_bytes)
+
+ def Verify(self, signed_bytes, signature_b64):
+ """Determines the validity of a signature over a signed buffer of bytes.
+
+ Args:
+ signed_bytes: string The buffer of bytes the signature_b64 covers.
+ signature_b64: string The putative signature, base64-encoded, to check.
+ Returns:
+ True if the request validated, False otherwise.
+ """
+ # Generate the PKCS1-v1_5 compatible message, which includes
+ # magic ASN.1 bytes and padding:
+ emsa_msg = self._MakeEmsaMessageSha256(signed_bytes,
+ self.keypair.size())
+
+ # Get putative signature:
+ putative_signature = base64.urlsafe_b64decode(signature_b64)
+ putative_signature = number.bytes_to_long(putative_signature)
+
+ # Verify signature given public key:
+ return self.keypair.verify(emsa_msg, (putative_signature,))