Skip to content

Instantly share code, notes, and snippets.

@joefusaro
Created September 14, 2016 01:27
Show Gist options
  • Select an option

  • Save joefusaro/1dd5b52e1e6ed1bb9632f86da1d586bc to your computer and use it in GitHub Desktop.

Select an option

Save joefusaro/1dd5b52e1e6ed1bb9632f86da1d586bc to your computer and use it in GitHub Desktop.

Revisions

  1. joefusaro created this gist Sep 14, 2016.
    90 changes: 90 additions & 0 deletions soap.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,90 @@
    """ Certain methods are not available via REST API, so we have to recreate!"""
    import requests
    from lxml.etree import Element, SubElement, tostring


    class LeadConverter(object):

    def __init__(self, session_id, lead_id, **kwargs):
    """ Provides functionality for converting a Lead to a new or existing
    Account and create a new Contact or update an existing Contact.
    lead_id (REQUIRED): The 15 or 18-character ID of the Lead record to be
    converted. All Salesforce Lead records begin with '00Q'
    converted_status (REQUIRED): A Salesforce admin-defined Status value
    that is set upon conversion. Unfortunately there is no method for us
    to know what the valid converted_status values are, so we resort to
    trial and error. When we find one that works, we must update the
    TableMetaData object for this user (where name="Lead", of course)
    account_id: Optional; if specified, converts the Lead to a Contact
    associated with this Account.
    contact_id: Optional; if specified, converts the Lead into an existing
    Contact record, preventing the creation of a duplicate.
    """

    self.session_id = session_id
    self.lead_id = lead_id
    self.converted_status = kwargs.get('converted_status', 'Qualified')
    self.account_id = kwargs.get('account_id', False)
    self.contact_id = kwargs.get('contact_id', False)
    self.do_not_create_opportunity = str(kwargs.get('do_not_create_opportunity', False))
    self.opportunity_name = kwargs.get('opportunity_name', False)
    self.owner_id = kwargs.get('owner_id', False)
    self.send_notification_email = str(kwargs.get('send_notification_email', False))

    def build_xml(self):
    schema_ns = 'http://schemas.xmlsoap.org/soap/envelope/'
    urn_ns = 'urn:partner.soap.sforce.com'
    S_PRE = '{' + schema_ns + '}'
    envelope = Element(
    S_PRE + 'Envelope',
    nsmap={'soapenv': schema_ns, 'urn': urn_ns}
    )
    header = SubElement(envelope, '{%s}Header' % schema_ns)
    s_header = SubElement(header, '{%s}SessionHeader' % urn_ns)
    sid = SubElement(s_header, '{%s}sessionId' % urn_ns).text=self.session_id
    body = SubElement(envelope, '{%s}Body' % schema_ns)
    convert_lead = SubElement(body, '{%s}convertLead' % urn_ns)
    lead_converts = SubElement(convert_lead, '{%s}leadConverts' % urn_ns)
    lead_id = SubElement(
    lead_converts,
    '{%s}leadId' % urn_ns
    ).text=self.lead_id
    do_not_create_opportunity = SubElement(
    lead_converts,
    '{%s}doNotCreateOpportunity' % urn_ns
    ).text=self.do_not_create_opportunity
    send_notification_email = SubElement(
    lead_converts,
    '{%s}sendNotificationEmail' % urn_ns
    ).text=self.send_notification_email

    if self.account_id:
    account_id = SubElement(
    lead_converts,
    '{%s}accountId' % urn_ns
    ).text=self.account_id
    if self.contact_id:
    contact_id = SubElement(
    lead_converts,
    '{%s}contactId' % urn_ns
    ).text=self.contact_id
    if self.converted_status:
    converted_status = SubElement(
    lead_converts,
    '{%s}convertedStatus' % urn_ns
    ).text=self.converted_status
    xml_meta = """<?xml version="1.1" encoding="utf-8"?>"""

    return xml_meta + tostring(envelope, encoding='utf-8')

    def post(self):
    xml = self.build_xml()
    headers = {'Content-Type':'text/xml', 'SOAPAction':'convertLead'}
    url = 'https://na16.salesforce.com/services/Soap/u/34.0'
    out = requests.post(url, data=xml, headers=headers)
    return out