/[marcuscom]/meraki/meraki_api/__init__.py
ViewVC logotype

Annotation of /meraki/meraki_api/__init__.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 20555 - (hide annotations) (download) (as text)
Thu May 17 11:25:57 2018 UTC (22 months, 3 weeks ago) by jclarke
File MIME type: text/x-python
File size: 19079 byte(s)
Make _check_headers private.

This is really only needed when first initializing an object.

1 jclarke 20550 #
2     # Copyright (c) 2018 Joe Clarke <jclarke@cisco.com>
3     #
4     # Redistribution and use in source and binary forms, with or without
5     # modification, are permitted provided that the following conditions
6     # are met:
7     # 1. Redistributions of source code must retain the above copyright
8     # notice, this list of conditions and the following disclaimer.
9     # 2. Redistributions in binary form must reproduce the above copyright
10     # notice, this list of conditions and the following disclaimer in the
11     # documentation and/or other materials provided with the distribution.
12     #
13     # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14     # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15     # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16     # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17     # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18     # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19     # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20     # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21     # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22     # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23     # SUCH DAMAGE.
24    
25     import requests
26     import logging
27    
28    
29     class Meraki(object):
30     MERAKI_API = 'https://api.meraki.com/api/v0'
31    
32     _headers = {
33     'X-Cisco-Meraki-API-Key': None,
34     'Content-type': 'application/json'
35     }
36    
37     _logit = False
38     _key = None
39    
40     _name = None
41     _id = None
42    
43     _class_name = 'Meraki'
44    
45     _name_prop = 'name'
46     _id_prop = 'id'
47    
48     _api_url_endpoint = ''
49    
50     def __init__(self, **kwargs):
51     if 'logit' in kwargs:
52     self._logit = kwargs['logit']
53     if 'key' in kwargs:
54     self._key = kwargs['key']
55     self._headers['X-Cisco-Meraki-API-Key'] = kwargs['key']
56     if 'name' in kwargs:
57     self._name = kwargs['name']
58     if 'id' in kwargs:
59     self._id = str(kwargs['id'])
60    
61     self.__dict = {}
62     self._initialized = False
63    
64     def set_key(self, key):
65     self._key = key
66     self._headers['X-Cisco-Meraki-API-Key'] = key
67    
68     def set_name(self, name):
69     self._name = name
70    
71     def _set_id(self, id):
72     self._id = id
73    
74 jclarke 20555 def __check_headers(self):
75 jclarke 20550 if self._headers['X-Cisco-Meraki-API-Key'] is None:
76     msg = 'Meraki API key is not set!'
77     if self._logit:
78     logging.error(msg)
79     else:
80     print(msg)
81    
82     return False
83    
84     return True
85    
86     def realize(self):
87     return self._check_obj()
88    
89     @staticmethod
90     def _get_json_errors(response):
91     res = ''
92     try:
93     jobj = response.json()
94     if 'errors' in jobj:
95     res = ', '.join(jobj['errors'])
96     except:
97     pass
98    
99     return res
100    
101     def _check_obj(self):
102     if self._initialized:
103     return True
104    
105     if self._id is None:
106     msg = '{} id is not set!'.format(self._class_name)
107     if self._logit:
108     logging.error(msg)
109     else:
110     print(msg)
111    
112     return False
113    
114     return self.__get_current_obj()
115    
116     def __get_current_obj(self):
117 jclarke 20555 if not self.__check_headers():
118 jclarke 20550 return False
119    
120     url = self.MERAKI_API + self._api_url_endpoint + '/' + self._id
121     try:
122     response = requests.request('GET', url, headers=self._headers)
123     response.raise_for_status()
124     except Exception as e:
125     msg = 'Error getting {} for {}: {} ({})'.format(
126     self._class_name.lower(), self._id, e, Meraki._get_json_errors(response))
127     if self._logit:
128     logging.error(msg)
129     else:
130     print(msg)
131    
132     return False
133    
134     jobj = response.json()
135     self.__populate_obj(jobj)
136     self._name = jobj[self._name_prop]
137     self._initialized = True
138     return True
139    
140     def __populate_obj(self, jobj):
141     for k, v in jobj.items():
142     self.__dict[k] = v
143    
144     def get(self, field):
145     if not self._check_obj():
146     raise Exception('Failed to initialize object!')
147    
148     if field in self.__dict:
149     return self.__dict[field]
150     else:
151     raise Exception('Field {} does not exist'.format(field))
152    
153     def set(self, field, value):
154     if not self._check_obj():
155     raise Exception('Failed to initialize object!')
156    
157     self.__dict[field] = value
158    
159     def get_organizations(self):
160 jclarke 20555 if not self.__check_headers():
161 jclarke 20550 return None
162    
163     url = self.MERAKI_API + '/organizations'
164     try:
165     response = requests.request('GET', url, headers=self._headers)
166     response.raise_for_status()
167     except Exception as e:
168     msg = 'Error getting list of organizations: {} ({})'.format(
169     e, Meraki._get_json_errors(response))
170     if self._logit:
171     logging.error(msg)
172     else:
173     print(msg)
174    
175     return None
176    
177     res = []
178     for org in response.json():
179     org_obj = Organization(
180     key=self._key, name=org[Organization._name_prop], id=org[Organization._id_prop])
181     res.append(org_obj)
182    
183     return res
184    
185    
186     class Organization(Meraki):
187     def __init__(self, **kwargs):
188     super(Organization, self).__init__(**kwargs)
189    
190     self._class_name = 'Organization'
191     self._api_url_endpoint = '/organizations'
192     self.ORG_API = self.MERAKI_API + self._api_url_endpoint
193    
194     def get_inventory(self):
195     if not self._check_obj():
196     return None
197    
198     url = self.ORG_API + '/' + self._id + '/inventory'
199    
200     try:
201     response = requests.request('GET', url, headers=self._headers)
202     response.raise_for_status()
203     except Exception as e:
204     msg = 'Error getting inventory for organization {}: {} ({})'.format(
205     self._name, e, Meraki._get_json_errors(response))
206     if self._logit:
207     logging.error(msg)
208     else:
209     print(msg)
210    
211     return None
212    
213     return response.json()
214    
215     def get_networks(self):
216     if not self._check_obj():
217     return None
218    
219     url = self.ORG_API + '/' + self._id + '/networks'
220     try:
221     response = requests.request('GET', url, headers=self._headers)
222     response.raise_for_status()
223     except Exception as e:
224     msg = 'Error getting list of networks for {}: {} ({})'.format(
225     self._name, e, Meraki._get_json_errors(response))
226     if self._logit:
227     logging.error(msg)
228     else:
229     print(msg)
230    
231     return None
232    
233     res = []
234     for n in response.json():
235     net = Network(
236     key=self._key, name=n[Network._name_prop], id=n[Network._id_prop
237     ])
238     res.append(net)
239    
240     return res
241    
242     def create_network(self, name, **kwargs):
243     if not self._check_obj():
244     return None
245    
246     payload = {
247     'name': name
248     }
249    
250     for key in ['type', 'tags', 'timezone']:
251     if key in kwargs:
252     if key == 'timezone':
253     payload['timeZone'] = kwargs[key]
254     else:
255     payload[key] = kwargs[key]
256    
257     if 'type' not in kwargs:
258     payload['type'] = 'wireless switch appliance phone'
259    
260     url = self.ORG_API + '/' + self._id + '/networks'
261     try:
262     response = requests.request(
263     'POST', url, json=payload, headers=self._headers)
264     response.raise_for_status()
265     except Exception as e:
266     msg = 'Error creating new network {} in {}: {} ({})'.format(
267     name, self._name, e, Meraki._get_json_errors(response))
268     if self._logit:
269     logging.error(msg)
270     else:
271     print(msg)
272    
273     return None
274    
275     jobj = response.json()
276     net_obj = Network(key=self._key, id=jobj['id'], name=jobj['name'])
277    
278     return net_obj
279    
280    
281     class Network(Meraki):
282     def __init__(self, **kwargs):
283     super(Network, self).__init__(**kwargs)
284    
285     self._class_name = 'Network'
286     self._api_url_endpoint = '/networks'
287     self.NET_API = self.MERAKI_API + self._api_url_endpoint
288    
289     def get_devices(self):
290     if not self._check_obj():
291     return None
292    
293     url = self.NET_API + '/' + self._id + '/devices'
294     try:
295     response = requests.request('GET', url, headers=self._headers)
296     response.raise_for_status()
297     except Exception as e:
298     msg = 'Error getting device list for network {}: {} ({})'.format(
299     self._name, e, Meraki._get_json_errors(response))
300     if self._logit:
301     logging.error(msg)
302     else:
303     print(msg)
304    
305     return None
306    
307     res = []
308     for d in response.json():
309     dev = Device(
310     key=self._key, name=d[Device._name_prop], id=d[Device._id_prop])
311     res.append(dev)
312    
313     return res
314    
315     def claim_device(self, dev):
316     if not self._check_obj():
317     return None
318    
319     url = self.NET_API + '/' + self._id + '/devices/claim'
320     payload = {
321     'serial': dev._id
322     }
323     try:
324     response = requests.request(
325     'POST', url, json=payload, headers=self._headers)
326     response.raise_for_status()
327     except Exception as e:
328     msg = 'Error claiming device {} for network {}: {} ({})'.format(
329     dev._id, self._name, e, Meraki._get_json_errors(response))
330     if self._logit:
331     logging.error(msg)
332     else:
333     print(msg)
334    
335     return None
336    
337     new_dev = Device(key=self._key, id=dev._id, net=self)
338     return new_dev
339    
340     def create_vlan(self, name, id, subnet, appliance_ip):
341     if not self._check_obj():
342     return None
343    
344     payload = {
345     'id': id,
346     'name': name,
347     'subnet': subnet,
348     'applianceIp': appliance_ip
349     }
350    
351     url = self.NET_API + '/' + self._id + '/vlans'
352     try:
353     response = requests.request(
354     'POST', url, json=payload, headers=self._headers)
355     response.raise_for_status()
356     except Exception as e:
357     msg = 'Error adding VLAN {} (ID: {}) to network {}: {} ({})'.format(
358     name, id, self._name, e, Meraki._get_json_errors(response))
359     if self._logit:
360     logging.error(msg)
361     else:
362     print(msg)
363    
364     return None
365    
366     jobj = response.json()
367     vlan_obj = Vlan(
368     key=self._key, id=jobj['id'], name=jobj['name'], net=self)
369    
370     return vlan_obj
371    
372    
373     class SSID(Meraki):
374     _id_prop = 'number'
375    
376     def __init__(self, **kwargs):
377     super(SSID, self).__init__(**kwargs)
378     if not 'net' in kwargs:
379 jclarke 20551 raise TypeError('Missing mandatory net argument!')
380 jclarke 20550
381     self._net = kwargs['net']
382    
383     self._class_name = 'SSID'
384     self._api_url_endpoint = '/networks/' + \
385     self._net.get(Network._id_prop) + '/ssids'
386     self.SSID_API = self.MERAKI_API + self._api_url_endpoint
387    
388     def update_ssid(self, **kwargs):
389     if not self._check_obj():
390     return False
391    
392     payload = {}
393     for key in ['name', 'enabled', 'auth_mode', 'encryption_mode', 'psk', 'ip_assignment_mode']:
394     if key in kwargs:
395     if key == 'auth_mode':
396     payload['authMode'] = kwargs[key]
397     elif key == 'encryption_mode':
398     payload['encryptionMode'] = kwargs[key]
399     elif key == 'ip_assignment_mode':
400     payload['ipAssignmentMode'] = kwargs[key]
401     else:
402     payload[key] = kwargs[key]
403    
404     if len(payload) == 0:
405     return False
406    
407     if 'ip_assignment_mode' not in kwargs:
408     payload['ipAssignmentMode'] = 'Bridge mode'
409    
410     url = self.SSID_API + '/' + self._id
411     try:
412     response = requests.request(
413     'PUT', url, json=payload, headers=self._headers)
414     response.raise_for_status()
415     except Exception as e:
416     msg = 'Error updating SSID properties for {}: {} ({})'.format(
417     self._id, e, Meraki._get_json_errors(response))
418     if self._logit:
419     logging.error(msg)
420     else:
421     print(msg)
422    
423     return False
424    
425     jobj = response.json()
426     for k, v in jobj.items():
427     self.set(k, v)
428    
429     return True
430    
431    
432     class Vlan(Meraki):
433     def __init__(self, **kwargs):
434     super(Vlan, self).__init__(**kwargs)
435     if not 'net' in kwargs:
436 jclarke 20551 raise TypeError('Missing mandatory net argument!')
437 jclarke 20550
438     self._net = kwargs['net']
439    
440     self._class_name = 'Vlan'
441     self._api_url_endpoint = '/networks/' + \
442     self._net.get(Network._id_prop) + '/vlans'
443     self.VLAN_API = self.MERAKI_API + self._api_url_endpoint
444    
445     def update_vlan(self, **kwargs):
446     if not self._check_obj():
447     return False
448    
449     payload = {}
450     for key in ['name', 'subnet', 'appliance_ip', 'fixed_ip_assignments', 'reserved_ip_ranges', 'dns_nameservers']:
451     if key in kwargs:
452     if key == 'appliance_ip':
453     payload['applianceIp'] = kwargs[key]
454     elif key == 'fixed_ip_assignments':
455     payload['fixedIpAssignments'] = kwargs[key]
456     elif key == 'reserved_ip_ranges':
457     payload['reservedIpRanges'] = kwargs[key]
458     elif key == 'dns_nameservers':
459     payload['dnsNameservers'] = kwargs[key]
460     else:
461     payload[key] = kwargs[key]
462    
463     if len(payload) == 0:
464     return False
465    
466     url = self.VLAN_API + '/' + self._id
467     try:
468     response = requests.request(
469     'PUT', url, json=payload, headers=self._headers)
470     response.raise_for_status()
471     except Exception as e:
472     msg = 'Error updating VLAN properties for {}: {} ({})'.format(
473     self._name, e, Meraki._get_json_errors(response))
474     if self._logit:
475     logging.error(msg)
476     else:
477     print(msg)
478    
479     return False
480    
481     jobj = response.json()
482     for k, v in jobj.items():
483     self.set(k, v)
484    
485     return True
486    
487    
488     class Device(Meraki):
489    
490     _id_prop = 'serial'
491    
492     def __init__(self, **kwargs):
493     super(Device, self).__init__(**kwargs)
494     if not 'net' in kwargs:
495 jclarke 20551 raise TypeError('Missing mandatory net argument!')
496 jclarke 20550
497     self._net = kwargs['net']
498    
499     self._class_name = 'Device'
500     self._api_url_endpoint = '/networks/' + \
501     self._net.get(Network._id_prop) + '/devices'
502     self.DEV_API = self.MERAKI_API + self._api_url_endpoint
503    
504     def remove_device(self):
505     if not self._check_obj():
506     return False
507    
508     url = self.DEV_API + '/' + self._id + '/remove'
509     try:
510     response = requests.request('POST', url, headers=self._headers)
511     response.raise_for_status()
512     except Exception as e:
513     msg = 'Failed to remove device {} for network {}: {} ({})'.format(
514     self._name, self._net.get(Network._name_prop), e, Meraki._get_json_errors(response))
515     if self._logit:
516     logging.error(msg)
517     else:
518     print(msg)
519    
520     return False
521    
522     return True
523    
524     def update_device(self, **kwargs):
525     if not self._check_obj():
526     return False
527    
528     payload = {}
529     for k, v in kwargs.items():
530     if k in ['name', 'tags', 'lat', 'lng', 'address', 'moveMapMarker']:
531     payload[k] = v
532     else:
533     msg = 'Property {} is not valid for a device'.format(k)
534     if self._logit:
535     logging.warning(msg)
536     else:
537     print(msg)
538    
539     if len(payload) == 0:
540     return False
541    
542     url = self.DEV_API + '/' + self._id
543     try:
544     response = requests.request(
545     'PUT', url, json=payload, headers=self._headers)
546     response.raise_for_status()
547     except Exception as e:
548     msg = 'Error updating device properties for {}: {} ({})'.format(
549     self._name, e, Meraki._get_json_errors(response))
550     if self._logit:
551     logging.error(msg)
552     else:
553     print(msg)
554    
555     return False
556    
557     jobj = response.json()
558     for k, v in jobj.items():
559     if k == 'moveMapMarker':
560     continue
561     self.set(k, v)
562    
563     return True
564    
565    
566     class SwitchPort(Meraki):
567     _id_prop = 'number'
568    
569     def __init__(self, **kwargs):
570     super(SwitchPort, self).__init__(**kwargs)
571     if not 'dev' in kwargs:
572 jclarke 20551 raise TypeError('Missing mandatory dev argument!')
573 jclarke 20550
574     self._dev = kwargs['dev']
575    
576     self._class_name = 'SwitchPort'
577     self._api_url_endpoint = '/devices/' + \
578     self._net.get(Network._id_prop) + '/switchPorts'
579     self.SWP_API = self.MERAKI_API + self._api_url_endpoint
580    
581     def update_switchport(self, **kwargs):
582     if not self._check_obj():
583     return False
584    
585     payload = {}
586     for key in ['name', 'tags', 'enabled', 'type', 'vlan', 'voice_vlan', 'allowed_vlans', 'poe_enabled']:
587     if key in kwargs:
588     if key == 'voice_vlan':
589     payload['voiceVlan'] = kwargs[key]
590     elif key == 'allowed_vlans':
591     payload['allowedVlans'] = kwargs[key]
592     elif key == 'poe_enabled':
593     payload['poeEnabled'] = kwargs[key]
594     else:
595     payload[key] = kwargs[key]
596    
597     if len(payload) == 0:
598     return False
599    
600     url = self.SWP_API + '/' + self._id
601     try:
602     response = requests.request(
603     'PUT', url, json=payload, headers=self._headers)
604     response.raise_for_status()
605     except Exception as e:
606     msg = 'Error updating switchport properties for {} on device {}: {} ({})'.format(
607     self._id, self._dev.get(Device._name_prop), e, Meraki._get_json_errors(response))
608     if self._logit:
609     logging.error(msg)
610     else:
611     print(msg)
612    
613     return False
614    
615     jobj = response.json()
616     for k, v in jobj.items():
617     self.set(k, v)
618    
619     return True

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.27