Browse Source

Break this code out instead of leaving it in init.

Joe Clarke 3 years ago
parent
commit
e88333ad91
3 changed files with 601 additions and 600 deletions
  1. 1 588
      meraki_api/__init__.py
  2. 588 0
      meraki_api/meraki_api.py
  3. 12 12
      setup-meraki-nets.py

+ 1 - 588
meraki_api/__init__.py

@@ -1,588 +1 @@
-from __future__ import print_function
-
-#
-# Copyright (c) 2018-2020  Joe Clarke <jclarke@cisco.com>
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-# 1. Redistributions of source code must retain the above copyright
-#    notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-#    notice, this list of conditions and the following disclaimer in the
-#    documentation and/or other materials provided with the distribution.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
-
-from builtins import str
-from builtins import object
-import requests
-import logging
-
-
-class Meraki(object):
-    MERAKI_API = "https://api.meraki.com/api/v0"
-
-    _headers = {"X-Cisco-Meraki-API-Key": None, "Content-type": "application/json"}
-
-    _logit = False
-    _key = None
-
-    _name = None
-    _id = None
-
-    _class_name = "Meraki"
-
-    _name_prop = "name"
-    _id_prop = "id"
-
-    _api_url_endpoint = ""
-
-    def __init__(self, **kwargs):
-        if "logit" in kwargs:
-            self._logit = kwargs["logit"]
-        if "key" in kwargs:
-            self._key = kwargs["key"]
-            self._headers["X-Cisco-Meraki-API-Key"] = kwargs["key"]
-        if "name" in kwargs:
-            self._name = kwargs["name"]
-        if "id" in kwargs:
-            self._id = str(kwargs["id"])
-
-        self.__dict = {}
-        self._initialized = False
-
-    def set_key(self, key):
-        self._key = key
-        self._headers["X-Cisco-Meraki-API-Key"] = key
-
-    def set_name(self, name):
-        self._name = name
-
-    def _set_id(self, id):
-        self._id = id
-
-    def __check_headers(self):
-        if self._headers["X-Cisco-Meraki-API-Key"] is None:
-            msg = "Meraki API key is not set!"
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-            return False
-
-        return True
-
-    def realize(self):
-        return self._check_obj()
-
-    @staticmethod
-    def _get_json_errors(response):
-        res = ""
-        try:
-            jobj = response.json()
-            if "errors" in jobj:
-                res = ", ".join(jobj["errors"])
-        except Exception:
-            pass
-
-        return res
-
-    def _check_obj(self):
-        if self._initialized:
-            return True
-
-        if self._id is None:
-            msg = "{} id is not set!".format(self._class_name)
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-            return False
-
-        return self.__get_current_obj()
-
-    def __get_current_obj(self):
-        if not self.__check_headers():
-            return False
-
-        url = self.MERAKI_API + self._api_url_endpoint + "/" + self._id
-        try:
-            response = requests.request("GET", url, headers=self._headers)
-            response.raise_for_status()
-        except Exception as e:
-            msg = "Error getting {} for {}: {} ({})".format(self._class_name.lower(), self._id, e, Meraki._get_json_errors(response))
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-            return False
-
-        jobj = response.json()
-        self.__populate_obj(jobj)
-        # Object may not have a name yet.
-        if self._name_prop in jobj:
-            self._name = jobj[self._name_prop]
-
-        self._initialized = True
-        return True
-
-    def __populate_obj(self, jobj):
-        for k, v in list(jobj.items()):
-            self.__dict[k] = v
-
-    def get(self, field):
-        if not self._check_obj():
-            raise Exception("Failed to initialize object!")
-
-        if field in self.__dict:
-            return self.__dict[field]
-        else:
-            raise Exception("Field {} does not exist".format(field))
-
-    def set(self, field, value):
-        if not self._check_obj():
-            raise Exception("Failed to initialize object!")
-
-        self.__dict[field] = value
-
-    def get_organizations(self):
-        if not self.__check_headers():
-            return None
-
-        url = self.MERAKI_API + "/organizations"
-        try:
-            response = requests.request("GET", url, headers=self._headers)
-            response.raise_for_status()
-        except Exception as e:
-            msg = "Error getting list of organizations: {} ({})".format(e, Meraki._get_json_errors(response))
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-            return None
-
-        res = []
-        for org in response.json():
-            org_obj = Organization(key=self._key, name=org[Organization._name_prop], id=org[Organization._id_prop])
-            res.append(org_obj)
-
-        return res
-
-
-class Organization(Meraki):
-    def __init__(self, **kwargs):
-        super(Organization, self).__init__(**kwargs)
-
-        self._class_name = "Organization"
-        self._api_url_endpoint = "/organizations"
-        self.ORG_API = self.MERAKI_API + self._api_url_endpoint
-
-    def get_inventory(self):
-        if not self._check_obj():
-            return None
-
-        url = self.ORG_API + "/" + self._id + "/inventory"
-
-        try:
-            response = requests.request("GET", url, headers=self._headers)
-            response.raise_for_status()
-        except Exception as e:
-            msg = "Error getting inventory for organization {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-            return None
-
-        return response.json()
-
-    def get_networks(self):
-        if not self._check_obj():
-            return None
-
-        url = self.ORG_API + "/" + self._id + "/networks"
-        try:
-            response = requests.request("GET", url, headers=self._headers)
-            response.raise_for_status()
-        except Exception as e:
-            msg = "Error getting list of networks for {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-            return None
-
-        res = []
-        for n in response.json():
-            net = Network(key=self._key, name=n[Network._name_prop], id=n[Network._id_prop])
-            res.append(net)
-
-        return res
-
-    def create_network(self, name, **kwargs):
-        if not self._check_obj():
-            return None
-
-        payload = {"name": name}
-
-        for key in ["type", "tags", "timezone", "copy_from_network_id"]:
-            if key in kwargs:
-                if key == "timezone":
-                    payload["timeZone"] = kwargs[key]
-                elif key == "copy_from_network_id":
-                    payload["copyFromNetworkId"] = kwargs[key]
-                else:
-                    payload[key] = kwargs[key]
-
-        if "type" not in kwargs:
-            payload["type"] = "wireless switch appliance"
-
-        url = self.ORG_API + "/" + self._id + "/networks"
-        try:
-            response = requests.request("POST", url, json=payload, headers=self._headers)
-            response.raise_for_status()
-        except Exception as e:
-            msg = "Error creating new network {} in {}: {} ({})".format(name, self._name, e, Meraki._get_json_errors(response))
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-            return None
-
-        jobj = response.json()
-        net_obj = Network(key=self._key, id=jobj["id"], name=jobj["name"])
-
-        return net_obj
-
-
-class Network(Meraki):
-    def __init__(self, **kwargs):
-        super(Network, self).__init__(**kwargs)
-
-        self._class_name = "Network"
-        self._api_url_endpoint = "/networks"
-        self.NET_API = self.MERAKI_API + self._api_url_endpoint
-
-    def get_devices(self):
-        if not self._check_obj():
-            return None
-
-        url = self.NET_API + "/" + self._id + "/devices"
-        try:
-            response = requests.request("GET", url, headers=self._headers)
-            response.raise_for_status()
-        except Exception as e:
-            msg = "Error getting device list for network {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-            return None
-
-        res = []
-        for d in response.json():
-            dev = Device(key=self._key, name=d[Device._name_prop], id=d[Device._id_prop])
-            res.append(dev)
-
-        return res
-
-    def claim_device(self, dev):
-        if not self._check_obj():
-            return None
-
-        url = self.NET_API + "/" + self._id + "/devices/claim"
-        payload = {"serial": dev._id}
-        try:
-            response = requests.request("POST", url, json=payload, headers=self._headers)
-            response.raise_for_status()
-        except Exception as e:
-            msg = "Error claiming device {} for network {}: {} ({})".format(dev._id, self._name, e, Meraki._get_json_errors(response))
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-            return None
-
-        new_dev = Device(key=self._key, id=dev._id, net=self)
-        return new_dev
-
-    def create_vlan(self, name, id, subnet, appliance_ip):
-        if not self._check_obj():
-            return None
-
-        payload = {"id": id, "name": name, "subnet": subnet, "applianceIp": appliance_ip}
-
-        url = self.NET_API + "/" + self._id + "/vlans"
-        try:
-            response = requests.request("POST", url, json=payload, headers=self._headers)
-            response.raise_for_status()
-        except Exception as e:
-            msg = "Error adding VLAN {} (ID: {}) to network {}: {} ({})".format(name, id, self._name, e, Meraki._get_json_errors(response))
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-            return None
-
-        jobj = response.json()
-        vlan_obj = Vlan(key=self._key, id=jobj["id"], name=jobj["name"], net=self)
-
-        return vlan_obj
-
-
-class SSID(Meraki):
-    _id_prop = "number"
-
-    def __init__(self, **kwargs):
-        super(SSID, self).__init__(**kwargs)
-        if "net" not in kwargs:
-            raise TypeError("Missing mandatory net argument!")
-
-        self._net = kwargs["net"]
-
-        self._class_name = "SSID"
-        self._api_url_endpoint = "/networks/" + self._net.get(Network._id_prop) + "/ssids"
-        self.SSID_API = self.MERAKI_API + self._api_url_endpoint
-
-    def update_ssid(self, **kwargs):
-        if not self._check_obj():
-            return False
-
-        payload = {}
-        for key in ["name", "enabled", "auth_mode", "encryption_mode", "psk", "ip_assignment_mode"]:
-            if key in kwargs:
-                if key == "auth_mode":
-                    payload["authMode"] = kwargs[key]
-                elif key == "encryption_mode":
-                    payload["encryptionMode"] = kwargs[key]
-                elif key == "ip_assignment_mode":
-                    payload["ipAssignmentMode"] = kwargs[key]
-                else:
-                    payload[key] = kwargs[key]
-
-        if len(payload) == 0:
-            return False
-
-        if "ip_assignment_mode" not in kwargs:
-            payload["ipAssignmentMode"] = "Bridge mode"
-
-        url = self.SSID_API + "/" + self._id
-        try:
-            response = requests.request("PUT", url, json=payload, headers=self._headers)
-            response.raise_for_status()
-        except Exception as e:
-            msg = "Error updating SSID properties for {}: {} ({})".format(self._id, e, Meraki._get_json_errors(response))
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-                return False
-
-        jobj = response.json()
-        for k, v in list(jobj.items()):
-            self.set(k, v)
-
-        return True
-
-
-class Vlan(Meraki):
-    def __init__(self, **kwargs):
-        super(Vlan, self).__init__(**kwargs)
-        if "net" not in kwargs:
-            raise TypeError("Missing mandatory net argument!")
-
-        self._net = kwargs["net"]
-
-        self._class_name = "Vlan"
-        self._api_url_endpoint = "/networks/" + self._net.get(Network._id_prop) + "/vlans"
-        self.VLAN_API = self.MERAKI_API + self._api_url_endpoint
-
-    def update_vlan(self, **kwargs):
-        if not self._check_obj():
-            return False
-
-        payload = {}
-        for key in ["name", "subnet", "appliance_ip", "fixed_ip_assignments", "reserved_ip_ranges", "dns_nameservers"]:
-            if key in kwargs:
-                if key == "appliance_ip":
-                    payload["applianceIp"] = kwargs[key]
-                elif key == "fixed_ip_assignments":
-                    payload["fixedIpAssignments"] = kwargs[key]
-                elif key == "reserved_ip_ranges":
-                    payload["reservedIpRanges"] = kwargs[key]
-                elif key == "dns_nameservers":
-                    payload["dnsNameservers"] = kwargs[key]
-                else:
-                    payload[key] = kwargs[key]
-
-        if len(payload) == 0:
-            return False
-
-        url = self.VLAN_API + "/" + self._id
-        try:
-            response = requests.request("PUT", url, json=payload, headers=self._headers)
-            response.raise_for_status()
-        except Exception as e:
-            msg = "Error updating VLAN properties for {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-                return False
-
-        jobj = response.json()
-        for k, v in list(jobj.items()):
-            self.set(k, v)
-
-        return True
-
-
-class Device(Meraki):
-
-    _id_prop = "serial"
-
-    def __init__(self, **kwargs):
-        super(Device, self).__init__(**kwargs)
-        if "net" not in kwargs:
-            raise TypeError("Missing mandatory net argument!")
-
-        self._net = kwargs["net"]
-
-        self._class_name = "Device"
-        self._api_url_endpoint = "/networks/" + self._net.get(Network._id_prop) + "/devices"
-        self.DEV_API = self.MERAKI_API + self._api_url_endpoint
-
-    def remove_device(self):
-        if not self._check_obj():
-            return False
-
-        url = self.DEV_API + "/" + self._id + "/remove"
-        try:
-            response = requests.request("POST", url, headers=self._headers)
-            response.raise_for_status()
-        except Exception as e:
-            msg = "Failed to remove device {} for network {}: {} ({})".format(
-                self._name, self._net.get(Network._name_prop), e, Meraki._get_json_errors(response)
-            )
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-            return False
-
-        return True
-
-    def update_device(self, **kwargs):
-        if not self._check_obj():
-            return False
-
-        payload = {}
-        for key in ["name", "tags", "lat", "lng", "address", "move_map_marker"]:
-            if key in kwargs:
-                if key == "move_map_marker":
-                    payload["moveMapMarker"] = kwargs[key]
-                else:
-                    payload[key] = kwargs[key]
-
-        if len(payload) == 0:
-            return False
-
-        url = self.DEV_API + "/" + self._id
-        try:
-            response = requests.request("PUT", url, json=payload, headers=self._headers)
-            response.raise_for_status()
-        except Exception as e:
-            msg = "Error updating device properties for {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-                return False
-
-        jobj = response.json()
-        for k, v in list(jobj.items()):
-            if k == "moveMapMarker":
-                continue
-            self.set(k, v)
-
-        return True
-
-
-class SwitchPort(Meraki):
-    _id_prop = "number"
-
-    def __init__(self, **kwargs):
-        super(SwitchPort, self).__init__(**kwargs)
-        if "dev" not in kwargs:
-            raise TypeError("Missing mandatory dev argument!")
-
-        self._dev = kwargs["dev"]
-
-        self._class_name = "SwitchPort"
-        self._api_url_endpoint = "/devices/" + self._net.get(Network._id_prop) + "/switchPorts"
-        self.SWP_API = self.MERAKI_API + self._api_url_endpoint
-
-    def update_switchport(self, **kwargs):
-        if not self._check_obj():
-            return False
-
-        payload = {}
-        for key in ["name", "tags", "enabled", "type", "vlan", "voice_vlan", "allowed_vlans", "poe_enabled"]:
-            if key in kwargs:
-                if key == "voice_vlan":
-                    payload["voiceVlan"] = kwargs[key]
-                elif key == "allowed_vlans":
-                    payload["allowedVlans"] = kwargs[key]
-                elif key == "poe_enabled":
-                    payload["poeEnabled"] = kwargs[key]
-                else:
-                    payload[key] = kwargs[key]
-
-        if len(payload) == 0:
-            return False
-
-        url = self.SWP_API + "/" + self._id
-        try:
-            response = requests.request("PUT", url, json=payload, headers=self._headers)
-            response.raise_for_status()
-        except Exception as e:
-            msg = "Error updating switchport properties for {} on device {}: {} ({})".format(
-                self._id, self._dev.get(Device._name_prop), e, Meraki._get_json_errors(response)
-            )
-            if self._logit:
-                logging.error(msg)
-            else:
-                print(msg)
-
-                return False
-
-        jobj = response.json()
-        for k, v in list(jobj.items()):
-            self.set(k, v)
-
-        return True
+from .meraki_api import Meraki, Organization, Network, Vlan, SwitchPort, SSID, Device  # noqa

