15 | 15 |
import io
|
16 | 16 |
import os
|
17 | 17 |
import re
|
|
18 |
import struct
|
18 | 19 |
import subprocess
|
19 | 20 |
import tempfile
|
20 | 21 |
from unittest import mock
|
|
59 | 60 |
self._created_files.append(fn)
|
60 | 61 |
subprocess.check_output(
|
61 | 62 |
'qemu-img create -f %s %s %i' % (fmt, fn, size),
|
|
63 |
shell=True)
|
|
64 |
return fn
|
|
65 |
|
|
66 |
def _create_allocated_vmdk(self, size_mb):
|
|
67 |
# We need a "big" VMDK file to exercise some parts of the code of the
|
|
68 |
# format_inspector. A way to create one is to first create an empty
|
|
69 |
# file, and then to convert it with the -S 0 option.
|
|
70 |
fn = tempfile.mktemp(prefix='glance-unittest-formatinspector-',
|
|
71 |
suffix='.vmdk')
|
|
72 |
self._created_files.append(fn)
|
|
73 |
zeroes = tempfile.mktemp(prefix='glance-unittest-formatinspector-',
|
|
74 |
suffix='.zero')
|
|
75 |
self._created_files.append(zeroes)
|
|
76 |
|
|
77 |
# Create an empty file
|
|
78 |
subprocess.check_output(
|
|
79 |
'dd if=/dev/zero of=%s bs=1M count=%i' % (zeroes, size_mb),
|
|
80 |
shell=True)
|
|
81 |
|
|
82 |
# Convert it to VMDK
|
|
83 |
subprocess.check_output(
|
|
84 |
'qemu-img convert -f raw -O vmdk -S 0 %s %s' % (zeroes, fn),
|
62 | 85 |
shell=True)
|
63 | 86 |
return fn
|
64 | 87 |
|
|
118 | 141 |
def test_vmdk(self):
|
119 | 142 |
self._test_format('vmdk')
|
120 | 143 |
|
|
144 |
def test_vmdk_bad_descriptor_offset(self):
|
|
145 |
format_name = 'vmdk'
|
|
146 |
image_size = 10 * units.Mi
|
|
147 |
descriptorOffsetAddr = 0x1c
|
|
148 |
BAD_ADDRESS = 0x400
|
|
149 |
img = self._create_img(format_name, image_size)
|
|
150 |
|
|
151 |
# Corrupt the header
|
|
152 |
fd = open(img, 'r+b')
|
|
153 |
fd.seek(descriptorOffsetAddr)
|
|
154 |
fd.write(struct.pack('<Q', BAD_ADDRESS // 512))
|
|
155 |
fd.close()
|
|
156 |
|
|
157 |
# Read the format in various sizes, some of which will read whole
|
|
158 |
# sections in a single read, others will be completely unaligned, etc.
|
|
159 |
for block_size in (64 * units.Ki, 512, 17, 1 * units.Mi):
|
|
160 |
fmt = self._test_format_at_block_size(format_name, img, block_size)
|
|
161 |
self.assertTrue(fmt.format_match,
|
|
162 |
'Failed to match %s at size %i block %i' % (
|
|
163 |
format_name, image_size, block_size))
|
|
164 |
self.assertEqual(0, fmt.virtual_size,
|
|
165 |
('Calculated a virtual size for a corrupt %s at '
|
|
166 |
'size %i block %i') % (format_name, image_size,
|
|
167 |
block_size))
|
|
168 |
|
|
169 |
def test_vmdk_bad_descriptor_mem_limit(self):
|
|
170 |
format_name = 'vmdk'
|
|
171 |
image_size = 5 * units.Mi
|
|
172 |
virtual_size = 5 * units.Mi
|
|
173 |
descriptorOffsetAddr = 0x1c
|
|
174 |
descriptorSizeAddr = descriptorOffsetAddr + 8
|
|
175 |
twoMBInSectors = (2 << 20) // 512
|
|
176 |
# We need a big VMDK because otherwise we will not have enough data to
|
|
177 |
# fill-up the CaptureRegion.
|
|
178 |
img = self._create_allocated_vmdk(image_size // units.Mi)
|
|
179 |
|
|
180 |
# Corrupt the end of descriptor address so it "ends" at 2MB
|
|
181 |
fd = open(img, 'r+b')
|
|
182 |
fd.seek(descriptorSizeAddr)
|
|
183 |
fd.write(struct.pack('<Q', twoMBInSectors))
|
|
184 |
fd.close()
|
|
185 |
|
|
186 |
# Read the format in various sizes, some of which will read whole
|
|
187 |
# sections in a single read, others will be completely unaligned, etc.
|
|
188 |
for block_size in (64 * units.Ki, 512, 17, 1 * units.Mi):
|
|
189 |
fmt = self._test_format_at_block_size(format_name, img, block_size)
|
|
190 |
self.assertTrue(fmt.format_match,
|
|
191 |
'Failed to match %s at size %i block %i' % (
|
|
192 |
format_name, image_size, block_size))
|
|
193 |
self.assertEqual(virtual_size, fmt.virtual_size,
|
|
194 |
('Failed to calculate size for %s at size %i '
|
|
195 |
'block %i') % (format_name, image_size,
|
|
196 |
block_size))
|
|
197 |
memory = sum(fmt.context_info.values())
|
|
198 |
self.assertLess(memory, 1.5 * units.Mi,
|
|
199 |
'Format used more than 1.5MiB of memory: %s' % (
|
|
200 |
fmt.context_info))
|
|
201 |
|
121 | 202 |
def test_vdi(self):
|
122 | 203 |
self._test_format('vdi')
|
123 | 204 |
|
|
274 | 355 |
self.assertEqual(format_inspector.QcowInspector,
|
275 | 356 |
format_inspector.get_inspector('qcow2'))
|
276 | 357 |
self.assertIsNone(format_inspector.get_inspector('foo'))
|
|
358 |
|
|
359 |
|
|
360 |
class TestFormatInspectorsTargeted(test_utils.BaseTestCase):
|
|
361 |
def _make_vhd_meta(self, guid_raw, item_length):
|
|
362 |
# Meta region header, padded to 32 bytes
|
|
363 |
data = struct.pack('<8sHH', b'metadata', 0, 1)
|
|
364 |
data += b'0' * 20
|
|
365 |
|
|
366 |
# Metadata table entry, 16-byte GUID, 12-byte information,
|
|
367 |
# padded to 32-bytes
|
|
368 |
data += guid_raw
|
|
369 |
data += struct.pack('<III', 256, item_length, 0)
|
|
370 |
data += b'0' * 6
|
|
371 |
|
|
372 |
return data
|
|
373 |
|
|
374 |
def test_vhd_table_over_limit(self):
|
|
375 |
ins = format_inspector.VHDXInspector()
|
|
376 |
meta = format_inspector.CaptureRegion(0, 0)
|
|
377 |
desired = b'012345678ABCDEF0'
|
|
378 |
# This is a poorly-crafted image that specifies a larger table size
|
|
379 |
# than is allowed
|
|
380 |
meta.data = self._make_vhd_meta(desired, 33 * 2048)
|
|
381 |
ins.new_region('metadata', meta)
|
|
382 |
new_region = ins._find_meta_entry(ins._guid(desired))
|
|
383 |
# Make sure we clamp to our limit of 32 * 2048
|
|
384 |
self.assertEqual(
|
|
385 |
format_inspector.VHDXInspector.VHDX_METADATA_TABLE_MAX_SIZE,
|
|
386 |
new_region.length)
|
|
387 |
|
|
388 |
def test_vhd_table_under_limit(self):
|
|
389 |
ins = format_inspector.VHDXInspector()
|
|
390 |
meta = format_inspector.CaptureRegion(0, 0)
|
|
391 |
desired = b'012345678ABCDEF0'
|
|
392 |
meta.data = self._make_vhd_meta(desired, 16 * 2048)
|
|
393 |
ins.new_region('metadata', meta)
|
|
394 |
new_region = ins._find_meta_entry(ins._guid(desired))
|
|
395 |
# Table size was under the limit, make sure we get it back
|
|
396 |
self.assertEqual(16 * 2048, new_region.length)
|