Created
November 6, 2017 11:29
-
-
Save MagerValp/d6e2d04e3566b6a249d6f1fb4d9f34b6 to your computer and use it in GitHub Desktop.
Revisions
-
MagerValp created this gist
Nov 6, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,108 @@ # -*- coding: utf-8 -*- """Directory Services helper class.""" from __future__ import unicode_literals from __future__ import print_function from __future__ import division from OpenDirectory import ODSession, ODNode, ODQuery, kODRecordTypeUsers, kODAttributeTypeRecordName, kODAttributeTypeStandardOnly, kODMatchEqualTo, kODRecordTypeGroups, kODRecordTypeUsers __all__ = ["DSHelper", "DSHelperError"] class DSHelperError(BaseException): pass class DSHelper(object): """Wrapper for Directory Services.""" def __init__(self): super(DSHelper, self).__init__() self.odsession = ODSession.defaultSession() def get_node(self, nodename): node, error = ODNode.nodeWithSession_name_error_(self.odsession, nodename, None) if node is None: raise DSHelperError("Couldn't open {} node: {}".format(nodename, error.localizedFailureReason())) return node def get_search_node(self): return self.get_node("Search") def find_groups_named(self, groupname, node=None): """Look up a group name and return an array of group records.""" if node is None: node = self.get_search_node() odquery, error = ODQuery.queryWithNode_forRecordTypes_attribute_matchType_queryValues_returnAttributes_maximumResults_error_( node, kODRecordTypeGroups, kODAttributeTypeRecordName, kODMatchEqualTo, groupname, kODAttributeTypeStandardOnly, 0, None) if odquery is None: raise DSHelperError("Couldn't query {}: {}".format(node.nodeName, error.localizedFailureReason())) result, error = odquery.resultsAllowingPartial_error_(False, None) if result is None: raise DSHelperError("Couldn't retrieve query results: {}".format(error.localizedFailureReason())) return result def find_users_named(self, username, node=None): """Look up a user name and return an array of user records.""" if node is None: node = self.get_search_node() odquery, error = ODQuery.queryWithNode_forRecordTypes_attribute_matchType_queryValues_returnAttributes_maximumResults_error_( node, kODRecordTypeUsers, kODAttributeTypeRecordName, kODMatchEqualTo, username, kODAttributeTypeStandardOnly, 0, None) if odquery is None: raise DSHelperError("Couldn't query {}: {}".format(node.nodeName, error.localizedFailureReason())) result, error = odquery.resultsAllowingPartial_error_(False, None) if result is None: raise DSHelperError("Couldn't retrieve query results: {}".format(error.localizedFailureReason())) return result def add_user_to_group(self, user, group): result, error = group.addMemberRecord_error_(user, None) if not result: if error: error_msg = ": " + error.localizedFailureReason() else: error_msg = "" raise DSHelperError("Couldn't add {} to {}{}".format(user.recordName, group.recordName, error_msg)) def remove_user_from_group(self, user, group): result, error = group.removeMemberRecord_error_(user, None) if not result: if error: error_msg = ": " + error.localizedFailureReason() else: error_msg = "" raise DSHelperError("Couldn't remove {} from {}{}" % (user.recordName, group.recordName, error_msg))