+ 588 - 0
meraki_api/meraki_api.py

@@ -0,0 +1,588 @@
+from __future__ import print_function
+
+#
+# Copyright (c) 2018-2020  Joe Clarke <jclarke@cisco.com>
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+#    notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+#    notice, this list of conditions and the following disclaimer in the
+#    documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
+
+from builtins import str
+from builtins import object
+import requests
+import logging
+
+
+class Meraki(object):
+    MERAKI_API = "https://api.meraki.com/api/v0"
+
+    _headers = {"X-Cisco-Meraki-API-Key": None, "Content-type": "application/json"}
+
+    _logit = False
+    _key = None
+
+    _name = None
+    _id = None
+
+    _class_name = "Meraki"
+
+    _name_prop = "name"
+    _id_prop = "id"
+
+    _api_url_endpoint = ""
+
+    def __init__(self, **kwargs):
+        if "logit" in kwargs:
+            self._logit = kwargs["logit"]
+        if "key" in kwargs:
+            self._key = kwargs["key"]
+            self._headers["X-Cisco-Meraki-API-Key"] = kwargs["key"]
+        if "name" in kwargs:
+            self._name = kwargs["name"]
+        if "id" in kwargs:
+            self._id = str(kwargs["id"])
+
+        self.__dict = {}
+        self._initialized = False
+
+    def set_key(self, key):
+        self._key = key
+        self._headers["X-Cisco-Meraki-API-Key"] = key
+
+    def set_name(self, name):
+        self._name = name
+
+    def _set_id(self, id):
+        self._id = id
+
+    def __check_headers(self):
+        if self._headers["X-Cisco-Meraki-API-Key"] is None:
+            msg = "Meraki API key is not set!"
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+            return False
+
+        return True
+
+    def realize(self):
+        return self._check_obj()
+
+    @staticmethod
+    def _get_json_errors(response):
+        res = ""
+        try:
+            jobj = response.json()
+            if "errors" in jobj:
+                res = ", ".join(jobj["errors"])
+        except Exception:
+            pass
+
+        return res
+
+    def _check_obj(self):
+        if self._initialized:
+            return True
+
+        if self._id is None:
+            msg = "{} id is not set!".format(self._class_name)
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+            return False
+
+        return self.__get_current_obj()
+
+    def __get_current_obj(self):
+        if not self.__check_headers():
+            return False
+
+        url = self.MERAKI_API + self._api_url_endpoint + "/" + self._id
+        try:
+            response = requests.request("GET", url, headers=self._headers)
+            response.raise_for_status()
+        except Exception as e:
+            msg = "Error getting {} for {}: {} ({})".format(self._class_name.lower(), self._id, e, Meraki._get_json_errors(response))
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+            return False
+
+        jobj = response.json()
+        self.__populate_obj(jobj)
+        # Object may not have a name yet.
+        if self._name_prop in jobj:
+            self._name = jobj[self._name_prop]
+
+        self._initialized = True
+        return True
+
+    def __populate_obj(self, jobj):
+        for k, v in list(jobj.items()):
+            self.__dict[k] = v
+
+    def get(self, field):
+        if not self._check_obj():
+            raise Exception("Failed to initialize object!")
+
+        if field in self.__dict:
+            return self.__dict[field]
+        else:
+            raise Exception("Field {} does not exist".format(field))
+
+    def set(self, field, value):
+        if not self._check_obj():
+            raise Exception("Failed to initialize object!")
+
+        self.__dict[field] = value
+
+    def get_organizations(self):
+        if not self.__check_headers():
+            return None
+
+        url = self.MERAKI_API + "/organizations"
+        try:
+            response = requests.request("GET", url, headers=self._headers)
+            response.raise_for_status()
+        except Exception as e:
+            msg = "Error getting list of organizations: {} ({})".format(e, Meraki._get_json_errors(response))
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+            return None
+
+        res = []
+        for org in response.json():
+            org_obj = Organization(key=self._key, name=org[Organization._name_prop], id=org[Organization._id_prop])
+            res.append(org_obj)
+
+        return res
+
+
+class Organization(Meraki):
+    def __init__(self, **kwargs):
+        super(Organization, self).__init__(**kwargs)
+
+        self._class_name = "Organization"
+        self._api_url_endpoint = "/organizations"
+        self.ORG_API = self.MERAKI_API + self._api_url_endpoint
+
+    def get_inventory(self):
+        if not self._check_obj():
+            return None
+
+        url = self.ORG_API + "/" + self._id + "/inventory"
+
+        try:
+            response = requests.request("GET", url, headers=self._headers)
+            response.raise_for_status()
+        except Exception as e:
+            msg = "Error getting inventory for organization {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+            return None
+
+        return response.json()
+
+    def get_networks(self):
+        if not self._check_obj():
+            return None
+
+        url = self.ORG_API + "/" + self._id + "/networks"
+        try:
+            response = requests.request("GET", url, headers=self._headers)
+            response.raise_for_status()
+        except Exception as e:
+            msg = "Error getting list of networks for {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+            return None
+
+        res = []
+        for n in response.json():
+            net = Network(key=self._key, name=n[Network._name_prop], id=n[Network._id_prop])
+            res.append(net)
+
+        return res
+
+    def create_network(self, name, **kwargs):
+        if not self._check_obj():
+            return None
+
+        payload = {"name": name}
+
+        for key in ["type", "tags", "timezone", "copy_from_network_id"]:
+            if key in kwargs:
+                if key == "timezone":
+                    payload["timeZone"] = kwargs[key]
+                elif key == "copy_from_network_id":
+                    payload["copyFromNetworkId"] = kwargs[key]
+                else:
+                    payload[key] = kwargs[key]
+
+        if "type" not in kwargs:
+            payload["type"] = "wireless switch appliance"
+
+        url = self.ORG_API + "/" + self._id + "/networks"
+        try:
+            response = requests.request("POST", url, json=payload, headers=self._headers)
+            response.raise_for_status()
+        except Exception as e:
+            msg = "Error creating new network {} in {}: {} ({})".format(name, self._name, e, Meraki._get_json_errors(response))
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+            return None
+
+        jobj = response.json()
+        net_obj = Network(key=self._key, id=jobj["id"], name=jobj["name"])
+
+        return net_obj
+
+
+class Network(Meraki):
+    def __init__(self, **kwargs):
+        super(Network, self).__init__(**kwargs)
+
+        self._class_name = "Network"
+        self._api_url_endpoint = "/networks"
+        self.NET_API = self.MERAKI_API + self._api_url_endpoint
+
+    def get_devices(self):
+        if not self._check_obj():
+            return None
+
+        url = self.NET_API + "/" + self._id + "/devices"
+        try:
+            response = requests.request("GET", url, headers=self._headers)
+            response.raise_for_status()
+        except Exception as e:
+            msg = "Error getting device list for network {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+            return None
+
+        res = []
+        for d in response.json():
+            dev = Device(key=self._key, name=d[Device._name_prop], id=d[Device._id_prop])
+            res.append(dev)
+
+        return res
+
+    def claim_device(self, dev):
+        if not self._check_obj():
+            return None
+
+        url = self.NET_API + "/" + self._id + "/devices/claim"
+        payload = {"serial": dev._id}
+        try:
+            response = requests.request("POST", url, json=payload, headers=self._headers)
+            response.raise_for_status()
+        except Exception as e:
+            msg = "Error claiming device {} for network {}: {} ({})".format(dev._id, self._name, e, Meraki._get_json_errors(response))
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+            return None
+
+        new_dev = Device(key=self._key, id=dev._id, net=self)
+        return new_dev
+
+    def create_vlan(self, name, id, subnet, appliance_ip):
+        if not self._check_obj():
+            return None
+
+        payload = {"id": id, "name": name, "subnet": subnet, "applianceIp": appliance_ip}
+
+        url = self.NET_API + "/" + self._id + "/vlans"
+        try:
+            response = requests.request("POST", url, json=payload, headers=self._headers)
+            response.raise_for_status()
+        except Exception as e:
+            msg = "Error adding VLAN {} (ID: {}) to network {}: {} ({})".format(name, id, self._name, e, Meraki._get_json_errors(response))
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+            return None
+
+        jobj = response.json()
+        vlan_obj = Vlan(key=self._key, id=jobj["id"], name=jobj["name"], net=self)
+
+        return vlan_obj
+
+
+class SSID(Meraki):
+    _id_prop = "number"
+
+    def __init__(self, **kwargs):
+        super(SSID, self).__init__(**kwargs)
+        if "net" not in kwargs:
+            raise TypeError("Missing mandatory net argument!")
+
+        self._net = kwargs["net"]
+
+        self._class_name = "SSID"
+        self._api_url_endpoint = "/networks/" + self._net.get(Network._id_prop) + "/ssids"
+        self.SSID_API = self.MERAKI_API + self._api_url_endpoint
+
+    def update_ssid(self, **kwargs):
+        if not self._check_obj():
+            return False
+
+        payload = {}
+        for key in ["name", "enabled", "auth_mode", "encryption_mode", "psk", "ip_assignment_mode"]:
+            if key in kwargs:
+                if key == "auth_mode":
+                    payload["authMode"] = kwargs[key]
+                elif key == "encryption_mode":
+                    payload["encryptionMode"] = kwargs[key]
+                elif key == "ip_assignment_mode":
+                    payload["ipAssignmentMode"] = kwargs[key]
+                else:
+                    payload[key] = kwargs[key]
+
+        if len(payload) == 0:
+            return False
+
+        if "ip_assignment_mode" not in kwargs:
+            payload["ipAssignmentMode"] = "Bridge mode"
+
+        url = self.SSID_API + "/" + self._id
+        try:
+            response = requests.request("PUT", url, json=payload, headers=self._headers)
+            response.raise_for_status()
+        except Exception as e:
+            msg = "Error updating SSID properties for {}: {} ({})".format(self._id, e, Meraki._get_json_errors(response))
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+                return False
+
+        jobj = response.json()
+        for k, v in list(jobj.items()):
+            self.set(k, v)
+
+        return True
+
+
+class Vlan(Meraki):
+    def __init__(self, **kwargs):
+        super(Vlan, self).__init__(**kwargs)
+        if "net" not in kwargs:
+            raise TypeError("Missing mandatory net argument!")
+
+        self._net = kwargs["net"]
+
+        self._class_name = "Vlan"
+        self._api_url_endpoint = "/networks/" + self._net.get(Network._id_prop) + "/vlans"
+        self.VLAN_API = self.MERAKI_API + self._api_url_endpoint
+
+    def update_vlan(self, **kwargs):
+        if not self._check_obj():
+            return False
+
+        payload = {}
+        for key in ["name", "subnet", "appliance_ip", "fixed_ip_assignments", "reserved_ip_ranges", "dns_nameservers"]:
+            if key in kwargs:
+                if key == "appliance_ip":
+                    payload["applianceIp"] = kwargs[key]
+                elif key == "fixed_ip_assignments":
+                    payload["fixedIpAssignments"] = kwargs[key]
+                elif key == "reserved_ip_ranges":
+                    payload["reservedIpRanges"] = kwargs[key]
+                elif key == "dns_nameservers":
+                    payload["dnsNameservers"] = kwargs[key]
+                else:
+                    payload[key] = kwargs[key]
+
+        if len(payload) == 0:
+            return False
+
+        url = self.VLAN_API + "/" + self._id
+        try:
+            response = requests.request("PUT", url, json=payload, headers=self._headers)
+            response.raise_for_status()
+        except Exception as e:
+            msg = "Error updating VLAN properties for {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+                return False
+
+        jobj = response.json()
+        for k, v in list(jobj.items()):
+            self.set(k, v)
+
+        return True
+
+
+class Device(Meraki):
+
+    _id_prop = "serial"
+
+    def __init__(self, **kwargs):
+        super(Device, self).__init__(**kwargs)
+        if "net" not in kwargs:
+            raise TypeError("Missing mandatory net argument!")
+
+        self._net = kwargs["net"]
+
+        self._class_name = "Device"
+        self._api_url_endpoint = "/networks/" + self._net.get(Network._id_prop) + "/devices"
+        self.DEV_API = self.MERAKI_API + self._api_url_endpoint
+
+    def remove_device(self):
+        if not self._check_obj():
+            return False
+
+        url = self.DEV_API + "/" + self._id + "/remove"
+        try:
+            response = requests.request("POST", url, headers=self._headers)
+            response.raise_for_status()
+        except Exception as e:
+            msg = "Failed to remove device {} for network {}: {} ({})".format(
+                self._name, self._net.get(Network._name_prop), e, Meraki._get_json_errors(response)
+            )
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+            return False
+
+        return True
+
+    def update_device(self, **kwargs):
+        if not self._check_obj():
+            return False
+
+        payload = {}
+        for key in ["name", "tags", "lat", "lng", "address", "move_map_marker"]:
+            if key in kwargs:
+                if key == "move_map_marker":
+                    payload["moveMapMarker"] = kwargs[key]
+                else:
+                    payload[key] = kwargs[key]
+
+        if len(payload) == 0:
+            return False
+
+        url = self.DEV_API + "/" + self._id
+        try:
+            response = requests.request("PUT", url, json=payload, headers=self._headers)
+            response.raise_for_status()
+        except Exception as e:
+            msg = "Error updating device properties for {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+                return False
+
+        jobj = response.json()
+        for k, v in list(jobj.items()):
+            if k == "moveMapMarker":
+                continue
+            self.set(k, v)
+
+        return True
+
+
+class SwitchPort(Meraki):
+    _id_prop = "number"
+
+    def __init__(self, **kwargs):
+        super(SwitchPort, self).__init__(**kwargs)
+        if "dev" not in kwargs:
+            raise TypeError("Missing mandatory dev argument!")
+
+        self._dev = kwargs["dev"]
+
+        self._class_name = "SwitchPort"
+        self._api_url_endpoint = "/devices/" + self._net.get(Network._id_prop) + "/switchPorts"
+        self.SWP_API = self.MERAKI_API + self._api_url_endpoint
+
+    def update_switchport(self, **kwargs):
+        if not self._check_obj():
+            return False
+
+        payload = {}
+        for key in ["name", "tags", "enabled", "type", "vlan", "voice_vlan", "allowed_vlans", "poe_enabled"]:
+            if key in kwargs:
+                if key == "voice_vlan":
+                    payload["voiceVlan"] = kwargs[key]
+                elif key == "allowed_vlans":
+                    payload["allowedVlans"] = kwargs[key]
+                elif key == "poe_enabled":
+                    payload["poeEnabled"] = kwargs[key]
+                else:
+                    payload[key] = kwargs[key]
+
+        if len(payload) == 0:
+            return False
+
+        url = self.SWP_API + "/" + self._id
+        try:
+            response = requests.request("PUT", url, json=payload, headers=self._headers)
+            response.raise_for_status()
+        except Exception as e:
+            msg = "Error updating switchport properties for {} on device {}: {} ({})".format(
+                self._id, self._dev.get(Device._name_prop), e, Meraki._get_json_errors(response)
+            )
+            if self._logit:
+                logging.error(msg)
+            else:
+                print(msg)
+
+                return False
+
+        jobj = response.json()
+        for k, v in list(jobj.items()):
+            self.set(k, v)
+
+        return True

