Add integration tests

Tests if repository json files conform to the json schema.
If all resources (images/icons/website URLs) they mention actually
exists.

And can also test writing images and the FAT modification code.
This commit is contained in:
Floris Bos 2022-11-19 23:49:43 +01:00
parent fce80b2a67
commit 05f1c4dbb5
11 changed files with 319 additions and 8 deletions

View file

@ -329,7 +329,6 @@
"required": [
"name",
"description",
"icon",
"subitems_url"
],
"properties": {
@ -415,7 +414,6 @@
"required": [
"name",
"description",
"icon",
"subitems"
],
"properties": {

View file

@ -57,6 +57,15 @@ int Cli::main()
parser.addOption(writeSystemDrive);
QCommandLineOption sha256Option("sha256", "Expected hash", "sha256", "");
parser.addOption(sha256Option);
QCommandLineOption cacheFileOption("cache-file", "Custom cache file (requires setting sha256 as well)", "cache-file", "");
parser.addOption(cacheFileOption);
QCommandLineOption firstRunScriptOption("first-run-script", "Add firstrun.sh to image", "first-run-script", "");
parser.addOption(firstRunScriptOption);
QCommandLineOption userdataOption("cloudinit-userdata", "Add cloud-init user-data file to image", "cloudinit-userdata", "");
parser.addOption(userdataOption);
QCommandLineOption networkconfigOption("cloudinit-networkconfig", "Add cloud-init network-config file to image", "cloudinit-networkconfig", "");
parser.addOption(networkconfigOption);
QCommandLineOption debugOption("debug", "Output debug messages to console");
parser.addOption(debugOption);
QCommandLineOption quietOption("quiet", "Only write to console on error");
@ -69,7 +78,7 @@ int Cli::main()
const QStringList args = parser.positionalArguments();
if (args.count() != 2)
{
std::cerr << "Usage: --cli [--disable-verify] [--sha256 <expected hash>] [--debug] [--quiet] <image file to write> <destination drive device>" << std::endl;
std::cerr << "Usage: --cli [--disable-verify] [--sha256 <expected hash> [--cache-file <cache file>]] [--first-run-script <script>] [--debug] [--quiet] <image file to write> <destination drive device>" << std::endl;
return 1;
}
@ -78,10 +87,17 @@ int Cli::main()
qInstallMessageHandler(devnullMsgHandler);
}
_quiet = parser.isSet(quietOption);
QByteArray initFormat = (parser.value(userdataOption).isEmpty()
&& parser.value(networkconfigOption).isEmpty() ) ? "systemd" : "cloudinit";
if (args[0].startsWith("http:", Qt::CaseInsensitive) || args[0].startsWith("https:", Qt::CaseInsensitive))
{
_imageWriter->setSrc(args[0], 0, 0, parser.value(sha256Option).toLatin1() );
_imageWriter->setSrc(args[0], 0, 0, parser.value(sha256Option).toLatin1(), false, "", "", initFormat);
if (!parser.value(cacheFileOption).isEmpty())
{
_imageWriter->setCustomCacheFile(parser.value(cacheFileOption), parser.value(sha256Option).toLatin1() );
}
}
else
{
@ -89,7 +105,7 @@ int Cli::main()
if (fi.isFile())
{
_imageWriter->setSrc(QUrl::fromLocalFile(args[0]), fi.size(), 0, parser.value(sha256Option).toLatin1() );
_imageWriter->setSrc(QUrl::fromLocalFile(args[0]), fi.size(), 0, parser.value(sha256Option).toLatin1(), false, "", "", initFormat);
}
else if (!fi.exists())
{
@ -140,6 +156,69 @@ int Cli::main()
}
}
if (!parser.value(userdataOption).isEmpty())
{
QByteArray userData, networkConfig;
QFile f(parser.value(userdataOption));
if (!f.exists())
{
std::cerr << "Error: user-data file does not exists" << std::endl;
return 1;
}
if (f.open(f.ReadOnly))
{
userData = f.readAll();
f.close();
}
else
{
std::cerr << "Error: opening user-data file" << std::endl;
return 1;
}
f.setFileName(parser.value(networkconfigOption));
if (!f.exists())
{
std::cerr << "Error: network-config file does not exists" << std::endl;
return 1;
}
if (f.open(f.ReadOnly))
{
networkConfig = f.readAll();
f.close();
}
else
{
std::cerr << "Error: opening network-config file" << std::endl;
return 1;
}
_imageWriter->setImageCustomization("", "", "", userData, networkConfig);
}
else if (!parser.value(firstRunScriptOption).isEmpty())
{
QByteArray firstRunScript;
QFile f(parser.value(firstRunScriptOption));
if (!f.exists())
{
std::cerr << "Error: firstrun script does not exists" << std::endl;
return 1;
}
if (f.open(f.ReadOnly))
{
firstRunScript = f.readAll();
f.close();
}
else
{
std::cerr << "Error: opening firstrun script" << std::endl;
return 1;
}
_imageWriter->setImageCustomization("", "", firstRunScript, "", "");
}
_imageWriter->setDst(args[1]);
_imageWriter->setVerifyEnabled(!parser.isSet(disableVerify));

View file

@ -66,7 +66,7 @@
ImageWriter::ImageWriter(QObject *parent)
: QObject(parent), _repo(QUrl(QString(OSLIST_URL))), _dlnow(0), _verifynow(0),
_engine(nullptr), _thread(nullptr), _verifyEnabled(false), _cachingEnabled(false),
_embeddedMode(false), _online(false), _trans(nullptr)
_embeddedMode(false), _online(false), _customCacheFile(false), _trans(nullptr)
{
connect(&_polltimer, SIGNAL(timeout()), SLOT(pollProgress()));
@ -349,8 +349,11 @@ void ImageWriter::startWrite()
void ImageWriter::onCacheFileUpdated(QByteArray sha256)
{
_settings.setValue("caching/lastDownloadSHA256", sha256);
_settings.sync();
if (!_customCacheFile)
{
_settings.setValue("caching/lastDownloadSHA256", sha256);
_settings.sync();
}
_cachedFileHash = sha256;
qDebug() << "Done writing cache file";
}
@ -421,6 +424,14 @@ void ImageWriter::setCustomOsListUrl(const QUrl &url)
_repo = url;
}
void ImageWriter::setCustomCacheFile(const QString &cacheFile, const QByteArray &sha256)
{
_cacheFileName = cacheFile;
_cachedFileHash = QFile::exists(cacheFile) ? sha256 : "";
_customCacheFile = true;
_cachingEnabled = true;
}
/* Start polling the list of available drives */
void ImageWriter::startDriveListPolling()
{

View file

@ -74,6 +74,9 @@ public:
/* Set custom repository */
Q_INVOKABLE void setCustomOsListUrl(const QUrl &url);
/* Set custom cache file */
void setCustomCacheFile(const QString &cacheFile, const QByteArray &sha256);
/* Utility function to open OS file dialog */
Q_INVOKABLE void openFileDialog();
@ -168,6 +171,7 @@ protected:
bool _verifyEnabled, _multipleFilesInZip, _cachingEnabled, _embeddedMode, _online;
QSettings _settings;
QMap<QString,QString> _translations;
bool _customCacheFile;
QTranslator *_trans;
#ifdef Q_OS_WIN
QWinTaskbarButton *_taskbarButton;

30
tests/README.md Normal file
View file

@ -0,0 +1,30 @@
Integration tests
===
Test if all json files in the public repository are correct (validate against json schema, and test all image files, icons and websites mentioned in the json do exist by performing HEAD requests)
```
$ cd tests
$ pytest
```
Test if a specific json file validates correctly
```
$ cd tests
$ pytest --repo=http://my-repo/os_list.json
```
Test image writes for all images in a repository
```
$ cd tests
$ truncate -s 16G loopfile
$ udisksctl loop-setup --file loopfile
Mapped file loopfile as /dev/loop24
$ sudo -g disk pytest test_write_images.py --repo=http://my-repo/os_list.json --device=/dev/loop24
```
Note: make sure automatic mounting of removable media is disabled in your Linux distribution during write tests.
You can also use real drives instead of loop files as device. But be very careful not to enter the wrong device. Writes are done for real, it is not a mock test...

1
tests/cache/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
*

90
tests/conftest.py Normal file
View file

@ -0,0 +1,90 @@
import pytest
import json
import urllib.request
os_list_files = []
icon_urls = set()
website_urls = set()
item_json = []
already_processed_urls = set()
total_download_size = 0
largest_extract_size = 0
def pytest_addoption(parser):
parser.addoption(
"--repo",
action="store",
default="https://downloads.raspberrypi.com/os_list_imagingutility_v3.json",
help="Repository URL to test"
)
parser.addoption(
"--device",
action="store",
default="",
help="(Loop) device if you want to perform actual image write tests"
)
def parse_json_entries(j):
global total_download_size, largest_extract_size
for item in j:
if "subitems" in item:
parse_json_entries(item["subitems"])
elif "subitems_url" in item:
parse_os_list(item["subitems_url"])
else:
if "icon" in item and not item["icon"].startswith("data:"):
icon_urls.add(item["icon"])
if "website" in item:
website_urls.add(item["website"])
if "url" in item:
item_json.append(item)
if "image_download_size" in item:
total_download_size += int(item["image_download_size"])
if "extract_size" in item:
largest_extract_size = max(largest_extract_size, int(item["extract_size"]))
def parse_os_list(url):
if url in already_processed_urls:
print("Circular reference! Already processed URL: {}".format(url))
return
already_processed_urls.add(url)
try:
print("Fetching OS list file {}".format(url))
req = urllib.request.Request(
url,
data=None,
headers={
'User-Agent': 'rpi-imager automated tests'
}
)
response = urllib.request.urlopen(req)
body = response.read()
j = json.loads(body)
os_list_files.append( (url,body) )
if "os_list" in j:
parse_json_entries(j["os_list"])
except Exception as err:
print("Error processing '{}': {}".format(url, repr(err) ))
def pytest_configure(config):
parse_os_list(config.getoption("--repo"))
print("Found {} os_list.json files {} OS images {} icons {} website URLs".format(
len(os_list_files), len(item_json), len(icon_urls), len(website_urls) ) )
print("Total compressed image download size: {} GB".format(round(total_download_size / 1024 ** 3) ))
print("Largest uncompressed image size: {} GB".format(round(largest_extract_size / 1024 ** 3) ))
def pytest_generate_tests(metafunc):
if "oslisttuple" in metafunc.fixturenames:
metafunc.parametrize("oslisttuple", os_list_files)
if "iconurl" in metafunc.fixturenames:
metafunc.parametrize("iconurl", icon_urls)
if "websiteurl" in metafunc.fixturenames:
metafunc.parametrize("websiteurl", website_urls)
if "imageitem" in metafunc.fixturenames:
metafunc.parametrize("imageitem", item_json)

5
tests/test_firstrun.txt Normal file
View file

@ -0,0 +1,5 @@
#!/bin/sh
echo "Bogus firstrun.sh file, used for automated testing of FAT16/FAT32 modification code only"
exit 0

28
tests/test_schema.py Normal file
View file

@ -0,0 +1,28 @@
import os
import json
import pytest
from jsonschema import validate
from jsonschema.exceptions import ValidationError
def test_os_list_json_against_schema(oslisttuple, schema):
oslisturl = oslisttuple[0]
oslistdata = oslisttuple[1]
errorMsg = ""
j = json.loads(oslistdata)
try:
validate(instance=j, schema=schema)
except ValidationError as err:
errorMsg = err.message
if errorMsg != "":
pytest.fail(oslisturl+" failed schema validation: "+errorMsg, False)
@pytest.fixture
def schema():
f = open(os.path.dirname(__file__)+"/../doc/json-schema/os-list-schema.json","r")
data = f.read()
return json.loads(data)

25
tests/test_urls.py Normal file
View file

@ -0,0 +1,25 @@
import urllib.request
def _head_request(url):
req = urllib.request.Request(
url,
data=None,
headers={
'User-Agent': 'rpi-imager automated tests'
},
method="HEAD"
)
return urllib.request.urlopen(req)
def test_icon_url_exists(iconurl):
assert _head_request(iconurl).status == 200
def test_website_url_exists(websiteurl):
assert _head_request(websiteurl).status == 200
def test_image_url_exists_and_has_correct_image_download_size(imageitem):
response = _head_request(imageitem["url"])
assert response.status == 200
assert str(imageitem["image_download_size"]) == response.headers["Content-Length"]

View file

@ -0,0 +1,40 @@
import pytest
import re
import subprocess
import os
import time
from shlex import quote
def shell(cmd, url):
msg = ''
try:
subprocess.run(cmd, shell=True, check=True, capture_output=True)
except subprocess.CalledProcessError as err:
msg = "{} Error running '{}' exit code {} stderr: '{}'".format(url, err.cmd, err.returncode, err.output)
if msg != '':
pytest.fail(msg, False)
def test_write_image_and_modify_fat(imageitem, device):
if not device:
pytest.skip("--device=<device> not specified. Skipping write tests")
return
assert "extract_sha256" in imageitem, "{}: missing extract_sha256. Cannot perform write test.".format(imageitem["url"])
assert "image_download_size" in imageitem, "{}: missing image_download_size. Cannot perform write test.".format(imageitem["url"])
assert re.search("^[a-z0-9]{64}$", imageitem["extract_sha256"]) != None
cacheFile = "cache/"+imageitem["extract_sha256"]
if os.path.exists(cacheFile) and os.path.getsize(cacheFile) != imageitem["image_download_size"]:
os.remove(cacheFile)
shell("rpi-imager --cli --quiet --enable-writing-system-drives --sha256 {} --cache-file {} --first-run-script test_firstrun.txt {} {}".format(
quote(imageitem["extract_sha256"]), quote(cacheFile), quote(imageitem["url"]), quote(device) ), imageitem["url"])
time.sleep(0.5)
shell("fsck.vfat -n "+quote(device+"p1"), imageitem["url"])
@pytest.fixture
def device(request):
return request.config.getoption("--device")