meraki_api.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. from __future__ import print_function
  2. #
  3. # Copyright (c) 2018-2020 Joe Clarke <jclarke@cisco.com>
  4. #
  5. # Redistribution and use in source and binary forms, with or without
  6. # modification, are permitted provided that the following conditions
  7. # are met:
  8. # 1. Redistributions of source code must retain the above copyright
  9. # notice, this list of conditions and the following disclaimer.
  10. # 2. Redistributions in binary form must reproduce the above copyright
  11. # notice, this list of conditions and the following disclaimer in the
  12. # documentation and/or other materials provided with the distribution.
  13. #
  14. # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
  15. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  16. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  17. # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  18. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  19. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  20. # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  21. # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  22. # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  23. # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  24. # SUCH DAMAGE.
  25. from builtins import str
  26. from builtins import object
  27. import requests
  28. import logging
  29. class Meraki(object):
  30. MERAKI_API = "https://api.meraki.com/api/v0"
  31. _headers = {"X-Cisco-Meraki-API-Key": None, "Content-type": "application/json"}
  32. _logit = False
  33. _key = None
  34. _name = None
  35. _id = None
  36. _class_name = "Meraki"
  37. _name_prop = "name"
  38. _id_prop = "id"
  39. _api_url_endpoint = ""
  40. def __init__(self, **kwargs):
  41. if "logit" in kwargs:
  42. self._logit = kwargs["logit"]
  43. if "key" in kwargs:
  44. self._key = kwargs["key"]
  45. self._headers["X-Cisco-Meraki-API-Key"] = kwargs["key"]
  46. if "name" in kwargs:
  47. self._name = kwargs["name"]
  48. if "id" in kwargs:
  49. self._id = str(kwargs["id"])
  50. self.__dict = {}
  51. self._initialized = False
  52. def set_key(self, key):
  53. self._key = key
  54. self._headers["X-Cisco-Meraki-API-Key"] = key
  55. def set_name(self, name):
  56. self._name = name
  57. def _set_id(self, id):
  58. self._id = id
  59. def __check_headers(self):
  60. if self._headers["X-Cisco-Meraki-API-Key"] is None:
  61. msg = "Meraki API key is not set!"
  62. if self._logit:
  63. logging.error(msg)
  64. else:
  65. print(msg)
  66. return False
  67. return True
  68. def realize(self):
  69. return self._check_obj()
  70. @staticmethod
  71. def _get_json_errors(response):
  72. res = ""
  73. try:
  74. jobj = response.json()
  75. if "errors" in jobj:
  76. res = ", ".join(jobj["errors"])
  77. except Exception:
  78. pass
  79. return res
  80. def _check_obj(self):
  81. if self._initialized:
  82. return True
  83. if self._id is None:
  84. msg = "{} id is not set!".format(self._class_name)
  85. if self._logit:
  86. logging.error(msg)
  87. else:
  88. print(msg)
  89. return False
  90. return self.__get_current_obj()
  91. def __get_current_obj(self):
  92. if not self.__check_headers():
  93. return False
  94. url = self.MERAKI_API + self._api_url_endpoint + "/" + self._id
  95. try:
  96. response = requests.request("GET", url, headers=self._headers)
  97. response.raise_for_status()
  98. except Exception as e:
  99. msg = "Error getting {} for {}: {} ({})".format(self._class_name.lower(), self._id, e, Meraki._get_json_errors(response))
  100. if self._logit:
  101. logging.error(msg)
  102. else:
  103. print(msg)
  104. return False
  105. jobj = response.json()
  106. self.__populate_obj(jobj)
  107. # Object may not have a name yet.
  108. if self._name_prop in jobj:
  109. self._name = jobj[self._name_prop]
  110. self._initialized = True
  111. return True
  112. def __populate_obj(self, jobj):
  113. for k, v in list(jobj.items()):
  114. self.__dict[k] = v
  115. def get(self, field):
  116. if not self._check_obj():
  117. raise Exception("Failed to initialize object!")
  118. if field in self.__dict:
  119. return self.__dict[field]
  120. else:
  121. raise Exception("Field {} does not exist".format(field))
  122. def set(self, field, value):
  123. if not self._check_obj():
  124. raise Exception("Failed to initialize object!")
  125. self.__dict[field] = value
  126. def get_organizations(self):
  127. if not self.__check_headers():
  128. return None
  129. url = self.MERAKI_API + "/organizations"
  130. try:
  131. response = requests.request("GET", url, headers=self._headers)
  132. response.raise_for_status()
  133. except Exception as e:
  134. msg = "Error getting list of organizations: {} ({})".format(e, Meraki._get_json_errors(response))
  135. if self._logit:
  136. logging.error(msg)
  137. else:
  138. print(msg)
  139. return None
  140. res = []
  141. for org in response.json():
  142. org_obj = Organization(key=self._key, name=org[Organization._name_prop], id=org[Organization._id_prop])
  143. res.append(org_obj)
  144. return res
  145. class Organization(Meraki):
  146. def __init__(self, **kwargs):
  147. super(Organization, self).__init__(**kwargs)
  148. self._class_name = "Organization"
  149. self._api_url_endpoint = "/organizations"
  150. self.ORG_API = self.MERAKI_API + self._api_url_endpoint
  151. def get_inventory(self):
  152. if not self._check_obj():
  153. return None
  154. url = self.ORG_API + "/" + self._id + "/inventory"
  155. try:
  156. response = requests.request("GET", url, headers=self._headers)
  157. response.raise_for_status()
  158. except Exception as e:
  159. msg = "Error getting inventory for organization {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
  160. if self._logit:
  161. logging.error(msg)
  162. else:
  163. print(msg)
  164. return None
  165. return response.json()
  166. def get_networks(self):
  167. if not self._check_obj():
  168. return None
  169. url = self.ORG_API + "/" + self._id + "/networks"
  170. try:
  171. response = requests.request("GET", url, headers=self._headers)
  172. response.raise_for_status()
  173. except Exception as e:
  174. msg = "Error getting list of networks for {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
  175. if self._logit:
  176. logging.error(msg)
  177. else:
  178. print(msg)
  179. return None
  180. res = []
  181. for n in response.json():
  182. net = Network(key=self._key, name=n[Network._name_prop], id=n[Network._id_prop])
  183. res.append(net)
  184. return res
  185. def create_network(self, name, **kwargs):
  186. if not self._check_obj():
  187. return None
  188. payload = {"name": name}
  189. for key in ["type", "tags", "timezone", "copy_from_network_id"]:
  190. if key in kwargs:
  191. if key == "timezone":
  192. payload["timeZone"] = kwargs[key]
  193. elif key == "copy_from_network_id":
  194. payload["copyFromNetworkId"] = kwargs[key]
  195. else:
  196. payload[key] = kwargs[key]
  197. if "type" not in kwargs:
  198. payload["type"] = "wireless switch appliance"
  199. url = self.ORG_API + "/" + self._id + "/networks"
  200. try:
  201. response = requests.request("POST", url, json=payload, headers=self._headers)
  202. response.raise_for_status()
  203. except Exception as e:
  204. msg = "Error creating new network {} in {}: {} ({})".format(name, self._name, e, Meraki._get_json_errors(response))
  205. if self._logit:
  206. logging.error(msg)
  207. else:
  208. print(msg)
  209. return None
  210. jobj = response.json()
  211. net_obj = Network(key=self._key, id=jobj["id"], name=jobj["name"])
  212. return net_obj
  213. class Network(Meraki):
  214. def __init__(self, **kwargs):
  215. super(Network, self).__init__(**kwargs)
  216. self._class_name = "Network"
  217. self._api_url_endpoint = "/networks"
  218. self.NET_API = self.MERAKI_API + self._api_url_endpoint
  219. def get_devices(self):
  220. if not self._check_obj():
  221. return None
  222. url = self.NET_API + "/" + self._id + "/devices"
  223. try:
  224. response = requests.request("GET", url, headers=self._headers)
  225. response.raise_for_status()
  226. except Exception as e:
  227. msg = "Error getting device list for network {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
  228. if self._logit:
  229. logging.error(msg)
  230. else:
  231. print(msg)
  232. return None
  233. res = []
  234. for d in response.json():
  235. dev = Device(key=self._key, name=d[Device._name_prop], id=d[Device._id_prop])
  236. res.append(dev)
  237. return res
  238. def claim_device(self, dev):
  239. if not self._check_obj():
  240. return None
  241. url = self.NET_API + "/" + self._id + "/devices/claim"
  242. payload = {"serial": dev._id}
  243. try:
  244. response = requests.request("POST", url, json=payload, headers=self._headers)
  245. response.raise_for_status()
  246. except Exception as e:
  247. msg = "Error claiming device {} for network {}: {} ({})".format(dev._id, self._name, e, Meraki._get_json_errors(response))
  248. if self._logit:
  249. logging.error(msg)
  250. else:
  251. print(msg)
  252. return None
  253. new_dev = Device(key=self._key, id=dev._id, net=self)
  254. return new_dev
  255. def create_vlan(self, name, id, subnet, appliance_ip):
  256. if not self._check_obj():
  257. return None
  258. payload = {"id": id, "name": name, "subnet": subnet, "applianceIp": appliance_ip}
  259. url = self.NET_API + "/" + self._id + "/vlans"
  260. try:
  261. response = requests.request("POST", url, json=payload, headers=self._headers)
  262. response.raise_for_status()
  263. except Exception as e:
  264. msg = "Error adding VLAN {} (ID: {}) to network {}: {} ({})".format(name, id, self._name, e, Meraki._get_json_errors(response))
  265. if self._logit:
  266. logging.error(msg)
  267. else:
  268. print(msg)
  269. return None
  270. jobj = response.json()
  271. vlan_obj = Vlan(key=self._key, id=jobj["id"], name=jobj["name"], net=self)
  272. return vlan_obj
  273. class SSID(Meraki):
  274. _id_prop = "number"
  275. def __init__(self, **kwargs):
  276. super(SSID, self).__init__(**kwargs)
  277. if "net" not in kwargs:
  278. raise TypeError("Missing mandatory net argument!")
  279. self._net = kwargs["net"]
  280. self._class_name = "SSID"
  281. self._api_url_endpoint = "/networks/" + self._net.get(Network._id_prop) + "/ssids"
  282. self.SSID_API = self.MERAKI_API + self._api_url_endpoint
  283. def update_ssid(self, **kwargs):
  284. if not self._check_obj():
  285. return False
  286. payload = {}
  287. for key in ["name", "enabled", "auth_mode", "encryption_mode", "psk", "ip_assignment_mode"]:
  288. if key in kwargs:
  289. if key == "auth_mode":
  290. payload["authMode"] = kwargs[key]
  291. elif key == "encryption_mode":
  292. payload["encryptionMode"] = kwargs[key]
  293. elif key == "ip_assignment_mode":
  294. payload["ipAssignmentMode"] = kwargs[key]
  295. else:
  296. payload[key] = kwargs[key]
  297. if len(payload) == 0:
  298. return False
  299. if "ip_assignment_mode" not in kwargs:
  300. payload["ipAssignmentMode"] = "Bridge mode"
  301. url = self.SSID_API + "/" + self._id
  302. try:
  303. response = requests.request("PUT", url, json=payload, headers=self._headers)
  304. response.raise_for_status()
  305. except Exception as e:
  306. msg = "Error updating SSID properties for {}: {} ({})".format(self._id, e, Meraki._get_json_errors(response))
  307. if self._logit:
  308. logging.error(msg)
  309. else:
  310. print(msg)
  311. return False
  312. jobj = response.json()
  313. for k, v in list(jobj.items()):
  314. self.set(k, v)
  315. return True
  316. class Vlan(Meraki):
  317. def __init__(self, **kwargs):
  318. super(Vlan, self).__init__(**kwargs)
  319. if "net" not in kwargs:
  320. raise TypeError("Missing mandatory net argument!")
  321. self._net = kwargs["net"]
  322. self._class_name = "Vlan"
  323. self._api_url_endpoint = "/networks/" + self._net.get(Network._id_prop) + "/vlans"
  324. self.VLAN_API = self.MERAKI_API + self._api_url_endpoint
  325. def update_vlan(self, **kwargs):
  326. if not self._check_obj():
  327. return False
  328. payload = {}
  329. for key in ["name", "subnet", "appliance_ip", "fixed_ip_assignments", "reserved_ip_ranges", "dns_nameservers"]:
  330. if key in kwargs:
  331. if key == "appliance_ip":
  332. payload["applianceIp"] = kwargs[key]
  333. elif key == "fixed_ip_assignments":
  334. payload["fixedIpAssignments"] = kwargs[key]
  335. elif key == "reserved_ip_ranges":
  336. payload["reservedIpRanges"] = kwargs[key]
  337. elif key == "dns_nameservers":
  338. payload["dnsNameservers"] = kwargs[key]
  339. else:
  340. payload[key] = kwargs[key]
  341. if len(payload) == 0:
  342. return False
  343. url = self.VLAN_API + "/" + self._id
  344. try:
  345. response = requests.request("PUT", url, json=payload, headers=self._headers)
  346. response.raise_for_status()
  347. except Exception as e:
  348. msg = "Error updating VLAN properties for {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
  349. if self._logit:
  350. logging.error(msg)
  351. else:
  352. print(msg)
  353. return False
  354. jobj = response.json()
  355. for k, v in list(jobj.items()):
  356. self.set(k, v)
  357. return True
  358. class Device(Meraki):
  359. _id_prop = "serial"
  360. def __init__(self, **kwargs):
  361. super(Device, self).__init__(**kwargs)
  362. if "net" not in kwargs:
  363. raise TypeError("Missing mandatory net argument!")
  364. self._net = kwargs["net"]
  365. self._class_name = "Device"
  366. self._api_url_endpoint = "/networks/" + self._net.get(Network._id_prop) + "/devices"
  367. self.DEV_API = self.MERAKI_API + self._api_url_endpoint
  368. def remove_device(self):
  369. if not self._check_obj():
  370. return False
  371. url = self.DEV_API + "/" + self._id + "/remove"
  372. try:
  373. response = requests.request("POST", url, headers=self._headers)
  374. response.raise_for_status()
  375. except Exception as e:
  376. msg = "Failed to remove device {} for network {}: {} ({})".format(
  377. self._name, self._net.get(Network._name_prop), e, Meraki._get_json_errors(response)
  378. )
  379. if self._logit:
  380. logging.error(msg)
  381. else:
  382. print(msg)
  383. return False
  384. return True
  385. def update_device(self, **kwargs):
  386. if not self._check_obj():
  387. return False
  388. payload = {}
  389. for key in ["name", "tags", "lat", "lng", "address", "move_map_marker"]:
  390. if key in kwargs:
  391. if key == "move_map_marker":
  392. payload["moveMapMarker"] = kwargs[key]
  393. else:
  394. payload[key] = kwargs[key]
  395. if len(payload) == 0:
  396. return False
  397. url = self.DEV_API + "/" + self._id
  398. try:
  399. response = requests.request("PUT", url, json=payload, headers=self._headers)
  400. response.raise_for_status()
  401. except Exception as e:
  402. msg = "Error updating device properties for {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
  403. if self._logit:
  404. logging.error(msg)
  405. else:
  406. print(msg)
  407. return False
  408. jobj = response.json()
  409. for k, v in list(jobj.items()):
  410. if k == "moveMapMarker":
  411. continue
  412. self.set(k, v)
  413. return True
  414. class SwitchPort(Meraki):
  415. _id_prop = "number"
  416. def __init__(self, **kwargs):
  417. super(SwitchPort, self).__init__(**kwargs)
  418. if "dev" not in kwargs:
  419. raise TypeError("Missing mandatory dev argument!")
  420. self._dev = kwargs["dev"]
  421. self._class_name = "SwitchPort"
  422. self._api_url_endpoint = "/devices/" + self._net.get(Network._id_prop) + "/switchPorts"
  423. self.SWP_API = self.MERAKI_API + self._api_url_endpoint
  424. def update_switchport(self, **kwargs):
  425. if not self._check_obj():
  426. return False
  427. payload = {}
  428. for key in ["name", "tags", "enabled", "type", "vlan", "voice_vlan", "allowed_vlans", "poe_enabled"]:
  429. if key in kwargs:
  430. if key == "voice_vlan":
  431. payload["voiceVlan"] = kwargs[key]
  432. elif key == "allowed_vlans":
  433. payload["allowedVlans"] = kwargs[key]
  434. elif key == "poe_enabled":
  435. payload["poeEnabled"] = kwargs[key]
  436. else:
  437. payload[key] = kwargs[key]
  438. if len(payload) == 0:
  439. return False
  440. url = self.SWP_API + "/" + self._id
  441. try:
  442. response = requests.request("PUT", url, json=payload, headers=self._headers)
  443. response.raise_for_status()
  444. except Exception as e:
  445. msg = "Error updating switchport properties for {} on device {}: {} ({})".format(
  446. self._id, self._dev.get(Device._name_prop), e, Meraki._get_json_errors(response)
  447. )
  448. if self._logit:
  449. logging.error(msg)
  450. else:
  451. print(msg)
  452. return False
  453. jobj = response.json()
  454. for k, v in list(jobj.items()):
  455. self.set(k, v)
  456. return True