|
| 1 | +import json |
| 2 | +import os |
| 3 | +import io |
| 4 | +import tarfile |
| 5 | +import shutil |
| 6 | +import re |
| 7 | +import urllib |
| 8 | +import pytest |
| 9 | +import tempfile |
| 10 | +import logging |
| 11 | +import requests_mock |
| 12 | + |
| 13 | +from os import makedirs |
| 14 | +from os.path import join |
| 15 | +from unittest.mock import patch, MagicMock, mock_open |
| 16 | +from zipfile import ZipFile |
| 17 | + |
| 18 | +from repo2docker.contentproviders.swhid import Swhid, parse_swhid |
| 19 | +from repo2docker.contentproviders.base import ContentProviderException |
| 20 | + |
| 21 | + |
| 22 | +# this is a slightly stripped down copy of swh.model.cli.swhid_of_dir(). |
| 23 | +# We do not use this later to prevent having to depend on swh.model[cli] |
| 24 | +def swhid_of_dir(path): |
| 25 | + object = Directory.from_disk(path=path).get_data() |
| 26 | + return swhid(DIRECTORY, object) |
| 27 | + |
| 28 | + |
| 29 | +def test_content_id(): |
| 30 | + swhid = Swhid() |
| 31 | + assert swhid.content_id is None |
| 32 | + |
| 33 | + |
| 34 | +swhids_ok = [ |
| 35 | + "swh:1:dir:" + "0" * 40, |
| 36 | + "swh:1:rev:" + "0" * 40, |
| 37 | +] |
| 38 | +swhids_invalid = [ |
| 39 | + "swh:1:dir:" + "0" * 39, |
| 40 | + "swh:2:dir:" + "0" * 40, |
| 41 | + "swh:1:rev:" + "0" * 41, |
| 42 | + "swh:1:cnt:" + "0" * 40, |
| 43 | + "swh:1:ori:" + "0" * 40, |
| 44 | + "swh:1:rel:" + "0" * 40, |
| 45 | + "swh:1:snp:" + "0" * 40, |
| 46 | +] |
| 47 | + |
| 48 | +detect_values = [ |
| 49 | + (swhid, {"swhid": swhid, "swhid_obj": parse_swhid(swhid)}) for swhid in swhids_ok |
| 50 | +] + [(swhid, None) for swhid in swhids_invalid] |
| 51 | + |
| 52 | + |
| 53 | +@pytest.mark.parametrize("swhid, expected", detect_values) |
| 54 | +def test_detect(swhid, expected): |
| 55 | + provider = Swhid() |
| 56 | + assert provider.detect(swhid) == expected |
| 57 | + |
| 58 | + |
| 59 | +def fake_urlopen(req): |
| 60 | + print(req) |
| 61 | + return req.headers |
| 62 | + |
| 63 | + |
| 64 | +def test_unresolving_swhid(): |
| 65 | + provider = Swhid() |
| 66 | + |
| 67 | + # swhid = "0" * 40 |
| 68 | + # assert provider.swhid2url(swhid) is swhid |
| 69 | + |
| 70 | + |
| 71 | +NULLID = "0" * 40 |
| 72 | + |
| 73 | + |
| 74 | +@pytest.fixture |
| 75 | +def gen_tarfile(tmpdir): |
| 76 | + rootdir = join(tmpdir, "tmp") |
| 77 | + makedirs(rootdir) |
| 78 | + with open(join(rootdir, "file1.txt"), "wb") as fobj: |
| 79 | + fobj.write(b"Some content\n") |
| 80 | + |
| 81 | + # this directory hash can be computed using the swh.model package, but we do |
| 82 | + # nto want to depend on this later to limit dependencies and because it |
| 83 | + # does not support python 3.6; |
| 84 | + dirhash = "89a3bd29a2c5ae0b1465febbe5df09730a8576fe" |
| 85 | + buf = io.BytesIO() |
| 86 | + tarf = tarfile.open(name=dirhash, fileobj=buf, mode="w") |
| 87 | + tarf.add(rootdir, arcname=dirhash) |
| 88 | + tarf.close() |
| 89 | + shutil.rmtree(rootdir) |
| 90 | + return dirhash, buf.getvalue() |
| 91 | + |
| 92 | + |
| 93 | +def mocked_provider(tmpdir, dirhash, tarfile_buf): |
| 94 | + provider = Swhid() |
| 95 | + adapter = requests_mock.Adapter() |
| 96 | + provider.base_url = "mock://api/1" |
| 97 | + provider.retry_delay = 0.1 |
| 98 | + provider.session.mount("mock://", adapter) |
| 99 | + |
| 100 | + adapter.register_uri( |
| 101 | + "GET", |
| 102 | + "mock://api/1/revision/{}/".format(NULLID), |
| 103 | + json={ |
| 104 | + "author": { "fullname": "John Doe <[email protected]>"}, |
| 105 | + "directory": dirhash, |
| 106 | + }, |
| 107 | + ) |
| 108 | + adapter.register_uri( |
| 109 | + "POST", |
| 110 | + "mock://api/1/vault/directory/{}/".format(dirhash), |
| 111 | + json={ |
| 112 | + "fetch_url": "mock://api/1/vault/directory/{}/raw/".format(dirhash), |
| 113 | + "status": "new", |
| 114 | + }, |
| 115 | + ) |
| 116 | + adapter.register_uri( |
| 117 | + "GET", |
| 118 | + "mock://api/1/vault/directory/{}/".format(dirhash), |
| 119 | + [ |
| 120 | + { |
| 121 | + "json": { |
| 122 | + "fetch_url": "mock://api/1/vault/directory/{}/raw/".format(dirhash), |
| 123 | + "status": "pending", |
| 124 | + } |
| 125 | + }, |
| 126 | + { |
| 127 | + "json": { |
| 128 | + "fetch_url": "mock://api/1/vault/directory/{}/raw/".format(dirhash), |
| 129 | + "status": "done", |
| 130 | + } |
| 131 | + }, |
| 132 | + ], |
| 133 | + ) |
| 134 | + adapter.register_uri( |
| 135 | + "GET", |
| 136 | + "mock://api/1/vault/directory/{}/raw/".format(dirhash), |
| 137 | + content=tarfile_buf, |
| 138 | + ) |
| 139 | + return provider |
| 140 | + |
| 141 | + |
| 142 | +def test_fetch_revision(tmpdir, gen_tarfile): |
| 143 | + dir_id, tarfile_buf = gen_tarfile |
| 144 | + provider = mocked_provider(tmpdir, dir_id, tarfile_buf) |
| 145 | + swhid = "swh:1:rev:" + NULLID |
| 146 | + for log in provider.fetch(provider.detect(swhid), tmpdir): |
| 147 | + print(log) |
| 148 | + assert provider.content_id == "swh:1:dir:" + dir_id |
| 149 | + |
| 150 | + |
| 151 | +def test_fetch_directory(tmpdir, gen_tarfile): |
| 152 | + dir_id, tarfile_buf = gen_tarfile |
| 153 | + provider = mocked_provider(tmpdir, dir_id, tarfile_buf) |
| 154 | + swhid = "swh:1:dir:" + dir_id |
| 155 | + for log in provider.fetch(provider.detect(swhid), tmpdir): |
| 156 | + print(log) |
| 157 | + assert provider.content_id == swhid |
0 commit comments