/[marcuscom]/meraki/meraki_api/__init__.py
ViewVC logotype

Contents of /meraki/meraki_api/__init__.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 20558 - (show annotations) (download) (as text)
Sat May 19 09:03:06 2018 UTC (22 months, 2 weeks ago) by jclarke
File MIME type: text/x-python
File size: 19139 byte(s)
Be consistent with other methods.

1 #
2 # Copyright (c) 2018 Joe Clarke <jclarke@cisco.com>
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions
6 # are met:
7 # 1. Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # 2. Redistributions in binary form must reproduce the above copyright
10 # notice, this list of conditions and the following disclaimer in the
11 # documentation and/or other materials provided with the distribution.
12 #
13 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23 # SUCH DAMAGE.
24
25 import requests
26 import logging
27
28
29 class Meraki(object):
30 MERAKI_API = 'https://api.meraki.com/api/v0'
31
32 _headers = {
33 'X-Cisco-Meraki-API-Key': None,
34 'Content-type': 'application/json'
35 }
36
37 _logit = False
38 _key = None
39
40 _name = None
41 _id = None
42
43 _class_name = 'Meraki'
44
45 _name_prop = 'name'
46 _id_prop = 'id'
47
48 _api_url_endpoint = ''
49
50 def __init__(self, **kwargs):
51 if 'logit' in kwargs:
52 self._logit = kwargs['logit']
53 if 'key' in kwargs:
54 self._key = kwargs['key']
55 self._headers['X-Cisco-Meraki-API-Key'] = kwargs['key']
56 if 'name' in kwargs:
57 self._name = kwargs['name']
58 if 'id' in kwargs:
59 self._id = str(kwargs['id'])
60
61 self.__dict = {}
62 self._initialized = False
63
64 def set_key(self, key):
65 self._key = key
66 self._headers['X-Cisco-Meraki-API-Key'] = key
67
68 def set_name(self, name):
69 self._name = name
70
71 def _set_id(self, id):
72 self._id = id
73
74 def __check_headers(self):
75 if self._headers['X-Cisco-Meraki-API-Key'] is None:
76 msg = 'Meraki API key is not set!'
77 if self._logit:
78 logging.error(msg)
79 else:
80 print(msg)
81
82 return False
83
84 return True
85
86 def realize(self):
87 return self._check_obj()
88
89 @staticmethod
90 def _get_json_errors(response):
91 res = ''
92 try:
93 jobj = response.json()
94 if 'errors' in jobj:
95 res = ', '.join(jobj['errors'])
96 except:
97 pass
98
99 return res
100
101 def _check_obj(self):
102 if self._initialized:
103 return True
104
105 if self._id is None:
106 msg = '{} id is not set!'.format(self._class_name)
107 if self._logit:
108 logging.error(msg)
109 else:
110 print(msg)
111
112 return False
113
114 return self.__get_current_obj()
115
116 def __get_current_obj(self):
117 if not self.__check_headers():
118 return False
119
120 url = self.MERAKI_API + self._api_url_endpoint + '/' + self._id
121 try:
122 response = requests.request('GET', url, headers=self._headers)
123 response.raise_for_status()
124 except Exception as e:
125 msg = 'Error getting {} for {}: {} ({})'.format(
126 self._class_name.lower(), self._id, e, Meraki._get_json_errors(response))
127 if self._logit:
128 logging.error(msg)
129 else:
130 print(msg)
131
132 return False
133
134 jobj = response.json()
135 self.__populate_obj(jobj)
136 self._name = jobj[self._name_prop]
137 self._initialized = True
138 return True
139
140 def __populate_obj(self, jobj):
141 for k, v in jobj.items():
142 self.__dict[k] = v
143
144 def get(self, field):
145 if not self._check_obj():
146 raise Exception('Failed to initialize object!')
147
148 if field in self.__dict:
149 return self.__dict[field]
150 else:
151 raise Exception('Field {} does not exist'.format(field))
152
153 def set(self, field, value):
154 if not self._check_obj():
155 raise Exception('Failed to initialize object!')
156
157 self.__dict[field] = value
158
159 def get_organizations(self):
160 if not self.__check_headers():
161 return None
162
163 url = self.MERAKI_API + '/organizations'
164 try:
165 response = requests.request('GET', url, headers=self._headers)
166 response.raise_for_status()
167 except Exception as e:
168 msg = 'Error getting list of organizations: {} ({})'.format(
169 e, Meraki._get_json_errors(response))
170 if self._logit:
171 logging.error(msg)
172 else:
173 print(msg)
174
175 return None
176
177 res = []
178 for org in response.json():
179 org_obj = Organization(
180 key=self._key, name=org[Organization._name_prop], id=org[Organization._id_prop])
181 res.append(org_obj)
182
183 return res
184
185
186 class Organization(Meraki):
187 def __init__(self, **kwargs):
188 super(Organization, self).__init__(**kwargs)
189
190 self._class_name = 'Organization'
191 self._api_url_endpoint = '/organizations'
192 self.ORG_API = self.MERAKI_API + self._api_url_endpoint
193
194 def get_inventory(self):
195 if not self._check_obj():
196 return None
197
198 url = self.ORG_API + '/' + self._id + '/inventory'
199
200 try:
201 response = requests.request('GET', url, headers=self._headers)
202 response.raise_for_status()
203 except Exception as e:
204 msg = 'Error getting inventory for organization {}: {} ({})'.format(
205 self._name, e, Meraki._get_json_errors(response))
206 if self._logit:
207 logging.error(msg)
208 else:
209 print(msg)
210
211 return None
212
213 return response.json()
214
215 def get_networks(self):
216 if not self._check_obj():
217 return None
218
219 url = self.ORG_API + '/' + self._id + '/networks'
220 try:
221 response = requests.request('GET', url, headers=self._headers)
222 response.raise_for_status()
223 except Exception as e:
224 msg = 'Error getting list of networks for {}: {} ({})'.format(
225 self._name, e, Meraki._get_json_errors(response))
226 if self._logit:
227 logging.error(msg)
228 else:
229 print(msg)
230
231 return None
232
233 res = []
234 for n in response.json():
235 net = Network(
236 key=self._key, name=n[Network._name_prop], id=n[Network._id_prop
237 ])
238 res.append(net)
239
240 return res
241
242 def create_network(self, name, **kwargs):
243 if not self._check_obj():
244 return None
245
246 payload = {
247 'name': name
248 }
249
250 for key in ['type', 'tags', 'timezone', 'copy_from_network_id']:
251 if key in kwargs:
252 if key == 'timezone':
253 payload['timeZone'] = kwargs[key]
254 elif key == 'copy_from_network_id':
255 payload['copyFromNetworkId'] = kwargs[key]
256 else:
257 payload[key] = kwargs[key]
258
259 if 'type' not in kwargs:
260 payload['type'] = 'wireless switch appliance phone'
261
262 url = self.ORG_API + '/' + self._id + '/networks'
263 try:
264 response = requests.request(
265 'POST', url, json=payload, headers=self._headers)
266 response.raise_for_status()
267 except Exception as e:
268 msg = 'Error creating new network {} in {}: {} ({})'.format(
269 name, self._name, e, Meraki._get_json_errors(response))
270 if self._logit:
271 logging.error(msg)
272 else:
273 print(msg)
274
275 return None
276
277 jobj = response.json()
278 net_obj = Network(key=self._key, id=jobj['id'], name=jobj['name'])
279
280 return net_obj
281
282
283 class Network(Meraki):
284 def __init__(self, **kwargs):
285 super(Network, self).__init__(**kwargs)
286
287 self._class_name = 'Network'
288 self._api_url_endpoint = '/networks'
289 self.NET_API = self.MERAKI_API + self._api_url_endpoint
290
291 def get_devices(self):
292 if not self._check_obj():
293 return None
294
295 url = self.NET_API + '/' + self._id + '/devices'
296 try:
297 response = requests.request('GET', url, headers=self._headers)
298 response.raise_for_status()
299 except Exception as e:
300 msg = 'Error getting device list for network {}: {} ({})'.format(
301 self._name, e, Meraki._get_json_errors(response))
302 if self._logit:
303 logging.error(msg)
304 else:
305 print(msg)
306
307 return None
308
309 res = []
310 for d in response.json():
311 dev = Device(
312 key=self._key, name=d[Device._name_prop], id=d[Device._id_prop])
313 res.append(dev)
314
315 return res
316
317 def claim_device(self, dev):
318 if not self._check_obj():
319 return None
320
321 url = self.NET_API + '/' + self._id + '/devices/claim'
322 payload = {
323 'serial': dev._id
324 }
325 try:
326 response = requests.request(
327 'POST', url, json=payload, headers=self._headers)
328 response.raise_for_status()
329 except Exception as e:
330 msg = 'Error claiming device {} for network {}: {} ({})'.format(
331 dev._id, self._name, e, Meraki._get_json_errors(response))
332 if self._logit:
333 logging.error(msg)
334 else:
335 print(msg)
336
337 return None
338
339 new_dev = Device(key=self._key, id=dev._id, net=self)
340 return new_dev
341
342 def create_vlan(self, name, id, subnet, appliance_ip):
343 if not self._check_obj():
344 return None
345
346 payload = {
347 'id': id,
348 'name': name,
349 'subnet': subnet,
350 'applianceIp': appliance_ip
351 }
352
353 url = self.NET_API + '/' + self._id + '/vlans'
354 try:
355 response = requests.request(
356 'POST', url, json=payload, headers=self._headers)
357 response.raise_for_status()
358 except Exception as e:
359 msg = 'Error adding VLAN {} (ID: {}) to network {}: {} ({})'.format(
360 name, id, self._name, e, Meraki._get_json_errors(response))
361 if self._logit:
362 logging.error(msg)
363 else:
364 print(msg)
365
366 return None
367
368 jobj = response.json()
369 vlan_obj = Vlan(
370 key=self._key, id=jobj['id'], name=jobj['name'], net=self)
371
372 return vlan_obj
373
374
375 class SSID(Meraki):
376 _id_prop = 'number'
377
378 def __init__(self, **kwargs):
379 super(SSID, self).__init__(**kwargs)
380 if not 'net' in kwargs:
381 raise TypeError('Missing mandatory net argument!')
382
383 self._net = kwargs['net']
384
385 self._class_name = 'SSID'
386 self._api_url_endpoint = '/networks/' + \
387 self._net.get(Network._id_prop) + '/ssids'
388 self.SSID_API = self.MERAKI_API + self._api_url_endpoint
389
390 def update_ssid(self, **kwargs):
391 if not self._check_obj():
392 return False
393
394 payload = {}
395 for key in ['name', 'enabled', 'auth_mode', 'encryption_mode', 'psk', 'ip_assignment_mode']:
396 if key in kwargs:
397 if key == 'auth_mode':
398 payload['authMode'] = kwargs[key]
399 elif key == 'encryption_mode':
400 payload['encryptionMode'] = kwargs[key]
401 elif key == 'ip_assignment_mode':
402 payload['ipAssignmentMode'] = kwargs[key]
403 else:
404 payload[key] = kwargs[key]
405
406 if len(payload) == 0:
407 return False
408
409 if 'ip_assignment_mode' not in kwargs:
410 payload['ipAssignmentMode'] = 'Bridge mode'
411
412 url = self.SSID_API + '/' + self._id
413 try:
414 response = requests.request(
415 'PUT', url, json=payload, headers=self._headers)
416 response.raise_for_status()
417 except Exception as e:
418 msg = 'Error updating SSID properties for {}: {} ({})'.format(
419 self._id, e, Meraki._get_json_errors(response))
420 if self._logit:
421 logging.error(msg)
422 else:
423 print(msg)
424
425 return False
426
427 jobj = response.json()
428 for k, v in jobj.items():
429 self.set(k, v)
430
431 return True
432
433
434 class Vlan(Meraki):
435 def __init__(self, **kwargs):
436 super(Vlan, self).__init__(**kwargs)
437 if not 'net' in kwargs:
438 raise TypeError('Missing mandatory net argument!')
439
440 self._net = kwargs['net']
441
442 self._class_name = 'Vlan'
443 self._api_url_endpoint = '/networks/' + \
444 self._net.get(Network._id_prop) + '/vlans'
445 self.VLAN_API = self.MERAKI_API + self._api_url_endpoint
446
447 def update_vlan(self, **kwargs):
448 if not self._check_obj():
449 return False
450
451 payload = {}
452 for key in ['name', 'subnet', 'appliance_ip', 'fixed_ip_assignments', 'reserved_ip_ranges', 'dns_nameservers']:
453 if key in kwargs:
454 if key == 'appliance_ip':
455 payload['applianceIp'] = kwargs[key]
456 elif key == 'fixed_ip_assignments':
457 payload['fixedIpAssignments'] = kwargs[key]
458 elif key == 'reserved_ip_ranges':
459 payload['reservedIpRanges'] = kwargs[key]
460 elif key == 'dns_nameservers':
461 payload['dnsNameservers'] = kwargs[key]
462 else:
463 payload[key] = kwargs[key]
464
465 if len(payload) == 0:
466 return False
467
468 url = self.VLAN_API + '/' + self._id
469 try:
470 response = requests.request(
471 'PUT', url, json=payload, headers=self._headers)
472 response.raise_for_status()
473 except Exception as e:
474 msg = 'Error updating VLAN properties for {}: {} ({})'.format(
475 self._name, e, Meraki._get_json_errors(response))
476 if self._logit:
477 logging.error(msg)
478 else:
479 print(msg)
480
481 return False
482
483 jobj = response.json()
484 for k, v in jobj.items():
485 self.set(k, v)
486
487 return True
488
489
490 class Device(Meraki):
491
492 _id_prop = 'serial'
493
494 def __init__(self, **kwargs):
495 super(Device, self).__init__(**kwargs)
496 if not 'net' in kwargs:
497 raise TypeError('Missing mandatory net argument!')
498
499 self._net = kwargs['net']
500
501 self._class_name = 'Device'
502 self._api_url_endpoint = '/networks/' + \
503 self._net.get(Network._id_prop) + '/devices'
504 self.DEV_API = self.MERAKI_API + self._api_url_endpoint
505
506 def remove_device(self):
507 if not self._check_obj():
508 return False
509
510 url = self.DEV_API + '/' + self._id + '/remove'
511 try:
512 response = requests.request('POST', url, headers=self._headers)
513 response.raise_for_status()
514 except Exception as e:
515 msg = 'Failed to remove device {} for network {}: {} ({})'.format(
516 self._name, self._net.get(Network._name_prop), e, Meraki._get_json_errors(response))
517 if self._logit:
518 logging.error(msg)
519 else:
520 print(msg)
521
522 return False
523
524 return True
525
526 def update_device(self, **kwargs):
527 if not self._check_obj():
528 return False
529
530 payload = {}
531 for key in ['name', 'tags', 'lat', 'lng', 'address', 'move_map_marker']:
532 if key in kwargs:
533 if key == 'move_map_marker':
534 payload['moveMapMarker'] = kwargs[key]
535 else:
536 payload[key] = kwargs[key]
537
538 if len(payload) == 0:
539 return False
540
541 url = self.DEV_API + '/' + self._id
542 try:
543 response = requests.request(
544 'PUT', url, json=payload, headers=self._headers)
545 response.raise_for_status()
546 except Exception as e:
547 msg = 'Error updating device properties for {}: {} ({})'.format(
548 self._name, e, Meraki._get_json_errors(response))
549 if self._logit:
550 logging.error(msg)
551 else:
552 print(msg)
553
554 return False
555
556 jobj = response.json()
557 for k, v in jobj.items():
558 if k == 'moveMapMarker':
559 continue
560 self.set(k, v)
561
562 return True
563
564
565 class SwitchPort(Meraki):
566 _id_prop = 'number'
567
568 def __init__(self, **kwargs):
569 super(SwitchPort, self).__init__(**kwargs)
570 if not 'dev' in kwargs:
571 raise TypeError('Missing mandatory dev argument!')
572
573 self._dev = kwargs['dev']
574
575 self._class_name = 'SwitchPort'
576 self._api_url_endpoint = '/devices/' + \
577 self._net.get(Network._id_prop) + '/switchPorts'
578 self.SWP_API = self.MERAKI_API + self._api_url_endpoint
579
580 def update_switchport(self, **kwargs):
581 if not self._check_obj():
582 return False
583
584 payload = {}
585 for key in ['name', 'tags', 'enabled', 'type', 'vlan', 'voice_vlan', 'allowed_vlans', 'poe_enabled']:
586 if key in kwargs:
587 if key == 'voice_vlan':
588 payload['voiceVlan'] = kwargs[key]
589 elif key == 'allowed_vlans':
590 payload['allowedVlans'] = kwargs[key]
591 elif key == 'poe_enabled':
592 payload['poeEnabled'] = kwargs[key]
593 else:
594 payload[key] = kwargs[key]
595
596 if len(payload) == 0:
597 return False
598
599 url = self.SWP_API + '/' + self._id
600 try:
601 response = requests.request(
602 'PUT', url, json=payload, headers=self._headers)
603 response.raise_for_status()
604 except Exception as e:
605 msg = 'Error updating switchport properties for {} on device {}: {} ({})'.format(
606 self._id, self._dev.get(Device._name_prop), e, Meraki._get_json_errors(response))
607 if self._logit:
608 logging.error(msg)
609 else:
610 print(msg)
611
612 return False
613
614 jobj = response.json()
615 for k, v in jobj.items():
616 self.set(k, v)
617
618 return True

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.27