blob: c19835ff63ea7264ba4d8033c4c45e2b368ddc60 [file] [log] [blame]
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////////
//! COSE Headers functionality.
use crate::{
cbor::value::Value,
common::AsCborValue,
iana,
iana::EnumI64,
util::{cbor_type_error, to_cbor_array, ValueTryAs},
Algorithm, CborSerializable, CoseError, CoseSignature, Label, RegisteredLabel, Result,
};
use alloc::{collections::BTreeSet, string::String, vec, vec::Vec};
#[cfg(test)]
mod tests;
/// Content type.
pub type ContentType = crate::RegisteredLabel<iana::CoapContentFormat>;
/// Structure representing a common COSE header map.
///
/// ```cddl
/// header_map = {
/// Generic_Headers,
/// * label => values
/// }
///
/// Generic_Headers = (
/// ? 1 => int / tstr, ; algorithm identifier
/// ? 2 => [+label], ; criticality
/// ? 3 => tstr / int, ; content type
/// ? 4 => bstr, ; key identifier
/// ? 5 => bstr, ; IV
/// ? 6 => bstr, ; Partial IV
/// ? 7 => COSE_Signature / [+COSE_Signature] ; Counter signature
/// )
/// ```
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Header {
/// Cryptographic algorithm to use
pub alg: Option<Algorithm>,
/// Critical headers to be understood
pub crit: Vec<RegisteredLabel<iana::HeaderParameter>>,
/// Content type of the payload
pub content_type: Option<ContentType>,
/// Key identifier.
pub key_id: Vec<u8>,
/// Full initialization vector
pub iv: Vec<u8>,
/// Partial initialization vector
pub partial_iv: Vec<u8>,
/// Counter signature
pub counter_signatures: Vec<CoseSignature>,
/// Any additional header (label,value) pairs. If duplicate labels are present, CBOR-encoding
/// will fail.
pub rest: Vec<(Label, Value)>,
}
impl Header {
/// Indicate whether the `Header` is empty.
pub fn is_empty(&self) -> bool {
self.alg.is_none()
&& self.crit.is_empty()
&& self.content_type.is_none()
&& self.key_id.is_empty()
&& self.iv.is_empty()
&& self.partial_iv.is_empty()
&& self.counter_signatures.is_empty()
&& self.rest.is_empty()
}
}
impl crate::CborSerializable for Header {}
const ALG: Label = Label::Int(iana::HeaderParameter::Alg as i64);
const CRIT: Label = Label::Int(iana::HeaderParameter::Crit as i64);
const CONTENT_TYPE: Label = Label::Int(iana::HeaderParameter::ContentType as i64);
const KID: Label = Label::Int(iana::HeaderParameter::Kid as i64);
const IV: Label = Label::Int(iana::HeaderParameter::Iv as i64);
const PARTIAL_IV: Label = Label::Int(iana::HeaderParameter::PartialIv as i64);
const COUNTER_SIG: Label = Label::Int(iana::HeaderParameter::CounterSignature as i64);
impl AsCborValue for Header {
fn from_cbor_value(value: Value) -> Result<Self> {
let m = value.try_as_map()?;
let mut headers = Self::default();
let mut seen = BTreeSet::new();
for (l, value) in m.into_iter() {
// The `ciborium` CBOR library does not police duplicate map keys.
// RFC 8152 section 14 requires that COSE does police duplicates, so do it here.
let label = Label::from_cbor_value(l)?;
if seen.contains(&label) {
return Err(CoseError::DuplicateMapKey);
}
seen.insert(label.clone());
match label {
ALG => headers.alg = Some(Algorithm::from_cbor_value(value)?),
CRIT => match value {
Value::Array(a) => {
if a.is_empty() {
return Err(CoseError::UnexpectedItem(
"empty array",
"non-empty array",
));
}
for v in a {
headers.crit.push(
RegisteredLabel::<iana::HeaderParameter>::from_cbor_value(v)?,
);
}
}
v => return cbor_type_error(&v, "array value"),
},
CONTENT_TYPE => {
headers.content_type = Some(ContentType::from_cbor_value(value)?);
if let Some(ContentType::Text(text)) = &headers.content_type {
if text.is_empty() {
return Err(CoseError::UnexpectedItem("empty tstr", "non-empty tstr"));
}
if text.trim() != text {
return Err(CoseError::UnexpectedItem(
"leading/trailing whitespace",
"no leading/trailing whitespace",
));
}
// Basic check that the content type is of form type/subtype.
// We don't check the precise definition though (RFC 6838 s4.2)
if text.matches('/').count() != 1 {
return Err(CoseError::UnexpectedItem(
"arbitrary text",
"text of form type/subtype",
));
}
}
}
KID => {
headers.key_id = value.try_as_nonempty_bytes()?;
}
IV => {
headers.iv = value.try_as_nonempty_bytes()?;
}
PARTIAL_IV => {
headers.partial_iv = value.try_as_nonempty_bytes()?;
}
COUNTER_SIG => {
let sig_or_sigs = value.try_as_array()?;
if sig_or_sigs.is_empty() {
return Err(CoseError::UnexpectedItem(
"empty sig array",
"non-empty sig array",
));
}
// The encoding of counter signature[s] is pesky:
// - a single counter signature is encoded as `COSE_Signature` (a 3-tuple)
// - multiple counter signatures are encoded as `[+ COSE_Signature]`
//
// Determine which is which by looking at the first entry of the array:
// - If it's a bstr, sig_or_sigs is a single signature.
// - If it's an array, sig_or_sigs is an array of signatures
match &sig_or_sigs[0] {
Value::Bytes(_) => headers
.counter_signatures
.push(CoseSignature::from_cbor_value(Value::Array(sig_or_sigs))?),
Value::Array(_) => {
for sig in sig_or_sigs.into_iter() {
headers
.counter_signatures
.push(CoseSignature::from_cbor_value(sig)?);
}
}
v => return cbor_type_error(v, "array or bstr value"),
}
}
label => headers.rest.push((label, value)),
}
// RFC 8152 section 3.1: "The 'Initialization Vector' and 'Partial Initialization
// Vector' parameters MUST NOT both be present in the same security layer."
if !headers.iv.is_empty() && !headers.partial_iv.is_empty() {
return Err(CoseError::UnexpectedItem(
"IV and partial-IV specified",
"only one of IV and partial IV",
));
}
}
Ok(headers)
}
fn to_cbor_value(mut self) -> Result<Value> {
let mut map = Vec::<(Value, Value)>::new();
if let Some(alg) = self.alg {
map.push((ALG.to_cbor_value()?, alg.to_cbor_value()?));
}
if !self.crit.is_empty() {
map.push((CRIT.to_cbor_value()?, to_cbor_array(self.crit)?));
}
if let Some(content_type) = self.content_type {
map.push((CONTENT_TYPE.to_cbor_value()?, content_type.to_cbor_value()?));
}
if !self.key_id.is_empty() {
map.push((KID.to_cbor_value()?, Value::Bytes(self.key_id)));
}
if !self.iv.is_empty() {
map.push((IV.to_cbor_value()?, Value::Bytes(self.iv)));
}
if !self.partial_iv.is_empty() {
map.push((PARTIAL_IV.to_cbor_value()?, Value::Bytes(self.partial_iv)));
}
if !self.counter_signatures.is_empty() {
if self.counter_signatures.len() == 1 {
// A single counter signature is encoded differently.
map.push((
COUNTER_SIG.to_cbor_value()?,
self.counter_signatures.remove(0).to_cbor_value()?,
));
} else {
map.push((
COUNTER_SIG.to_cbor_value()?,
to_cbor_array(self.counter_signatures)?,
));
}
}
let mut seen = BTreeSet::new();
for (label, value) in self.rest.into_iter() {
if seen.contains(&label) {
return Err(CoseError::DuplicateMapKey);
}
seen.insert(label.clone());
map.push((label.to_cbor_value()?, value));
}
Ok(Value::Map(map))
}
}
/// Builder for [`Header`] objects.
#[derive(Debug, Default)]
pub struct HeaderBuilder(Header);
impl HeaderBuilder {
builder! {Header}
builder_set! {key_id: Vec<u8>}
/// Set the algorithm.
#[must_use]
pub fn algorithm(mut self, alg: iana::Algorithm) -> Self {
self.0.alg = Some(Algorithm::Assigned(alg));
self
}
/// Add a critical header.
#[must_use]
pub fn add_critical(mut self, param: iana::HeaderParameter) -> Self {
self.0.crit.push(RegisteredLabel::Assigned(param));
self
}
/// Add a critical header.
#[must_use]
pub fn add_critical_label(mut self, label: RegisteredLabel<iana::HeaderParameter>) -> Self {
self.0.crit.push(label);
self
}
/// Set the content type to a numeric value.
#[must_use]
pub fn content_format(mut self, content_type: iana::CoapContentFormat) -> Self {
self.0.content_type = Some(ContentType::Assigned(content_type));
self
}
/// Set the content type to a text value.
#[must_use]
pub fn content_type(mut self, content_type: String) -> Self {
self.0.content_type = Some(ContentType::Text(content_type));
self
}
/// Set the IV, and clear any partial IV already set.
#[must_use]
pub fn iv(mut self, iv: Vec<u8>) -> Self {
self.0.iv = iv;
self.0.partial_iv.clear();
self
}
/// Set the partial IV, and clear any IV already set.
#[must_use]
pub fn partial_iv(mut self, iv: Vec<u8>) -> Self {
self.0.partial_iv = iv;
self.0.iv.clear();
self
}
/// Add a counter signature.
#[must_use]
pub fn add_counter_signature(mut self, sig: CoseSignature) -> Self {
self.0.counter_signatures.push(sig);
self
}
/// Set a header label:value pair. If duplicate labels are added to a [`Header`],
/// subsequent attempts to CBOR-encode the header will fail.
///
/// # Panics
///
/// This function will panic if it used to set a header label from the range [1, 6].
#[must_use]
pub fn value(mut self, label: i64, value: Value) -> Self {
if label >= iana::HeaderParameter::Alg.to_i64()
&& label <= iana::HeaderParameter::CounterSignature.to_i64()
{
panic!("value() method used to set core header parameter"); // safe: invalid input
}
self.0.rest.push((Label::Int(label), value));
self
}
/// Set a header label:value pair where the `label` is text.
#[must_use]
pub fn text_value(mut self, label: String, value: Value) -> Self {
self.0.rest.push((Label::Text(label), value));
self
}
}
/// Structure representing a protected COSE header map.
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ProtectedHeader {
/// If this structure was created by parsing serialized data, this field
/// holds the entire contents of the original `bstr` data.
pub original_data: Option<Vec<u8>>,
/// Parsed header information.
pub header: Header,
}
impl ProtectedHeader {
/// Constructor from a [`Value`] that holds a `bstr` encoded header.
#[inline]
pub fn from_cbor_bstr(val: Value) -> Result<Self> {
let data = val.try_as_bytes()?;
let header = if data.is_empty() {
// An empty bstr is used as a short cut for an empty header map.
Header::default()
} else {
Header::from_slice(&data)?
};
Ok(ProtectedHeader {
original_data: Some(data),
header,
})
}
/// Convert this header to a `bstr` encoded map, as a [`Value`], consuming the object along the
/// way.
#[inline]
pub fn cbor_bstr(self) -> Result<Value> {
Ok(Value::Bytes(
if let Some(original_data) = self.original_data {
original_data
} else if self.is_empty() {
vec![]
} else {
self.to_vec()?
},
))
}
/// Indicate whether the `ProtectedHeader` is empty.
pub fn is_empty(&self) -> bool {
self.header.is_empty()
}
}
impl crate::CborSerializable for ProtectedHeader {}
impl AsCborValue for ProtectedHeader {
fn from_cbor_value(value: Value) -> Result<Self> {
Ok(ProtectedHeader {
original_data: None,
header: Header::from_cbor_value(value)?,
})
}
fn to_cbor_value(self) -> Result<Value> {
self.header.to_cbor_value()
}
}