/[marcuscom]/meraki/meraki_api/__init__.py
ViewVC logotype

Contents of /meraki/meraki_api/__init__.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 20640 - (show annotations) (download) (as text)
Sun Jul 28 18:14:26 2019 UTC (7 weeks, 6 days ago) by jclarke
File MIME type: text/x-python
File size: 19240 byte(s)
Fix a number of typos and bugs.

Notably, update for recent API changes.

1 #
2 # Copyright (c) 2018 Joe Clarke <jclarke@cisco.com>
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions
6 # are met:
7 # 1. Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # 2. Redistributions in binary form must reproduce the above copyright
10 # notice, this list of conditions and the following disclaimer in the
11 # documentation and/or other materials provided with the distribution.
12 #
13 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23 # SUCH DAMAGE.
24
25 import requests
26 import logging
27 import json
28
29
30 class Meraki(object):
31 MERAKI_API = 'https://api.meraki.com/api/v0'
32
33 _headers = {
34 'X-Cisco-Meraki-API-Key': None,
35 'Content-type': 'application/json'
36 }
37
38 _logit = False
39 _key = None
40
41 _name = None
42 _id = None
43
44 _class_name = 'Meraki'
45
46 _name_prop = 'name'
47 _id_prop = 'id'
48
49 _api_url_endpoint = ''
50
51 def __init__(self, **kwargs):
52 if 'logit' in kwargs:
53 self._logit = kwargs['logit']
54 if 'key' in kwargs:
55 self._key = kwargs['key']
56 self._headers['X-Cisco-Meraki-API-Key'] = kwargs['key']
57 if 'name' in kwargs:
58 self._name = kwargs['name']
59 if 'id' in kwargs:
60 self._id = str(kwargs['id'])
61
62 self.__dict = {}
63 self._initialized = False
64
65 def set_key(self, key):
66 self._key = key
67 self._headers['X-Cisco-Meraki-API-Key'] = key
68
69 def set_name(self, name):
70 self._name = name
71
72 def _set_id(self, id):
73 self._id = id
74
75 def __check_headers(self):
76 if self._headers['X-Cisco-Meraki-API-Key'] is None:
77 msg = 'Meraki API key is not set!'
78 if self._logit:
79 logging.error(msg)
80 else:
81 print(msg)
82
83 return False
84
85 return True
86
87 def realize(self):
88 return self._check_obj()
89
90 @staticmethod
91 def _get_json_errors(response):
92 res = ''
93 try:
94 jobj = response.json()
95 if 'errors' in jobj:
96 res = ', '.join(jobj['errors'])
97 except:
98 pass
99
100 return res
101
102 def _check_obj(self):
103 if self._initialized:
104 return True
105
106 if self._id is None:
107 msg = '{} id is not set!'.format(self._class_name)
108 if self._logit:
109 logging.error(msg)
110 else:
111 print(msg)
112
113 return False
114
115 return self.__get_current_obj()
116
117 def __get_current_obj(self):
118 if not self.__check_headers():
119 return False
120
121 url = self.MERAKI_API + self._api_url_endpoint + '/' + self._id
122 try:
123 response = requests.request('GET', url, headers=self._headers)
124 response.raise_for_status()
125 except Exception as e:
126 msg = 'Error getting {} for {}: {} ({})'.format(
127 self._class_name.lower(), self._id, e, Meraki._get_json_errors(response))
128 if self._logit:
129 logging.error(msg)
130 else:
131 print(msg)
132
133 return False
134
135 jobj = response.json()
136 self.__populate_obj(jobj)
137 # Object may not have a name yet.
138 if self._name_prop in jobj:
139 self._name = jobj[self._name_prop]
140
141 self._initialized = True
142 return True
143
144 def __populate_obj(self, jobj):
145 for k, v in jobj.items():
146 self.__dict[k] = v
147
148 def get(self, field):
149 if not self._check_obj():
150 raise Exception('Failed to initialize object!')
151
152 if field in self.__dict:
153 return self.__dict[field]
154 else:
155 raise Exception('Field {} does not exist'.format(field))
156
157 def set(self, field, value):
158 if not self._check_obj():
159 raise Exception('Failed to initialize object!')
160
161 self.__dict[field] = value
162
163 def get_organizations(self):
164 if not self.__check_headers():
165 return None
166
167 url = self.MERAKI_API + '/organizations'
168 try:
169 response = requests.request('GET', url, headers=self._headers)
170 response.raise_for_status()
171 except Exception as e:
172 msg = 'Error getting list of organizations: {} ({})'.format(
173 e, Meraki._get_json_errors(response))
174 if self._logit:
175 logging.error(msg)
176 else:
177 print(msg)
178
179 return None
180
181 res = []
182 for org in response.json():
183 org_obj = Organization(
184 key=self._key, name=org[Organization._name_prop], id=org[Organization._id_prop])
185 res.append(org_obj)
186
187 return res
188
189
190 class Organization(Meraki):
191 def __init__(self, **kwargs):
192 super(Organization, self).__init__(**kwargs)
193
194 self._class_name = 'Organization'
195 self._api_url_endpoint = '/organizations'
196 self.ORG_API = self.MERAKI_API + self._api_url_endpoint
197
198 def get_inventory(self):
199 if not self._check_obj():
200 return None
201
202 url = self.ORG_API + '/' + self._id + '/inventory'
203
204 try:
205 response = requests.request('GET', url, headers=self._headers)
206 response.raise_for_status()
207 except Exception as e:
208 msg = 'Error getting inventory for organization {}: {} ({})'.format(
209 self._name, e, Meraki._get_json_errors(response))
210 if self._logit:
211 logging.error(msg)
212 else:
213 print(msg)
214
215 return None
216
217 return response.json()
218
219 def get_networks(self):
220 if not self._check_obj():
221 return None
222
223 url = self.ORG_API + '/' + self._id + '/networks'
224 try:
225 response = requests.request('GET', url, headers=self._headers)
226 response.raise_for_status()
227 except Exception as e:
228 msg = 'Error getting list of networks for {}: {} ({})'.format(
229 self._name, e, Meraki._get_json_errors(response))
230 if self._logit:
231 logging.error(msg)
232 else:
233 print(msg)
234
235 return None
236
237 res = []
238 for n in response.json():
239 net = Network(
240 key=self._key, name=n[Network._name_prop], id=n[Network._id_prop
241 ])
242 res.append(net)
243
244 return res
245
246 def create_network(self, name, **kwargs):
247 if not self._check_obj():
248 return None
249
250 payload = {
251 'name': name
252 }
253
254 for key in ['type', 'tags', 'timezone', 'copy_from_network_id']:
255 if key in kwargs:
256 if key == 'timezone':
257 payload['timeZone'] = kwargs[key]
258 elif key == 'copy_from_network_id':
259 payload['copyFromNetworkId'] = kwargs[key]
260 else:
261 payload[key] = kwargs[key]
262
263 if 'type' not in kwargs:
264 payload['type'] = 'wireless switch appliance'
265
266 url = self.ORG_API + '/' + self._id + '/networks'
267 try:
268 response = requests.request(
269 'POST', url, json=payload, headers=self._headers)
270 response.raise_for_status()
271 except Exception as e:
272 msg = 'Error creating new network {} in {}: {} ({})'.format(
273 name, self._name, e, Meraki._get_json_errors(response))
274 if self._logit:
275 logging.error(msg)
276 else:
277 print(msg)
278
279 return None
280
281 jobj = response.json()
282 net_obj = Network(key=self._key, id=jobj['id'], name=jobj['name'])
283
284 return net_obj
285
286
287 class Network(Meraki):
288 def __init__(self, **kwargs):
289 super(Network, self).__init__(**kwargs)
290
291 self._class_name = 'Network'
292 self._api_url_endpoint = '/networks'
293 self.NET_API = self.MERAKI_API + self._api_url_endpoint
294
295 def get_devices(self):
296 if not self._check_obj():
297 return None
298
299 url = self.NET_API + '/' + self._id + '/devices'
300 try:
301 response = requests.request('GET', url, headers=self._headers)
302 response.raise_for_status()
303 except Exception as e:
304 msg = 'Error getting device list for network {}: {} ({})'.format(
305 self._name, e, Meraki._get_json_errors(response))
306 if self._logit:
307 logging.error(msg)
308 else:
309 print(msg)
310
311 return None
312
313 res = []
314 for d in response.json():
315 dev = Device(
316 key=self._key, name=d[Device._name_prop], id=d[Device._id_prop])
317 res.append(dev)
318
319 return res
320
321 def claim_device(self, dev):
322 if not self._check_obj():
323 return None
324
325 url = self.NET_API + '/' + self._id + '/devices/claim'
326 payload = {
327 'serial': dev._id
328 }
329 try:
330 response = requests.request(
331 'POST', url, json=payload, headers=self._headers)
332 response.raise_for_status()
333 except Exception as e:
334 msg = 'Error claiming device {} for network {}: {} ({})'.format(
335 dev._id, self._name, e, Meraki._get_json_errors(response))
336 if self._logit:
337 logging.error(msg)
338 else:
339 print(msg)
340
341 return None
342
343 new_dev = Device(key=self._key, id=dev._id, net=self)
344 return new_dev
345
346 def create_vlan(self, name, id, subnet, appliance_ip):
347 if not self._check_obj():
348 return None
349
350 payload = {
351 'id': id,
352 'name': name,
353 'subnet': subnet,
354 'applianceIp': appliance_ip
355 }
356
357 url = self.NET_API + '/' + self._id + '/vlans'
358 try:
359 response = requests.request(
360 'POST', url, json=payload, headers=self._headers)
361 response.raise_for_status()
362 except Exception as e:
363 msg = 'Error adding VLAN {} (ID: {}) to network {}: {} ({})'.format(
364 name, id, self._name, e, Meraki._get_json_errors(response))
365 if self._logit:
366 logging.error(msg)
367 else:
368 print(msg)
369
370 return None
371
372 jobj = response.json()
373 vlan_obj = Vlan(
374 key=self._key, id=jobj['id'], name=jobj['name'], net=self)
375
376 return vlan_obj
377
378
379 class SSID(Meraki):
380 _id_prop = 'number'
381
382 def __init__(self, **kwargs):
383 super(SSID, self).__init__(**kwargs)
384 if not 'net' in kwargs:
385 raise TypeError('Missing mandatory net argument!')
386
387 self._net = kwargs['net']
388
389 self._class_name = 'SSID'
390 self._api_url_endpoint = '/networks/' + \
391 self._net.get(Network._id_prop) + '/ssids'
392 self.SSID_API = self.MERAKI_API + self._api_url_endpoint
393
394 def update_ssid(self, **kwargs):
395 if not self._check_obj():
396 return False
397
398 payload = {}
399 for key in ['name', 'enabled', 'auth_mode', 'encryption_mode', 'psk', 'ip_assignment_mode']:
400 if key in kwargs:
401 if key == 'auth_mode':
402 payload['authMode'] = kwargs[key]
403 elif key == 'encryption_mode':
404 payload['encryptionMode'] = kwargs[key]
405 elif key == 'ip_assignment_mode':
406 payload['ipAssignmentMode'] = kwargs[key]
407 else:
408 payload[key] = kwargs[key]
409
410 if len(payload) == 0:
411 return False
412
413 if 'ip_assignment_mode' not in kwargs:
414 payload['ipAssignmentMode'] = 'Bridge mode'
415
416 url = self.SSID_API + '/' + self._id
417 try:
418 response = requests.request(
419 'PUT', url, json=payload, headers=self._headers)
420 response.raise_for_status()
421 except Exception as e:
422 msg = 'Error updating SSID properties for {}: {} ({})'.format(
423 self._id, e, Meraki._get_json_errors(response))
424 if self._logit:
425 logging.error(msg)
426 else:
427 print(msg)
428
429 return False
430
431 jobj = response.json()
432 for k, v in jobj.items():
433 self.set(k, v)
434
435 return True
436
437
438 class Vlan(Meraki):
439 def __init__(self, **kwargs):
440 super(Vlan, self).__init__(**kwargs)
441 if not 'net' in kwargs:
442 raise TypeError('Missing mandatory net argument!')
443
444 self._net = kwargs['net']
445
446 self._class_name = 'Vlan'
447 self._api_url_endpoint = '/networks/' + \
448 self._net.get(Network._id_prop) + '/vlans'
449 self.VLAN_API = self.MERAKI_API + self._api_url_endpoint
450
451 def update_vlan(self, **kwargs):
452 if not self._check_obj():
453 return False
454
455 payload = {}
456 for key in ['name', 'subnet', 'appliance_ip', 'fixed_ip_assignments', 'reserved_ip_ranges', 'dns_nameservers']:
457 if key in kwargs:
458 if key == 'appliance_ip':
459 payload['applianceIp'] = kwargs[key]
460 elif key == 'fixed_ip_assignments':
461 payload['fixedIpAssignments'] = kwargs[key]
462 elif key == 'reserved_ip_ranges':
463 payload['reservedIpRanges'] = kwargs[key]
464 elif key == 'dns_nameservers':
465 payload['dnsNameservers'] = kwargs[key]
466 else:
467 payload[key] = kwargs[key]
468
469 if len(payload) == 0:
470 return False
471
472 url = self.VLAN_API + '/' + self._id
473 try:
474 response = requests.request(
475 'PUT', url, json=payload, headers=self._headers)
476 response.raise_for_status()
477 except Exception as e:
478 msg = 'Error updating VLAN properties for {}: {} ({})'.format(
479 self._name, e, Meraki._get_json_errors(response))
480 if self._logit:
481 logging.error(msg)
482 else:
483 print(msg)
484
485 return False
486
487 jobj = response.json()
488 for k, v in jobj.items():
489 self.set(k, v)
490
491 return True
492
493
494 class Device(Meraki):
495
496 _id_prop = 'serial'
497
498 def __init__(self, **kwargs):
499 super(Device, self).__init__(**kwargs)
500 if not 'net' in kwargs:
501 raise TypeError('Missing mandatory net argument!')
502
503 self._net = kwargs['net']
504
505 self._class_name = 'Device'
506 self._api_url_endpoint = '/networks/' + \
507 self._net.get(Network._id_prop) + '/devices'
508 self.DEV_API = self.MERAKI_API + self._api_url_endpoint
509
510 def remove_device(self):
511 if not self._check_obj():
512 return False
513
514 url = self.DEV_API + '/' + self._id + '/remove'
515 try:
516 response = requests.request('POST', url, headers=self._headers)
517 response.raise_for_status()
518 except Exception as e:
519 msg = 'Failed to remove device {} for network {}: {} ({})'.format(
520 self._name, self._net.get(Network._name_prop), e, Meraki._get_json_errors(response))
521 if self._logit:
522 logging.error(msg)
523 else:
524 print(msg)
525
526 return False
527
528 return True
529
530 def update_device(self, **kwargs):
531 if not self._check_obj():
532 return False
533
534 payload = {}
535 for key in ['name', 'tags', 'lat', 'lng', 'address', 'move_map_marker']:
536 if key in kwargs:
537 if key == 'move_map_marker':
538 payload['moveMapMarker'] = kwargs[key]
539 else:
540 payload[key] = kwargs[key]
541
542 if len(payload) == 0:
543 return False
544
545 url = self.DEV_API + '/' + self._id
546 try:
547 response = requests.request(
548 'PUT', url, json=payload, headers=self._headers)
549 response.raise_for_status()
550 except Exception as e:
551 msg = 'Error updating device properties for {}: {} ({})'.format(
552 self._name, e, Meraki._get_json_errors(response))
553 if self._logit:
554 logging.error(msg)
555 else:
556 print(msg)
557
558 return False
559
560 jobj = response.json()
561 for k, v in jobj.items():
562 if k == 'moveMapMarker':
563 continue
564 self.set(k, v)
565
566 return True
567
568
569 class SwitchPort(Meraki):
570 _id_prop = 'number'
571
572 def __init__(self, **kwargs):
573 super(SwitchPort, self).__init__(**kwargs)
574 if not 'dev' in kwargs:
575 raise TypeError('Missing mandatory dev argument!')
576
577 self._dev = kwargs['dev']
578
579 self._class_name = 'SwitchPort'
580 self._api_url_endpoint = '/devices/' + \
581 self._net.get(Network._id_prop) + '/switchPorts'
582 self.SWP_API = self.MERAKI_API + self._api_url_endpoint
583
584 def update_switchport(self, **kwargs):
585 if not self._check_obj():
586 return False
587
588 payload = {}
589 for key in ['name', 'tags', 'enabled', 'type', 'vlan', 'voice_vlan', 'allowed_vlans', 'poe_enabled']:
590 if key in kwargs:
591 if key == 'voice_vlan':
592 payload['voiceVlan'] = kwargs[key]
593 elif key == 'allowed_vlans':
594 payload['allowedVlans'] = kwargs[key]
595 elif key == 'poe_enabled':
596 payload['poeEnabled'] = kwargs[key]
597 else:
598 payload[key] = kwargs[key]
599
600 if len(payload) == 0:
601 return False
602
603 url = self.SWP_API + '/' + self._id
604 try:
605 response = requests.request(
606 'PUT', url, json=payload, headers=self._headers)
607 response.raise_for_status()
608 except Exception as e:
609 msg = 'Error updating switchport properties for {} on device {}: {} ({})'.format(
610 self._id, self._dev.get(Device._name_prop), e, Meraki._get_json_errors(response))
611 if self._logit:
612 logging.error(msg)
613 else:
614 print(msg)
615
616 return False
617
618 jobj = response.json()
619 for k, v in jobj.items():
620 self.set(k, v)
621
622 return True

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.27