|
@@ -0,0 +1,622 @@
|
|
|
+#
|
|
|
+# Copyright (c) 2018 Joe Clarke <jclarke@cisco.com>
|
|
|
+#
|
|
|
+# Redistribution and use in source and binary forms, with or without
|
|
|
+# modification, are permitted provided that the following conditions
|
|
|
+# are met:
|
|
|
+# 1. Redistributions of source code must retain the above copyright
|
|
|
+# notice, this list of conditions and the following disclaimer.
|
|
|
+# 2. Redistributions in binary form must reproduce the above copyright
|
|
|
+# notice, this list of conditions and the following disclaimer in the
|
|
|
+# documentation and/or other materials provided with the distribution.
|
|
|
+#
|
|
|
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
|
|
|
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
|
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
|
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
|
|
|
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
|
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
|
|
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
|
|
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
|
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
|
|
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
|
|
+# SUCH DAMAGE.
|
|
|
+
|
|
|
+import requests
|
|
|
+import logging
|
|
|
+import json
|
|
|
+
|
|
|
+
|
|
|
+class Meraki(object):
|
|
|
+ MERAKI_API = 'https://api.meraki.com/api/v0'
|
|
|
+
|
|
|
+ _headers = {
|
|
|
+ 'X-Cisco-Meraki-API-Key': None,
|
|
|
+ 'Content-type': 'application/json'
|
|
|
+ }
|
|
|
+
|
|
|
+ _logit = False
|
|
|
+ _key = None
|
|
|
+
|
|
|
+ _name = None
|
|
|
+ _id = None
|
|
|
+
|
|
|
+ _class_name = 'Meraki'
|
|
|
+
|
|
|
+ _name_prop = 'name'
|
|
|
+ _id_prop = 'id'
|
|
|
+
|
|
|
+ _api_url_endpoint = ''
|
|
|
+
|
|
|
+ def __init__(self, **kwargs):
|
|
|
+ if 'logit' in kwargs:
|
|
|
+ self._logit = kwargs['logit']
|
|
|
+ if 'key' in kwargs:
|
|
|
+ self._key = kwargs['key']
|
|
|
+ self._headers['X-Cisco-Meraki-API-Key'] = kwargs['key']
|
|
|
+ if 'name' in kwargs:
|
|
|
+ self._name = kwargs['name']
|
|
|
+ if 'id' in kwargs:
|
|
|
+ self._id = str(kwargs['id'])
|
|
|
+
|
|
|
+ self.__dict = {}
|
|
|
+ self._initialized = False
|
|
|
+
|
|
|
+ def set_key(self, key):
|
|
|
+ self._key = key
|
|
|
+ self._headers['X-Cisco-Meraki-API-Key'] = key
|
|
|
+
|
|
|
+ def set_name(self, name):
|
|
|
+ self._name = name
|
|
|
+
|
|
|
+ def _set_id(self, id):
|
|
|
+ self._id = id
|
|
|
+
|
|
|
+ def __check_headers(self):
|
|
|
+ if self._headers['X-Cisco-Meraki-API-Key'] is None:
|
|
|
+ msg = 'Meraki API key is not set!'
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ def realize(self):
|
|
|
+ return self._check_obj()
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _get_json_errors(response):
|
|
|
+ res = ''
|
|
|
+ try:
|
|
|
+ jobj = response.json()
|
|
|
+ if 'errors' in jobj:
|
|
|
+ res = ', '.join(jobj['errors'])
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+
|
|
|
+ return res
|
|
|
+
|
|
|
+ def _check_obj(self):
|
|
|
+ if self._initialized:
|
|
|
+ return True
|
|
|
+
|
|
|
+ if self._id is None:
|
|
|
+ msg = '{} id is not set!'.format(self._class_name)
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ return self.__get_current_obj()
|
|
|
+
|
|
|
+ def __get_current_obj(self):
|
|
|
+ if not self.__check_headers():
|
|
|
+ return False
|
|
|
+
|
|
|
+ url = self.MERAKI_API + self._api_url_endpoint + '/' + self._id
|
|
|
+ try:
|
|
|
+ response = requests.request('GET', url, headers=self._headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ except Exception as e:
|
|
|
+ msg = 'Error getting {} for {}: {} ({})'.format(
|
|
|
+ self._class_name.lower(), self._id, e, Meraki._get_json_errors(response))
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ jobj = response.json()
|
|
|
+ self.__populate_obj(jobj)
|
|
|
+ # Object may not have a name yet.
|
|
|
+ if self._name_prop in jobj:
|
|
|
+ self._name = jobj[self._name_prop]
|
|
|
+
|
|
|
+ self._initialized = True
|
|
|
+ return True
|
|
|
+
|
|
|
+ def __populate_obj(self, jobj):
|
|
|
+ for k, v in jobj.items():
|
|
|
+ self.__dict[k] = v
|
|
|
+
|
|
|
+ def get(self, field):
|
|
|
+ if not self._check_obj():
|
|
|
+ raise Exception('Failed to initialize object!')
|
|
|
+
|
|
|
+ if field in self.__dict:
|
|
|
+ return self.__dict[field]
|
|
|
+ else:
|
|
|
+ raise Exception('Field {} does not exist'.format(field))
|
|
|
+
|
|
|
+ def set(self, field, value):
|
|
|
+ if not self._check_obj():
|
|
|
+ raise Exception('Failed to initialize object!')
|
|
|
+
|
|
|
+ self.__dict[field] = value
|
|
|
+
|
|
|
+ def get_organizations(self):
|
|
|
+ if not self.__check_headers():
|
|
|
+ return None
|
|
|
+
|
|
|
+ url = self.MERAKI_API + '/organizations'
|
|
|
+ try:
|
|
|
+ response = requests.request('GET', url, headers=self._headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ except Exception as e:
|
|
|
+ msg = 'Error getting list of organizations: {} ({})'.format(
|
|
|
+ e, Meraki._get_json_errors(response))
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ res = []
|
|
|
+ for org in response.json():
|
|
|
+ org_obj = Organization(
|
|
|
+ key=self._key, name=org[Organization._name_prop], id=org[Organization._id_prop])
|
|
|
+ res.append(org_obj)
|
|
|
+
|
|
|
+ return res
|
|
|
+
|
|
|
+
|
|
|
+class Organization(Meraki):
|
|
|
+ def __init__(self, **kwargs):
|
|
|
+ super(Organization, self).__init__(**kwargs)
|
|
|
+
|
|
|
+ self._class_name = 'Organization'
|
|
|
+ self._api_url_endpoint = '/organizations'
|
|
|
+ self.ORG_API = self.MERAKI_API + self._api_url_endpoint
|
|
|
+
|
|
|
+ def get_inventory(self):
|
|
|
+ if not self._check_obj():
|
|
|
+ return None
|
|
|
+
|
|
|
+ url = self.ORG_API + '/' + self._id + '/inventory'
|
|
|
+
|
|
|
+ try:
|
|
|
+ response = requests.request('GET', url, headers=self._headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ except Exception as e:
|
|
|
+ msg = 'Error getting inventory for organization {}: {} ({})'.format(
|
|
|
+ self._name, e, Meraki._get_json_errors(response))
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def get_networks(self):
|
|
|
+ if not self._check_obj():
|
|
|
+ return None
|
|
|
+
|
|
|
+ url = self.ORG_API + '/' + self._id + '/networks'
|
|
|
+ try:
|
|
|
+ response = requests.request('GET', url, headers=self._headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ except Exception as e:
|
|
|
+ msg = 'Error getting list of networks for {}: {} ({})'.format(
|
|
|
+ self._name, e, Meraki._get_json_errors(response))
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ res = []
|
|
|
+ for n in response.json():
|
|
|
+ net = Network(
|
|
|
+ key=self._key, name=n[Network._name_prop], id=n[Network._id_prop
|
|
|
+ ])
|
|
|
+ res.append(net)
|
|
|
+
|
|
|
+ return res
|
|
|
+
|
|
|
+ def create_network(self, name, **kwargs):
|
|
|
+ if not self._check_obj():
|
|
|
+ return None
|
|
|
+
|
|
|
+ payload = {
|
|
|
+ 'name': name
|
|
|
+ }
|
|
|
+
|
|
|
+ for key in ['type', 'tags', 'timezone', 'copy_from_network_id']:
|
|
|
+ if key in kwargs:
|
|
|
+ if key == 'timezone':
|
|
|
+ payload['timeZone'] = kwargs[key]
|
|
|
+ elif key == 'copy_from_network_id':
|
|
|
+ payload['copyFromNetworkId'] = kwargs[key]
|
|
|
+ else:
|
|
|
+ payload[key] = kwargs[key]
|
|
|
+
|
|
|
+ if 'type' not in kwargs:
|
|
|
+ payload['type'] = 'wireless switch appliance'
|
|
|
+
|
|
|
+ url = self.ORG_API + '/' + self._id + '/networks'
|
|
|
+ try:
|
|
|
+ response = requests.request(
|
|
|
+ 'POST', url, json=payload, headers=self._headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ except Exception as e:
|
|
|
+ msg = 'Error creating new network {} in {}: {} ({})'.format(
|
|
|
+ name, self._name, e, Meraki._get_json_errors(response))
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ jobj = response.json()
|
|
|
+ net_obj = Network(key=self._key, id=jobj['id'], name=jobj['name'])
|
|
|
+
|
|
|
+ return net_obj
|
|
|
+
|
|
|
+
|
|
|
+class Network(Meraki):
|
|
|
+ def __init__(self, **kwargs):
|
|
|
+ super(Network, self).__init__(**kwargs)
|
|
|
+
|
|
|
+ self._class_name = 'Network'
|
|
|
+ self._api_url_endpoint = '/networks'
|
|
|
+ self.NET_API = self.MERAKI_API + self._api_url_endpoint
|
|
|
+
|
|
|
+ def get_devices(self):
|
|
|
+ if not self._check_obj():
|
|
|
+ return None
|
|
|
+
|
|
|
+ url = self.NET_API + '/' + self._id + '/devices'
|
|
|
+ try:
|
|
|
+ response = requests.request('GET', url, headers=self._headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ except Exception as e:
|
|
|
+ msg = 'Error getting device list for network {}: {} ({})'.format(
|
|
|
+ self._name, e, Meraki._get_json_errors(response))
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ res = []
|
|
|
+ for d in response.json():
|
|
|
+ dev = Device(
|
|
|
+ key=self._key, name=d[Device._name_prop], id=d[Device._id_prop])
|
|
|
+ res.append(dev)
|
|
|
+
|
|
|
+ return res
|
|
|
+
|
|
|
+ def claim_device(self, dev):
|
|
|
+ if not self._check_obj():
|
|
|
+ return None
|
|
|
+
|
|
|
+ url = self.NET_API + '/' + self._id + '/devices/claim'
|
|
|
+ payload = {
|
|
|
+ 'serial': dev._id
|
|
|
+ }
|
|
|
+ try:
|
|
|
+ response = requests.request(
|
|
|
+ 'POST', url, json=payload, headers=self._headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ except Exception as e:
|
|
|
+ msg = 'Error claiming device {} for network {}: {} ({})'.format(
|
|
|
+ dev._id, self._name, e, Meraki._get_json_errors(response))
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ new_dev = Device(key=self._key, id=dev._id, net=self)
|
|
|
+ return new_dev
|
|
|
+
|
|
|
+ def create_vlan(self, name, id, subnet, appliance_ip):
|
|
|
+ if not self._check_obj():
|
|
|
+ return None
|
|
|
+
|
|
|
+ payload = {
|
|
|
+ 'id': id,
|
|
|
+ 'name': name,
|
|
|
+ 'subnet': subnet,
|
|
|
+ 'applianceIp': appliance_ip
|
|
|
+ }
|
|
|
+
|
|
|
+ url = self.NET_API + '/' + self._id + '/vlans'
|
|
|
+ try:
|
|
|
+ response = requests.request(
|
|
|
+ 'POST', url, json=payload, headers=self._headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ except Exception as e:
|
|
|
+ msg = 'Error adding VLAN {} (ID: {}) to network {}: {} ({})'.format(
|
|
|
+ name, id, self._name, e, Meraki._get_json_errors(response))
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ jobj = response.json()
|
|
|
+ vlan_obj = Vlan(
|
|
|
+ key=self._key, id=jobj['id'], name=jobj['name'], net=self)
|
|
|
+
|
|
|
+ return vlan_obj
|
|
|
+
|
|
|
+
|
|
|
+class SSID(Meraki):
|
|
|
+ _id_prop = 'number'
|
|
|
+
|
|
|
+ def __init__(self, **kwargs):
|
|
|
+ super(SSID, self).__init__(**kwargs)
|
|
|
+ if not 'net' in kwargs:
|
|
|
+ raise TypeError('Missing mandatory net argument!')
|
|
|
+
|
|
|
+ self._net = kwargs['net']
|
|
|
+
|
|
|
+ self._class_name = 'SSID'
|
|
|
+ self._api_url_endpoint = '/networks/' + \
|
|
|
+ self._net.get(Network._id_prop) + '/ssids'
|
|
|
+ self.SSID_API = self.MERAKI_API + self._api_url_endpoint
|
|
|
+
|
|
|
+ def update_ssid(self, **kwargs):
|
|
|
+ if not self._check_obj():
|
|
|
+ return False
|
|
|
+
|
|
|
+ payload = {}
|
|
|
+ for key in ['name', 'enabled', 'auth_mode', 'encryption_mode', 'psk', 'ip_assignment_mode']:
|
|
|
+ if key in kwargs:
|
|
|
+ if key == 'auth_mode':
|
|
|
+ payload['authMode'] = kwargs[key]
|
|
|
+ elif key == 'encryption_mode':
|
|
|
+ payload['encryptionMode'] = kwargs[key]
|
|
|
+ elif key == 'ip_assignment_mode':
|
|
|
+ payload['ipAssignmentMode'] = kwargs[key]
|
|
|
+ else:
|
|
|
+ payload[key] = kwargs[key]
|
|
|
+
|
|
|
+ if len(payload) == 0:
|
|
|
+ return False
|
|
|
+
|
|
|
+ if 'ip_assignment_mode' not in kwargs:
|
|
|
+ payload['ipAssignmentMode'] = 'Bridge mode'
|
|
|
+
|
|
|
+ url = self.SSID_API + '/' + self._id
|
|
|
+ try:
|
|
|
+ response = requests.request(
|
|
|
+ 'PUT', url, json=payload, headers=self._headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ except Exception as e:
|
|
|
+ msg = 'Error updating SSID properties for {}: {} ({})'.format(
|
|
|
+ self._id, e, Meraki._get_json_errors(response))
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ jobj = response.json()
|
|
|
+ for k, v in jobj.items():
|
|
|
+ self.set(k, v)
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+class Vlan(Meraki):
|
|
|
+ def __init__(self, **kwargs):
|
|
|
+ super(Vlan, self).__init__(**kwargs)
|
|
|
+ if not 'net' in kwargs:
|
|
|
+ raise TypeError('Missing mandatory net argument!')
|
|
|
+
|
|
|
+ self._net = kwargs['net']
|
|
|
+
|
|
|
+ self._class_name = 'Vlan'
|
|
|
+ self._api_url_endpoint = '/networks/' + \
|
|
|
+ self._net.get(Network._id_prop) + '/vlans'
|
|
|
+ self.VLAN_API = self.MERAKI_API + self._api_url_endpoint
|
|
|
+
|
|
|
+ def update_vlan(self, **kwargs):
|
|
|
+ if not self._check_obj():
|
|
|
+ return False
|
|
|
+
|
|
|
+ payload = {}
|
|
|
+ for key in ['name', 'subnet', 'appliance_ip', 'fixed_ip_assignments', 'reserved_ip_ranges', 'dns_nameservers']:
|
|
|
+ if key in kwargs:
|
|
|
+ if key == 'appliance_ip':
|
|
|
+ payload['applianceIp'] = kwargs[key]
|
|
|
+ elif key == 'fixed_ip_assignments':
|
|
|
+ payload['fixedIpAssignments'] = kwargs[key]
|
|
|
+ elif key == 'reserved_ip_ranges':
|
|
|
+ payload['reservedIpRanges'] = kwargs[key]
|
|
|
+ elif key == 'dns_nameservers':
|
|
|
+ payload['dnsNameservers'] = kwargs[key]
|
|
|
+ else:
|
|
|
+ payload[key] = kwargs[key]
|
|
|
+
|
|
|
+ if len(payload) == 0:
|
|
|
+ return False
|
|
|
+
|
|
|
+ url = self.VLAN_API + '/' + self._id
|
|
|
+ try:
|
|
|
+ response = requests.request(
|
|
|
+ 'PUT', url, json=payload, headers=self._headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ except Exception as e:
|
|
|
+ msg = 'Error updating VLAN properties for {}: {} ({})'.format(
|
|
|
+ self._name, e, Meraki._get_json_errors(response))
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ jobj = response.json()
|
|
|
+ for k, v in jobj.items():
|
|
|
+ self.set(k, v)
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+class Device(Meraki):
|
|
|
+
|
|
|
+ _id_prop = 'serial'
|
|
|
+
|
|
|
+ def __init__(self, **kwargs):
|
|
|
+ super(Device, self).__init__(**kwargs)
|
|
|
+ if not 'net' in kwargs:
|
|
|
+ raise TypeError('Missing mandatory net argument!')
|
|
|
+
|
|
|
+ self._net = kwargs['net']
|
|
|
+
|
|
|
+ self._class_name = 'Device'
|
|
|
+ self._api_url_endpoint = '/networks/' + \
|
|
|
+ self._net.get(Network._id_prop) + '/devices'
|
|
|
+ self.DEV_API = self.MERAKI_API + self._api_url_endpoint
|
|
|
+
|
|
|
+ def remove_device(self):
|
|
|
+ if not self._check_obj():
|
|
|
+ return False
|
|
|
+
|
|
|
+ url = self.DEV_API + '/' + self._id + '/remove'
|
|
|
+ try:
|
|
|
+ response = requests.request('POST', url, headers=self._headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ except Exception as e:
|
|
|
+ msg = 'Failed to remove device {} for network {}: {} ({})'.format(
|
|
|
+ self._name, self._net.get(Network._name_prop), e, Meraki._get_json_errors(response))
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ def update_device(self, **kwargs):
|
|
|
+ if not self._check_obj():
|
|
|
+ return False
|
|
|
+
|
|
|
+ payload = {}
|
|
|
+ for key in ['name', 'tags', 'lat', 'lng', 'address', 'move_map_marker']:
|
|
|
+ if key in kwargs:
|
|
|
+ if key == 'move_map_marker':
|
|
|
+ payload['moveMapMarker'] = kwargs[key]
|
|
|
+ else:
|
|
|
+ payload[key] = kwargs[key]
|
|
|
+
|
|
|
+ if len(payload) == 0:
|
|
|
+ return False
|
|
|
+
|
|
|
+ url = self.DEV_API + '/' + self._id
|
|
|
+ try:
|
|
|
+ response = requests.request(
|
|
|
+ 'PUT', url, json=payload, headers=self._headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ except Exception as e:
|
|
|
+ msg = 'Error updating device properties for {}: {} ({})'.format(
|
|
|
+ self._name, e, Meraki._get_json_errors(response))
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ jobj = response.json()
|
|
|
+ for k, v in jobj.items():
|
|
|
+ if k == 'moveMapMarker':
|
|
|
+ continue
|
|
|
+ self.set(k, v)
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+class SwitchPort(Meraki):
|
|
|
+ _id_prop = 'number'
|
|
|
+
|
|
|
+ def __init__(self, **kwargs):
|
|
|
+ super(SwitchPort, self).__init__(**kwargs)
|
|
|
+ if not 'dev' in kwargs:
|
|
|
+ raise TypeError('Missing mandatory dev argument!')
|
|
|
+
|
|
|
+ self._dev = kwargs['dev']
|
|
|
+
|
|
|
+ self._class_name = 'SwitchPort'
|
|
|
+ self._api_url_endpoint = '/devices/' + \
|
|
|
+ self._net.get(Network._id_prop) + '/switchPorts'
|
|
|
+ self.SWP_API = self.MERAKI_API + self._api_url_endpoint
|
|
|
+
|
|
|
+ def update_switchport(self, **kwargs):
|
|
|
+ if not self._check_obj():
|
|
|
+ return False
|
|
|
+
|
|
|
+ payload = {}
|
|
|
+ for key in ['name', 'tags', 'enabled', 'type', 'vlan', 'voice_vlan', 'allowed_vlans', 'poe_enabled']:
|
|
|
+ if key in kwargs:
|
|
|
+ if key == 'voice_vlan':
|
|
|
+ payload['voiceVlan'] = kwargs[key]
|
|
|
+ elif key == 'allowed_vlans':
|
|
|
+ payload['allowedVlans'] = kwargs[key]
|
|
|
+ elif key == 'poe_enabled':
|
|
|
+ payload['poeEnabled'] = kwargs[key]
|
|
|
+ else:
|
|
|
+ payload[key] = kwargs[key]
|
|
|
+
|
|
|
+ if len(payload) == 0:
|
|
|
+ return False
|
|
|
+
|
|
|
+ url = self.SWP_API + '/' + self._id
|
|
|
+ try:
|
|
|
+ response = requests.request(
|
|
|
+ 'PUT', url, json=payload, headers=self._headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ except Exception as e:
|
|
|
+ msg = 'Error updating switchport properties for {} on device {}: {} ({})'.format(
|
|
|
+ self._id, self._dev.get(Device._name_prop), e, Meraki._get_json_errors(response))
|
|
|
+ if self._logit:
|
|
|
+ logging.error(msg)
|
|
|
+ else:
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ jobj = response.json()
|
|
|
+ for k, v in jobj.items():
|
|
|
+ self.set(k, v)
|
|
|
+
|
|
|
+ return True
|