+ 12 - 12
setup-meraki-nets.py

@@ -26,7 +26,7 @@
 
 from __future__ import print_function
 from builtins import input
-import meraki_api
+from meraki_api import Meraki, Network, Vlan, SwitchPort, SSID, Device
 import yaml
 import argparse
 import sys
@@ -59,7 +59,7 @@ def main():
             print("Invalid config: {} is missing!".format(key))
             sys.exit(1)
 
-    meraki = meraki_api.Meraki(key=config["api_key"])
+    meraki = Meraki(key=config["api_key"])
     orgs = meraki.get_organizations()
     org = None
     for o in orgs:
@@ -132,8 +132,8 @@ def main():
                     dev_obj = None
                     if inv_dev[0]["networkId"] is not None and inv_dev[0]["networkId"] != net_obj.get("id"):
                         try:
-                            inv_net_obj = meraki_api.Network(key=config["api_key"], id=inv_dev[0]["networkId"])
-                            dev_obj = meraki_api.Device(key=config["api_key"], id=inv_dev[0]["serial"], net=inv_net_obj)
+                            inv_net_obj = Network(key=config["api_key"], id=inv_dev[0]["networkId"])
+                            dev_obj = Device(key=config["api_key"], id=inv_dev[0]["serial"], net=inv_net_obj)
 
                             res = dev_obj.remove_device()
                             if not res:
