meraki_api.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
  1. from __future__ import print_function
  2. #
  3. # Copyright (c) 2018-2021 Joe Clarke <jclarke@cisco.com>
  4. #
  5. # Redistribution and use in source and binary forms, with or without
  6. # modification, are permitted provided that the following conditions
  7. # are met:
  8. # 1. Redistributions of source code must retain the above copyright
  9. # notice, this list of conditions and the following disclaimer.
  10. # 2. Redistributions in binary form must reproduce the above copyright
  11. # notice, this list of conditions and the following disclaimer in the
  12. # documentation and/or other materials provided with the distribution.
  13. #
  14. # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
  15. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  16. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  17. # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  18. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  19. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  20. # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  21. # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  22. # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  23. # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  24. # SUCH DAMAGE.
  25. from builtins import str
  26. from builtins import object
  27. import requests
  28. import logging
  29. class Meraki(object):
  30. MERAKI_API = "https://api.meraki.com/api/v0"
  31. _headers = {"X-Cisco-Meraki-API-Key": None, "Content-type": "application/json"}
  32. _logit = False
  33. _key = None
  34. _name = None
  35. _id = None
  36. _class_name = "Meraki"
  37. _name_prop = "name"
  38. _id_prop = "id"
  39. _api_url_endpoint = ""
  40. def __init__(self, **kwargs):
  41. if "logit" in kwargs:
  42. self._logit = kwargs["logit"]
  43. if "key" in kwargs:
  44. self._key = kwargs["key"]
  45. self._headers["X-Cisco-Meraki-API-Key"] = kwargs["key"]
  46. if "name" in kwargs:
  47. self._name = kwargs["name"]
  48. if "id" in kwargs:
  49. self._id = str(kwargs["id"])
  50. self.__dict = {}
  51. self._initialized = False
  52. def set_key(self, key):
  53. self._key = key
  54. self._headers["X-Cisco-Meraki-API-Key"] = key
  55. def set_name(self, name):
  56. self._name = name
  57. def _set_id(self, id):
  58. self._id = id
  59. def __check_headers(self):
  60. if self._headers["X-Cisco-Meraki-API-Key"] is None:
  61. msg = "Meraki API key is not set!"
  62. if self._logit:
  63. logging.error(msg)
  64. else:
  65. print(msg)
  66. return False
  67. return True
  68. def realize(self):
  69. return self._check_obj()
  70. @staticmethod
  71. def _get_json_errors(response):
  72. res = ""
  73. try:
  74. jobj = response.json()
  75. if "errors" in jobj:
  76. res = ", ".join(jobj["errors"])
  77. except Exception:
  78. pass
  79. return res
  80. def _check_obj(self):
  81. if self._initialized:
  82. return True
  83. if self._id is None:
  84. msg = "{} id is not set!".format(self._class_name)
  85. if self._logit:
  86. logging.error(msg)
  87. else:
  88. print(msg)
  89. return False
  90. return self.__get_current_obj()
  91. def __get_current_obj(self):
  92. if not self.__check_headers():
  93. return False
  94. url = self.MERAKI_API + self._api_url_endpoint + "/" + self._id
  95. try:
  96. response = requests.request("GET", url, headers=self._headers)
  97. response.raise_for_status()
  98. except Exception as e:
  99. msg = "Error getting {} for {}: {} ({})".format(self._class_name.lower(), self._id, e, Meraki._get_json_errors(response))
  100. if self._logit:
  101. logging.error(msg)
  102. else:
  103. print(msg)
  104. return False
  105. jobj = response.json()
  106. self.__populate_obj(jobj)
  107. # Object may not have a name yet.
  108. if self._name_prop in jobj:
  109. self._name = jobj[self._name_prop]
  110. self._initialized = True
  111. return True
  112. def __populate_obj(self, jobj):
  113. for k, v in list(jobj.items()):
  114. self.__dict[k] = v
  115. def get(self, field):
  116. if not self._check_obj():
  117. raise Exception("Failed to initialize object!")
  118. if field in self.__dict:
  119. return self.__dict[field]
  120. else:
  121. raise Exception("Field {} does not exist".format(field))
  122. def set(self, field, value):
  123. if not self._check_obj():
  124. raise Exception("Failed to initialize object!")
  125. self.__dict[field] = value
  126. def get_organizations(self):
  127. if not self.__check_headers():
  128. return None
  129. url = self.MERAKI_API + "/organizations"
  130. try:
  131. response = requests.request("GET", url, headers=self._headers)
  132. response.raise_for_status()
  133. except Exception as e:
  134. msg = "Error getting list of organizations: {} ({})".format(e, Meraki._get_json_errors(response))
  135. if self._logit:
  136. logging.error(msg)
  137. else:
  138. print(msg)
  139. return None
  140. res = []
  141. for org in response.json():
  142. org_obj = Organization(key=self._key, name=org[Organization._name_prop], id=org[Organization._id_prop])
  143. res.append(org_obj)
  144. return res
  145. class Organization(Meraki):
  146. def __init__(self, **kwargs):
  147. super(Organization, self).__init__(**kwargs)
  148. self._class_name = "Organization"
  149. self._api_url_endpoint = "/organizations"
  150. self.ORG_API = self.MERAKI_API + self._api_url_endpoint
  151. def get_inventory(self):
  152. if not self._check_obj():
  153. return None
  154. url = self.ORG_API + "/" + self._id + "/inventory"
  155. try:
  156. response = requests.request("GET", url, headers=self._headers)
  157. response.raise_for_status()
  158. except Exception as e:
  159. msg = "Error getting inventory for organization {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
  160. if self._logit:
  161. logging.error(msg)
  162. else:
  163. print(msg)
  164. return None
  165. return response.json()
  166. def get_networks(self):
  167. if not self._check_obj():
  168. return None
  169. url = self.ORG_API + "/" + self._id + "/networks"
  170. try:
  171. response = requests.request("GET", url, headers=self._headers)
  172. response.raise_for_status()
  173. except Exception as e:
  174. msg = "Error getting list of networks for {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
  175. if self._logit:
  176. logging.error(msg)
  177. else:
  178. print(msg)
  179. return None
  180. res = []
  181. for n in response.json():
  182. net = Network(key=self._key, name=n[Network._name_prop], id=n[Network._id_prop])
  183. res.append(net)
  184. return res
  185. def create_network(self, name, **kwargs):
  186. if not self._check_obj():
  187. return None
  188. payload = {"name": name}
  189. for key in ["type", "tags", "timezone", "copy_from_network_id"]:
  190. if key in kwargs:
  191. if key == "timezone":
  192. payload["timeZone"] = kwargs[key]
  193. elif key == "copy_from_network_id":
  194. payload["copyFromNetworkId"] = kwargs[key]
  195. else:
  196. payload[key] = kwargs[key]
  197. if "type" not in kwargs:
  198. payload["type"] = "wireless switch appliance"
  199. url = self.ORG_API + "/" + self._id + "/networks"
  200. try:
  201. response = requests.request("POST", url, json=payload, headers=self._headers)
  202. response.raise_for_status()
  203. except Exception as e:
  204. msg = "Error creating new network {} in {}: {} ({})".format(name, self._name, e, Meraki._get_json_errors(response))
  205. if self._logit:
  206. logging.error(msg)
  207. else:
  208. print(msg)
  209. return None
  210. jobj = response.json()
  211. net_obj = Network(key=self._key, id=jobj["id"], name=jobj["name"])
  212. return net_obj
  213. class Network(Meraki):
  214. def __init__(self, **kwargs):
  215. super(Network, self).__init__(**kwargs)
  216. self._class_name = "Network"
  217. self._api_url_endpoint = "/networks"
  218. self.NET_API = self.MERAKI_API + self._api_url_endpoint
  219. def get_devices(self):
  220. if not self._check_obj():
  221. return None
  222. url = self.NET_API + "/" + self._id + "/devices"
  223. try:
  224. response = requests.request("GET", url, headers=self._headers)
  225. response.raise_for_status()
  226. except Exception as e:
  227. msg = "Error getting device list for network {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
  228. if self._logit:
  229. logging.error(msg)
  230. else:
  231. print(msg)
  232. return None
  233. res = []
  234. for d in response.json():
  235. dev = Device(key=self._key, name=d[Device._name_prop], id=d[Device._id_prop])
  236. res.append(dev)
  237. return res
  238. def claim_device(self, dev):
  239. if not self._check_obj():
  240. return None
  241. url = self.NET_API + "/" + self._id + "/devices/claim"
  242. payload = {"serial": dev._id}
  243. try:
  244. response = requests.request("POST", url, json=payload, headers=self._headers)
  245. response.raise_for_status()
  246. except Exception as e:
  247. msg = "Error claiming device {} for network {}: {} ({})".format(dev._id, self._name, e, Meraki._get_json_errors(response))
  248. if self._logit:
  249. logging.error(msg)
  250. else:
  251. print(msg)
  252. return None
  253. new_dev = Device(key=self._key, id=dev._id, net=self)
  254. return new_dev
  255. def create_vlan(self, name, id, subnet, appliance_ip):
  256. if not self._check_obj():
  257. return None
  258. payload = {"id": id, "name": name, "subnet": subnet, "applianceIp": appliance_ip}
  259. url = self.NET_API + "/" + self._id + "/vlans"
  260. try:
  261. response = requests.request("POST", url, json=payload, headers=self._headers)
  262. response.raise_for_status()
  263. except Exception as e:
  264. msg = "Error adding VLAN {} (ID: {}) to network {}: {} ({})".format(name, id, self._name, e, Meraki._get_json_errors(response))
  265. if self._logit:
  266. logging.error(msg)
  267. else:
  268. print(msg)
  269. return None
  270. jobj = response.json()
  271. vlan_obj = Vlan(key=self._key, id=jobj["id"], name=jobj["name"], net=self)
  272. return vlan_obj
  273. def apply_shaping(self, client_limit):
  274. if not self._check_obj():
  275. return False
  276. payload = {"globalBandwidthLimits": {"limitUp": client_limit, "limitDown": client_limit}}
  277. url = self.NET_API + "/" + self._id + "/networks/appliance/trafficShaping"
  278. try:
  279. response = requests.request("POST", url, json=payload, headers=self._headers)
  280. response.raise_for_status()
  281. except Exception as e:
  282. msg = "Error setting traffic shaping limit in network {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
  283. if self._logit:
  284. logging.error(msg)
  285. else:
  286. print(msg)
  287. return False
  288. return True
  289. class SSID(Meraki):
  290. _id_prop = "number"
  291. def __init__(self, **kwargs):
  292. super(SSID, self).__init__(**kwargs)
  293. if "net" not in kwargs:
  294. raise TypeError("Missing mandatory net argument!")
  295. self._net = kwargs["net"]
  296. self._class_name = "SSID"
  297. self._api_url_endpoint = "/networks/" + self._net.get(Network._id_prop) + "/ssids"
  298. self.SSID_API = self.MERAKI_API + self._api_url_endpoint
  299. def update_ssid(self, **kwargs):
  300. if not self._check_obj():
  301. return False
  302. payload = {}
  303. for key in ["name", "enabled", "auth_mode", "encryption_mode", "psk", "ip_assignment_mode", "tags"]:
  304. if key in kwargs:
  305. if key == "auth_mode":
  306. payload["authMode"] = kwargs[key]
  307. elif key == "encryption_mode":
  308. payload["encryptionMode"] = kwargs[key]
  309. elif key == "ip_assignment_mode":
  310. payload["ipAssignmentMode"] = kwargs[key]
  311. elif key == "tags":
  312. payload["availabilityTags"] = kwargs[key]
  313. payload["availableOnAllAps"] = False
  314. else:
  315. payload[key] = kwargs[key]
  316. if len(payload) == 0:
  317. return False
  318. if "ip_assignment_mode" not in kwargs:
  319. payload["ipAssignmentMode"] = "Bridge mode"
  320. url = self.SSID_API + "/" + self._id
  321. try:
  322. response = requests.request("PUT", url, json=payload, headers=self._headers)
  323. response.raise_for_status()
  324. except Exception as e:
  325. msg = "Error updating SSID properties for {}: {} ({})".format(self._id, e, Meraki._get_json_errors(response))
  326. if self._logit:
  327. logging.error(msg)
  328. else:
  329. print(msg)
  330. return False
  331. jobj = response.json()
  332. for k, v in list(jobj.items()):
  333. self.set(k, v)
  334. return True
  335. class Vlan(Meraki):
  336. def __init__(self, **kwargs):
  337. super(Vlan, self).__init__(**kwargs)
  338. if "net" not in kwargs:
  339. raise TypeError("Missing mandatory net argument!")
  340. self._net = kwargs["net"]
  341. self._class_name = "Vlan"
  342. self._api_url_endpoint = "/networks/" + self._net.get(Network._id_prop) + "/vlans"
  343. self.VLAN_API = self.MERAKI_API + self._api_url_endpoint
  344. def update_vlan(self, **kwargs):
  345. if not self._check_obj():
  346. return False
  347. payload = {}
  348. for key in ["name", "subnet", "appliance_ip", "fixed_ip_assignments", "reserved_ip_ranges", "dns_nameservers"]:
  349. if key in kwargs:
  350. if key == "appliance_ip":
  351. payload["applianceIp"] = kwargs[key]
  352. elif key == "fixed_ip_assignments":
  353. payload["fixedIpAssignments"] = kwargs[key]
  354. elif key == "reserved_ip_ranges":
  355. payload["reservedIpRanges"] = kwargs[key]
  356. elif key == "dns_nameservers":
  357. payload["dnsNameservers"] = kwargs[key]
  358. else:
  359. payload[key] = kwargs[key]
  360. if len(payload) == 0:
  361. return False
  362. url = self.VLAN_API + "/" + self._id
  363. try:
  364. response = requests.request("PUT", url, json=payload, headers=self._headers)
  365. response.raise_for_status()
  366. except Exception as e:
  367. msg = "Error updating VLAN properties for {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
  368. if self._logit:
  369. logging.error(msg)
  370. else:
  371. print(msg)
  372. return False
  373. jobj = response.json()
  374. for k, v in list(jobj.items()):
  375. self.set(k, v)
  376. return True
  377. class Device(Meraki):
  378. _id_prop = "serial"
  379. def __init__(self, **kwargs):
  380. super(Device, self).__init__(**kwargs)
  381. if "net" not in kwargs:
  382. raise TypeError("Missing mandatory net argument!")
  383. self._net = kwargs["net"]
  384. self._class_name = "Device"
  385. self._api_url_endpoint = "/networks/" + self._net.get(Network._id_prop) + "/devices"
  386. self.DEV_API = self.MERAKI_API + self._api_url_endpoint
  387. def remove_device(self):
  388. if not self._check_obj():
  389. return False
  390. url = self.DEV_API + "/" + self._id + "/remove"
  391. try:
  392. response = requests.request("POST", url, headers=self._headers)
  393. response.raise_for_status()
  394. except Exception as e:
  395. msg = "Failed to remove device {} for network {}: {} ({})".format(
  396. self._name, self._net.get(Network._name_prop), e, Meraki._get_json_errors(response)
  397. )
  398. if self._logit:
  399. logging.error(msg)
  400. else:
  401. print(msg)
  402. return False
  403. return True
  404. def update_device(self, **kwargs):
  405. if not self._check_obj():
  406. return False
  407. payload = {}
  408. for key in ["name", "tags", "lat", "lng", "address", "move_map_marker"]:
  409. if key in kwargs:
  410. if key == "move_map_marker":
  411. payload["moveMapMarker"] = kwargs[key]
  412. else:
  413. payload[key] = kwargs[key]
  414. if len(payload) == 0:
  415. return False
  416. url = self.DEV_API + "/" + self._id
  417. try:
  418. response = requests.request("PUT", url, json=payload, headers=self._headers)
  419. response.raise_for_status()
  420. except Exception as e:
  421. msg = "Error updating device properties for {}: {} ({})".format(self._name, e, Meraki._get_json_errors(response))
  422. if self._logit:
  423. logging.error(msg)
  424. else:
  425. print(msg)
  426. return False
  427. jobj = response.json()
  428. for k, v in list(jobj.items()):
  429. if k == "moveMapMarker":
  430. continue
  431. self.set(k, v)
  432. return True
  433. class SwitchPort(Meraki):
  434. _id_prop = "number"
  435. def __init__(self, **kwargs):
  436. super(SwitchPort, self).__init__(**kwargs)
  437. if "dev" not in kwargs:
  438. raise TypeError("Missing mandatory dev argument!")
  439. self._dev = kwargs["dev"]
  440. self._class_name = "SwitchPort"
  441. self._api_url_endpoint = "/devices/" + self._net.get(Network._id_prop) + "/switchPorts"
  442. self.SWP_API = self.MERAKI_API + self._api_url_endpoint
  443. def update_switchport(self, **kwargs):
  444. if not self._check_obj():
  445. return False
  446. payload = {}
  447. for key in ["name", "tags", "enabled", "type", "vlan", "voice_vlan", "allowed_vlans", "poe_enabled"]:
  448. if key in kwargs:
  449. if key == "voice_vlan":
  450. payload["voiceVlan"] = kwargs[key]
  451. elif key == "allowed_vlans":
  452. payload["allowedVlans"] = kwargs[key]
  453. elif key == "poe_enabled":
  454. payload["poeEnabled"] = kwargs[key]
  455. else:
  456. payload[key] = kwargs[key]
  457. if len(payload) == 0:
  458. return False
  459. url = self.SWP_API + "/" + self._id
  460. try:
  461. response = requests.request("PUT", url, json=payload, headers=self._headers)
  462. response.raise_for_status()
  463. except Exception as e:
  464. msg = "Error updating switchport properties for {} on device {}: {} ({})".format(
  465. self._id, self._dev.get(Device._name_prop), e, Meraki._get_json_errors(response)
  466. )
  467. if self._logit:
  468. logging.error(msg)
  469. else:
  470. print(msg)
  471. return False
  472. jobj = response.json()
  473. for k, v in list(jobj.items()):
  474. self.set(k, v)
  475. return True