/[marcuscom]/meraki/meraki_api/__init__.py
ViewVC logotype

Contents of /meraki/meraki_api/__init__.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 20555 - (show annotations) (download) (as text)
Thu May 17 11:25:57 2018 UTC (22 months, 3 weeks ago) by jclarke
File MIME type: text/x-python
File size: 19079 byte(s)
Make _check_headers private.

This is really only needed when first initializing an object.

1 #
2 # Copyright (c) 2018 Joe Clarke <jclarke@cisco.com>
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions
6 # are met:
7 # 1. Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # 2. Redistributions in binary form must reproduce the above copyright
10 # notice, this list of conditions and the following disclaimer in the
11 # documentation and/or other materials provided with the distribution.
12 #
13 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23 # SUCH DAMAGE.
24
25 import requests
26 import logging
27
28
29 class Meraki(object):
30 MERAKI_API = 'https://api.meraki.com/api/v0'
31
32 _headers = {
33 'X-Cisco-Meraki-API-Key': None,
34 'Content-type': 'application/json'
35 }
36
37 _logit = False
38 _key = None
39
40 _name = None
41 _id = None
42
43 _class_name = 'Meraki'
44
45 _name_prop = 'name'
46 _id_prop = 'id'
47
48 _api_url_endpoint = ''
49
50 def __init__(self, **kwargs):
51 if 'logit' in kwargs:
52 self._logit = kwargs['logit']
53 if 'key' in kwargs:
54 self._key = kwargs['key']
55 self._headers['X-Cisco-Meraki-API-Key'] = kwargs['key']
56 if 'name' in kwargs:
57 self._name = kwargs['name']
58 if 'id' in kwargs:
59 self._id = str(kwargs['id'])
60
61 self.__dict = {}
62 self._initialized = False
63
64 def set_key(self, key):
65 self._key = key
66 self._headers['X-Cisco-Meraki-API-Key'] = key
67
68 def set_name(self, name):
69 self._name = name
70
71 def _set_id(self, id):
72 self._id = id
73
74 def __check_headers(self):
75 if self._headers['X-Cisco-Meraki-API-Key'] is None:
76 msg = 'Meraki API key is not set!'
77 if self._logit:
78 logging.error(msg)
79 else:
80 print(msg)
81
82 return False
83
84 return True
85
86 def realize(self):
87 return self._check_obj()
88
89 @staticmethod
90 def _get_json_errors(response):
91 res = ''
92 try:
93 jobj = response.json()
94 if 'errors' in jobj:
95 res = ', '.join(jobj['errors'])
96 except:
97 pass
98
99 return res
100
101 def _check_obj(self):
102 if self._initialized:
103 return True
104
105 if self._id is None:
106 msg = '{} id is not set!'.format(self._class_name)
107 if self._logit:
108 logging.error(msg)
109 else:
110 print(msg)
111
112 return False
113
114 return self.__get_current_obj()
115
116 def __get_current_obj(self):
117 if not self.__check_headers():
118 return False
119
120 url = self.MERAKI_API + self._api_url_endpoint + '/' + self._id
121 try:
122 response = requests.request('GET', url, headers=self._headers)
123 response.raise_for_status()
124 except Exception as e:
125 msg = 'Error getting {} for {}: {} ({})'.format(
126 self._class_name.lower(), self._id, e, Meraki._get_json_errors(response))
127 if self._logit:
128 logging.error(msg)
129 else:
130 print(msg)
131
132 return False
133
134 jobj = response.json()
135 self.__populate_obj(jobj)
136 self._name = jobj[self._name_prop]
137 self._initialized = True
138 return True
139
140 def __populate_obj(self, jobj):
141 for k, v in jobj.items():
142 self.__dict[k] = v
143
144 def get(self, field):
145 if not self._check_obj():
146 raise Exception('Failed to initialize object!')
147
148 if field in self.__dict:
149 return self.__dict[field]
150 else:
151 raise Exception('Field {} does not exist'.format(field))
152
153 def set(self, field, value):
154 if not self._check_obj():
155 raise Exception('Failed to initialize object!')
156
157 self.__dict[field] = value
158
159 def get_organizations(self):
160 if not self.__check_headers():
161 return None
162
163 url = self.MERAKI_API + '/organizations'
164 try:
165 response = requests.request('GET', url, headers=self._headers)
166 response.raise_for_status()
167 except Exception as e:
168 msg = 'Error getting list of organizations: {} ({})'.format(
169 e, Meraki._get_json_errors(response))
170 if self._logit:
171 logging.error(msg)
172 else:
173 print(msg)
174
175 return None
176
177 res = []
178 for org in response.json():
179 org_obj = Organization(
180 key=self._key, name=org[Organization._name_prop], id=org[Organization._id_prop])
181 res.append(org_obj)
182
183 return res
184
185
186 class Organization(Meraki):
187 def __init__(self, **kwargs):
188 super(Organization, self).__init__(**kwargs)
189
190 self._class_name = 'Organization'
191 self._api_url_endpoint = '/organizations'
192 self.ORG_API = self.MERAKI_API + self._api_url_endpoint
193
194 def get_inventory(self):
195 if not self._check_obj():
196 return None
197
198 url = self.ORG_API + '/' + self._id + '/inventory'
199
200 try:
201 response = requests.request('GET', url, headers=self._headers)
202 response.raise_for_status()
203 except Exception as e:
204 msg = 'Error getting inventory for organization {}: {} ({})'.format(
205 self._name, e, Meraki._get_json_errors(response))
206 if self._logit:
207 logging.error(msg)
208 else:
209 print(msg)
210
211 return None
212
213 return response.json()
214
215 def get_networks(self):
216 if not self._check_obj():
217 return None
218
219 url = self.ORG_API + '/' + self._id + '/networks'
220 try:
221 response = requests.request('GET', url, headers=self._headers)
222 response.raise_for_status()
223 except Exception as e:
224 msg = 'Error getting list of networks for {}: {} ({})'.format(
225 self._name, e, Meraki._get_json_errors(response))
226 if self._logit:
227 logging.error(msg)
228 else:
229 print(msg)
230
231 return None
232
233 res = []
234 for n in response.json():
235 net = Network(
236 key=self._key, name=n[Network._name_prop], id=n[Network._id_prop
237 ])
238 res.append(net)
239
240 return res
241
242 def create_network(self, name, **kwargs):
243 if not self._check_obj():
244 return None
245
246 payload = {
247 'name': name
248 }
249
250 for key in ['type', 'tags', 'timezone']:
251 if key in kwargs:
252 if key == 'timezone':
253 payload['timeZone'] = kwargs[key]
254 else:
255 payload[key] = kwargs[key]
256
257 if 'type' not in kwargs:
258 payload['type'] = 'wireless switch appliance phone'
259
260 url = self.ORG_API + '/' + self._id + '/networks'
261 try:
262 response = requests.request(
263 'POST', url, json=payload, headers=self._headers)
264 response.raise_for_status()
265 except Exception as e:
266 msg = 'Error creating new network {} in {}: {} ({})'.format(
267 name, self._name, e, Meraki._get_json_errors(response))
268 if self._logit:
269 logging.error(msg)
270 else:
271 print(msg)
272
273 return None
274
275 jobj = response.json()
276 net_obj = Network(key=self._key, id=jobj['id'], name=jobj['name'])
277
278 return net_obj
279
280
281 class Network(Meraki):
282 def __init__(self, **kwargs):
283 super(Network, self).__init__(**kwargs)
284
285 self._class_name = 'Network'
286 self._api_url_endpoint = '/networks'
287 self.NET_API = self.MERAKI_API + self._api_url_endpoint
288
289 def get_devices(self):
290 if not self._check_obj():
291 return None
292
293 url = self.NET_API + '/' + self._id + '/devices'
294 try:
295 response = requests.request('GET', url, headers=self._headers)
296 response.raise_for_status()
297 except Exception as e:
298 msg = 'Error getting device list for network {}: {} ({})'.format(
299 self._name, e, Meraki._get_json_errors(response))
300 if self._logit:
301 logging.error(msg)
302 else:
303 print(msg)
304
305 return None
306
307 res = []
308 for d in response.json():
309 dev = Device(
310 key=self._key, name=d[Device._name_prop], id=d[Device._id_prop])
311 res.append(dev)
312
313 return res
314
315 def claim_device(self, dev):
316 if not self._check_obj():
317 return None
318
319 url = self.NET_API + '/' + self._id + '/devices/claim'
320 payload = {
321 'serial': dev._id
322 }
323 try:
324 response = requests.request(
325 'POST', url, json=payload, headers=self._headers)
326 response.raise_for_status()
327 except Exception as e:
328 msg = 'Error claiming device {} for network {}: {} ({})'.format(
329 dev._id, self._name, e, Meraki._get_json_errors(response))
330 if self._logit:
331 logging.error(msg)
332 else:
333 print(msg)
334
335 return None
336
337 new_dev = Device(key=self._key, id=dev._id, net=self)
338 return new_dev
339
340 def create_vlan(self, name, id, subnet, appliance_ip):
341 if not self._check_obj():
342 return None
343
344 payload = {
345 'id': id,
346 'name': name,
347 'subnet': subnet,
348 'applianceIp': appliance_ip
349 }
350
351 url = self.NET_API + '/' + self._id + '/vlans'
352 try:
353 response = requests.request(
354 'POST', url, json=payload, headers=self._headers)
355 response.raise_for_status()
356 except Exception as e:
357 msg = 'Error adding VLAN {} (ID: {}) to network {}: {} ({})'.format(
358 name, id, self._name, e, Meraki._get_json_errors(response))
359 if self._logit:
360 logging.error(msg)
361 else:
362 print(msg)
363
364 return None
365
366 jobj = response.json()
367 vlan_obj = Vlan(
368 key=self._key, id=jobj['id'], name=jobj['name'], net=self)
369
370 return vlan_obj
371
372
373 class SSID(Meraki):
374 _id_prop = 'number'
375
376 def __init__(self, **kwargs):
377 super(SSID, self).__init__(**kwargs)
378 if not 'net' in kwargs:
379 raise TypeError('Missing mandatory net argument!')
380
381 self._net = kwargs['net']
382
383 self._class_name = 'SSID'
384 self._api_url_endpoint = '/networks/' + \
385 self._net.get(Network._id_prop) + '/ssids'
386 self.SSID_API = self.MERAKI_API + self._api_url_endpoint
387
388 def update_ssid(self, **kwargs):
389 if not self._check_obj():
390 return False
391
392 payload = {}
393 for key in ['name', 'enabled', 'auth_mode', 'encryption_mode', 'psk', 'ip_assignment_mode']:
394 if key in kwargs:
395 if key == 'auth_mode':
396 payload['authMode'] = kwargs[key]
397 elif key == 'encryption_mode':
398 payload['encryptionMode'] = kwargs[key]
399 elif key == 'ip_assignment_mode':
400 payload['ipAssignmentMode'] = kwargs[key]
401 else:
402 payload[key] = kwargs[key]
403
404 if len(payload) == 0:
405 return False
406
407 if 'ip_assignment_mode' not in kwargs:
408 payload['ipAssignmentMode'] = 'Bridge mode'
409
410 url = self.SSID_API + '/' + self._id
411 try:
412 response = requests.request(
413 'PUT', url, json=payload, headers=self._headers)
414 response.raise_for_status()
415 except Exception as e:
416 msg = 'Error updating SSID properties for {}: {} ({})'.format(
417 self._id, e, Meraki._get_json_errors(response))
418 if self._logit:
419 logging.error(msg)
420 else:
421 print(msg)
422
423 return False
424
425 jobj = response.json()
426 for k, v in jobj.items():
427 self.set(k, v)
428
429 return True
430
431
432 class Vlan(Meraki):
433 def __init__(self, **kwargs):
434 super(Vlan, self).__init__(**kwargs)
435 if not 'net' in kwargs:
436 raise TypeError('Missing mandatory net argument!')
437
438 self._net = kwargs['net']
439
440 self._class_name = 'Vlan'
441 self._api_url_endpoint = '/networks/' + \
442 self._net.get(Network._id_prop) + '/vlans'
443 self.VLAN_API = self.MERAKI_API + self._api_url_endpoint
444
445 def update_vlan(self, **kwargs):
446 if not self._check_obj():
447 return False
448
449 payload = {}
450 for key in ['name', 'subnet', 'appliance_ip', 'fixed_ip_assignments', 'reserved_ip_ranges', 'dns_nameservers']:
451 if key in kwargs:
452 if key == 'appliance_ip':
453 payload['applianceIp'] = kwargs[key]
454 elif key == 'fixed_ip_assignments':
455 payload['fixedIpAssignments'] = kwargs[key]
456 elif key == 'reserved_ip_ranges':
457 payload['reservedIpRanges'] = kwargs[key]
458 elif key == 'dns_nameservers':
459 payload['dnsNameservers'] = kwargs[key]
460 else:
461 payload[key] = kwargs[key]
462
463 if len(payload) == 0:
464 return False
465
466 url = self.VLAN_API + '/' + self._id
467 try:
468 response = requests.request(
469 'PUT', url, json=payload, headers=self._headers)
470 response.raise_for_status()
471 except Exception as e:
472 msg = 'Error updating VLAN properties for {}: {} ({})'.format(
473 self._name, e, Meraki._get_json_errors(response))
474 if self._logit:
475 logging.error(msg)
476 else:
477 print(msg)
478
479 return False
480
481 jobj = response.json()
482 for k, v in jobj.items():
483 self.set(k, v)
484
485 return True
486
487
488 class Device(Meraki):
489
490 _id_prop = 'serial'
491
492 def __init__(self, **kwargs):
493 super(Device, self).__init__(**kwargs)
494 if not 'net' in kwargs:
495 raise TypeError('Missing mandatory net argument!')
496
497 self._net = kwargs['net']
498
499 self._class_name = 'Device'
500 self._api_url_endpoint = '/networks/' + \
501 self._net.get(Network._id_prop) + '/devices'
502 self.DEV_API = self.MERAKI_API + self._api_url_endpoint
503
504 def remove_device(self):
505 if not self._check_obj():
506 return False
507
508 url = self.DEV_API + '/' + self._id + '/remove'
509 try:
510 response = requests.request('POST', url, headers=self._headers)
511 response.raise_for_status()
512 except Exception as e:
513 msg = 'Failed to remove device {} for network {}: {} ({})'.format(
514 self._name, self._net.get(Network._name_prop), e, Meraki._get_json_errors(response))
515 if self._logit:
516 logging.error(msg)
517 else:
518 print(msg)
519
520 return False
521
522 return True
523
524 def update_device(self, **kwargs):
525 if not self._check_obj():
526 return False
527
528 payload = {}
529 for k, v in kwargs.items():
530 if k in ['name', 'tags', 'lat', 'lng', 'address', 'moveMapMarker']:
531 payload[k] = v
532 else:
533 msg = 'Property {} is not valid for a device'.format(k)
534 if self._logit:
535 logging.warning(msg)
536 else:
537 print(msg)
538
539 if len(payload) == 0:
540 return False
541
542 url = self.DEV_API + '/' + self._id
543 try:
544 response = requests.request(
545 'PUT', url, json=payload, headers=self._headers)
546 response.raise_for_status()
547 except Exception as e:
548 msg = 'Error updating device properties for {}: {} ({})'.format(
549 self._name, e, Meraki._get_json_errors(response))
550 if self._logit:
551 logging.error(msg)
552 else:
553 print(msg)
554
555 return False
556
557 jobj = response.json()
558 for k, v in jobj.items():
559 if k == 'moveMapMarker':
560 continue
561 self.set(k, v)
562
563 return True
564
565
566 class SwitchPort(Meraki):
567 _id_prop = 'number'
568
569 def __init__(self, **kwargs):
570 super(SwitchPort, self).__init__(**kwargs)
571 if not 'dev' in kwargs:
572 raise TypeError('Missing mandatory dev argument!')
573
574 self._dev = kwargs['dev']
575
576 self._class_name = 'SwitchPort'
577 self._api_url_endpoint = '/devices/' + \
578 self._net.get(Network._id_prop) + '/switchPorts'
579 self.SWP_API = self.MERAKI_API + self._api_url_endpoint
580
581 def update_switchport(self, **kwargs):
582 if not self._check_obj():
583 return False
584
585 payload = {}
586 for key in ['name', 'tags', 'enabled', 'type', 'vlan', 'voice_vlan', 'allowed_vlans', 'poe_enabled']:
587 if key in kwargs:
588 if key == 'voice_vlan':
589 payload['voiceVlan'] = kwargs[key]
590 elif key == 'allowed_vlans':
591 payload['allowedVlans'] = kwargs[key]
592 elif key == 'poe_enabled':
593 payload['poeEnabled'] = kwargs[key]
594 else:
595 payload[key] = kwargs[key]
596
597 if len(payload) == 0:
598 return False
599
600 url = self.SWP_API + '/' + self._id
601 try:
602 response = requests.request(
603 'PUT', url, json=payload, headers=self._headers)
604 response.raise_for_status()
605 except Exception as e:
606 msg = 'Error updating switchport properties for {} on device {}: {} ({})'.format(
607 self._id, self._dev.get(Device._name_prop), e, Meraki._get_json_errors(response))
608 if self._logit:
609 logging.error(msg)
610 else:
611 print(msg)
612
613 return False
614
615 jobj = response.json()
616 for k, v in jobj.items():
617 self.set(k, v)
618
619 return True

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.27