/[marcuscom]/meraki/meraki_api/__init__.py
ViewVC logotype

Annotation of /meraki/meraki_api/__init__.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 20640 - (hide annotations) (download) (as text)
Sun Jul 28 18:14:26 2019 UTC (8 months, 1 week ago) by jclarke
File MIME type: text/x-python
File size: 19240 byte(s)
Fix a number of typos and bugs.

Notably, update for recent API changes.

1 jclarke 20550 #
2     # Copyright (c) 2018 Joe Clarke <jclarke@cisco.com>
3     #
4     # Redistribution and use in source and binary forms, with or without
5     # modification, are permitted provided that the following conditions
6     # are met:
7     # 1. Redistributions of source code must retain the above copyright
8     # notice, this list of conditions and the following disclaimer.
9     # 2. Redistributions in binary form must reproduce the above copyright
10     # notice, this list of conditions and the following disclaimer in the
11     # documentation and/or other materials provided with the distribution.
12     #
13     # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14     # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15     # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16     # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17     # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18     # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19     # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20     # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21     # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22     # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23     # SUCH DAMAGE.
24    
25     import requests
26     import logging
27 jclarke 20640 import json
28 jclarke 20550
29    
30     class Meraki(object):
31     MERAKI_API = 'https://api.meraki.com/api/v0'
32    
33     _headers = {
34     'X-Cisco-Meraki-API-Key': None,
35     'Content-type': 'application/json'
36     }
37    
38     _logit = False
39     _key = None
40    
41     _name = None
42     _id = None
43    
44     _class_name = 'Meraki'
45    
46     _name_prop = 'name'
47     _id_prop = 'id'
48    
49     _api_url_endpoint = ''
50    
51     def __init__(self, **kwargs):
52     if 'logit' in kwargs:
53     self._logit = kwargs['logit']
54     if 'key' in kwargs:
55     self._key = kwargs['key']
56     self._headers['X-Cisco-Meraki-API-Key'] = kwargs['key']
57     if 'name' in kwargs:
58     self._name = kwargs['name']
59     if 'id' in kwargs:
60     self._id = str(kwargs['id'])
61    
62     self.__dict = {}
63     self._initialized = False
64    
65     def set_key(self, key):
66     self._key = key
67     self._headers['X-Cisco-Meraki-API-Key'] = key
68    
69     def set_name(self, name):
70     self._name = name
71    
72     def _set_id(self, id):
73     self._id = id
74    
75 jclarke 20555 def __check_headers(self):
76 jclarke 20550 if self._headers['X-Cisco-Meraki-API-Key'] is None:
77     msg = 'Meraki API key is not set!'
78     if self._logit:
79     logging.error(msg)
80     else:
81     print(msg)
82    
83     return False
84    
85     return True
86    
87     def realize(self):
88     return self._check_obj()
89    
90     @staticmethod
91     def _get_json_errors(response):
92     res = ''
93     try:
94     jobj = response.json()
95     if 'errors' in jobj:
96     res = ', '.join(jobj['errors'])
97     except:
98     pass
99    
100     return res
101    
102     def _check_obj(self):
103     if self._initialized:
104     return True
105    
106     if self._id is None:
107     msg = '{} id is not set!'.format(self._class_name)
108     if self._logit:
109     logging.error(msg)
110     else:
111     print(msg)
112    
113     return False
114    
115     return self.__get_current_obj()
116    
117     def __get_current_obj(self):
118 jclarke 20555 if not self.__check_headers():
119 jclarke 20550 return False
120    
121     url = self.MERAKI_API + self._api_url_endpoint + '/' + self._id
122     try:
123     response = requests.request('GET', url, headers=self._headers)
124     response.raise_for_status()
125     except Exception as e:
126     msg = 'Error getting {} for {}: {} ({})'.format(
127     self._class_name.lower(), self._id, e, Meraki._get_json_errors(response))
128     if self._logit:
129     logging.error(msg)
130     else:
131     print(msg)
132    
133     return False
134    
135     jobj = response.json()
136     self.__populate_obj(jobj)
137 jclarke 20640 # Object may not have a name yet.
138     if self._name_prop in jobj:
139     self._name = jobj[self._name_prop]
140    
141 jclarke 20550 self._initialized = True
142     return True
143    
144     def __populate_obj(self, jobj):
145     for k, v in jobj.items():
146     self.__dict[k] = v
147    
148     def get(self, field):
149     if not self._check_obj():
150     raise Exception('Failed to initialize object!')
151    
152     if field in self.__dict:
153     return self.__dict[field]
154     else:
155     raise Exception('Field {} does not exist'.format(field))
156    
157     def set(self, field, value):
158     if not self._check_obj():
159     raise Exception('Failed to initialize object!')
160    
161     self.__dict[field] = value
162    
163     def get_organizations(self):
164 jclarke 20555 if not self.__check_headers():
165 jclarke 20550 return None
166    
167     url = self.MERAKI_API + '/organizations'
168     try:
169     response = requests.request('GET', url, headers=self._headers)
170     response.raise_for_status()
171     except Exception as e:
172     msg = 'Error getting list of organizations: {} ({})'.format(
173     e, Meraki._get_json_errors(response))
174     if self._logit:
175     logging.error(msg)
176     else:
177     print(msg)
178    
179     return None
180    
181     res = []
182     for org in response.json():
183     org_obj = Organization(
184     key=self._key, name=org[Organization._name_prop], id=org[Organization._id_prop])
185     res.append(org_obj)
186    
187     return res
188    
189    
190     class Organization(Meraki):
191     def __init__(self, **kwargs):
192     super(Organization, self).__init__(**kwargs)
193    
194     self._class_name = 'Organization'
195     self._api_url_endpoint = '/organizations'
196     self.ORG_API = self.MERAKI_API + self._api_url_endpoint
197    
198     def get_inventory(self):
199     if not self._check_obj():
200     return None
201    
202     url = self.ORG_API + '/' + self._id + '/inventory'
203    
204     try:
205     response = requests.request('GET', url, headers=self._headers)
206     response.raise_for_status()
207     except Exception as e:
208     msg = 'Error getting inventory for organization {}: {} ({})'.format(
209     self._name, e, Meraki._get_json_errors(response))
210     if self._logit:
211     logging.error(msg)
212     else:
213     print(msg)
214    
215     return None
216    
217     return response.json()
218    
219     def get_networks(self):
220     if not self._check_obj():
221     return None
222    
223     url = self.ORG_API + '/' + self._id + '/networks'
224     try:
225     response = requests.request('GET', url, headers=self._headers)
226     response.raise_for_status()
227     except Exception as e:
228     msg = 'Error getting list of networks for {}: {} ({})'.format(
229     self._name, e, Meraki._get_json_errors(response))
230     if self._logit:
231     logging.error(msg)
232     else:
233     print(msg)
234    
235     return None
236    
237     res = []
238     for n in response.json():
239     net = Network(
240     key=self._key, name=n[Network._name_prop], id=n[Network._id_prop
241     ])
242     res.append(net)
243    
244     return res
245    
246     def create_network(self, name, **kwargs):
247     if not self._check_obj():
248     return None
249    
250     payload = {
251     'name': name
252     }
253    
254 jclarke 20556 for key in ['type', 'tags', 'timezone', 'copy_from_network_id']:
255 jclarke 20550 if key in kwargs:
256     if key == 'timezone':
257     payload['timeZone'] = kwargs[key]
258 jclarke 20556 elif key == 'copy_from_network_id':
259     payload['copyFromNetworkId'] = kwargs[key]
260 jclarke 20550 else:
261     payload[key] = kwargs[key]
262    
263     if 'type' not in kwargs:
264 jclarke 20640 payload['type'] = 'wireless switch appliance'
265 jclarke 20550
266     url = self.ORG_API + '/' + self._id + '/networks'
267     try:
268     response = requests.request(
269     'POST', url, json=payload, headers=self._headers)
270     response.raise_for_status()
271     except Exception as e:
272     msg = 'Error creating new network {} in {}: {} ({})'.format(
273     name, self._name, e, Meraki._get_json_errors(response))
274     if self._logit:
275     logging.error(msg)
276     else:
277     print(msg)
278    
279     return None
280    
281     jobj = response.json()
282     net_obj = Network(key=self._key, id=jobj['id'], name=jobj['name'])
283    
284     return net_obj
285    
286    
287     class Network(Meraki):
288     def __init__(self, **kwargs):
289     super(Network, self).__init__(**kwargs)
290    
291     self._class_name = 'Network'
292     self._api_url_endpoint = '/networks'
293     self.NET_API = self.MERAKI_API + self._api_url_endpoint
294    
295     def get_devices(self):
296     if not self._check_obj():
297     return None
298    
299     url = self.NET_API + '/' + self._id + '/devices'
300     try:
301     response = requests.request('GET', url, headers=self._headers)
302     response.raise_for_status()
303     except Exception as e:
304     msg = 'Error getting device list for network {}: {} ({})'.format(
305     self._name, e, Meraki._get_json_errors(response))
306     if self._logit:
307     logging.error(msg)
308     else:
309     print(msg)
310    
311     return None
312    
313     res = []
314     for d in response.json():
315     dev = Device(
316     key=self._key, name=d[Device._name_prop], id=d[Device._id_prop])
317     res.append(dev)
318    
319     return res
320    
321     def claim_device(self, dev):
322     if not self._check_obj():
323     return None
324    
325     url = self.NET_API + '/' + self._id + '/devices/claim'
326     payload = {
327     'serial': dev._id
328     }
329     try:
330     response = requests.request(
331     'POST', url, json=payload, headers=self._headers)
332     response.raise_for_status()
333     except Exception as e:
334     msg = 'Error claiming device {} for network {}: {} ({})'.format(
335     dev._id, self._name, e, Meraki._get_json_errors(response))
336     if self._logit:
337     logging.error(msg)
338     else:
339     print(msg)
340    
341     return None
342    
343     new_dev = Device(key=self._key, id=dev._id, net=self)
344     return new_dev
345    
346     def create_vlan(self, name, id, subnet, appliance_ip):
347     if not self._check_obj():
348     return None
349    
350     payload = {
351     'id': id,
352     'name': name,
353     'subnet': subnet,
354     'applianceIp': appliance_ip
355     }
356    
357     url = self.NET_API + '/' + self._id + '/vlans'
358     try:
359     response = requests.request(
360     'POST', url, json=payload, headers=self._headers)
361     response.raise_for_status()
362     except Exception as e:
363     msg = 'Error adding VLAN {} (ID: {}) to network {}: {} ({})'.format(
364     name, id, self._name, e, Meraki._get_json_errors(response))
365     if self._logit:
366     logging.error(msg)
367     else:
368     print(msg)
369    
370     return None
371    
372     jobj = response.json()
373     vlan_obj = Vlan(
374     key=self._key, id=jobj['id'], name=jobj['name'], net=self)
375    
376     return vlan_obj
377    
378    
379     class SSID(Meraki):
380     _id_prop = 'number'
381    
382     def __init__(self, **kwargs):
383     super(SSID, self).__init__(**kwargs)
384     if not 'net' in kwargs:
385 jclarke 20551 raise TypeError('Missing mandatory net argument!')
386 jclarke 20550
387     self._net = kwargs['net']
388    
389     self._class_name = 'SSID'
390     self._api_url_endpoint = '/networks/' + \
391     self._net.get(Network._id_prop) + '/ssids'
392     self.SSID_API = self.MERAKI_API + self._api_url_endpoint
393    
394     def update_ssid(self, **kwargs):
395     if not self._check_obj():
396     return False
397    
398     payload = {}
399     for key in ['name', 'enabled', 'auth_mode', 'encryption_mode', 'psk', 'ip_assignment_mode']:
400     if key in kwargs:
401     if key == 'auth_mode':
402     payload['authMode'] = kwargs[key]
403     elif key == 'encryption_mode':
404     payload['encryptionMode'] = kwargs[key]
405     elif key == 'ip_assignment_mode':
406     payload['ipAssignmentMode'] = kwargs[key]
407     else:
408     payload[key] = kwargs[key]
409    
410     if len(payload) == 0:
411     return False
412    
413     if 'ip_assignment_mode' not in kwargs:
414     payload['ipAssignmentMode'] = 'Bridge mode'
415    
416     url = self.SSID_API + '/' + self._id
417     try:
418     response = requests.request(
419     'PUT', url, json=payload, headers=self._headers)
420     response.raise_for_status()
421     except Exception as e:
422     msg = 'Error updating SSID properties for {}: {} ({})'.format(
423     self._id, e, Meraki._get_json_errors(response))
424     if self._logit:
425     logging.error(msg)
426     else:
427     print(msg)
428    
429     return False
430    
431     jobj = response.json()
432     for k, v in jobj.items():
433     self.set(k, v)
434    
435     return True
436    
437    
438     class Vlan(Meraki):
439     def __init__(self, **kwargs):
440     super(Vlan, self).__init__(**kwargs)
441     if not 'net' in kwargs:
442 jclarke 20551 raise TypeError('Missing mandatory net argument!')
443 jclarke 20550
444     self._net = kwargs['net']
445    
446     self._class_name = 'Vlan'
447     self._api_url_endpoint = '/networks/' + \
448     self._net.get(Network._id_prop) + '/vlans'
449     self.VLAN_API = self.MERAKI_API + self._api_url_endpoint
450    
451     def update_vlan(self, **kwargs):
452     if not self._check_obj():
453     return False
454    
455     payload = {}
456     for key in ['name', 'subnet', 'appliance_ip', 'fixed_ip_assignments', 'reserved_ip_ranges', 'dns_nameservers']:
457     if key in kwargs:
458     if key == 'appliance_ip':
459     payload['applianceIp'] = kwargs[key]
460     elif key == 'fixed_ip_assignments':
461     payload['fixedIpAssignments'] = kwargs[key]
462     elif key == 'reserved_ip_ranges':
463     payload['reservedIpRanges'] = kwargs[key]
464     elif key == 'dns_nameservers':
465     payload['dnsNameservers'] = kwargs[key]
466     else:
467     payload[key] = kwargs[key]
468    
469     if len(payload) == 0:
470     return False
471    
472     url = self.VLAN_API + '/' + self._id
473     try:
474     response = requests.request(
475     'PUT', url, json=payload, headers=self._headers)
476     response.raise_for_status()
477     except Exception as e:
478     msg = 'Error updating VLAN properties for {}: {} ({})'.format(
479     self._name, e, Meraki._get_json_errors(response))
480     if self._logit:
481     logging.error(msg)
482     else:
483     print(msg)
484    
485     return False
486    
487     jobj = response.json()
488     for k, v in jobj.items():
489     self.set(k, v)
490    
491     return True
492    
493    
494     class Device(Meraki):
495    
496     _id_prop = 'serial'
497    
498     def __init__(self, **kwargs):
499     super(Device, self).__init__(**kwargs)
500     if not 'net' in kwargs:
501 jclarke 20551 raise TypeError('Missing mandatory net argument!')
502 jclarke 20550
503     self._net = kwargs['net']
504    
505     self._class_name = 'Device'
506     self._api_url_endpoint = '/networks/' + \
507     self._net.get(Network._id_prop) + '/devices'
508     self.DEV_API = self.MERAKI_API + self._api_url_endpoint
509    
510     def remove_device(self):
511     if not self._check_obj():
512     return False
513    
514     url = self.DEV_API + '/' + self._id + '/remove'
515     try:
516     response = requests.request('POST', url, headers=self._headers)
517     response.raise_for_status()
518     except Exception as e:
519     msg = 'Failed to remove device {} for network {}: {} ({})'.format(
520     self._name, self._net.get(Network._name_prop), e, Meraki._get_json_errors(response))
521     if self._logit:
522     logging.error(msg)
523     else:
524     print(msg)
525    
526     return False
527    
528     return True
529    
530     def update_device(self, **kwargs):
531     if not self._check_obj():
532     return False
533    
534     payload = {}
535 jclarke 20558 for key in ['name', 'tags', 'lat', 'lng', 'address', 'move_map_marker']:
536     if key in kwargs:
537     if key == 'move_map_marker':
538     payload['moveMapMarker'] = kwargs[key]
539 jclarke 20557 else:
540 jclarke 20558 payload[key] = kwargs[key]
541 jclarke 20550
542     if len(payload) == 0:
543     return False
544    
545     url = self.DEV_API + '/' + self._id
546     try:
547     response = requests.request(
548     'PUT', url, json=payload, headers=self._headers)
549     response.raise_for_status()
550     except Exception as e:
551     msg = 'Error updating device properties for {}: {} ({})'.format(
552     self._name, e, Meraki._get_json_errors(response))
553     if self._logit:
554     logging.error(msg)
555     else:
556     print(msg)
557    
558     return False
559    
560     jobj = response.json()
561     for k, v in jobj.items():
562     if k == 'moveMapMarker':
563     continue
564     self.set(k, v)
565    
566     return True
567    
568    
569     class SwitchPort(Meraki):
570     _id_prop = 'number'
571    
572     def __init__(self, **kwargs):
573     super(SwitchPort, self).__init__(**kwargs)
574     if not 'dev' in kwargs:
575 jclarke 20551 raise TypeError('Missing mandatory dev argument!')
576 jclarke 20550
577     self._dev = kwargs['dev']
578    
579     self._class_name = 'SwitchPort'
580     self._api_url_endpoint = '/devices/' + \
581     self._net.get(Network._id_prop) + '/switchPorts'
582     self.SWP_API = self.MERAKI_API + self._api_url_endpoint
583    
584     def update_switchport(self, **kwargs):
585     if not self._check_obj():
586     return False
587    
588     payload = {}
589     for key in ['name', 'tags', 'enabled', 'type', 'vlan', 'voice_vlan', 'allowed_vlans', 'poe_enabled']:
590     if key in kwargs:
591     if key == 'voice_vlan':
592     payload['voiceVlan'] = kwargs[key]
593     elif key == 'allowed_vlans':
594     payload['allowedVlans'] = kwargs[key]
595     elif key == 'poe_enabled':
596     payload['poeEnabled'] = kwargs[key]
597     else:
598     payload[key] = kwargs[key]
599    
600     if len(payload) == 0:
601     return False
602    
603     url = self.SWP_API + '/' + self._id
604     try:
605     response = requests.request(
606     'PUT', url, json=payload, headers=self._headers)
607     response.raise_for_status()
608     except Exception as e:
609     msg = 'Error updating switchport properties for {} on device {}: {} ({})'.format(
610     self._id, self._dev.get(Device._name_prop), e, Meraki._get_json_errors(response))
611     if self._logit:
612     logging.error(msg)
613     else:
614     print(msg)
615    
616     return False
617    
618     jobj = response.json()
619     for k, v in jobj.items():
620     self.set(k, v)
621    
622     return True

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.27