Skip to content

Commit

Permalink
Fix some incomplete metadata issue partially
Browse files Browse the repository at this point in the history
When data cannot be retrieved from immudb, or when data for package is
missing, if the --rpm-package option was used to specify an rpm package,
I made it so that the rpm package is used to supplement the information.

The following issues have been partially fixed.
 - AlmaLinux#42
 - AlmaLinux#44

And the following issues have been fixed.
 - AlmaLinux#26
  • Loading branch information
kawaharasouta authored and KAWAHARA-souta committed Aug 26, 2024
1 parent f755887 commit e9310e0
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 116 deletions.
276 changes: 162 additions & 114 deletions alma_sbom.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import dataclasses
import os
import sys
import contextlib
import rpm
from logging import basicConfig, getLogger, DEBUG, INFO, WARNING
from collections import defaultdict
from typing import Dict, List, Literal, Optional, Tuple
Expand All @@ -14,6 +16,7 @@

from libsbom import cyclonedx as alma_cyclonedx
from libsbom import spdx as alma_spdx
from libsbom import common

ALBS_URL = 'https://build.almalinux.org'
IS_SIGNED = 3
Expand Down Expand Up @@ -195,7 +198,33 @@ def _generate_purl(package_nevra: PackageNevra, source_rpm: str):
return purl


def add_package_source_info(immudb_metadata: Dict, component: Dict):
def _add_package_build_info(immudb_metadata: Dict, component: Dict, albs_url: str = None, build_url: str = None):
component['properties'].extend(
[
{
'name': 'almalinux:package:buildhost',
'value': immudb_metadata['build_host'],
},
{
'name': 'almalinux:albs:build:targetArch',
'value': immudb_metadata['build_arch'],
},
{
'name': 'almalinux:albs:build:ID',
'value': immudb_metadata['build_id'],
},
{
'name': 'almalinux:albs:build:URL',
'value': build_url or f'{albs_url}/build/{immudb_metadata["build_id"]}',
},
{
'name': 'almalinux:albs:build:author',
'value': immudb_metadata['built_by'],
},
]
)

def _add_package_source_info(immudb_metadata: Dict, component: Dict):
if immudb_metadata['source_type'] == 'git':
component['properties'].extend(
[
Expand Down Expand Up @@ -245,46 +274,37 @@ def add_package_source_info(immudb_metadata: Dict, component: Dict):
]
)


