# -*- coding: utf-8 -*- """Directory Services helper class.""" from __future__ import unicode_literals from __future__ import print_function from __future__ import division from OpenDirectory import ODSession, ODNode, ODQuery, kODRecordTypeUsers, kODAttributeTypeRecordName, kODAttributeTypeStandardOnly, kODMatchEqualTo, kODRecordTypeGroups, kODRecordTypeUsers __all__ = ["DSHelper", "DSHelperError"] class DSHelperError(BaseException): pass class DSHelper(object): """Wrapper for Directory Services.""" def __init__(self): super(DSHelper, self).__init__() self.odsession = ODSession.defaultSession() def get_node(self, nodename): node, error = ODNode.nodeWithSession_name_error_(self.odsession, nodename, None) if node is None: raise DSHelperError("Couldn't open {} node: {}".format(nodename, error.localizedFailureReason())) return node def get_search_node(self): return self.get_node("Search") def find_groups_named(self, groupname, node=None): """Look up a group name and return an array of group records.""" if node is None: node = self.get_search_node() odquery, error = ODQuery.queryWithNode_forRecordTypes_attribute_matchType_queryValues_returnAttributes_maximumResults_error_( node, kODRecordTypeGroups, kODAttributeTypeRecordName, kODMatchEqualTo, groupname, kODAttributeTypeStandardOnly, 0, None) if odquery is None: raise DSHelperError("Couldn't query {}: {}".format(node.nodeName, error.localizedFailureReason())) result, error = odquery.resultsAllowingPartial_error_(False, None) if result is None: raise DSHelperError("Couldn't retrieve query results: {}".format(error.localizedFailureReason())) return result def find_users_named(self, username, node=None): """Look up a user name and return an array of user records.""" if node is None: node = self.get_search_node() odquery, error = ODQuery.queryWithNode_forRecordTypes_attribute_matchType_queryValues_returnAttributes_maximumResults_error_( node, kODRecordTypeUsers, kODAttributeTypeRecordName, kODMatchEqualTo, username, kODAttributeTypeStandardOnly, 0, None) if odquery is None: raise DSHelperError("Couldn't query {}: {}".format(node.nodeName, error.localizedFailureReason())) result, error = odquery.resultsAllowingPartial_error_(False, None) if result is None: raise DSHelperError("Couldn't retrieve query results: {}".format(error.localizedFailureReason())) return result def add_user_to_group(self, user, group): result, error = group.addMemberRecord_error_(user, None) if not result: if error: error_msg = ": " + error.localizedFailureReason() else: error_msg = "" raise DSHelperError("Couldn't add {} to {}{}".format(user.recordName, group.recordName, error_msg)) def remove_user_from_group(self, user, group): result, error = group.removeMemberRecord_error_(user, None) if not result: if error: error_msg = ": " + error.localizedFailureReason() else: error_msg = "" raise DSHelperError("Couldn't remove {} from {}{}" % (user.recordName, group.recordName, error_msg))