New upstream version 1.11.0
Samuel Henrique
5 years ago
0 | 0 | CHANGELOG |
1 | 1 | ========= |
2 | ||
3 | 1.11.0 | |
4 | ---------- | |
5 | * New command **shodan scan list** to list recently launched scans | |
6 | * New command **shodan alert triggers** to list the available notification triggers | |
7 | * New command **shodan alert enable** to enable a notification trigger | |
8 | * New command **shodan alert disable** to disable a notification trigger | |
9 | * New command **shodan alert info** to show details of a specific alert | |
10 | * Include timestamp, vulns and tags in CSV converter (#85) | |
11 | * Fixed bug that caused an exception when parsing uncompressed data files in Python3 | |
12 | * Code quality improvements | |
13 | * Thank you for contributions from @wagner-certat, @cclauss, @opt9, @voldmar and Antoine Neuenschwander | |
14 | ||
15 | 1.10.4 | |
16 | ------ | |
17 | * Fix a bug when showing old banner records that don't have the "transport" property | |
18 | * Code quality improvements (bare excepts) | |
2 | 19 | |
3 | 20 | 1.10.3 |
4 | 21 | ------ |
5 | 5 | README = open('README.rst', 'r').read() |
6 | 6 | |
7 | 7 | setup( |
8 | name = 'shodan', | |
9 | version = '1.10.4', | |
10 | description = 'Python library and command-line utility for Shodan (https://developer.shodan.io)', | |
11 | long_description = README, | |
12 | long_description_content_type = 'text/x-rst', | |
13 | author = 'John Matherly', | |
14 | author_email = 'jmath@shodan.io', | |
15 | url = 'http://github.com/achillean/shodan-python/tree/master', | |
16 | packages = ['shodan', 'shodan.cli', 'shodan.cli.converter'], | |
17 | entry_points = {'console_scripts': ['shodan = shodan.__main__:main']}, | |
18 | install_requires = DEPENDENCIES, | |
19 | keywords = ['security', 'network'], | |
20 | classifiers = [ | |
8 | name='shodan', | |
9 | version='1.11.0', | |
10 | description='Python library and command-line utility for Shodan (https://developer.shodan.io)', | |
11 | long_description=README, | |
12 | long_description_content_type='text/x-rst', | |
13 | author='John Matherly', | |
14 | author_email='jmath@shodan.io', | |
15 | url='http://github.com/achillean/shodan-python/tree/master', | |
16 | packages=['shodan', 'shodan.cli', 'shodan.cli.converter'], | |
17 | entry_points={'console_scripts': ['shodan=shodan.__main__:main']}, | |
18 | install_requires=DEPENDENCIES, | |
19 | keywords=['security', 'network'], | |
20 | classifiers=[ | |
21 | 21 | 'Development Status :: 5 - Production/Stable', |
22 | 22 | 'Intended Audience :: Developers', |
23 | 23 | 'License :: OSI Approved :: MIT License', |
48 | 48 | from click_plugins import with_plugins |
49 | 49 | from pkg_resources import iter_entry_points |
50 | 50 | |
51 | # Large subcommands are stored in separate modules | |
52 | from shodan.cli.alert import alert | |
53 | from shodan.cli.data import data | |
54 | from shodan.cli.organization import org | |
55 | from shodan.cli.scan import scan | |
56 | ||
57 | ||
51 | 58 | # Make "-h" work like "--help" |
52 | 59 | CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) |
53 | 60 | |
56 | 63 | basestring |
57 | 64 | except NameError: |
58 | 65 | basestring = str |
59 | ||
60 | 66 | |
61 | 67 | # Define the main entry point for all of our commands |
62 | 68 | # and expose a way for 3rd-party plugins to tie into the Shodan CLI. |
66 | 72 | pass |
67 | 73 | |
68 | 74 | |
69 | # Large subcommands are stored in separate modules | |
70 | from shodan.cli.alert import alert | |
71 | from shodan.cli.data import data | |
72 | from shodan.cli.organization import org | |
73 | from shodan.cli.scan import scan | |
75 | # Setup the large subcommands | |
74 | 76 | main.add_command(alert) |
75 | 77 | main.add_command(data) |
76 | 78 | main.add_command(org) |
150 | 152 | |
151 | 153 | os.chmod(keyfile, 0o600) |
152 | 154 | |
155 | ||
153 | 156 | @main.command() |
154 | 157 | @click.argument('query', metavar='<search query>', nargs=-1) |
155 | 158 | def count(query): |
202 | 205 | try: |
203 | 206 | total = api.count(query)['total'] |
204 | 207 | info = api.info() |
205 | except: | |
208 | except Exception: | |
206 | 209 | raise click.ClickException('The Shodan API is unresponsive at the moment, please try again later.') |
207 | 210 | |
208 | 211 | # Print some summary information about the download request |
272 | 275 | helpers.write_banner(fout, banner) |
273 | 276 | except shodan.APIError as e: |
274 | 277 | raise click.ClickException(e.value) |
275 | ||
276 | 278 | |
277 | 279 | |
278 | 280 | @main.command() |
307 | 309 | |
308 | 310 | has_filters = len(filters) > 0 |
309 | 311 | |
310 | ||
311 | 312 | # Setup the output file handle |
312 | 313 | fout = None |
313 | 314 | if filename: |
332 | 333 | helpers.write_banner(fout, banner) |
333 | 334 | |
334 | 335 | # Loop over all the fields and print the banner as a row |
335 | for field in fields: | |
336 | for i, field in enumerate(fields): | |
336 | 337 | tmp = u'' |
337 | 338 | value = get_banner_field(banner, field) |
338 | 339 | if value: |
350 | 351 | if color: |
351 | 352 | tmp = click.style(tmp, fg=COLORIZE_FIELDS.get(field, 'white')) |
352 | 353 | |
353 | # Add the field information to the row | |
354 | row += tmp | |
355 | row += separator | |
354 | # Add the field information to the row | |
355 | if i > 0: | |
356 | row += separator | |
357 | row += tmp | |
356 | 358 | |
357 | 359 | click.echo(row) |
358 | 360 | |
518 | 520 | if len(values) > counter: |
519 | 521 | has_items = True |
520 | 522 | row[pos] = values[counter]['value'] |
521 | row[pos+1] = values[counter]['count'] | |
523 | row[pos + 1] = values[counter]['count'] | |
522 | 524 | |
523 | 525 | pos += 2 |
524 | 526 | |
544 | 546 | @click.option('--asn', help='A comma-separated list of ASNs to grab data on.', default=None, type=str) |
545 | 547 | @click.option('--alert', help='The network alert ID or "all" to subscribe to all network alerts on your account.', default=None, type=str) |
546 | 548 | @click.option('--compresslevel', help='The gzip compression level (0-9; 0 = no compression, 9 = most compression', default=9, type=int) |
547 | def stream(color, fields, separator, limit, datadir, ports, quiet, timeout, streamer, countries, asn, alert, compresslevel): | |
549 | def stream(color, fields, separator, limit, datadir, ports, quiet, timeout, streamer, countries, asn, alert, compresslevel): | |
548 | 550 | """Stream data in real-time.""" |
549 | 551 | # Setup the Shodan API |
550 | 552 | key = get_api_key() |
640 | 642 | if datadir: |
641 | 643 | cur_time = timestr() |
642 | 644 | if cur_time != last_time: |
643 | last_time = cur_time | |
644 | fout.close() | |
645 | fout = open_streaming_file(datadir, last_time) | |
645 | last_time = cur_time | |
646 | fout.close() | |
647 | fout = open_streaming_file(datadir, last_time) | |
646 | 648 | helpers.write_banner(fout, banner) |
647 | 649 | |
648 | 650 | # Print the banner information to stdout |
705 | 707 | click.echo(click.style('Not a honeypot', fg='green')) |
706 | 708 | |
707 | 709 | click.echo('Score: {}'.format(score)) |
708 | except: | |
710 | except Exception: | |
709 | 711 | raise click.ClickException('Unable to calculate honeyscore') |
710 | 712 | |
711 | 713 | |
724 | 726 | except Exception as e: |
725 | 727 | raise click.ClickException(u'{}'.format(e)) |
726 | 728 | |
729 | ||
727 | 730 | if __name__ == '__main__': |
728 | 731 | main() |
0 | class Alert: | |
1 | def __init__(self): | |
2 | self.id = None | |
3 | self.name = None | |
4 | self.api_key = None | |
5 | self.filters = None | |
6 | self.credits = None | |
7 | self.created = None | |
8 | self.expires = None |
0 | 0 | import click |
1 | 1 | import shodan |
2 | 2 | |
3 | from operator import itemgetter | |
3 | 4 | from shodan.cli.helpers import get_api_key |
5 | ||
4 | 6 | |
5 | 7 | @click.group() |
6 | 8 | def alert(): |
23 | 25 | except shodan.APIError as e: |
24 | 26 | raise click.ClickException(e.value) |
25 | 27 | click.echo("Alerts deleted") |
28 | ||
26 | 29 | |
27 | 30 | @alert.command(name='create') |
28 | 31 | @click.argument('name', metavar='<name>') |
41 | 44 | click.secho('Successfully created network alert!', fg='green') |
42 | 45 | click.secho('Alert ID: {}'.format(alert['id']), fg='cyan') |
43 | 46 | |
47 | ||
48 | @alert.command(name='info') | |
49 | @click.argument('alert', metavar='<alert id>') | |
50 | def alert_info(alert): | |
51 | """Show information about a specific alert""" | |
52 | key = get_api_key() | |
53 | api = shodan.Shodan(key) | |
54 | ||
55 | try: | |
56 | info = api.alerts(aid=alert) | |
57 | except shodan.APIError as e: | |
58 | raise click.ClickException(e.value) | |
59 | ||
60 | click.secho(info['name'], fg='cyan') | |
61 | click.secho('Created: ', nl=False, dim=True) | |
62 | click.secho(info['created'], fg='magenta') | |
63 | ||
64 | click.secho('Notifications: ', nl=False, dim=True) | |
65 | if 'triggers' in info and info['triggers']: | |
66 | click.secho('enabled', fg='green') | |
67 | else: | |
68 | click.echo('disabled') | |
69 | ||
70 | click.echo('') | |
71 | click.secho('Network Range(s):', dim=True) | |
72 | ||
73 | for network in info['filters']['ip']: | |
74 | click.echo(u' > {}'.format(click.style(network, fg='yellow'))) | |
75 | ||
76 | click.echo('') | |
77 | if 'triggers' in info and info['triggers']: | |
78 | click.secho('Triggers:', dim=True) | |
79 | for trigger in info['triggers']: | |
80 | click.echo(u' > {}'.format(click.style(trigger, fg='yellow'))) | |
81 | click.echo('') | |
82 | ||
83 | ||
44 | 84 | @alert.command(name='list') |
45 | 85 | @click.option('--expired', help='Whether or not to show expired alerts.', default=True, type=bool) |
46 | 86 | def alert_list(expired): |
56 | 96 | |
57 | 97 | if len(results) > 0: |
58 | 98 | click.echo(u'# {:14} {:<21} {:<15s}'.format('Alert ID', 'Name', 'IP/ Network')) |
59 | # click.echo('#' * 65) | |
99 | ||
60 | 100 | for alert in results: |
61 | 101 | click.echo( |
62 | 102 | u'{:16} {:<30} {:<35} '.format( |
63 | click.style(alert['id'], fg='yellow'), | |
103 | click.style(alert['id'], fg='yellow'), | |
64 | 104 | click.style(alert['name'], fg='cyan'), |
65 | 105 | click.style(', '.join(alert['filters']['ip']), fg='white') |
66 | 106 | ), |
67 | 107 | nl=False |
68 | 108 | ) |
109 | ||
110 | if 'triggers' in alert and alert['triggers']: | |
111 | click.secho('Triggers: ', fg='magenta', nl=False) | |
112 | click.echo(', '.join(alert['triggers'].keys()), nl=False) | |
69 | 113 | |
70 | 114 | if 'expired' in alert and alert['expired']: |
71 | 115 | click.secho('expired', fg='red') |
88 | 132 | except shodan.APIError as e: |
89 | 133 | raise click.ClickException(e.value) |
90 | 134 | click.echo("Alert deleted") |
135 | ||
136 | ||
137 | @alert.command(name='triggers') | |
138 | def alert_list_triggers(): | |
139 | """List the available notification triggers""" | |
140 | key = get_api_key() | |
141 | ||
142 | # Get the list | |
143 | api = shodan.Shodan(key) | |
144 | try: | |
145 | results = api.alert_triggers() | |
146 | except shodan.APIError as e: | |
147 | raise click.ClickException(e.value) | |
148 | ||
149 | if len(results) > 0: | |
150 | click.secho('The following triggers can be enabled on alerts:', dim=True) | |
151 | click.echo('') | |
152 | ||
153 | for trigger in sorted(results, key=itemgetter('name')): | |
154 | click.secho('{:<12} '.format('Name'), dim=True, nl=False) | |
155 | click.secho(trigger['name'], fg='yellow') | |
156 | ||
157 | click.secho('{:<12} '.format('Description'), dim=True, nl=False) | |
158 | click.secho(trigger['description'], fg='cyan') | |
159 | ||
160 | click.secho('{:<12} '.format('Rule'), dim=True, nl=False) | |
161 | click.echo(trigger['rule']) | |
162 | ||
163 | click.echo('') | |
164 | else: | |
165 | click.echo("No triggers currently available.") | |
166 | ||
167 | ||
168 | @alert.command(name='enable') | |
169 | @click.argument('alert_id', metavar='<alert ID>') | |
170 | @click.argument('trigger', metavar='<trigger name>') | |
171 | def alert_enable_trigger(alert_id, trigger): | |
172 | """Enable a trigger for the alert""" | |
173 | key = get_api_key() | |
174 | ||
175 | # Get the list | |
176 | api = shodan.Shodan(key) | |
177 | try: | |
178 | api.enable_alert_trigger(alert_id, trigger) | |
179 | except shodan.APIError as e: | |
180 | raise click.ClickException(e.value) | |
181 | ||
182 | click.secho('Successfully enabled the trigger: {}'.format(trigger), fg='green') | |
183 | ||
184 | ||
185 | @alert.command(name='disable') | |
186 | @click.argument('alert_id', metavar='<alert ID>') | |
187 | @click.argument('trigger', metavar='<trigger name>') | |
188 | def alert_disable_trigger(alert_id, trigger): | |
189 | """Disable a trigger for the alert""" | |
190 | key = get_api_key() | |
191 | ||
192 | # Get the list | |
193 | api = shodan.Shodan(key) | |
194 | try: | |
195 | api.disable_alert_trigger(alert_id, trigger) | |
196 | except shodan.APIError as e: | |
197 | raise click.ClickException(e.value) | |
198 | ||
199 | click.secho('Successfully disabled the trigger: {}'.format(trigger), fg='green') |
1 | 1 | from .excel import ExcelConverter |
2 | 2 | from .geojson import GeoJsonConverter |
3 | 3 | from .images import ImagesConverter |
4 | from .kml import KmlConverter⏎ | |
4 | from .kml import KmlConverter |
23 | 23 | 'os', |
24 | 24 | 'asn', |
25 | 25 | 'port', |
26 | 'tags', | |
27 | 'timestamp', | |
26 | 28 | 'transport', |
27 | 29 | 'product', |
28 | 30 | 'version', |
29 | ||
31 | 'vulns', | |
32 | ||
30 | 33 | 'ssl.cipher.version', |
31 | 34 | 'ssl.cipher.bits', |
32 | 35 | 'ssl.cipher.name', |
35 | 38 | 'ssl.cert.serial', |
36 | 39 | 'ssl.cert.fingerprint.sha1', |
37 | 40 | 'ssl.cert.fingerprint.sha256', |
38 | ||
41 | ||
39 | 42 | 'html', |
40 | 43 | 'title', |
41 | 44 | ] |
42 | ||
45 | ||
43 | 46 | def process(self, files): |
44 | 47 | writer = csv_writer(self.fout, dialect=excel) |
45 | ||
48 | ||
46 | 49 | # Write the header |
47 | 50 | writer.writerow(self.fields) |
48 | ||
51 | ||
49 | 52 | for banner in iterate_files(files): |
53 | # The "vulns" property can't be nicely flattened as-is so we turn | |
54 | # it into a list before processing the banner. | |
55 | if 'vulns' in banner: | |
56 | banner['vulns'] = banner['vulns'].keys() | |
57 | ||
50 | 58 | try: |
51 | 59 | row = [] |
52 | 60 | for field in self.fields: |
55 | 63 | writer.writerow(row) |
56 | 64 | except Exception: |
57 | 65 | pass |
58 | ||
66 | ||
59 | 67 | def banner_field(self, banner, flat_field): |
60 | 68 | # The provided field is a collapsed form of the actual field |
61 | 69 | fields = flat_field.split('.') |
62 | ||
70 | ||
63 | 71 | try: |
64 | 72 | current_obj = banner |
65 | 73 | for field in fields: |
66 | 74 | current_obj = current_obj[field] |
67 | ||
75 | ||
68 | 76 | # Convert a list into a concatenated string |
69 | 77 | if isinstance(current_obj, list): |
70 | 78 | current_obj = ','.join([str(i) for i in current_obj]) |
71 | ||
79 | ||
72 | 80 | return current_obj |
73 | 81 | except Exception: |
74 | 82 | pass |
75 | ||
83 | ||
76 | 84 | return '' |
77 | ||
85 | ||
78 | 86 | def flatten(self, d, parent_key='', sep='.'): |
79 | 87 | items = [] |
80 | 88 | for k, v in d.items(): |
81 | 89 | new_key = parent_key + sep + k if parent_key else k |
82 | 90 | if isinstance(v, MutableMapping): |
83 | # pylint: disable=E0602 | |
84 | items.extend(flatten(v, new_key, sep=sep).items()) | |
91 | items.extend(self.flatten(v, new_key, sep=sep).items()) | |
85 | 92 | else: |
86 | 93 | items.append((new_key, v)) |
87 | 94 | return dict(items) |
22 | 22 | 'transport', |
23 | 23 | 'product', |
24 | 24 | 'version', |
25 | ||
25 | ||
26 | 26 | 'http.server', |
27 | 27 | 'http.title', |
28 | 28 | ] |
39 | 39 | 'http.server': 'Web Server', |
40 | 40 | 'http.title': 'Website Title', |
41 | 41 | } |
42 | ||
42 | ||
43 | 43 | def process(self, files): |
44 | 44 | # Get the filename from the already-open file handle |
45 | 45 | filename = self.fout.name |
54 | 54 | bold = workbook.add_format({ |
55 | 55 | 'bold': 1, |
56 | 56 | }) |
57 | ||
57 | ||
58 | 58 | # Create the main worksheet where all the raw data is shown |
59 | 59 | main_sheet = workbook.add_worksheet('Raw Data') |
60 | 60 | |
61 | 61 | # Write the header |
62 | main_sheet.write(0, 0, 'IP', bold) # The IP field can be either ip_str or ipv6 so we treat it differently | |
62 | main_sheet.write(0, 0, 'IP', bold) # The IP field can be either ip_str or ipv6 so we treat it differently | |
63 | 63 | main_sheet.set_column(0, 0, 20) |
64 | ||
64 | ||
65 | 65 | row = 0 |
66 | 66 | col = 1 |
67 | 67 | for field in self.fields: |
79 | 79 | for field in self.fields: |
80 | 80 | value = self.banner_field(banner, field) |
81 | 81 | data.append(value) |
82 | ||
82 | ||
83 | 83 | # Write those values to the main workbook |
84 | 84 | # Starting off w/ the special "IP" property |
85 | 85 | main_sheet.write_string(row, 0, get_ip(banner)) |
91 | 91 | row += 1 |
92 | 92 | except Exception: |
93 | 93 | pass |
94 | ||
94 | ||
95 | 95 | # Aggregate summary information |
96 | 96 | total += 1 |
97 | 97 | ports[banner['port']] += 1 |
98 | ||
98 | ||
99 | 99 | summary_sheet = workbook.add_worksheet('Summary') |
100 | 100 | summary_sheet.write(0, 0, 'Total', bold) |
101 | 101 | summary_sheet.write(0, 1, total) |
108 | 108 | summary_sheet.write(row, col, key) |
109 | 109 | summary_sheet.write(row, col + 1, value) |
110 | 110 | row += 1 |
111 | ||
111 | ||
112 | 112 | def banner_field(self, banner, flat_field): |
113 | 113 | # The provided field is a collapsed form of the actual field |
114 | 114 | fields = flat_field.split('.') |
115 | ||
115 | ||
116 | 116 | try: |
117 | 117 | current_obj = banner |
118 | 118 | for field in fields: |
119 | 119 | current_obj = current_obj[field] |
120 | ||
120 | ||
121 | 121 | # Convert a list into a concatenated string |
122 | 122 | if isinstance(current_obj, list): |
123 | 123 | current_obj = ','.join([str(i) for i in current_obj]) |
124 | ||
124 | ||
125 | 125 | return current_obj |
126 | 126 | except Exception: |
127 | 127 | pass |
128 | ||
128 | ||
129 | 129 | return '' |
1 | 1 | from .base import Converter |
2 | 2 | from ...helpers import get_ip, iterate_files |
3 | 3 | |
4 | ||
4 | 5 | class GeoJsonConverter(Converter): |
5 | ||
6 | ||
6 | 7 | def header(self): |
7 | 8 | self.fout.write("""{ |
8 | 9 | "type": "FeatureCollection", |
9 | 10 | "features": [ |
10 | 11 | """) |
11 | ||
12 | ||
12 | 13 | def footer(self): |
13 | 14 | self.fout.write("""{ }]}""") |
14 | ||
15 | ||
15 | 16 | def process(self, files): |
16 | 17 | # Write the header |
17 | 18 | self.header() |
18 | ||
19 | ||
19 | 20 | hosts = {} |
20 | 21 | for banner in iterate_files(files): |
21 | 22 | ip = get_ip(banner) |
22 | 23 | if not ip: |
23 | 24 | continue |
24 | ||
25 | ||
25 | 26 | if ip not in hosts: |
26 | 27 | hosts[ip] = banner |
27 | 28 | hosts[ip]['ports'] = [] |
28 | ||
29 | ||
29 | 30 | hosts[ip]['ports'].append(banner['port']) |
30 | ||
31 | ||
31 | 32 | for ip, host in iter(hosts.items()): |
32 | 33 | self.write(host) |
33 | ||
34 | ||
34 | 35 | self.footer() |
35 | ||
36 | ||
36 | ||
37 | 37 | def write(self, host): |
38 | 38 | try: |
39 | 39 | ip = get_ip(host) |
13 | 13 | # special code in the Shodan CLI that relies on the "dirname" property to let |
14 | 14 | # the user know where the images have been stored. |
15 | 15 | dirname = None |
16 | ||
16 | ||
17 | 17 | def process(self, files): |
18 | 18 | # Get the filename from the already-open file handle and use it as |
19 | 19 | # the directory name to store the images. |
1 | 1 | from .base import Converter |
2 | 2 | from ...helpers import iterate_files |
3 | 3 | |
4 | ||
4 | 5 | class KmlConverter(Converter): |
5 | ||
6 | ||
6 | 7 | def header(self): |
7 | 8 | self.fout.write("""<?xml version="1.0" encoding="UTF-8"?> |
8 | 9 | <kml xmlns="http://www.opengis.net/kml/2.2"> |
9 | 10 | <Document>""") |
10 | ||
11 | ||
11 | 12 | def footer(self): |
12 | 13 | self.fout.write("""</Document></kml>""") |
13 | ||
14 | ||
14 | 15 | def process(self, files): |
15 | 16 | # Write the header |
16 | 17 | self.header() |
17 | ||
18 | ||
18 | 19 | hosts = {} |
19 | 20 | for banner in iterate_files(files): |
20 | 21 | ip = banner.get('ip_str', banner.get('ipv6', None)) |
21 | 22 | if not ip: |
22 | 23 | continue |
23 | ||
24 | ||
24 | 25 | if ip not in hosts: |
25 | 26 | hosts[ip] = banner |
26 | 27 | hosts[ip]['ports'] = [] |
27 | ||
28 | ||
28 | 29 | hosts[ip]['ports'].append(banner['port']) |
29 | ||
30 | ||
30 | 31 | for ip, host in iter(hosts.items()): |
31 | 32 | self.write(host) |
32 | ||
33 | ||
33 | 34 | self.footer() |
34 | ||
35 | ||
35 | ||
36 | 36 | def write(self, host): |
37 | 37 | try: |
38 | 38 | ip = host.get('ip_str', host.get('ipv6', None)) |
8 | 8 | import sys |
9 | 9 | |
10 | 10 | from .settings import SHODAN_CONFIG_DIR |
11 | ||
12 | try: | |
13 | basestring # Python 2 | |
14 | except NameError: | |
15 | basestring = (str, ) # Python 3 | |
16 | ||
11 | 17 | |
12 | 18 | def get_api_key(): |
13 | 19 | '''Returns the API key of the current logged-in user.''' |
63 | 63 | for port in ports: |
64 | 64 | banner = { |
65 | 65 | 'port': port, |
66 | 'transport': 'tcp', # All the filtered services use TCP | |
67 | 'timestamp': host['data'][-1]['timestamp'], # Use the timestamp of the oldest banner | |
68 | 'placeholder': True, # Don't store this banner when the file is saved | |
66 | 'transport': 'tcp', # All the filtered services use TCP | |
67 | 'timestamp': host['data'][-1]['timestamp'], # Use the timestamp of the oldest banner | |
68 | 'placeholder': True, # Don't store this banner when the file is saved | |
69 | 69 | } |
70 | 70 | host['data'].append(banner) |
71 | 71 | |
93 | 93 | # Show optional ssl info |
94 | 94 | if 'ssl' in banner: |
95 | 95 | if 'versions' in banner['ssl'] and banner['ssl']['versions']: |
96 | click.echo('\t|-- SSL Versions: {}'.format(', '.join([version for version in sorted(banner['ssl']['versions']) if not version.startswith('-')]))) | |
96 | click.echo('\t|-- SSL Versions: {}'.format(', '.join([item for item in sorted(banner['ssl']['versions']) if not version.startswith('-')]))) | |
97 | 97 | if 'dhparams' in banner['ssl'] and banner['ssl']['dhparams']: |
98 | 98 | click.echo('\t|-- Diffie-Hellman Parameters:') |
99 | 99 | click.echo('\t\t{:15s}{}\n\t\t{:15s}{}'.format('Bits:', banner['ssl']['dhparams']['bits'], 'Generator:', banner['ssl']['dhparams']['generator'])) |
118 | 118 | HOST_PRINT = { |
119 | 119 | 'pretty': host_print_pretty, |
120 | 120 | 'tsv': host_print_tsv, |
121 | }⏎ | |
121 | } |
21 | 21 | api.org.add_member(user, notify=not silent) |
22 | 22 | except shodan.APIError as e: |
23 | 23 | raise click.ClickException(e.value) |
24 | ||
24 | ||
25 | 25 | click.secho('Successfully added the new member', fg='green') |
26 | 26 | |
27 | 27 | |
38 | 38 | click.secho(organization['name'], fg='cyan') |
39 | 39 | click.secho('Access Level: ', nl=False, dim=True) |
40 | 40 | click.secho(humanize_api_plan(organization['upgrade_type']), fg='magenta') |
41 | ||
41 | ||
42 | 42 | if organization['domains']: |
43 | 43 | click.secho('Authorized Domains: ', nl=False, dim=True) |
44 | 44 | click.echo(', '.join(organization['domains'])) |
45 | ||
45 | ||
46 | 46 | click.echo('') |
47 | 47 | click.secho('Administrators:', dim=True) |
48 | 48 | |
50 | 50 | click.echo(u' > {:30}\t{:30}'.format( |
51 | 51 | click.style(admin['username'], fg='yellow'), |
52 | 52 | admin['email']) |
53 | ) | |
54 | ||
53 | ) | |
54 | ||
55 | 55 | click.echo('') |
56 | 56 | if organization['members']: |
57 | 57 | click.secho('Members:', dim=True) |
75 | 75 | api.org.remove_member(user) |
76 | 76 | except shodan.APIError as e: |
77 | 77 | raise click.ClickException(e.value) |
78 | ||
78 | ||
79 | 79 | click.secho('Successfully removed the member', fg='green') |
14 | 14 | def scan(): |
15 | 15 | """Scan an IP/ netblock using Shodan.""" |
16 | 16 | pass |
17 | ||
18 | ||
19 | @scan.command(name='list') | |
20 | def scan_list(): | |
21 | """Show recently launched scans""" | |
22 | key = get_api_key() | |
23 | ||
24 | # Get the list | |
25 | api = shodan.Shodan(key) | |
26 | try: | |
27 | scans = api.scans() | |
28 | except shodan.APIError as e: | |
29 | raise click.ClickException(e.value) | |
30 | ||
31 | if len(scans) > 0: | |
32 | click.echo(u'# {} Scans Total - Showing 10 most recent scans:'.format(scans['total'])) | |
33 | click.echo(u'# {:20} {:<15} {:<10} {:<15s}'.format('Scan ID', 'Status', 'Size', 'Timestamp')) | |
34 | # click.echo('#' * 65) | |
35 | for scan in scans['matches'][:10]: | |
36 | click.echo( | |
37 | u'{:31} {:<24} {:<10} {:<15s}'.format( | |
38 | click.style(scan['id'], fg='yellow'), | |
39 | click.style(scan['status'], fg='cyan'), | |
40 | scan['size'], | |
41 | scan['created'] | |
42 | ) | |
43 | ) | |
44 | else: | |
45 | click.echo("You haven't yet launched any scans.") | |
17 | 46 | |
18 | 47 | |
19 | 48 | @scan.command(name='internet') |
57 | 86 | |
58 | 87 | if not quiet: |
59 | 88 | click.echo('{0:<40} {1:<20} {2}'.format( |
60 | click.style(helpers.get_ip(banner), fg=COLORIZE_FIELDS['ip_str']), | |
61 | click.style(str(banner['port']), fg=COLORIZE_FIELDS['port']), | |
62 | ';'.join(banner['hostnames']) | |
63 | ) | |
89 | click.style(helpers.get_ip(banner), fg=COLORIZE_FIELDS['ip_str']), | |
90 | click.style(str(banner['port']), fg=COLORIZE_FIELDS['port']), | |
91 | ';'.join(banner['hostnames'])) | |
64 | 92 | ) |
65 | 93 | except shodan.APIError as e: |
66 | 94 | # We stop waiting for results if the scan has been processed by the crawlers and |
107 | 107 | TODO: filter out stuff that doesn't fit |
108 | 108 | TODO: make it possible to use "zoomed" maps |
109 | 109 | """ |
110 | width = (self.corners[3]-self.corners[1]) | |
111 | height = (self.corners[2]-self.corners[0]) | |
110 | width = (self.corners[3] - self.corners[1]) | |
111 | height = (self.corners[2] - self.corners[0]) | |
112 | 112 | |
113 | 113 | # change to 0-180, 0-360 |
114 | abs_lat = -lat+90 | |
115 | abs_lon = lon+180 | |
116 | x = (abs_lon/360.0)*width + self.corners[1] | |
117 | y = (abs_lat/180.0)*height + self.corners[0] | |
114 | abs_lat = -lat + 90 | |
115 | abs_lon = lon + 180 | |
116 | x = (abs_lon / 360.0) * width + self.corners[1] | |
117 | y = (abs_lat / 180.0) * height + self.corners[0] | |
118 | 118 | return int(x), int(y) |
119 | 119 | |
120 | 120 | def set_data(self, data): |
154 | 154 | self.window.addstr(0, 0, self.map) |
155 | 155 | |
156 | 156 | # FIXME: position to be defined in map config? |
157 | row = self.corners[2]-6 | |
157 | row = self.corners[2] - 6 | |
158 | 158 | items_to_show = 5 |
159 | 159 | for lat, lon, char, desc, attrs, color in self.data: |
160 | 160 | # to make this work almost everywhere. see http://docs.python.org/2/library/curses.html |
176 | 176 | self.window.addstr(row, 1, det_show, attrs) |
177 | 177 | row += 1 |
178 | 178 | items_to_show -= 1 |
179 | except StandardError: | |
179 | except Exception: | |
180 | 180 | # FIXME: check window size before addstr() |
181 | 181 | break |
182 | 182 | self.window.overwrite(target) |
256 | 256 | api = Shodan(get_api_key()) |
257 | 257 | return launch_map(api) |
258 | 258 | |
259 | ||
259 | 260 | if __name__ == '__main__': |
260 | 261 | import sys |
261 | 262 | sys.exit(main()) |
175 | 175 | :param key: The Shodan API key. |
176 | 176 | :type key: str |
177 | 177 | :param proxies: A proxies array for the requests library, e.g. {'https': 'your proxy'} |
178 | :type key: dict | |
178 | :type proxies: dict | |
179 | 179 | """ |
180 | 180 | self.api_key = key |
181 | 181 | self.base_url = 'https://api.shodan.io' |
346 | 346 | |
347 | 347 | return self._request('/shodan/scan', params, method='post') |
348 | 348 | |
349 | def scans(self, page=1): | |
350 | """Get a list of scans submitted | |
351 | ||
352 | :param page: Page through the list of scans 100 results at a time | |
353 | :type page: int | |
354 | """ | |
355 | return self._request('/shodan/scans', { | |
356 | 'page': page, | |
357 | }) | |
358 | ||
349 | 359 | def scan_internet(self, port, protocol): |
350 | 360 | """Scan a network using Shodan |
351 | 361 | |
437 | 447 | try: |
438 | 448 | yield banner |
439 | 449 | except GeneratorExit: |
440 | return # exit out of the function | |
450 | return # exit out of the function | |
441 | 451 | page += 1 |
442 | 452 | tries = 0 |
443 | 453 | except Exception: |
446 | 456 | break |
447 | 457 | |
448 | 458 | tries += 1 |
449 | time.sleep(1.0) # wait 1 second if the search errored out for some reason | |
459 | time.sleep(1.0) # wait 1 second if the search errored out for some reason | |
450 | 460 | |
451 | 461 | def search_tokens(self, query): |
452 | 462 | """Returns information about the search query itself (filters used etc.) |
506 | 516 | def queries_tags(self, size=10): |
507 | 517 | """Search the directory of saved search queries in Shodan. |
508 | 518 | |
509 | :param query: The number of tags to return | |
510 | :type page: int | |
519 | :param size: The number of tags to return | |
520 | :type size: int | |
511 | 521 | |
512 | 522 | :returns: A list of tags. |
513 | 523 | """ |
517 | 527 | return self._request('/shodan/query/tags', args) |
518 | 528 | |
519 | 529 | def create_alert(self, name, ip, expires=0): |
520 | """Search the directory of saved search queries in Shodan. | |
521 | ||
522 | :param query: The number of tags to return | |
523 | :type page: int | |
524 | ||
525 | :returns: A list of tags. | |
530 | """Create a network alert/ private firehose for the specified IP range(s) | |
531 | ||
532 | :param name: Name of the alert | |
533 | :type name: str | |
534 | :param ip: Network range(s) to monitor | |
535 | :type ip: str OR list of str | |
536 | ||
537 | :returns: A dict describing the alert | |
526 | 538 | """ |
527 | 539 | data = { |
528 | 540 | 'name': name, |
546 | 558 | |
547 | 559 | response = api_request(self.api_key, func, params={ |
548 | 560 | 'include_expired': include_expired, |
549 | }, | |
550 | proxies=self._session.proxies) | |
561 | }, proxies=self._session.proxies) | |
551 | 562 | |
552 | 563 | return response |
553 | 564 | |
560 | 571 | |
561 | 572 | return response |
562 | 573 | |
574 | def alert_triggers(self): | |
575 | """Return a list of available triggers that can be enabled for alerts. | |
576 | ||
577 | :returns: A list of triggers | |
578 | """ | |
579 | return self._request('/shodan/alert/triggers', {}) | |
580 | ||
581 | def enable_alert_trigger(self, aid, trigger): | |
582 | """Enable the given trigger on the alert.""" | |
583 | return self._request('/shodan/alert/{}/trigger/{}'.format(aid, trigger), {}, method='put') | |
584 | ||
585 | def disable_alert_trigger(self, aid, trigger): | |
586 | """Disable the given trigger on the alert.""" | |
587 | return self._request('/shodan/alert/{}/trigger/{}'.format(aid, trigger), {}, method='delete') |
1 | 1 | """This exception gets raised whenever a non-200 status code was returned by the Shodan API.""" |
2 | 2 | def __init__(self, value): |
3 | 3 | self.value = value |
4 | ||
4 | ||
5 | 5 | def __str__(self): |
6 | 6 | return self.value |
7 | 7 | |
8 | 8 | |
9 | 9 | class APITimeout(APIError): |
10 | pass | |
11 | ||
10 | pass |
18 | 18 | if isinstance(facet, basestring): |
19 | 19 | facet_str += facet |
20 | 20 | else: |
21 | facet_str += '%s:%s' % (facet[0], facet[1]) | |
21 | facet_str += '{}:{}'.format(facet[0], facet[1]) | |
22 | 22 | facet_str += ',' |
23 | 23 | return facet_str[:-1] |
24 | 24 | |
75 | 75 | # Parse the text into JSON |
76 | 76 | try: |
77 | 77 | data = data.json() |
78 | except: | |
78 | except Exception: | |
79 | 79 | raise APIError('Unable to parse JSON response') |
80 | 80 | |
81 | 81 | # Raise an exception if an error occurred |
112 | 112 | |
113 | 113 | for line in fin: |
114 | 114 | # Ensure the line has been decoded into a string to prevent errors w/ Python3 |
115 | line = line.decode('utf-8') | |
115 | if not isinstance(line, basestring): | |
116 | line = line.decode('utf-8') | |
116 | 117 | |
117 | 118 | # Convert the JSON into a native Python object |
118 | 119 | banner = loads(line) |
119 | 120 | yield banner |
121 | ||
120 | 122 | |
121 | 123 | def get_screenshot(banner): |
122 | 124 | if 'opts' in banner and 'screenshot' in banner['opts']: |
158 | 160 | >>> humanize_bytes(1024*1234*1111,1) |
159 | 161 | '1.3 GB' |
160 | 162 | """ |
161 | ||
162 | 163 | if bytes == 1: |
163 | 164 | return '1 byte' |
164 | 165 | if bytes < 1024: |
165 | 166 | return '%.*f %s' % (precision, bytes, "bytes") |
166 | 167 | |
167 | 168 | suffixes = ['KB', 'MB', 'GB', 'TB', 'PB'] |
168 | multiple = 1024.0 #.0 force float on python 2 | |
169 | multiple = 1024.0 # .0 to force float on python 2 | |
169 | 170 | for suffix in suffixes: |
170 | 171 | bytes /= multiple |
171 | 172 | if bytes < multiple: |
20 | 20 | |
21 | 21 | # The user doesn't want to use a timeout |
22 | 22 | # If the timeout is specified as 0 then we also don't want to have a timeout |
23 | if ( timeout and timeout <= 0 ) or ( timeout == 0 ): | |
23 | if (timeout and timeout <= 0) or (timeout == 0): | |
24 | 24 | timeout = None |
25 | 25 | |
26 | 26 | # If the user requested a timeout then we need to disable heartbeat messages |
42 | 42 | # not specific to Cloudflare. |
43 | 43 | if req.status_code != 524 or timeout >= 0: |
44 | 44 | break |
45 | except Exception as e: | |
45 | except Exception: | |
46 | 46 | raise APIError('Unable to contact the Shodan Streaming API') |
47 | 47 | |
48 | 48 | if req.status_code != 200: |
49 | 49 | try: |
50 | 50 | data = json.loads(req.text) |
51 | 51 | raise APIError(data['error']) |
52 | except APIError as e: | |
52 | except APIError: | |
53 | 53 | raise |
54 | except Exception as e: | |
54 | except Exception: | |
55 | 55 | pass |
56 | 56 | raise APIError('Invalid API key or you do not have access to the Streaming API') |
57 | 57 | if req.encoding is None: |
77 | 77 | try: |
78 | 78 | for line in self._iter_stream(stream, raw): |
79 | 79 | yield line |
80 | except requests.exceptions.ConnectionError as e: | |
80 | except requests.exceptions.ConnectionError: | |
81 | 81 | raise APIError('Stream timed out') |
82 | except ssl.SSLError as e: | |
82 | except ssl.SSLError: | |
83 | 83 | raise APIError('Stream timed out') |
84 | 84 | |
85 | 85 | def asn(self, asn, raw=False, timeout=None): |
122 | 122 | stream = self._create_stream('/shodan/ports/%s' % ','.join([str(port) for port in ports]), timeout=timeout) |
123 | 123 | for line in self._iter_stream(stream, raw): |
124 | 124 | yield line |
125 |
23 | 23 | try: |
24 | 24 | req = requests.get(self.base_url + name, params={'key': self.parent.api_key}, |
25 | 25 | stream=True, proxies=self.proxies) |
26 | except: | |
26 | except Exception: | |
27 | 27 | raise APIError('Unable to contact the Shodan Streaming API') |
28 | 28 | |
29 | 29 | if req.status_code != 200: |
30 | 30 | try: |
31 | 31 | raise APIError(req.json()['error']) |
32 | except: | |
32 | except Exception: | |
33 | 33 | pass |
34 | 34 | raise APIError('Invalid API key or you do not have access to the Streaming API') |
35 | 35 | return req |
64 | 64 | self.api_key = key |
65 | 65 | self.base_url = 'https://api.shodan.io' |
66 | 66 | self.stream = self.Stream(self) |
67 |
8 | 8 | |
9 | 9 | class ShodanTests(unittest.TestCase): |
10 | 10 | |
11 | api = None | |
12 | FACETS = [ | |
13 | 'port', | |
14 | ('domain', 1) | |
15 | ] | |
16 | QUERIES = { | |
17 | 'simple': 'cisco-ios', | |
18 | 'minify': 'apache', | |
19 | 'advanced': 'apache port:443', | |
20 | 'empty': 'asdasdasdasdasdasdasdasdasdhjihjkjk', | |
21 | } | |
11 | api = None | |
12 | FACETS = [ | |
13 | 'port', | |
14 | ('domain', 1) | |
15 | ] | |
16 | QUERIES = { | |
17 | 'simple': 'cisco-ios', | |
18 | 'minify': 'apache', | |
19 | 'advanced': 'apache port:443', | |
20 | 'empty': 'asdasdasdasdasdasdasdasdasdhjihjkjk', | |
21 | } | |
22 | 22 | |
23 | def setUp(self): | |
24 | self.api = shodan.Shodan(open('SHODAN-API-KEY').read().strip()) | |
23 | def setUp(self): | |
24 | self.api = shodan.Shodan(open('SHODAN-API-KEY').read().strip()) | |
25 | 25 | |
26 | def test_search_simple(self): | |
27 | results = self.api.search(self.QUERIES['simple']) | |
26 | def test_search_simple(self): | |
27 | results = self.api.search(self.QUERIES['simple']) | |
28 | 28 | |
29 | # Make sure the properties exist | |
30 | self.assertIn('matches', results) | |
31 | self.assertIn('total', results) | |
29 | # Make sure the properties exist | |
30 | self.assertIn('matches', results) | |
31 | self.assertIn('total', results) | |
32 | 32 | |
33 | # Make sure no error occurred | |
34 | self.assertNotIn('error', results) | |
33 | # Make sure no error occurred | |
34 | self.assertNotIn('error', results) | |
35 | 35 | |
36 | # Make sure some values were returned | |
37 | self.assertTrue(results['matches']) | |
38 | self.assertTrue(results['total']) | |
36 | # Make sure some values were returned | |
37 | self.assertTrue(results['matches']) | |
38 | self.assertTrue(results['total']) | |
39 | 39 | |
40 | # A regular search shouldn't have the optional info | |
41 | self.assertNotIn('opts', results['matches'][0]) | |
40 | # A regular search shouldn't have the optional info | |
41 | self.assertNotIn('opts', results['matches'][0]) | |
42 | 42 | |
43 | def test_search_empty(self): | |
44 | results = self.api.search(self.QUERIES['empty']) | |
45 | self.assertTrue(len(results['matches']) == 0) | |
46 | self.assertEqual(results['total'], 0) | |
43 | def test_search_empty(self): | |
44 | results = self.api.search(self.QUERIES['empty']) | |
45 | self.assertTrue(len(results['matches']) == 0) | |
46 | self.assertEqual(results['total'], 0) | |
47 | 47 | |
48 | def test_search_facets(self): | |
49 | results = self.api.search(self.QUERIES['simple'], facets=self.FACETS) | |
48 | def test_search_facets(self): | |
49 | results = self.api.search(self.QUERIES['simple'], facets=self.FACETS) | |
50 | 50 | |
51 | self.assertTrue(results['facets']['port']) | |
52 | self.assertEqual(len(results['facets']['domain']), 1) | |
51 | self.assertTrue(results['facets']['port']) | |
52 | self.assertEqual(len(results['facets']['domain']), 1) | |
53 | 53 | |
54 | def test_count_simple(self): | |
55 | results = self.api.count(self.QUERIES['simple']) | |
54 | def test_count_simple(self): | |
55 | results = self.api.count(self.QUERIES['simple']) | |
56 | 56 | |
57 | # Make sure the properties exist | |
58 | self.assertIn('matches', results) | |
59 | self.assertIn('total', results) | |
57 | # Make sure the properties exist | |
58 | self.assertIn('matches', results) | |
59 | self.assertIn('total', results) | |
60 | 60 | |
61 | # Make sure no error occurred | |
62 | self.assertNotIn('error', results) | |
61 | # Make sure no error occurred | |
62 | self.assertNotIn('error', results) | |
63 | 63 | |
64 | # Make sure no values were returned | |
65 | self.assertFalse(results['matches']) | |
66 | self.assertTrue(results['total']) | |
64 | # Make sure no values were returned | |
65 | self.assertFalse(results['matches']) | |
66 | self.assertTrue(results['total']) | |
67 | 67 | |
68 | def test_count_facets(self): | |
69 | results = self.api.count(self.QUERIES['simple'], facets=self.FACETS) | |
68 | def test_count_facets(self): | |
69 | results = self.api.count(self.QUERIES['simple'], facets=self.FACETS) | |
70 | 70 | |
71 | self.assertTrue(results['facets']['port']) | |
72 | self.assertEqual(len(results['facets']['domain']), 1) | |
71 | self.assertTrue(results['facets']['port']) | |
72 | self.assertEqual(len(results['facets']['domain']), 1) | |
73 | 73 | |
74 | def test_host_details(self): | |
75 | host = self.api.host('147.228.101.7') | |
74 | def test_host_details(self): | |
75 | host = self.api.host('147.228.101.7') | |
76 | 76 | |
77 | self.assertEqual('147.228.101.7', host['ip_str']) | |
78 | self.assertFalse(isinstance(host['ip'], basestring)) | |
77 | self.assertEqual('147.228.101.7', host['ip_str']) | |
78 | self.assertFalse(isinstance(host['ip'], basestring)) | |
79 | 79 | |
80 | def test_search_minify(self): | |
81 | results = self.api.search(self.QUERIES['minify'], minify=False) | |
82 | self.assertIn('opts', results['matches'][0]) | |
80 | def test_search_minify(self): | |
81 | results = self.api.search(self.QUERIES['minify'], minify=False) | |
82 | self.assertIn('opts', results['matches'][0]) | |
83 | 83 | |
84 | def test_exploits_search(self): | |
85 | results = self.api.exploits.search('apache') | |
86 | self.assertIn('matches', results) | |
87 | self.assertIn('total', results) | |
88 | self.assertTrue(results['matches']) | |
84 | def test_exploits_search(self): | |
85 | results = self.api.exploits.search('apache') | |
86 | self.assertIn('matches', results) | |
87 | self.assertIn('total', results) | |
88 | self.assertTrue(results['matches']) | |
89 | 89 | |
90 | def test_exploits_search_paging(self): | |
91 | results = self.api.exploits.search('apache', page=1) | |
92 | match1 = results['matches'][0] | |
93 | results = self.api.exploits.search('apache', page=2) | |
94 | match2 = results['matches'][0] | |
90 | def test_exploits_search_paging(self): | |
91 | results = self.api.exploits.search('apache', page=1) | |
92 | match1 = results['matches'][0] | |
93 | results = self.api.exploits.search('apache', page=2) | |
94 | match2 = results['matches'][0] | |
95 | 95 | |
96 | self.assertNotEqual(match1['_id'], match2['_id']) | |
96 | self.assertNotEqual(match1['_id'], match2['_id']) | |
97 | 97 | |
98 | def test_exploits_search_facets(self): | |
99 | results = self.api.exploits.search('apache', facets=['source', ('author', 1)]) | |
100 | self.assertIn('facets', results) | |
101 | self.assertTrue(results['facets']['source']) | |
102 | self.assertTrue(len(results['facets']['author']) == 1) | |
98 | def test_exploits_search_facets(self): | |
99 | results = self.api.exploits.search('apache', facets=['source', ('author', 1)]) | |
100 | self.assertIn('facets', results) | |
101 | self.assertTrue(results['facets']['source']) | |
102 | self.assertTrue(len(results['facets']['author']) == 1) | |
103 | 103 | |
104 | def test_exploits_count(self): | |
105 | results = self.api.exploits.count('apache') | |
106 | self.assertIn('matches', results) | |
107 | self.assertIn('total', results) | |
108 | self.assertTrue(len(results['matches']) == 0) | |
104 | def test_exploits_count(self): | |
105 | results = self.api.exploits.count('apache') | |
106 | self.assertIn('matches', results) | |
107 | self.assertIn('total', results) | |
108 | self.assertTrue(len(results['matches']) == 0) | |
109 | 109 | |
110 | def test_exploits_count_facets(self): | |
111 | results = self.api.exploits.count('apache', facets=['source', ('author', 1)]) | |
112 | self.assertEqual(len(results['matches']), 0) | |
113 | self.assertIn('facets', results) | |
114 | self.assertTrue(results['facets']['source']) | |
115 | self.assertTrue(len(results['facets']['author']) == 1) | |
110 | def test_exploits_count_facets(self): | |
111 | results = self.api.exploits.count('apache', facets=['source', ('author', 1)]) | |
112 | self.assertEqual(len(results['matches']), 0) | |
113 | self.assertIn('facets', results) | |
114 | self.assertTrue(results['facets']['source']) | |
115 | self.assertTrue(len(results['facets']['author']) == 1) | |
116 | 116 | |
117 | # Test error responses | |
118 | def test_invalid_key(self): | |
119 | api = shodan.Shodan('garbage') | |
120 | raised = False | |
121 | try: | |
122 | api.search('something') | |
123 | except shodan.APIError as e: | |
124 | raised = True | |
117 | # Test error responses | |
118 | def test_invalid_key(self): | |
119 | api = shodan.Shodan('garbage') | |
120 | raised = False | |
121 | try: | |
122 | api.search('something') | |
123 | except shodan.APIError: | |
124 | raised = True | |
125 | 125 | |
126 | self.assertTrue(raised) | |
126 | self.assertTrue(raised) | |
127 | 127 | |
128 | def test_invalid_host_ip(self): | |
129 | raised = False | |
130 | try: | |
131 | host = self.api.host('test') | |
132 | except shodan.APIError as e: | |
133 | raised = True | |
128 | def test_invalid_host_ip(self): | |
129 | raised = False | |
130 | try: | |
131 | self.api.host('test') | |
132 | except shodan.APIError: | |
133 | raised = True | |
134 | 134 | |
135 | self.assertTrue(raised) | |
135 | self.assertTrue(raised) | |
136 | 136 | |
137 | def test_search_empty_query(self): | |
138 | raised = False | |
139 | try: | |
140 | self.api.search('') | |
141 | except shodan.APIError as e: | |
142 | raised = True | |
143 | self.assertTrue(raised) | |
137 | def test_search_empty_query(self): | |
138 | raised = False | |
139 | try: | |
140 | self.api.search('') | |
141 | except shodan.APIError: | |
142 | raised = True | |
143 | self.assertTrue(raised) | |
144 | 144 | |
145 | def test_search_advanced_query(self): | |
146 | # The free API plan can't use filters | |
147 | raised = False | |
148 | try: | |
149 | self.api.search(self.QUERIES['advanced']) | |
150 | except shodan.APIError as e: | |
151 | raised = True | |
152 | self.assertTrue(raised) | |
145 | def test_search_advanced_query(self): | |
146 | # The free API plan can't use filters | |
147 | raised = False | |
148 | try: | |
149 | self.api.search(self.QUERIES['advanced']) | |
150 | except shodan.APIError: | |
151 | raised = True | |
152 | self.assertTrue(raised) | |
153 | 153 | |
154 | 154 | |
155 | 155 | if __name__ == '__main__': |