def get_info_about_package(
def _get_each_package_component(
immudb_info_about_package: Dict,
albs_url: str,
immudb_wrapper: ImmudbWrapper,
build_url: str = None,
immudb_hash: str = None,
rpm_package: str = None,
):
result = {}
immudb_info_about_package = _extract_immudb_info_about_package(
immudb_wrapper=immudb_wrapper,
immudb_hash=immudb_hash,
rpm_package=rpm_package,
)
source_rpm, package_nevra = _get_specific_info_about_package(
immudb_info_about_package=immudb_info_about_package,
)
immudb_hash = immudb_hash or immudb_info_about_package['Hash']
immudb_metadata = immudb_info_about_package['Metadata']
result['version'] = 1
if 'unsigned_hash' in immudb_metadata:
result['version'] += 1
result['metadata'] = {}
result['metadata']['component'] = {
result = {
'name': package_nevra.name,
'version': (
f'{package_nevra.epoch if package_nevra.epoch else ""}'
f'{":" if package_nevra.epoch else ""}'
f'{package_nevra.version}-{package_nevra.release}'
),
'cpe': _generate_cpe(package_nevra=package_nevra),
'purl': _generate_purl(
package_nevra=package_nevra,
source_rpm=source_rpm,
),
'hashes': [
{
'alg': 'SHA-256',
'content': immudb_hash,
}
],
'cpe': _generate_cpe(package_nevra=package_nevra),
'purl': _generate_purl(
package_nevra=package_nevra,
source_rpm=source_rpm,
),
'properties': [
{
'name': 'almalinux:package:epoch',
Expand All @@ -306,18 +326,10 @@ def get_info_about_package(
'name': 'almalinux:package:sourcerpm',
'value': source_rpm,
},
{
'name': 'almalinux:package:buildhost',
'value': immudb_metadata['build_host'],
},
{
'name': 'almalinux:package:timestamp',
'value': immudb_info_about_package['timestamp'],
},
{
'name': 'almalinux:albs:build:targetArch',
'value': immudb_metadata['build_arch'],
},
{
'name': 'almalinux:albs:build:packageType',
'value': 'rpm',
Expand All @@ -326,25 +338,122 @@ def get_info_about_package(
'name': 'almalinux:sbom:immudbHash',
'value': immudb_hash,
},
{
'name': 'almalinux:albs:build:ID',
'value': immudb_metadata['build_id'],
},
{
'name': 'almalinux:albs:build:URL',
'value': f'{albs_url}/build/{immudb_metadata["build_id"]}',
},
{
'name': 'almalinux:albs:build:author',
'value': immudb_metadata['built_by'],
},
],
}

add_package_source_info(
immudb_metadata=immudb_metadata,
component=result['metadata']['component'],
build_info_fields = ['build_host', 'build_arch', 'build_id', 'built_by']
is_build_info, missing_fields = common.check_required_data(immudb_metadata, build_info_fields)
if is_build_info:
_add_package_build_info(
immudb_metadata=immudb_metadata,
component=result,
albs_url=albs_url,
build_url=build_url
)
else:
_logger.warning(f'build info are lacking.')

if 'source_type' in immudb_metadata:
_add_package_source_info(
immudb_metadata=immudb_metadata,
component=result,
)
else:
_logger.warning(f'source info are lacking.')

return result

def comp_package_info(
immudb_info_about_package: Dict,
rpm_package: str = None,
):
ts = None
if not rpm_package:
pass
else:
ts = rpm.TransactionSet()
try:
fd = os.open(rpm_package, os.O_RDONLY)
hdr = ts.hdrFromFdno(fd)
except OSError as e:
raise RuntimeError(f'File open error: {e.strerror}') from e
except rpm.error as e:
raise RuntimeError(f'RPM error: {str(e)}') from e
finally:
if fd is not None:
with contextlib.suppress(Exception):
os.close(fd)

if 'Hash' not in immudb_info_about_package:
if rpm_package is not None:
immudb_info_about_package['Hash'] = ImmudbWrapper.hash_file(self=ImmudbWrapper, file_path=rpm_package)
else:
raise ValueError('Cannot get required package info from immudb or The data is lacking. Cannot make SBOM.')

immudb_metadata = immudb_info_about_package['Metadata'] if 'Metadata' in immudb_info_about_package else {}
if immudb_metadata == {}: # There isn't metadata on immudb
immudb_metadata['sbom_api'] = '0.0'

required_fields = ['name', 'epoch', 'version', 'release', 'arch', 'sourcerpm']
dict_field_rpmtag = {
'name': rpm.RPMTAG_NAME,
'epoch': rpm.RPMTAG_EPOCH,
'version': rpm.RPMTAG_VERSION,
'release': rpm.RPMTAG_RELEASE,
'arch': rpm.RPMTAG_ARCH,
'sourcerpm': rpm.RPMTAG_SOURCERPM,
}
is_required_data, missing_fields = common.check_required_data(immudb_metadata, required_fields)
if not is_required_data:
_logger.warning('Required data are missing')
if ts is None:
raise ValueError('Cannot get required package info from immudb or The data is lacking.')
else:
_logger.warning('Complete the data from the RPM package information.')
for field in missing_fields:
immudb_metadata[field] = hdr[dict_field_rpmtag[field]]
### NOTE
### There are little bit difference of buildtime between immudb_metadata & rpm_package.
### So, now we don't set buildtime using rpm_package info.
### According to the specifications of extractimmudb_info_about_package, even if there is no timestamp
### info in immudb, None will be stored.
### Or, We should set it anymore? because whenever this code is executed, immudb_metadata is None or lacking.
### If you want do this, uncomment below block.
# if 'timestamp' not in immudb_info_about_package or immudb_info_about_package['timestamp'] is None:
# immudb_info_about_package['timestamp'] = hdr[rpm.RPMTAG_BUILDTIME]

immudb_info_about_package['Metadata'] = immudb_metadata

def get_info_about_package(
albs_url: str,
immudb_wrapper: ImmudbWrapper,
immudb_hash: str = None,
rpm_package: str = None,
):
result = {}

immudb_info_about_package = _extract_immudb_info_about_package(
immudb_wrapper=immudb_wrapper,
immudb_hash=immudb_hash,
rpm_package=rpm_package,
)
comp_package_info(
immudb_info_about_package=immudb_info_about_package,
rpm_package=rpm_package,
)
immudb_metadata = immudb_info_about_package['Metadata']
result['version'] = 1
if 'unsigned_hash' in immudb_metadata:
result['version'] += 1
result['metadata'] = {}

result['metadata']['component'] = _get_each_package_component(
immudb_info_about_package=immudb_info_about_package,
albs_url = albs_url,
immudb_hash=immudb_hash,
rpm_package=rpm_package,
)

return result


Expand Down Expand Up @@ -389,82 +498,21 @@ def get_info_about_build(
if artifact['type'] != 'rpm':
continue
immudb_hash = artifact['cas_hash']
result_of_execution = _extract_immudb_info_about_package(

immudb_info_about_package = _extract_immudb_info_about_package(
immudb_wrapper=immudb_wrapper,
immudb_hash=immudb_hash,
)
immudb_metadata = result_of_execution['Metadata']
source_rpm, package_nevra = _get_specific_info_about_package(
immudb_info_about_package=result_of_execution,
comp_package_info(
immudb_info_about_package=immudb_info_about_package,
)
component = {
'name': package_nevra.name,
'version': package_nevra.version,
'cpe': _generate_cpe(package_nevra=package_nevra),
'purl': _generate_purl(
package_nevra=package_nevra,
source_rpm=source_rpm,
),
'hashes': [
{
'alg': 'SHA-256',
'content': immudb_hash,
}
],
'properties': [
{
'name': 'almalinux:package:epoch',
'value': package_nevra.epoch,
},
{
'name': 'almalinux:package:version',
'value': package_nevra.version,
},
{
'name': 'almalinux:package:release',
'value': package_nevra.release,
},
{
'name': 'almalinux:package:arch',
'value': package_nevra.arch,
},
{
'name': 'almalinux:package:sourcerpm',
'value': source_rpm,
},
{
'name': 'almalinux:package:buildhost',
'value': immudb_metadata['build_host'],
},
{
'name': 'almalinux:albs:build:targetArch',
'value': immudb_metadata['build_arch'],
},
{
'name': 'almalinux:albs:build:packageType',
'value': 'rpm',
},
{
'name': 'almalinux:sbom:immudbHash',
'value': result_of_execution['Hash'],
},
{
'name': 'almalinux:albs:build:ID',
'value': build_id,
},
{
'name': 'almalinux:albs:build:URL',
'value': build_url,
},
{
'name': 'almalinux:albs:build:author',
'value': immudb_metadata['built_by'],
},
],
}
add_package_source_info(
immudb_metadata=immudb_metadata,
component=component,

component = _get_each_package_component(
immudb_info_about_package=immudb_info_about_package,
albs_url = albs_url,
build_url = build_url,
immudb_hash=immudb_hash,
rpm_package=rpm_package,
)
components.append(component)
result['components'] = components
Expand Down
21 changes: 21 additions & 0 deletions libsbom/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
import typing

def check_required_data(
data_dict: typing.Dict[str, any],
required_fields: typing.List[str],
) -> typing.Tuple[bool, typing.List[str]]:
"""
Check if all the required fields exist in the specified data dictionary
Args:
data_dict (Dict[str, any]): A dictionary containing the data to be checked
required_fields (List[str]): A list of required field names
Returns:
Tuple[bool, List[str]]:
- bool: If all required fields exist, return True; otherwise, return False
- List[str]: A list of missing field names
"""

missing_fields = [field for field in required_fields if field not in data_dict]
return not bool(missing_fields), missing_fields


def replace_patterns(input_str: str, patterns: typing.Dict[str, str]) -> str:
"""Convenience function to perform multiple string replacements."""

Expand Down
2 changes: 0 additions & 2 deletions libsbom/spdx.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,6 @@ def add_package(self, component, build):
pkg.built_date = component_get_buildtime(
component
) or build_get_timestamp(build)
if not pkg.built_date:
raise ValueError(f"Cannot determine build time of {pkg.name}")

pkg.files_analyzed = False

Expand Down

0 comments on commit e9310e0

Please sign in to comment.