Skip to content

Instantly share code, notes, and snippets.

@MagerValp
Created November 6, 2017 11:29
Show Gist options
  • Save MagerValp/d6e2d04e3566b6a249d6f1fb4d9f34b6 to your computer and use it in GitHub Desktop.
Save MagerValp/d6e2d04e3566b6a249d6f1fb4d9f34b6 to your computer and use it in GitHub Desktop.

Revisions

  1. MagerValp created this gist Nov 6, 2017.
    108 changes: 108 additions & 0 deletions dshelper.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,108 @@
    # -*- coding: utf-8 -*-

    """Directory Services helper class."""


    from __future__ import unicode_literals
    from __future__ import print_function
    from __future__ import division


    from OpenDirectory import ODSession, ODNode, ODQuery, kODRecordTypeUsers, kODAttributeTypeRecordName, kODAttributeTypeStandardOnly, kODMatchEqualTo, kODRecordTypeGroups, kODRecordTypeUsers


    __all__ = ["DSHelper", "DSHelperError"]


    class DSHelperError(BaseException):
    pass


    class DSHelper(object):
    """Wrapper for Directory Services."""

    def __init__(self):
    super(DSHelper, self).__init__()
    self.odsession = ODSession.defaultSession()

    def get_node(self, nodename):
    node, error = ODNode.nodeWithSession_name_error_(self.odsession, nodename, None)
    if node is None:
    raise DSHelperError("Couldn't open {} node: {}".format(nodename,
    error.localizedFailureReason()))
    return node

    def get_search_node(self):
    return self.get_node("Search")

    def find_groups_named(self, groupname, node=None):
    """Look up a group name and return an array of group records."""

    if node is None:
    node = self.get_search_node()

    odquery, error = ODQuery.queryWithNode_forRecordTypes_attribute_matchType_queryValues_returnAttributes_maximumResults_error_( node,
    kODRecordTypeGroups,
    kODAttributeTypeRecordName,
    kODMatchEqualTo,
    groupname,
    kODAttributeTypeStandardOnly,
    0,
    None)
    if odquery is None:
    raise DSHelperError("Couldn't query {}: {}".format(node.nodeName,
    error.localizedFailureReason()))

    result, error = odquery.resultsAllowingPartial_error_(False, None)
    if result is None:
    raise DSHelperError("Couldn't retrieve query results: {}".format(error.localizedFailureReason()))

    return result

    def find_users_named(self, username, node=None):
    """Look up a user name and return an array of user records."""

    if node is None:
    node = self.get_search_node()

    odquery, error = ODQuery.queryWithNode_forRecordTypes_attribute_matchType_queryValues_returnAttributes_maximumResults_error_( node,
    kODRecordTypeUsers,
    kODAttributeTypeRecordName,
    kODMatchEqualTo,
    username,
    kODAttributeTypeStandardOnly,
    0,
    None)
    if odquery is None:
    raise DSHelperError("Couldn't query {}: {}".format(node.nodeName,
    error.localizedFailureReason()))

    result, error = odquery.resultsAllowingPartial_error_(False, None)
    if result is None:
    raise DSHelperError("Couldn't retrieve query results: {}".format(error.localizedFailureReason()))

    return result

    def add_user_to_group(self, user, group):
    result, error = group.addMemberRecord_error_(user, None)
    if not result:
    if error:
    error_msg = ": " + error.localizedFailureReason()
    else:
    error_msg = ""
    raise DSHelperError("Couldn't add {} to {}{}".format(user.recordName,
    group.recordName,
    error_msg))

    def remove_user_from_group(self, user, group):
    result, error = group.removeMemberRecord_error_(user, None)
    if not result:
    if error:
    error_msg = ": " + error.localizedFailureReason()
    else:
    error_msg = ""
    raise DSHelperError("Couldn't remove {} from {}{}" % (user.recordName,
    group.recordName,
    error_msg))