@@ -166,7 +166,7 @@ def main():
                             continue
                     elif inv_dev[0]["networkId"] is None:
                         try:
-                            dev_obj = meraki_api.Device(key=config["api_key"], id=inv_dev[0]["serial"], net=net_obj)
+                            dev_obj = Device(key=config["api_key"], id=inv_dev[0]["serial"], net=net_obj)
                             res = net_obj.claim_device(dev_obj)
                             if not res:
                                 print("{}Error claiming device {}{}".format(Fore.RED, inv_dev[0]["serial"], Style.RESET_ALL))
@@ -180,7 +180,7 @@ def main():
                             continue
 
                     else:
-                        dev_obj = meraki_api.Device(key=config["api_key"], id=inv_dev[0]["serial"], net=net_obj)
+                        dev_obj = Device(key=config["api_key"], id=inv_dev[0]["serial"], net=net_obj)
                         print("{}ok: {} is in network{}".format(Fore.GREEN, inv_dev[0]["serial"], Style.RESET_ALL))
 
                     dev_location = net[nname]["address"]
@@ -220,7 +220,7 @@ def main():
                         Fore.YELLOW, vname, vlan[vname]["id"], vlan[vname]["subnet"], vlan[vname]["appliance_ip"], Style.RESET_ALL
                     )
                 else:
