/[marcuscom]/meraki/meraki_api/__init__.py
ViewVC logotype

Contents of /meraki/meraki_api/__init__.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 20550 - (show annotations) (download) (as text)
Mon May 7 21:11:26 2018 UTC (23 months ago) by jclarke
File MIME type: text/x-python
File size: 19763 byte(s)
Add the initial code drop for my Meraki setup system.

1 #
2 # Copyright (c) 2018 Joe Clarke <jclarke@cisco.com>
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions
6 # are met:
7 # 1. Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # 2. Redistributions in binary form must reproduce the above copyright
10 # notice, this list of conditions and the following disclaimer in the
11 # documentation and/or other materials provided with the distribution.
12 #
13 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23 # SUCH DAMAGE.
24
25 import requests
26 import logging
27
28
29 class Meraki(object):
30 MERAKI_API = 'https://api.meraki.com/api/v0'
31
32 _headers = {
33 'X-Cisco-Meraki-API-Key': None,
34 'Content-type': 'application/json'
35 }
36
37 _logit = False
38 _key = None
39
40 _name = None
41 _id = None
42
43 _class_name = 'Meraki'
44
45 _name_prop = 'name'
46 _id_prop = 'id'
47
48 _api_url_endpoint = ''
49
50 def __init__(self, **kwargs):
51 if 'logit' in kwargs:
52 self._logit = kwargs['logit']
53 if 'key' in kwargs:
54 self._key = kwargs['key']
55 self._headers['X-Cisco-Meraki-API-Key'] = kwargs['key']
56 if 'name' in kwargs:
57 self._name = kwargs['name']
58 if 'id' in kwargs:
59 self._id = str(kwargs['id'])
60
61 self.__dict = {}
62 self._initialized = False
63
64 def set_key(self, key):
65 self._key = key
66 self._headers['X-Cisco-Meraki-API-Key'] = key
67
68 def set_name(self, name):
69 self._name = name
70
71 def _set_id(self, id):
72 self._id = id
73
74 def _check_headers(self):
75 if self._headers['X-Cisco-Meraki-API-Key'] is None:
76 msg = 'Meraki API key is not set!'
77 if self._logit:
78 logging.error(msg)
79 else:
80 print(msg)
81
82 return False
83
84 return True
85
86 def realize(self):
87 return self._check_obj()
88
89 @staticmethod
90 def _get_json_errors(response):
91 res = ''
92 try:
93 jobj = response.json()
94 if 'errors' in jobj:
95 res = ', '.join(jobj['errors'])
96 except:
97 pass
98
99 return res
100
101 def _check_obj(self):
102 if self._initialized:
103 return True
104
105 if self._id is None:
106 msg = '{} id is not set!'.format(self._class_name)
107 if self._logit:
108 logging.error(msg)
109 else:
110 print(msg)
111
112 return False
113
114 return self.__get_current_obj()
115
116 def __get_current_obj(self):
117 if not self._check_headers():
118 return False
119
120 url = self.MERAKI_API + self._api_url_endpoint + '/' + self._id
121 try:
122 response = requests.request('GET', url, headers=self._headers)
123 response.raise_for_status()
124 except Exception as e:
125 msg = 'Error getting {} for {}: {} ({})'.format(
126 self._class_name.lower(), self._id, e, Meraki._get_json_errors(response))
127 if self._logit:
128 logging.error(msg)
129 else:
130 print(msg)
131
132 return False
133
134 jobj = response.json()
135 self.__populate_obj(jobj)
136 self._name = jobj[self._name_prop]
137 self._initialized = True
138 return True
139
140 def __populate_obj(self, jobj):
141 for k, v in jobj.items():
142 self.__dict[k] = v
143
144 def get(self, field):
145 if not self._check_obj():
146 raise Exception('Failed to initialize object!')
147
148 if field in self.__dict:
149 return self.__dict[field]
150 else:
151 raise Exception('Field {} does not exist'.format(field))
152
153 def set(self, field, value):
154 if not self._check_obj():
155 raise Exception('Failed to initialize object!')
156
157 self.__dict[field] = value
158
159 def get_organizations(self):
160 if not self._check_headers():
161 return None
162
163 url = self.MERAKI_API + '/organizations'
164 try:
165 response = requests.request('GET', url, headers=self._headers)
166 response.raise_for_status()
167 except Exception as e:
168 msg = 'Error getting list of organizations: {} ({})'.format(
169 e, Meraki._get_json_errors(response))
170 if self._logit:
171 logging.error(msg)
172 else:
173 print(msg)
174
175 return None
176
177 res = []
178 for org in response.json():
179 org_obj = Organization(
180 key=self._key, name=org[Organization._name_prop], id=org[Organization._id_prop])
181 res.append(org_obj)
182
183 return res
184
185
186 class Organization(Meraki):
187 def __init__(self, **kwargs):
188 super(Organization, self).__init__(**kwargs)
189
190 self._class_name = 'Organization'
191 self._api_url_endpoint = '/organizations'
192 self.ORG_API = self.MERAKI_API + self._api_url_endpoint
193
194 def get_inventory(self):
195 if not self._check_headers():
196 return None
197 if not self._check_obj():
198 return None
199
200 url = self.ORG_API + '/' + self._id + '/inventory'
201
202 try:
203 response = requests.request('GET', url, headers=self._headers)
204 response.raise_for_status()
205 except Exception as e:
206 msg = 'Error getting inventory for organization {}: {} ({})'.format(
207 self._name, e, Meraki._get_json_errors(response))
208 if self._logit:
209 logging.error(msg)
210 else:
211 print(msg)
212
213 return None
214
215 return response.json()
216
217 def get_networks(self):
218 if not self._check_headers():
219 return None
220 if not self._check_obj():
221 return None
222
223 url = self.ORG_API + '/' + self._id + '/networks'
224 try:
225 response = requests.request('GET', url, headers=self._headers)
226 response.raise_for_status()
227 except Exception as e:
228 msg = 'Error getting list of networks for {}: {} ({})'.format(
229 self._name, e, Meraki._get_json_errors(response))
230 if self._logit:
231 logging.error(msg)
232 else:
233 print(msg)
234
235 return None
236
237 res = []
238 for n in response.json():
239 net = Network(
240 key=self._key, name=n[Network._name_prop], id=n[Network._id_prop
241 ])
242 res.append(net)
243
244 return res
245
246 def create_network(self, name, **kwargs):
247 if not self._check_headers():
248 return None
249 if not self._check_obj():
250 return None
251
252 payload = {
253 'name': name
254 }
255
256 for key in ['type', 'tags', 'timezone']:
257 if key in kwargs:
258 if key == 'timezone':
259 payload['timeZone'] = kwargs[key]
260 else:
261 payload[key] = kwargs[key]
262
263 if 'type' not in kwargs:
264 payload['type'] = 'wireless switch appliance phone'
265
266 url = self.ORG_API + '/' + self._id + '/networks'
267 try:
268 response = requests.request(
269 'POST', url, json=payload, headers=self._headers)
270 response.raise_for_status()
271 except Exception as e:
272 msg = 'Error creating new network {} in {}: {} ({})'.format(
273 name, self._name, e, Meraki._get_json_errors(response))
274 if self._logit:
275 logging.error(msg)
276 else:
277 print(msg)
278
279 return None
280
281 jobj = response.json()
282 net_obj = Network(key=self._key, id=jobj['id'], name=jobj['name'])
283
284 return net_obj
285
286
287 class Network(Meraki):
288 def __init__(self, **kwargs):
289 super(Network, self).__init__(**kwargs)
290
291 self._class_name = 'Network'
292 self._api_url_endpoint = '/networks'
293 self.NET_API = self.MERAKI_API + self._api_url_endpoint
294
295 def get_devices(self):
296 if not self._check_headers():
297 return None
298 if not self._check_obj():
299 return None
300
301 url = self.NET_API + '/' + self._id + '/devices'
302 try:
303 response = requests.request('GET', url, headers=self._headers)
304 response.raise_for_status()
305 except Exception as e:
306 msg = 'Error getting device list for network {}: {} ({})'.format(
307 self._name, e, Meraki._get_json_errors(response))
308 if self._logit:
309 logging.error(msg)
310 else:
311 print(msg)
312
313 return None
314
315 res = []
316 for d in response.json():
317 dev = Device(
318 key=self._key, name=d[Device._name_prop], id=d[Device._id_prop])
319 res.append(dev)
320
321 return res
322
323 def claim_device(self, dev):
324 if not self._check_headers():
325 return None
326 if not self._check_obj():
327 return None
328
329 url = self.NET_API + '/' + self._id + '/devices/claim'
330 payload = {
331 'serial': dev._id
332 }
333 try:
334 response = requests.request(
335 'POST', url, json=payload, headers=self._headers)
336 response.raise_for_status()
337 except Exception as e:
338 msg = 'Error claiming device {} for network {}: {} ({})'.format(
339 dev._id, self._name, e, Meraki._get_json_errors(response))
340 if self._logit:
341 logging.error(msg)
342 else:
343 print(msg)
344
345 return None
346
347 new_dev = Device(key=self._key, id=dev._id, net=self)
348 return new_dev
349
350 def create_vlan(self, name, id, subnet, appliance_ip):
351 if not self._check_headers():
352 return None
353 if not self._check_obj():
354 return None
355
356 payload = {
357 'id': id,
358 'name': name,
359 'subnet': subnet,
360 'applianceIp': appliance_ip
361 }
362
363 url = self.NET_API + '/' + self._id + '/vlans'
364 try:
365 response = requests.request(
366 'POST', url, json=payload, headers=self._headers)
367 response.raise_for_status()
368 except Exception as e:
369 msg = 'Error adding VLAN {} (ID: {}) to network {}: {} ({})'.format(
370 name, id, self._name, e, Meraki._get_json_errors(response))
371 if self._logit:
372 logging.error(msg)
373 else:
374 print(msg)
375
376 return None
377
378 jobj = response.json()
379 vlan_obj = Vlan(
380 key=self._key, id=jobj['id'], name=jobj['name'], net=self)
381
382 return vlan_obj
383
384
385 class SSID(Meraki):
386 _id_prop = 'number'
387
388 def __init__(self, **kwargs):
389 super(SSID, self).__init__(**kwargs)
390 if not 'net' in kwargs:
391 raise Exception('Missing mandatory net argument!')
392
393 self._net = kwargs['net']
394
395 self._class_name = 'SSID'
396 self._api_url_endpoint = '/networks/' + \
397 self._net.get(Network._id_prop) + '/ssids'
398 self.SSID_API = self.MERAKI_API + self._api_url_endpoint
399
400 def update_ssid(self, **kwargs):
401 if not self._check_headers():
402 return False
403 if not self._check_obj():
404 return False
405
406 payload = {}
407 for key in ['name', 'enabled', 'auth_mode', 'encryption_mode', 'psk', 'ip_assignment_mode']:
408 if key in kwargs:
409 if key == 'auth_mode':
410 payload['authMode'] = kwargs[key]
411 elif key == 'encryption_mode':
412 payload['encryptionMode'] = kwargs[key]
413 elif key == 'ip_assignment_mode':
414 payload['ipAssignmentMode'] = kwargs[key]
415 else:
416 payload[key] = kwargs[key]
417
418 if len(payload) == 0:
419 return False
420
421 if 'ip_assignment_mode' not in kwargs:
422 payload['ipAssignmentMode'] = 'Bridge mode'
423
424 url = self.SSID_API + '/' + self._id
425 try:
426 response = requests.request(
427 'PUT', url, json=payload, headers=self._headers)
428 response.raise_for_status()
429 except Exception as e:
430 msg = 'Error updating SSID properties for {}: {} ({})'.format(
431 self._id, e, Meraki._get_json_errors(response))
432 if self._logit:
433 logging.error(msg)
434 else:
435 print(msg)
436
437 return False
438
439 jobj = response.json()
440 for k, v in jobj.items():
441 self.set(k, v)
442
443 return True
444
445
446 class Vlan(Meraki):
447 def __init__(self, **kwargs):
448 super(Vlan, self).__init__(**kwargs)
449 if not 'net' in kwargs:
450 raise Exception('Missing mandatory net argument!')
451
452 self._net = kwargs['net']
453
454 self._class_name = 'Vlan'
455 self._api_url_endpoint = '/networks/' + \
456 self._net.get(Network._id_prop) + '/vlans'
457 self.VLAN_API = self.MERAKI_API + self._api_url_endpoint
458
459 def update_vlan(self, **kwargs):
460 if not self._check_headers():
461 return False
462 if not self._check_obj():
463 return False
464
465 payload = {}
466 for key in ['name', 'subnet', 'appliance_ip', 'fixed_ip_assignments', 'reserved_ip_ranges', 'dns_nameservers']:
467 if key in kwargs:
468 if key == 'appliance_ip':
469 payload['applianceIp'] = kwargs[key]
470 elif key == 'fixed_ip_assignments':
471 payload['fixedIpAssignments'] = kwargs[key]
472 elif key == 'reserved_ip_ranges':
473 payload['reservedIpRanges'] = kwargs[key]
474 elif key == 'dns_nameservers':
475 payload['dnsNameservers'] = kwargs[key]
476 else:
477 payload[key] = kwargs[key]
478
479 if len(payload) == 0:
480 return False
481
482 url = self.VLAN_API + '/' + self._id
483 try:
484 response = requests.request(
485 'PUT', url, json=payload, headers=self._headers)
486 response.raise_for_status()
487 except Exception as e:
488 msg = 'Error updating VLAN properties for {}: {} ({})'.format(
489 self._name, e, Meraki._get_json_errors(response))
490 if self._logit:
491 logging.error(msg)
492 else:
493 print(msg)
494
495 return False
496
497 jobj = response.json()
498 for k, v in jobj.items():
499 self.set(k, v)
500
501 return True
502
503
504 class Device(Meraki):
505
506 _id_prop = 'serial'
507
508 def __init__(self, **kwargs):
509 super(Device, self).__init__(**kwargs)
510 if not 'net' in kwargs:
511 raise Exception('Missing mandatory net argument!')
512
513 self._net = kwargs['net']
514
515 self._class_name = 'Device'
516 self._api_url_endpoint = '/networks/' + \
517 self._net.get(Network._id_prop) + '/devices'
518 self.DEV_API = self.MERAKI_API + self._api_url_endpoint
519
520 def remove_device(self):
521 if not self._check_headers():
522 return False
523 if not self._check_obj():
524 return False
525
526 url = self.DEV_API + '/' + self._id + '/remove'
527 try:
528 response = requests.request('POST', url, headers=self._headers)
529 response.raise_for_status()
530 except Exception as e:
531 msg = 'Failed to remove device {} for network {}: {} ({})'.format(
532 self._name, self._net.get(Network._name_prop), e, Meraki._get_json_errors(response))
533 if self._logit:
534 logging.error(msg)
535 else:
536 print(msg)
537
538 return False
539
540 return True
541
542 def update_device(self, **kwargs):
543 if not self._check_headers():
544 return False
545 if not self._check_obj():
546 return False
547
548 payload = {}
549 for k, v in kwargs.items():
550 if k in ['name', 'tags', 'lat', 'lng', 'address', 'moveMapMarker']:
551 payload[k] = v
552 else:
553 msg = 'Property {} is not valid for a device'.format(k)
554 if self._logit:
555 logging.warning(msg)
556 else:
557 print(msg)
558
559 if len(payload) == 0:
560 return False
561
562 url = self.DEV_API + '/' + self._id
563 try:
564 response = requests.request(
565 'PUT', url, json=payload, headers=self._headers)
566 response.raise_for_status()
567 except Exception as e:
568 msg = 'Error updating device properties for {}: {} ({})'.format(
569 self._name, e, Meraki._get_json_errors(response))
570 if self._logit:
571 logging.error(msg)
572 else:
573 print(msg)
574
575 return False
576
577 jobj = response.json()
578 for k, v in jobj.items():
579 if k == 'moveMapMarker':
580 continue
581 self.set(k, v)
582
583 return True
584
585
586 class SwitchPort(Meraki):
587 _id_prop = 'number'
588
589 def __init__(self, **kwargs):
590 super(SwitchPort, self).__init__(**kwargs)
591 if not 'dev' in kwargs:
592 raise Exception('Missing mandatory dev argument!')
593
594 self._dev = kwargs['dev']
595
596 self._class_name = 'SwitchPort'
597 self._api_url_endpoint = '/devices/' + \
598 self._net.get(Network._id_prop) + '/switchPorts'
599 self.SWP_API = self.MERAKI_API + self._api_url_endpoint
600
601 def update_switchport(self, **kwargs):
602 if not self._check_headers():
603 return False
604 if not self._check_obj():
605 return False
606
607 payload = {}
608 for key in ['name', 'tags', 'enabled', 'type', 'vlan', 'voice_vlan', 'allowed_vlans', 'poe_enabled']:
609 if key in kwargs:
610 if key == 'voice_vlan':
611 payload['voiceVlan'] = kwargs[key]
612 elif key == 'allowed_vlans':
613 payload['allowedVlans'] = kwargs[key]
614 elif key == 'poe_enabled':
615 payload['poeEnabled'] = kwargs[key]
616 else:
617 payload[key] = kwargs[key]
618
619 if len(payload) == 0:
620 return False
621
622 url = self.SWP_API + '/' + self._id
623 try:
624 response = requests.request(
625 'PUT', url, json=payload, headers=self._headers)
626 response.raise_for_status()
627 except Exception as e:
628 msg = 'Error updating switchport properties for {} on device {}: {} ({})'.format(
629 self._id, self._dev.get(Device._name_prop), e, Meraki._get_json_errors(response))
630 if self._logit:
631 logging.error(msg)
632 else:
633 print(msg)
634
635 return False
636
637 jobj = response.json()
638 for k, v in jobj.items():
639 self.set(k, v)
640
641 return True

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.27