Source code for pyhpke.cipher_suite

import struct
from typing import Optional, Tuple

from .aead import AEAD, AEADParams
from .aead_interface import AEADInterface
from .consts import AEADId, KDFId, KEMId, Mode
from .context_interface import ContextInterface
from .exporter_context import ExporterContext
from .kdf import KDF
from .kdf_interface import KDFInterface
from .kem import KEM
from .kem_interface import KEMInterface
from .kem_key import KEMKeyPair
from .kem_key_interface import KEMKeyInterface
from .recipient_context import RecipientContext
from .sender_context import SenderContext

[docs] class CipherSuite(object): """ An HPKE cipher suite which consists of KEM, KDF and AEAD. """ def __init__(self, kem: KEMInterface, kdf: KDFInterface, aead: AEADInterface): self._kem = kem self._kdf = kdf self._aead = aead return
[docs] @classmethod def new(cls, kem_id: KEMId, kdf_id: KDFId, aead_id: AEADId): """ Constructor of HPKE cipher suite. Args: kem_id (KEMId): A KEM (Key Encapsulation Mechanism) identifier. kdf_id (KDFId): A KDF (Key Derivation Function) identifier. aead_id (AEADId): An AEAD (Authenticated Encryption with Additional Data) identifier. Returns: bytes: A CipherSuite object. """ suite_id = b"HPKE" + struct.pack(">HHH", kem_id.value, kdf_id.value, aead_id.value) kem = KEM(kem_id) kdf = KDF(kdf_id, suite_id) aead = AEAD(aead_id) return cls(kem, kdf, aead)
@property def kem(self) -> KEMInterface: """ The KEM context in the cipher suite. """ return self._kem @property def kdf(self) -> KDFInterface: """ The KDF context in the cipher suite. """ return self._kdf @property def aead(self) -> AEADInterface: """ The AEAD context in the cipher suite. """ return self._aead
[docs] def create_sender_context( self, pkr: KEMKeyInterface, info: bytes = b"", sks: Optional[KEMKeyInterface] = None, psk: bytes = b"", psk_id: bytes = b"", eks: Optional[KEMKeyPair] = None, ) -> Tuple[bytes, ContextInterface]: """ Creates a sender context. """ mode: Mode = Mode.BASE if psk == b"": mode = Mode.BASE if sks is None else Mode.AUTH else: mode = Mode.PSK if sks is None else Mode.AUTH_PSK shared_secret, enc = self._kem.encap(pkr, sks, eks) return enc, self._key_schedule_s(mode, shared_secret, info, psk, psk_id)
[docs] def create_recipient_context( self, enc: bytes, skr: KEMKeyInterface, info: bytes = b"", pks: Optional[KEMKeyInterface] = None, psk: bytes = b"", psk_id: bytes = b"", ) -> ContextInterface: """ Creates a recipient context. """ mode: Mode = Mode.BASE if psk == b"": mode = Mode.BASE if pks is None else Mode.AUTH else: mode = Mode.PSK if pks is None else Mode.AUTH_PSK shared_secret = self._kem.decap(enc, skr, pks) return self._key_schedule_r(mode, shared_secret, info, psk, psk_id)
[docs] def seal( self, pkr: KEMKeyInterface, pt: bytes, info: bytes = b"", aad: bytes = b"", psk: bytes = b"", psk_id: bytes = b"", sks: Optional[KEMKeyInterface] = None, ) -> Tuple[bytes, bytes]: """ Does a single-shot encryption. """ raise NotImplementedError()
[docs] def open( self, enc: bytes, skr: KEMKeyInterface, ct: bytes, info: bytes = b"", aad: bytes = b"", psk: bytes = b"", psk_id: bytes = b"", pks: Optional[KEMKeyInterface] = None, ) -> bytes: """ Does a single-shot decryption. """ raise NotImplementedError()
def _key_schedule( self, mode: Mode, shared_secret: bytes, info: bytes, psk: bytes, psk_id: bytes, ) -> Tuple[KDF, AEADParams]: suite_id = b"HPKE" + struct.pack(">HHH",,, kdf = KDF(, suite_id) psk_id_hash = kdf.labeled_extract(b"", b"psk_id_hash", psk_id) info_hash = kdf.labeled_extract(b"", b"info_hash", info) key_schedule_context = bytes([mode.value]) + psk_id_hash + info_hash secret = kdf.labeled_extract(shared_secret, b"secret", psk) key = kdf.labeled_expand(secret, b"key", key_schedule_context, self._aead.key_size) base_nonce = kdf.labeled_expand(secret, b"base_nonce", key_schedule_context, self._aead.nonce_size) exporter_secret = kdf.labeled_expand(secret, b"exp", key_schedule_context, kdf.digest_size) return kdf, AEADParams(self._aead, key, base_nonce, 0, exporter_secret) def _key_schedule_s( self, mode: Mode, shared_secret: bytes, info: bytes, psk: bytes, psk_id: bytes, ) -> ContextInterface: kdf, params = self._key_schedule(mode, shared_secret, info, psk, psk_id) if params.key == b"": return ExporterContext(kdf, params.exporter_secret) return SenderContext(kdf, params) def _key_schedule_r( self, mode: Mode, shared_secret: bytes, info: bytes, psk: bytes, psk_id: bytes, ) -> ContextInterface: kdf, params = self._key_schedule(mode, shared_secret, info, psk, psk_id) if params.key == b"": return ExporterContext(kdf, params.exporter_secret) return RecipientContext(kdf, params)