]>
git.frykholm.com Git - friends.git/blob - friends/magicsig/__init__.py
620244b3bc8253395a664201aa7e8da5f1dcfb34
3 # Copyright 2009 Google Inc. All Rights Reserved.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 """Implementation of Magic Signatures protocol.
20 See Magic Signatures RFC for specification. This module
21 implements the Magic Signature API on top of the crypto
22 layer in magicsigalg.py, hiding the low level crypto details.
25 __author__
= 'jpanzer@google.com (John Panzer)'
33 # ElementTree is standard with Python >=2.5, needs
34 # environment support for 2.4 and lower.
36 import xml
.etree
.ElementTree
as et
# Python >=2.5
39 import elementtree
as et
# Allow local path override
43 import magicsig
.magicsigalg
46 _WHITESPACE_RE
= re
.compile(r
'\s+')
49 class Error(Exception):
50 """Error thrown for generic magic envelope failures."""
53 Exception.__init
__(self
)
56 def NormalizeUserIdToUri(userid
):
57 """Normalizes a user-provided user id to a reasonable guess at a URI."""
58 userid
= userid
.strip()
60 # If already in a URI form, we're done:
61 if (userid
.startswith('http:') or
62 userid
.startswith('https:') or
63 userid
.startswith('acct:')):
66 if userid
.find('@') > 0:
69 # Catchall: Guess at http: if nothing else works.
70 return 'http://'+userid
73 def _GetElementByTagName(e
, ns
, tag_name
):
74 """Retrieves a unique element from a DOM subtree by name.
76 Convenience wrapper for the case where the format
77 dictates exactly-once semantics.
80 e: Root element of DOM subtree.
81 ns: Namespace of desired element.
82 tag_name: Name of desired element.
84 ValueError: If the element was not unique or not found.
88 seq
= e
.getElementsByTagNameNS(str(ns
), str(tag_name
))
89 if seq
.length
== 0: raise ValueError('Element %s not found' % tag_name
)
90 if seq
.length
> 1: raise ValueError('Element %s appears multiple times' %
95 class KeyRetriever(object):
96 """Retrieves public or private keys for a signer identifier (URI)."""
98 def LookupPublicKey(self
, signer_uri
):
99 # TODO(jpanzer): Really look this up with Webfinger.
102 return ('RSA.mVgY8RN6URBTstndvmUUPb4UZTdwvwmddSKE5z_jvKUEK6yk1'
103 'u3rrC9yN8k6FilGj9K0eeUPe2hf4Pj-5CmHww=='
105 '.Lgy_yL3hsLBngkFdDw1Jy9TmSRMiH6yihYetQ8jy-jZXdsZXd8V5'
106 'ub3kuBHHk4M39i3TduIkcrjcsiWQb77D8Q==')
108 def LookupPrivateKey(self
, signer_uri
):
109 """Look up signing key for a given signer URI."""
110 # TODO(jpanzer): Fix this up to really work, or eliminate.
111 return self
.LookupPublicKey(signer_uri
)
113 _ATOM_NS_URL
= 'http://www.w3.org/2005/Atom'
114 _ME_NS_URL
= 'http://salmon-protocol.org/ns/magic-env'
115 _ATOM_NS
='{%s}' % _ATOM_NS_URL
116 _ME_NS
='{%s}' % _ME_NS_URL
118 # Set up default namespace mappings for things we care about:
120 __register_namespace
= et
.register_namespace
121 except AttributeError:
122 def __register_namespace(prefix
, uri
):
123 et
._namespace
_map
[uri
] = prefix
124 __register_namespace('atom', _ATOM_NS_URL
)
125 __register_namespace('me', _ME_NS_URL
)
126 __register_namespace('thr', 'http://purl.org/syndication/thread/1.0')
128 class MagicEnvelopeProtocol(object):
129 """Implementation of Magic Envelope protocol."""
131 ENCODING
= 'base64url' # This is a constant for now.
132 key_retriever
= KeyRetriever()
134 def GetPrivateKey(self
, signer_uri
):
135 """Retrieves private signing key to be used."""
136 return self
.key_retriever
.LookupPrivateKey(signer_uri
)
138 def GetPublicKey(self
, signer_uri
):
139 """Retrieves public key to be used to verify signatures for signer."""
140 return self
.key_retriever
.LookupPublicKey(signer_uri
)
142 def GetSignerURI(self
, data
):
143 """Grabs signer == first author from given message.
145 Currently we're assuming most messages are single author
146 and punting on what it means to sign a multi-author
147 message. We only look at the first (lexical) author
148 in the input and act as if that is the only author.
151 data: The message, either pre-parsed or a string.
153 The URI of the author of the message.
155 if isinstance(data
, et
.ElementTree
):
159 data
= data
.encode('utf8') if type(data
) is str else data
160 d
._setroot
(et
.XML(data
))
162 auth_uris
= d
.getroot().findall(_ATOM_NS
+'author/'+_ATOM_NS
+'uri')
164 return NormalizeUserIdToUri(u
.text
)
166 def IsAllowedSigner(self
, data
, userid_uri
):
167 """Checks that userid_uri is identified as an allowed signer.
169 Note that this does not do a signature check.
172 data: The message, either pre-parsed or a string.
173 userid_uri: The URI of the author to be checked.
175 True iff userid_uri is identified as the first author.
177 return self
.GetSignerURI(data
) == userid_uri
179 def Verify(self
, env
):
180 """Verifies magic envelope data.
182 Checks that its signature matches the contents and that the
183 author's public key generated the signature.
186 env: The magic envelope data in dict form (section 3.1 of spec)
188 True iff the signature is verified.
190 assert env
['alg'] == 'RSA-SHA256'
191 assert env
['encoding'] == self
.ENCODING
193 # Decode data to text and grab the author:
194 text
= base64
.urlsafe_b64decode(env
['data'].encode('utf-8'))
195 signer_uri
= self
.GetSignerURI(text
)
197 verifier
= magicsigalg
.SignatureAlgRsaSha256(self
.GetKeypair(signer_uri
))
199 return verifier
.Verify(env
['data'], env
['sig'])
201 def GetSigningAlg(self
, signing_key
):
202 """Returns algorithm to use for signing messages.
205 signing_key: Keypair to use to construct the algorithm.
207 An algorithm object that can be used to sign byte sequences.
209 # TODO(jpanzer): Massage signing_key into appropriate format if needed.
211 # Use standard test key if testing:
212 if signing_key
== 'TEST':
213 signing_key
= ('RSA.mVgY8RN6URBTstndvmUUPb4UZTdwvwmddSKE5z_jvKUEK6yk1'
214 'u3rrC9yN8k6FilGj9K0eeUPe2hf4Pj-5CmHww=='
216 '.Lgy_yL3hsLBngkFdDw1Jy9TmSRMiH6yihYetQ8jy-jZXdsZXd8V5'
217 'ub3kuBHHk4M39i3TduIkcrjcsiWQb77D8Q==')
219 return magicsigalg
.SignatureAlgRsaSha256(signing_key
)
221 def GetVerifierAlg(self
, public_key
):
222 """Returns algorithm to use for verifying messages.
225 public_key: Public key to use to construct the algorithm.
227 An algorithm object that can be used to sign byte sequences.
229 # TODO(jpanzer): Massage public_key into appropriate format if needed.
230 return magicsigalg
.SignatureAlgRsaSha256(public_key
)
232 def EncodeData(self
, raw_text_data
, encoding
):
233 """Encodes raw data into an armored form.
236 raw_text_data: Textual data to be encoded; should be in utf-8 form.
237 encoding: Encoding to use (must be base64url)
239 ValueError: The encoding is unknown or missing.
241 The encoded data in the specified format.
243 if encoding
!= 'base64url':
244 raise ValueError('Unknown encoding %s' % encoding
)
246 return base64
.urlsafe_b64encode(
247 raw_text_data
.encode('utf8'))
249 def DecodeData(self
, encoded_text_data
, encoding
):
250 """Decodes armored data into raw text form.
253 encoded_text_data: Armored data to be decoded.
254 encoding: Encoding to use.
256 ValueError: If the encoding is unknown.
258 The raw decoded text as a string.
260 if encoding
!= 'base64url':
261 raise ValueError('Unknown encoding %s' % encoding
)
262 return base64
.urlsafe_b64decode(encoded_text_data
.encode('utf-8'))
264 def ParseData(self
, raw_text_data
, mime_type
):
265 """Parses the payload of a magic envelope's data field.
268 raw_text_data: Data in given MIME type.
269 mime_type: Type of the textual data. application/atom+xml supported
271 ValueError: The input format was unrecognized or badly formed.
273 Parsed data suitable for passing in to other methods of this object.
275 if mime_type
!= 'application/atom+xml':
276 raise ValueError('Unknown MIME type %s' % mime_type
)
279 raw_text_data
= raw_text_data
.encode('utf8') if type(raw_text_data
) is str else raw_text_data
280 d
._setroot
(et
.XML(raw_text_data
))
284 def Parse(self
, textinput
, mime_type
='application/magic-envelope+xml'):
285 """Parses a magic envelope.
288 textinput: Input message in either application/magic-envelope
289 or application/atom format.
290 mime_type: MIME type of textinput data.
292 ValueError: The input format was unrecognized or badly formed.
294 Magic envelope fields in dict format per section 3.1 of spec.
296 ns
= 'http://salmon-protocol.org/ns/magic-env'
298 # TODO(jpanzer): Support JSON format, do real sanity checks against
301 textinput
= textinput
.strip()
302 textinput
= textinput
.encode('utf8') if type(textinput
) is str else textinput
303 d
._setroot
(et
.XML(textinput
))
305 if d
.getroot().tag
== _ATOM_NS
+'entry':
306 env_el
= d
.find(_ME_NS
+'provenance')
307 elif d
.getroot().tag
== _ME_NS
+'env':
310 raise ValueError('Unrecognized input format')
312 def Squeeze(s
): # Remove all whitespace
313 return re
.sub(_WHITESPACE_RE
, '', s
)
315 data_el
= env_el
.find(_ME_NS
+'data')
317 # Pull magic envelope fields out into dict. Don't forget
318 # to remove leading and trailing whitepace from each field's
321 data
=Squeeze(data_el
.text
),
322 encoding
=env_el
.findtext(_ME_NS
+'encoding'),
323 data_type
=data_el
.get('type'),
324 alg
=env_el
.findtext(_ME_NS
+'alg'),
325 sig
=Squeeze(env_el
.findtext(_ME_NS
+'sig')),
329 class EnvelopeError(Error
):
330 """Error thrown on failure to initialize an Envelope."""
331 invalid_envelope
= None # The failed envelope
332 error_text
= None # Human readable error text
333 context
= None # Tuple of type,value from chained exception if any
335 def __init__(self
, envelope
, err
, context
=None):
336 self
.invalid_envelope
= envelope
337 self
.error_text
= err
338 self
.context
= context
342 return '<magicsig.Error "%s" for envelope %s (prior exception: %s)>' % (
343 self
.error_text
, self
.invalid_envelope
, self
.context
)
346 class Envelope(object):
347 """Represents a Magic Envelope."""
349 # Envelope contents (verified)
350 _data
= None # The payload data as a string
351 _data_type
= None # The MIME type of the payload
352 _encoding
= None # The encoding to use ("base64url")
353 _alg
= None # The algorithm used ("RSA")
354 _sig
= None # The signature string
356 _parsed_data
= None # The data as a parsed object
357 _signer_uri
= None # URI of signer
358 _signer_key
= None # Key(pair) associated w/signature
360 _init_timestamp
= None # Timestamp when signed or verified
363 protocol
=MagicEnvelopeProtocol(),
365 """Initializes an envelope from arbitrary input."""
367 self
._protocol
= protocol
368 self
._Initialize
(kwargs
)
370 if self
._sig
: # Verify signature if provided
371 self
._PerformVerification
()
372 elif self
._signer
_key
: # Sign w/signer key if provided
375 raise EnvelopeError(self
, 'Can neither verify nor sign envelope')
376 except EnvelopeError
:
379 # raise EnvelopeError(self, 'Unknown envelope failure %s' %
380 # sys.exc_info()[:1],
381 # sys.exc_info()[:2])
383 # Record when object successfully initialized. This
384 # also serves as a validity flag.
385 self
._init
_timestamp
= time
.time()
387 def _Initialize(self
, kwargs
):
388 """Initializes envelope data from input."""
389 # Input from serialized text document if provided:
390 self
._mime
_type
= kwargs
.get('mime_type', None)
391 self
._document
= kwargs
.get('document', None)
394 # If document provided, use it to parse out fields:
395 fields
= self
._protocol
.Parse(self
._document
, self
._mime
_type
)
396 kwargs
.update(fields
)
398 # Pull structured data from kwargs and sanity check:
399 self
._data
= kwargs
.get('data', None)
400 self
._data
_type
= kwargs
.get('data_type', None)
401 self
._encoding
= kwargs
.get('encoding', 'base64url')
402 self
._alg
= kwargs
.get('alg', 'RSA-SHA256')
403 self
._sig
= kwargs
.get('sig', None)
406 if not self
._data
_type
:
407 raise EnvelopeError(self
, 'Missing data_type')
408 if self
._alg
!= 'RSA-SHA256':
409 raise EnvelopeError(self
, 'Unknown alg %s; must be RSA-SHA256' %
411 if self
._encoding
!= 'base64url':
412 raise EnvelopeError(self
, 'Unknown encoding %s; must be base64url' %
415 raw_data
= kwargs
.get('raw_data_to_sign', None)
417 # If passed raw data to sign, the envelope goes into signing mode.
418 assert self
._data
_type
420 assert not self
._data
421 assert 'signer_uri' in kwargs
422 assert 'signer_key' in kwargs
# And it better be a keypair too!
424 self
._parsed
_data
= self
._protocol
.ParseData(raw_data
,
426 self
._data
= self
._protocol
.EncodeData(raw_data
,
428 self
._signer
_uri
= kwargs
['signer_uri']
429 self
._signer
_key
= kwargs
['signer_key']
431 # If passed a signature, the envelope goes into verify mode.
433 raise EnvelopeError(self
, 'No data to verify')
434 raw_data
= self
._protocol
.DecodeData(self
._data
, self
._encoding
)
436 # No raw data and no signature, give up.
437 raise EnvelopeError(self
, 'Insufficient data to initialize envelope.')
439 # Cache a parsed representation of the raw data:
440 self
._parsed
_data
= self
._protocol
.ParseData(raw_data
, self
._data
_type
)
442 # At this point the envelope is initialized but is not yet valid.
443 # (It needs to be either verified or signed.)
444 self
._init
_timestamp
= None
447 """Age of object since successful verification."""
448 assert self
._init
_timestamp
450 return self
._init
_timestamp
- time
.time()
453 """Signs an envelope given appropriate key inputs."""
454 assert self
._signer
_uri
455 assert self
._signer
_key
456 assert self
._protocol
.IsAllowedSigner(self
._parsed
_data
, self
._signer
_uri
)
458 signature_alg
= self
._protocol
.GetSigningAlg(self
._signer
_key
)
459 self
._sig
= signature_alg
.Sign(self
._data
)
460 self
._alg
= signature_alg
.GetName()
462 # Hmm. This seems like a no-brainer assert but what if you're
463 # signing something with a not-yet-published public key?
464 assert signature_alg
.Verify(self
._data
, self
._sig
)
466 # TODO(jpanzer): Clear private key data from object?
468 def _PerformVerification(self
):
469 """Performs signature verification on parsed data."""
470 # Decode data to text, cache parsed representation,
471 # and find the key to use:
472 text
= base64
.urlsafe_b64decode(self
._data
.encode('utf-8'))
473 self
._parsed
_data
= self
._protocol
.ParseData(text
, self
._data
_type
)
474 self
._signer
_uri
= self
._protocol
.GetSignerURI(self
._parsed
_data
)
475 self
._signer
_public
_key
= self
._protocol
.GetPublicKey(self
._signer
_uri
)
477 # Get a verifier for that key:
478 verifier
= self
._protocol
.GetVerifierAlg(self
._signer
_public
_key
)
480 # Check whether the signature verifies; if not, abandon
482 if not verifier
.Verify(self
._data
, self
._sig
):
483 raise EnvelopeError(self
, 'Signature verification failed.')
485 def ToXML(self
, fulldoc
=True, indentation
=0):
486 """Turns envelope into serialized XML suitable for transmission.
489 fulldoc: Return a full XML document with <?xml...
490 indentation: Indent each line this number of spaces.
492 An XML document or fragment in string form.
494 assert self
._init
_timestamp
# Object must be successfully initialized
495 # TODO(jpanzer): Determine leeway period before requiring another
497 # (we can't keep an object sitting around in memory for a month without
498 # rechecking the signature).
500 # Template for a Magic Envelope:
502 template
= '<?xml version=\'1.0\' encoding=\'UTF-8\'?>'
506 <me:env xmlns:me='http://salmon-protocol.org/ns/magic-env'>
507 <me:encoding>%s</me:encoding>
508 <me:data type='application/atom+xml'>
517 text
= template
% (self
._encoding
,
518 _ToPretty(self
._data
, 4, 60),
520 _ToPretty(self
._sig
, 4, 60))
522 for line
in text
.strip().split('\n'):
523 indented_text
+= ' '*indentation
+ line
+ '\n'
527 def ToAtom(self
, fulldoc
=True, indentation
=0):
528 """Turns envelope into serialized Atom entry.
531 fulldoc: Return a full XML document with <?xml...
532 indentation: Indent each line this number of spaces.
534 An Atom entry XML document with an me:provenance element
535 containing the original magic signature data.
537 if not self
._parsed
_data
:
538 self
._parsed
_data
= self
._protocol
.ParseData(text
, self
._data
_type
)
540 d
= self
._parsed
_data
541 assert d
.getroot().tag
== _ATOM_NS
+'entry'
543 # Create a provenance and add it in.
544 prov_el
= et
.Element(_ME_NS
+'provenance')
545 data_el
= et
.SubElement(prov_el
, _ME_NS
+'data')
546 data_el
.set('type', self
._data
_type
)
547 data_el
.text
= '\n'+_ToPretty(self
._data
, indentation
+6, 60)
548 et
.SubElement(prov_el
, _ME_NS
+'encoding').text
= self
._encoding
549 et
.SubElement(prov_el
, _ME_NS
+'sig').text
= '\n'+_ToPretty(self
._sig
,
553 # Add in the provenance element:
554 d
.getroot().append(prov_el
)
557 self
._PrettyIndent
(d
.getroot(), indentation
/2)
559 # Turn it back into text for consumption:
560 text
= et
.tostring(d
.getroot(),encoding
='utf-8')
563 for line
in text
.strip().split('\n'):
564 if line
.strip() != '':
565 indented_text
+= ' '*indentation
+ line
+ '\n'
568 indented_text
= ('<?xml version=\'1.0\' encoding=\'UTF-8\'?>\n' +
573 """Returns envelope's verified data."""
574 return self
._protocol
.Decode(self
._data
, self
._encoding
)
576 def GetParsedData(self
):
577 """Returns envelope's verified data in parsed form."""
578 if not self
._parsed
_data
:
579 self
._parsed
_data
= self
._protocol
.ParseData(
580 self
._protocol
.Decode(self
._data
),
582 return self
._parsed
_data
584 def GetDataWithProvenance(self
):
585 """Returns envelope's data as a string with provenance attached."""
586 # TODO(jpanzer): Implement.
588 def GetParsedDataWithProvenance(self
):
589 """Returns data with provenance in parsed form."""
590 # TODO(jpanzer): Implement.
593 def _PrettyIndent(self
, elem
, level
=0):
594 """Prettifies an element tree in-place"""
595 # TODO(jpanzer): Avoid munging text nodes where it matters?
598 if not elem
.text
or not elem
.text
.strip():
600 if not elem
.tail
or not elem
.tail
.strip():
603 self
._PrettyIndent
(elem
, level
+1)
604 if not elem
.tail
or not elem
.tail
.strip():
607 if level
and (not elem
.tail
or not elem
.tail
.strip()):
610 def _ToPretty(text
, indent
, linelength
):
611 """Makes huge text lines pretty, or at least printable."""
613 tl
= linelength
- indent
615 for i
in range(0, len(text
), tl
):
618 output
+= ' ' * indent
+ text
[i
:i
+tl
]