134 | 134 |
def have_configuration_slot(self, slot):
|
135 | 135 |
return (slot in [1, 2])
|
136 | 136 |
|
137 | |
class YubiKeyUSBHID(YubiKey):
|
138 | |
"""
|
139 | |
Class for accessing a YubiKey over USB HID.
|
140 | |
|
141 | |
This class is for communicating specifically with standard YubiKeys
|
142 | |
(USB vendor id = 0x1050, product id = 0x10) using USB HID.
|
143 | |
|
144 | |
There is another class for the YubiKey NEO BETA, even though that
|
145 | |
product also goes by product id 0x10 for the BETA versions. The
|
146 | |
expectation is that the final YubiKey NEO will have it's own product id.
|
147 | |
|
148 | |
Tested with YubiKey versions 1.3 and 2.2.
|
149 | |
"""
|
150 | |
|
151 | |
model = 'YubiKey'
|
152 | |
description = 'YubiKey (or YubiKey NANO)'
|
|
137 |
|
|
138 |
class YubiKeyHIDDevice():
|
|
139 |
"""
|
|
140 |
High-level wrapper for low-level HID commands for a HID based YubiKey.
|
|
141 |
"""
|
153 | 142 |
|
154 | 143 |
def __init__(self, debug=False, skip=0):
|
155 | 144 |
"""
|
|
159 | 148 |
skip -- number of YubiKeys to skip
|
160 | 149 |
debug -- True or False
|
161 | 150 |
"""
|
162 | |
YubiKey.__init__(self, debug)
|
|
151 |
self.debug = debug
|
163 | 152 |
self._usb_handle = None
|
164 | 153 |
if not self._open(skip):
|
165 | 154 |
raise YubiKeyUSBHIDError('YubiKey USB HID initialization failed')
|
166 | 155 |
self.status()
|
167 | |
self.capabilities = \
|
168 | |
YubiKeyUSBHIDCapabilities(model = self.model, \
|
169 | |
version = self.version_num(), \
|
170 | |
default_answer = False)
|
|
156 |
|
|
157 |
def status(self):
|
|
158 |
"""
|
|
159 |
Poll YubiKey for status.
|
|
160 |
"""
|
|
161 |
data = self._read()
|
|
162 |
self._status = YubiKeyUSBHIDStatus(data)
|
|
163 |
return self._status
|
171 | 164 |
|
172 | 165 |
def __del__(self):
|
173 | 166 |
try:
|
|
176 | 169 |
except IOError:
|
177 | 170 |
pass
|
178 | 171 |
|
|
172 |
def _write_config(self, cfg, slot):
|
|
173 |
""" Write configuration to YubiKey. """
|
|
174 |
old_pgm_seq = self._status.pgm_seq
|
|
175 |
frame = cfg.to_frame(slot=slot)
|
|
176 |
self._debug("Writing %s frame :\n%s\n" % \
|
|
177 |
(yubikey_defs.command2str(frame.command), cfg))
|
|
178 |
self._write(frame)
|
|
179 |
self._waitfor_clear(yubikey_defs.SLOT_WRITE_FLAG)
|
|
180 |
# make sure we have a fresh pgm_seq value
|
|
181 |
self.status()
|
|
182 |
self._debug("Programmed slot %i, sequence %i -> %i\n" % (slot, old_pgm_seq, self._status.pgm_seq))
|
|
183 |
if self._status.pgm_seq != old_pgm_seq + 1:
|
|
184 |
raise YubiKeyUSBHIDError('YubiKey programming failed (seq %i not increased (%i))' % \
|
|
185 |
(old_pgm_seq, self._status.pgm_seq))
|
|
186 |
|
|
187 |
def _read_response(self, may_block=False):
|
|
188 |
""" Wait for a response to become available, and read it. """
|
|
189 |
# wait for response to become available
|
|
190 |
res = self._waitfor_set(yubikey_defs.RESP_PENDING_FLAG, may_block)[:7]
|
|
191 |
# continue reading while response pending is set
|
|
192 |
while True:
|
|
193 |
this = self._read()
|
|
194 |
flags = yubico_util.ord_byte(this[7])
|
|
195 |
if flags & yubikey_defs.RESP_PENDING_FLAG:
|
|
196 |
seq = flags & 0b00011111
|
|
197 |
if res and (seq == 0):
|
|
198 |
break
|
|
199 |
res += this[:7]
|
|
200 |
else:
|
|
201 |
break
|
|
202 |
self._write_reset()
|
|
203 |
return res
|
|
204 |
|
|
205 |
def _read(self):
|
|
206 |
""" Read a USB HID feature report from the YubiKey. """
|
|
207 |
request_type = _USB_TYPE_CLASS | _USB_RECIP_INTERFACE | _USB_ENDPOINT_IN
|
|
208 |
value = _REPORT_TYPE_FEATURE << 8 # apparently required for YubiKey 1.3.2, but not 2.2.x
|
|
209 |
recv = self._usb_handle.controlMsg(request_type,
|
|
210 |
_HID_GET_REPORT,
|
|
211 |
_FEATURE_RPT_SIZE,
|
|
212 |
value = value,
|
|
213 |
timeout = _USB_TIMEOUT_MS)
|
|
214 |
if len(recv) != _FEATURE_RPT_SIZE:
|
|
215 |
self._debug("Failed reading %i bytes (got %i) from USB HID YubiKey.\n"
|
|
216 |
% (_FEATURE_RPT_SIZE, recv))
|
|
217 |
raise YubiKeyUSBHIDError('Failed reading from USB HID YubiKey')
|
|
218 |
data = b''.join(yubico_util.chr_byte(c) for c in recv)
|
|
219 |
self._debug("READ : %s" % (yubico_util.hexdump(data, colorize=True)))
|
|
220 |
return data
|
|
221 |
|
|
222 |
def _write(self, frame):
|
|
223 |
"""
|
|
224 |
Write a YubiKeyFrame to the USB HID.
|
|
225 |
|
|
226 |
Includes polling for YubiKey readiness before each write.
|
|
227 |
"""
|
|
228 |
for data in frame.to_feature_reports(debug=self.debug):
|
|
229 |
debug_str = None
|
|
230 |
if self.debug:
|
|
231 |
(data, debug_str) = data
|
|
232 |
# first, we ensure the YubiKey will accept a write
|
|
233 |
self._waitfor_clear(yubikey_defs.SLOT_WRITE_FLAG)
|
|
234 |
self._raw_write(data, debug_str)
|
|
235 |
return True
|
|
236 |
|
|
237 |
def _write_reset(self):
|
|
238 |
"""
|
|
239 |
Reset read mode by issuing a dummy write.
|
|
240 |
"""
|
|
241 |
data = b'\x00\x00\x00\x00\x00\x00\x00\x8f'
|
|
242 |
self._raw_write(data)
|
|
243 |
self._waitfor_clear(yubikey_defs.SLOT_WRITE_FLAG)
|
|
244 |
return True
|
|
245 |
|
|
246 |
def _raw_write(self, data, debug_str = None):
|
|
247 |
"""
|
|
248 |
Write data to YubiKey.
|
|
249 |
"""
|
|
250 |
if self.debug:
|
|
251 |
if not debug_str:
|
|
252 |
debug_str = ''
|
|
253 |
hexdump = yubico_util.hexdump(data, colorize=True)[:-1] # strip LF
|
|
254 |
self._debug("WRITE : %s %s\n" % (hexdump, debug_str))
|
|
255 |
request_type = _USB_TYPE_CLASS | _USB_RECIP_INTERFACE | _USB_ENDPOINT_OUT
|
|
256 |
value = _REPORT_TYPE_FEATURE << 8 # apparently required for YubiKey 1.3.2, but not 2.2.x
|
|
257 |
sent = self._usb_handle.controlMsg(request_type,
|
|
258 |
_HID_SET_REPORT,
|
|
259 |
data,
|
|
260 |
value = value,
|
|
261 |
timeout = _USB_TIMEOUT_MS)
|
|
262 |
if sent != _FEATURE_RPT_SIZE:
|
|
263 |
self.debug("Failed writing %i bytes (wrote %i) to USB HID YubiKey.\n"
|
|
264 |
% (_FEATURE_RPT_SIZE, sent))
|
|
265 |
raise YubiKeyUSBHIDError('Failed talking to USB HID YubiKey')
|
|
266 |
return sent
|
|
267 |
|
|
268 |
def _waitfor_clear(self, mask, may_block=False):
|
|
269 |
"""
|
|
270 |
Wait for the YubiKey to turn OFF the bits in 'mask' in status responses.
|
|
271 |
|
|
272 |
Returns the 8 bytes last read.
|
|
273 |
"""
|
|
274 |
return self._waitfor('nand', mask, may_block)
|
|
275 |
|
|
276 |
def _waitfor_set(self, mask, may_block=False):
|
|
277 |
"""
|
|
278 |
Wait for the YubiKey to turn ON the bits in 'mask' in status responses.
|
|
279 |
|
|
280 |
Returns the 8 bytes last read.
|
|
281 |
"""
|
|
282 |
return self._waitfor('and', mask, may_block)
|
|
283 |
|
|
284 |
def _waitfor(self, mode, mask, may_block, timeout=2):
|
|
285 |
"""
|
|
286 |
Wait for the YubiKey to either turn ON or OFF certain bits in the status byte.
|
|
287 |
|
|
288 |
mode is either 'and' or 'nand'
|
|
289 |
timeout is a number of seconds (precision about ~0.5 seconds)
|
|
290 |
"""
|
|
291 |
finished = False
|
|
292 |
sleep = 0.01
|
|
293 |
# After six sleeps, we've slept 0.64 seconds.
|
|
294 |
wait_num = (timeout * 2) - 1 + 6
|
|
295 |
resp_timeout = False # YubiKey hasn't indicated RESP_TIMEOUT (yet)
|
|
296 |
while not finished:
|
|
297 |
this = self._read()
|
|
298 |
flags = yubico_util.ord_byte(this[7])
|
|
299 |
|
|
300 |
if flags & yubikey_defs.RESP_TIMEOUT_WAIT_FLAG:
|
|
301 |
if not resp_timeout:
|
|
302 |
resp_timeout = True
|
|
303 |
seconds_left = flags & yubikey_defs.RESP_TIMEOUT_WAIT_MASK
|
|
304 |
self._debug("Device indicates RESP_TIMEOUT (%i seconds left)\n" \
|
|
305 |
% (seconds_left))
|
|
306 |
if may_block:
|
|
307 |
# calculate new wait_num - never more than 20 seconds
|
|
308 |
seconds_left = min(20, seconds_left)
|
|
309 |
wait_num = (seconds_left * 2) - 1 + 6
|
|
310 |
|
|
311 |
if mode is 'nand':
|
|
312 |
if not flags & mask == mask:
|
|
313 |
finished = True
|
|
314 |
else:
|
|
315 |
self._debug("Status %s (0x%x) has not cleared bits %s (0x%x)\n"
|
|
316 |
% (bin(flags), flags, bin(mask), mask))
|
|
317 |
elif mode is 'and':
|
|
318 |
if flags & mask == mask:
|
|
319 |
finished = True
|
|
320 |
else:
|
|
321 |
self._debug("Status %s (0x%x) has not set bits %s (0x%x)\n"
|
|
322 |
% (bin(flags), flags, bin(mask), mask))
|
|
323 |
else:
|
|
324 |
assert()
|
|
325 |
|
|
326 |
if not finished:
|
|
327 |
wait_num -= 1
|
|
328 |
if wait_num == 0:
|
|
329 |
if mode is 'nand':
|
|
330 |
reason = 'Timed out waiting for YubiKey to clear status 0x%x' % mask
|
|
331 |
else:
|
|
332 |
reason = 'Timed out waiting for YubiKey to set status 0x%x' % mask
|
|
333 |
raise yubikey_base.YubiKeyTimeout(reason)
|
|
334 |
time.sleep(sleep)
|
|
335 |
sleep = min(sleep + sleep, 0.5)
|
|
336 |
else:
|
|
337 |
return this
|
|
338 |
|
|
339 |
def _open(self, skip=0):
|
|
340 |
""" Perform HID initialization """
|
|
341 |
usb_device = self._get_usb_device(skip)
|
|
342 |
|
|
343 |
if usb_device:
|
|
344 |
usb_conf = usb_device.configurations[0]
|
|
345 |
self._usb_int = usb_conf.interfaces[0][0]
|
|
346 |
else:
|
|
347 |
raise YubiKeyUSBHIDError('No USB YubiKey found')
|
|
348 |
|
|
349 |
try:
|
|
350 |
self._usb_handle = usb_device.open()
|
|
351 |
self._usb_handle.detachKernelDriver(0)
|
|
352 |
except Exception as error:
|
|
353 |
if 'could not detach kernel driver from interface' in str(error):
|
|
354 |
self._debug('The in-kernel-HID driver has already been detached\n')
|
|
355 |
else:
|
|
356 |
self._debug("detachKernelDriver not supported!\n")
|
|
357 |
|
|
358 |
try:
|
|
359 |
self._usb_handle.setConfiguration(1)
|
|
360 |
except usb.USBError:
|
|
361 |
self._debug("Unable to set configuration, ignoring...\n")
|
|
362 |
self._usb_handle.claimInterface(self._usb_int)
|
|
363 |
return True
|
|
364 |
|
|
365 |
def _close(self):
|
|
366 |
"""
|
|
367 |
Release the USB interface again.
|
|
368 |
"""
|
|
369 |
self._usb_handle.releaseInterface()
|
|
370 |
try:
|
|
371 |
# If we're using PyUSB >= 1.0 we can re-attach the kernel driver here.
|
|
372 |
self._usb_handle.dev.attach_kernel_driver(0)
|
|
373 |
except:
|
|
374 |
pass
|
|
375 |
self._usb_int = None
|
|
376 |
self._usb_handle = None
|
|
377 |
return True
|
|
378 |
|
|
379 |
def _get_usb_device(self, skip=0):
|
|
380 |
"""
|
|
381 |
Get YubiKey USB device.
|
|
382 |
|
|
383 |
Optionally allows you to skip n devices, to support multiple attached YubiKeys.
|
|
384 |
"""
|
|
385 |
try:
|
|
386 |
# PyUSB >= 1.0, this is a workaround for a problem with libusbx
|
|
387 |
# on Windows.
|
|
388 |
import usb.core
|
|
389 |
import usb.legacy
|
|
390 |
devices = [usb.legacy.Device(d) for d in usb.core.find(
|
|
391 |
find_all=True, idVendor=_YUBICO_VID)]
|
|
392 |
except ImportError:
|
|
393 |
# Using PyUsb < 1.0.
|
|
394 |
import usb
|
|
395 |
devices = [d for bus in usb.busses() for d in bus.devices]
|
|
396 |
for device in devices:
|
|
397 |
if device.idVendor == _YUBICO_VID:
|
|
398 |
if device.idProduct in _YK_PIDS:
|
|
399 |
if skip == 0:
|
|
400 |
return device
|
|
401 |
skip -= 1
|
|
402 |
return None
|
|
403 |
|
|
404 |
def _debug(self, out, print_prefix=True):
|
|
405 |
""" Print out to stderr, if debugging is enabled. """
|
|
406 |
if self.debug:
|
|
407 |
if print_prefix:
|
|
408 |
pre = self.__class__.__name__
|
|
409 |
if hasattr(self, 'debug_prefix'):
|
|
410 |
pre = getattr(self, 'debug_prefix')
|
|
411 |
sys.stderr.write("%s: " % pre)
|
|
412 |
sys.stderr.write(out)
|
|
413 |
|
|
414 |
|
|
415 |
class YubiKeyUSBHID(YubiKey):
|
|
416 |
"""
|
|
417 |
Class for accessing a YubiKey over USB HID.
|
|
418 |
|
|
419 |
This class is for communicating specifically with standard YubiKeys
|
|
420 |
(USB vendor id = 0x1050, product id = 0x10) using USB HID.
|
|
421 |
|
|
422 |
There is another class for the YubiKey NEO BETA, even though that
|
|
423 |
product also goes by product id 0x10 for the BETA versions. The
|
|
424 |
expectation is that the final YubiKey NEO will have it's own product id.
|
|
425 |
|
|
426 |
Tested with YubiKey versions 1.3 and 2.2.
|
|
427 |
"""
|
|
428 |
|
|
429 |
model = 'YubiKey'
|
|
430 |
description = 'YubiKey (or YubiKey NANO)'
|
|
431 |
_capabilities_cls = YubiKeyUSBHIDCapabilities
|
|
432 |
|
|
433 |
def __init__(self, debug=False, skip=0, hid_device=None):
|
|
434 |
"""
|
|
435 |
Find and connect to a YubiKey (USB HID).
|
|
436 |
|
|
437 |
Attributes :
|
|
438 |
skip -- number of YubiKeys to skip
|
|
439 |
debug -- True or False
|
|
440 |
"""
|
|
441 |
YubiKey.__init__(self, debug)
|
|
442 |
if hid_device is None:
|
|
443 |
self._device = YubiKeyHIDDevice(debug, skip)
|
|
444 |
else:
|
|
445 |
self._device = hid_device
|
|
446 |
self.capabilities = \
|
|
447 |
self._capabilities_cls(model=self.model,
|
|
448 |
version=self.version_num(),
|
|
449 |
default_answer=False)
|
|
450 |
|
179 | 451 |
def __repr__(self):
|
180 | 452 |
return '<%s instance at %s: YubiKey version %s>' % (
|
181 | 453 |
self.__class__.__name__,
|
|
187 | 459 |
"""
|
188 | 460 |
Poll YubiKey for status.
|
189 | 461 |
"""
|
190 | |
data = self._read()
|
191 | |
self._status = YubiKeyUSBHIDStatus(data)
|
192 | |
return self._status
|
|
462 |
return self._device.status()
|
193 | 463 |
|
194 | 464 |
def version_num(self):
|
195 | 465 |
""" Get the YubiKey version as a tuple (major, minor, build). """
|
196 | |
return self._status.ykver()
|
|
466 |
return self._device._status.ykver()
|
197 | 467 |
|
198 | 468 |
def version(self):
|
199 | 469 |
""" Get the YubiKey version. """
|
200 | |
return self._status.version()
|
|
470 |
return self._device._status.version()
|
201 | 471 |
|
202 | 472 |
def serial(self, may_block=True):
|
203 | 473 |
""" Get the YubiKey serial number (requires YubiKey 2.2). """
|
|
225 | 495 |
(cfg_req_ver[0], cfg_req_ver[1], self.version()))
|
226 | 496 |
if not self.capabilities.have_configuration_slot(slot):
|
227 | 497 |
raise YubiKeyUSBHIDError("Can't write configuration to slot %i" % (slot))
|
228 | |
return self._write_config(cfg, slot)
|
|
498 |
return self._device._write_config(cfg, slot)
|
229 | 499 |
|
230 | 500 |
def _read_serial(self, may_block):
|
231 | 501 |
""" Read the serial number from a YubiKey > 2.2. """
|
232 | 502 |
|
233 | 503 |
frame = yubikey_frame.YubiKeyFrame(command = _SLOT_DEVICE_SERIAL)
|
234 | |
self._write(frame)
|
235 | |
response = self._read_response(may_block=may_block)
|
|
504 |
self._device._write(frame)
|
|
505 |
response = self._device._read_response(may_block=may_block)
|
236 | 506 |
if not yubico_util.validate_crc16(response[:6]):
|
237 | 507 |
raise YubiKeyUSBHIDError("Read from device failed CRC check")
|
238 | 508 |
# the serial number is big-endian, although everything else is little-endian
|
|
268 | 538 |
raise yubico_exception.InputError('Invalid slot specified (%s)' % (slot))
|
269 | 539 |
|
270 | 540 |
frame = yubikey_frame.YubiKeyFrame(command=command, payload=challenge)
|
271 | |
self._write(frame)
|
272 | |
response = self._read_response(may_block=may_block)
|
|
541 |
self._device._write(frame)
|
|
542 |
response = self._device._read_response(may_block=may_block)
|
273 | 543 |
if not yubico_util.validate_crc16(response[:response_len + 2]):
|
274 | 544 |
raise YubiKeyUSBHIDError("Read from device failed CRC check")
|
275 | 545 |
return response[:response_len]
|
276 | 546 |
|
277 | |
def _write_config(self, cfg, slot):
|
278 | |
""" Write configuration to YubiKey. """
|
279 | |
old_pgm_seq = self._status.pgm_seq
|
280 | |
frame = cfg.to_frame(slot=slot)
|
281 | |
self._debug("Writing %s frame :\n%s\n" % \
|
282 | |
(yubikey_defs.command2str(frame.command), cfg))
|
283 | |
self._write(frame)
|
284 | |
self._waitfor_clear(yubikey_defs.SLOT_WRITE_FLAG)
|
285 | |
# make sure we have a fresh pgm_seq value
|
286 | |
self.status()
|
287 | |
self._debug("Programmed slot %i, sequence %i -> %i\n" % (slot, old_pgm_seq, self._status.pgm_seq))
|
288 | |
if self._status.pgm_seq != old_pgm_seq + 1:
|
289 | |
raise YubiKeyUSBHIDError('YubiKey programming failed (seq %i not increased (%i))' % \
|
290 | |
(old_pgm_seq, self._status.pgm_seq))
|
291 | |
|
292 | |
def _read_response(self, may_block=False):
|
293 | |
""" Wait for a response to become available, and read it. """
|
294 | |
# wait for response to become available
|
295 | |
res = self._waitfor_set(yubikey_defs.RESP_PENDING_FLAG, may_block)[:7]
|
296 | |
# continue reading while response pending is set
|
297 | |
while True:
|
298 | |
this = self._read()
|
299 | |
flags = yubico_util.ord_byte(this[7])
|
300 | |
if flags & yubikey_defs.RESP_PENDING_FLAG:
|
301 | |
seq = flags & 0b00011111
|
302 | |
if res and (seq == 0):
|
303 | |
break
|
304 | |
res += this[:7]
|
305 | |
else:
|
306 | |
break
|
307 | |
self._write_reset()
|
308 | |
return res
|
309 | |
|
310 | |
def _read(self):
|
311 | |
""" Read a USB HID feature report from the YubiKey. """
|
312 | |
request_type = _USB_TYPE_CLASS | _USB_RECIP_INTERFACE | _USB_ENDPOINT_IN
|
313 | |
value = _REPORT_TYPE_FEATURE << 8 # apparently required for YubiKey 1.3.2, but not 2.2.x
|
314 | |
recv = self._usb_handle.controlMsg(request_type,
|
315 | |
_HID_GET_REPORT,
|
316 | |
_FEATURE_RPT_SIZE,
|
317 | |
value = value,
|
318 | |
timeout = _USB_TIMEOUT_MS)
|
319 | |
if len(recv) != _FEATURE_RPT_SIZE:
|
320 | |
self._debug("Failed reading %i bytes (got %i) from USB HID YubiKey.\n"
|
321 | |
% (_FEATURE_RPT_SIZE, recv))
|
322 | |
raise YubiKeyUSBHIDError('Failed reading from USB HID YubiKey')
|
323 | |
data = b''.join(yubico_util.chr_byte(c) for c in recv)
|
324 | |
self._debug("READ : %s" % (yubico_util.hexdump(data, colorize=True)))
|
325 | |
return data
|
326 | |
|
327 | |
def _write(self, frame):
|
328 | |
"""
|
329 | |
Write a YubiKeyFrame to the USB HID.
|
330 | |
|
331 | |
Includes polling for YubiKey readiness before each write.
|
332 | |
"""
|
333 | |
for data in frame.to_feature_reports(debug=self.debug):
|
334 | |
debug_str = None
|
335 | |
if self.debug:
|
336 | |
(data, debug_str) = data
|
337 | |
# first, we ensure the YubiKey will accept a write
|
338 | |
self._waitfor_clear(yubikey_defs.SLOT_WRITE_FLAG)
|
339 | |
self._raw_write(data, debug_str)
|
340 | |
return True
|
341 | |
|
342 | |
def _write_reset(self):
|
343 | |
"""
|
344 | |
Reset read mode by issuing a dummy write.
|
345 | |
"""
|
346 | |
data = b'\x00\x00\x00\x00\x00\x00\x00\x8f'
|
347 | |
self._raw_write(data)
|
348 | |
self._waitfor_clear(yubikey_defs.SLOT_WRITE_FLAG)
|
349 | |
return True
|
350 | |
|
351 | |
def _raw_write(self, data, debug_str = None):
|
352 | |
"""
|
353 | |
Write data to YubiKey.
|
354 | |
"""
|
355 | |
if self.debug:
|
356 | |
if not debug_str:
|
357 | |
debug_str = ''
|
358 | |
hexdump = yubico_util.hexdump(data, colorize=True)[:-1] # strip LF
|
359 | |
self._debug("WRITE : %s %s\n" % (hexdump, debug_str))
|
360 | |
request_type = _USB_TYPE_CLASS | _USB_RECIP_INTERFACE | _USB_ENDPOINT_OUT
|
361 | |
value = _REPORT_TYPE_FEATURE << 8 # apparently required for YubiKey 1.3.2, but not 2.2.x
|
362 | |
sent = self._usb_handle.controlMsg(request_type,
|
363 | |
_HID_SET_REPORT,
|
364 | |
data,
|
365 | |
value = value,
|
366 | |
timeout = _USB_TIMEOUT_MS)
|
367 | |
if sent != _FEATURE_RPT_SIZE:
|
368 | |
self.debug("Failed writing %i bytes (wrote %i) to USB HID YubiKey.\n"
|
369 | |
% (_FEATURE_RPT_SIZE, sent))
|
370 | |
raise YubiKeyUSBHIDError('Failed talking to USB HID YubiKey')
|
371 | |
return sent
|
372 | |
|
373 | |
def _waitfor_clear(self, mask, may_block=False):
|
374 | |
"""
|
375 | |
Wait for the YubiKey to turn OFF the bits in 'mask' in status responses.
|
376 | |
|
377 | |
Returns the 8 bytes last read.
|
378 | |
"""
|
379 | |
return self._waitfor('nand', mask, may_block)
|
380 | |
|
381 | |
def _waitfor_set(self, mask, may_block=False):
|
382 | |
"""
|
383 | |
Wait for the YubiKey to turn ON the bits in 'mask' in status responses.
|
384 | |
|
385 | |
Returns the 8 bytes last read.
|
386 | |
"""
|
387 | |
return self._waitfor('and', mask, may_block)
|
388 | |
|
389 | |
def _waitfor(self, mode, mask, may_block, timeout=2):
|
390 | |
"""
|
391 | |
Wait for the YubiKey to either turn ON or OFF certain bits in the status byte.
|
392 | |
|
393 | |
mode is either 'and' or 'nand'
|
394 | |
timeout is a number of seconds (precision about ~0.5 seconds)
|
395 | |
"""
|
396 | |
finished = False
|
397 | |
sleep = 0.01
|
398 | |
# After six sleeps, we've slept 0.64 seconds.
|
399 | |
wait_num = (timeout * 2) - 1 + 6
|
400 | |
resp_timeout = False # YubiKey hasn't indicated RESP_TIMEOUT (yet)
|
401 | |
while not finished:
|
402 | |
this = self._read()
|
403 | |
flags = yubico_util.ord_byte(this[7])
|
404 | |
|
405 | |
if flags & yubikey_defs.RESP_TIMEOUT_WAIT_FLAG:
|
406 | |
if not resp_timeout:
|
407 | |
resp_timeout = True
|
408 | |
seconds_left = flags & yubikey_defs.RESP_TIMEOUT_WAIT_MASK
|
409 | |
self._debug("Device indicates RESP_TIMEOUT (%i seconds left)\n" \
|
410 | |
% (seconds_left))
|
411 | |
if may_block:
|
412 | |
# calculate new wait_num - never more than 20 seconds
|
413 | |
seconds_left = min(20, seconds_left)
|
414 | |
wait_num = (seconds_left * 2) - 1 + 6
|
415 | |
|
416 | |
if mode is 'nand':
|
417 | |
if not flags & mask == mask:
|
418 | |
finished = True
|
419 | |
else:
|
420 | |
self._debug("Status %s (0x%x) has not cleared bits %s (0x%x)\n"
|
421 | |
% (bin(flags), flags, bin(mask), mask))
|
422 | |
elif mode is 'and':
|
423 | |
if flags & mask == mask:
|
424 | |
finished = True
|
425 | |
else:
|
426 | |
self._debug("Status %s (0x%x) has not set bits %s (0x%x)\n"
|
427 | |
% (bin(flags), flags, bin(mask), mask))
|
428 | |
else:
|
429 | |
assert()
|
430 | |
|
431 | |
if not finished:
|
432 | |
wait_num -= 1
|
433 | |
if wait_num == 0:
|
434 | |
if mode is 'nand':
|
435 | |
reason = 'Timed out waiting for YubiKey to clear status 0x%x' % mask
|
436 | |
else:
|
437 | |
reason = 'Timed out waiting for YubiKey to set status 0x%x' % mask
|
438 | |
raise yubikey_base.YubiKeyTimeout(reason)
|
439 | |
time.sleep(sleep)
|
440 | |
sleep = min(sleep + sleep, 0.5)
|
441 | |
else:
|
442 | |
return this
|
443 | |
|
444 | |
def _open(self, skip=0):
|
445 | |
""" Perform HID initialization """
|
446 | |
usb_device = self._get_usb_device(skip)
|
447 | |
|
448 | |
if usb_device:
|
449 | |
usb_conf = usb_device.configurations[0]
|
450 | |
self._usb_int = usb_conf.interfaces[0][0]
|
451 | |
else:
|
452 | |
raise YubiKeyUSBHIDError('No USB YubiKey found')
|
453 | |
|
454 | |
try:
|
455 | |
self._usb_handle = usb_device.open()
|
456 | |
self._usb_handle.detachKernelDriver(0)
|
457 | |
except Exception as error:
|
458 | |
if 'could not detach kernel driver from interface' in str(error):
|
459 | |
self._debug('The in-kernel-HID driver has already been detached\n')
|
460 | |
else:
|
461 | |
self._debug("detachKernelDriver not supported!\n")
|
462 | |
|
463 | |
try:
|
464 | |
self._usb_handle.setConfiguration(1)
|
465 | |
except usb.USBError:
|
466 | |
self._debug("Unable to set configuration, ignoring...\n")
|
467 | |
self._usb_handle.claimInterface(self._usb_int)
|
468 | |
return True
|
469 | |
|
470 | |
def _close(self):
|
471 | |
"""
|
472 | |
Release the USB interface again.
|
473 | |
"""
|
474 | |
self._usb_handle.releaseInterface()
|
475 | |
try:
|
476 | |
# If we're using PyUSB >= 1.0 we can re-attach the kernel driver here.
|
477 | |
self._usb_handle.dev.attach_kernel_driver(0)
|
478 | |
except:
|
479 | |
pass
|
480 | |
self._usb_int = None
|
481 | |
self._usb_handle = None
|
482 | |
return True
|
483 | |
|
484 | |
def _get_usb_device(self, skip=0):
|
485 | |
"""
|
486 | |
Get YubiKey USB device.
|
487 | |
|
488 | |
Optionally allows you to skip n devices, to support multiple attached YubiKeys.
|
489 | |
"""
|
490 | |
try:
|
491 | |
# PyUSB >= 1.0, this is a workaround for a problem with libusbx
|
492 | |
# on Windows.
|
493 | |
import usb.core
|
494 | |
import usb.legacy
|
495 | |
devices = [usb.legacy.Device(d) for d in usb.core.find(
|
496 | |
find_all=True, idVendor=_YUBICO_VID)]
|
497 | |
except ImportError:
|
498 | |
# Using PyUsb < 1.0.
|
499 | |
import usb
|
500 | |
devices = [d for bus in usb.busses() for d in bus.devices]
|
501 | |
for device in devices:
|
502 | |
if device.idVendor == _YUBICO_VID:
|
503 | |
if device.idProduct in _YK_PIDS:
|
504 | |
if skip == 0:
|
505 | |
return device
|
506 | |
skip -= 1
|
507 | |
return None
|
508 | |
|
509 | |
def _debug(self, out, print_prefix=True):
|
510 | |
""" Print out to stderr, if debugging is enabled. """
|
511 | |
if self.debug:
|
512 | |
if print_prefix:
|
513 | |
pre = self.__class__.__name__
|
514 | |
if hasattr(self, 'debug_prefix'):
|
515 | |
pre = getattr(self, 'debug_prefix')
|
516 | |
sys.stderr.write("%s: " % (self.__class__.__name__))
|
517 | |
sys.stderr.write(out)
|
518 | 547 |
|
519 | 548 |
class YubiKeyUSBHIDStatus():
|
520 | 549 |
""" Class to represent the status information we get from the YubiKey. """
|