-                    vlan_obj = meraki_api.Vlan(key=config["api_key"], id=1, net=net_obj)
+                    vlan_obj = Vlan(key=config["api_key"], id=1, net=net_obj)
                     done_msg = "{}ok: VLAN with ID {} exists{}".format(Fore.GREEN, vlan[vname]["id"], Style.RESET_ALL)
                 if vlan_obj is None:
                     print(
@@ -251,7 +251,7 @@ def main():
                 si = 0
                 for ssid in net[nname]["ssids"]:
                     sname = list(ssid.keys())[0]
-                    ssid_obj = meraki_api.SSID(key=config["api_key"], id=si, name=sname, net=net_obj)
+                    ssid_obj = SSID(key=config["api_key"], id=si, name=sname, net=net_obj)
                     sargs = {}
                     for key in ["name", "enabled", "auth_mode", "encryption_mode", "psk", "ip_assignment_mode"]:
                         if key in ssid[sname]:
@@ -268,7 +268,7 @@ def main():
         if "switches" in net[nname]:
             for switch in net[nname]["switches"]:
                 serial = list(switch.keys())[0]
-                dev_obj = meraki_api.Device(key=config["api_key"], id=serial, net=net_obj)
+                dev_obj = Device(key=config["api_key"], id=serial, net=net_obj)
                 if not dev_obj.realize():
                     print("{}Device {} is not in network {}{}".format(Fore.RED, serial, net_obj.get("name"), Style.RESET_ALL))
                     nerrors += 1
@@ -278,14 +278,14 @@ def main():
                     port_range = list(switchport.keys())[0]
                     ports = []
                     if isinstance(port_range, int):
-                        port_obj = meraki_api.SwitchPort(key=config["api_key"], id=port_range, dev=dev_obj)
+                        port_obj = SwitchPort(key=config["api_key"], id=port_range, dev=dev_obj)
                         ports.append(port_obj)
                     else:
                         prs = port_range.split(",")
                         for pr in prs:
                             pr = pr.strip()
                             if isinstance(pr, int):
-                                port_obj = meraki_api.SwitchPort(key=config["api_key"], id=pr, dev=dev_obj)
+                                port_obj = SwitchPort(key=config["api_key"], id=pr, dev=dev_obj)
                                 ports.append(pr)
                             else:
                                 if "-" not in pr:
@@ -312,7 +312,7 @@ def main():
 
                                 pi = start
                                 while pi <= end:
-                                    port_obj = meraki_api.SwitchPort(key=config["api_key"], id=pi, dev=dev_obj)
+                                    port_obj = SwitchPort(key=config["api_key"], id=pi, dev=dev_obj)
                                     ports.append(port_obj)
                                     pi += 1