Skip to content

Instantly share code, notes, and snippets.

@chidea
Forked from pklaus/ping.py
Last active September 5, 2022 15:34
Show Gist options
  • Save chidea/955cea841e5c76a7e5ee8aa02234409d to your computer and use it in GitHub Desktop.
Save chidea/955cea841e5c76a7e5ee8aa02234409d to your computer and use it in GitHub Desktop.

Revisions

  1. chidea revised this gist Sep 24, 2016. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions ping.py
    Original file line number Diff line number Diff line change
    @@ -46,6 +46,7 @@
    Rewrite by ChIdea:
    - Editied for Python3
    - Renamed verbose_ping() to ping(). VERBOSE is the switch for verbose/quiet mode.
    - ping() returns average ms of succeed pings or None on failure.
    November 1, 2010
    Rewrite by Johannes Meyer:
  2. chidea revised this gist Sep 24, 2016. No changes.
  3. chidea revised this gist Sep 24, 2016. 1 changed file with 35 additions and 23 deletions.
    58 changes: 35 additions & 23 deletions ping.py
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    #!/usr/bin/env python2
    #!/usr/bin/env python3

    """
    Other Repositories of python-ping
    @@ -42,7 +42,11 @@
    Revision history
    ~~~~~~~~~~~~~~~~
    September 24, 2016
    Rewrite by ChIdea:
    - Editied for Python3
    - Renamed verbose_ping() to ping(). VERBOSE is the switch for verbose/quiet mode.
    November 1, 2010
    Rewrite by Johannes Meyer:
    - changed entire code layout
    @@ -63,6 +67,10 @@
    - delete "return None" (or change to "return" only)
    - in checksum() rename "str" to "source_string"
    December 4, 2000
    Changed the struct.pack() calls to pack the checksum and ID as
    unsigned. My thanks to Jerome Poincheval for the fix.
    November 22, 1997
    Initial hack. Doesn't do much, but rather than try to guess
    what features I (or others) will want in the future, I've only
    @@ -74,10 +82,6 @@
    Linux x86. Since I don't know just what's wrong, I'll swap the
    bytes always and then do an htons().
    December 4, 2000
    Changed the struct.pack() calls to pack the checksum and ID as
    unsigned. My thanks to Jerome Poincheval for the fix.
    """

    import time
    @@ -87,6 +91,8 @@
    import random
    import asyncore

    VERBOSE = False

    # From /usr/include/linux/icmp.h; your milage may vary.
    ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.

    @@ -98,23 +104,24 @@
    ' users or processes with administrator rights.'
    }

    __all__ = ['create_packet', 'do_one', 'verbose_ping', 'PingQuery',
    __all__ = ['create_packet', 'do_one', 'ping', 'PingQuery',
    'multi_ping_query']


    def checksum(source_string):
    # I'm not too confident that this is right but testing seems to
    # suggest that it gives the same answers as in_cksum in ping.c.
    sum = 0
    count_to = (len(source_string) / 2) * 2
    l = len(source_string)
    count_to = (l / 2) * 2
    count = 0
    while count < count_to:
    this_val = ord(source_string[count + 1])*256+ord(source_string[count])
    this_val = source_string[count + 1]*256+source_string[count]
    sum = sum + this_val
    sum = sum & 0xffffffff # Necessary?
    count = count + 2
    if count_to < len(source_string):
    sum = sum + ord(source_string[len(source_string) - 1])
    if count_to < l:
    sum = sum + source_string[l - 1]
    sum = sum & 0xffffffff # Necessary?
    sum = (sum >> 16) + (sum & 0xffff)
    sum = sum + (sum >> 16)
    @@ -129,7 +136,7 @@ def create_packet(id):
    """Create a new echo request packet based on the given "id"."""
    # Header is type (8), code (8), checksum (16), id (16), sequence (16)
    header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0, 0, id, 1)
    data = 192 * 'Q'
    data = 192 * b'Q'
    # Calculate the checksum on the data and the dummy header.
    my_checksum = checksum(header + data)
    # Now that we have the right checksum, we put that in. It's just easier
    @@ -194,7 +201,7 @@ def receive_ping(my_socket, packet_id, time_sent, timeout):
    return


    def verbose_ping(dest_addr, timeout=2, count=4):
    def ping(dest_addr, timeout=2, count=4):
    """
    Sends one ping to the given "dest_addr" which can be an ip or hostname.
    @@ -204,15 +211,19 @@ def verbose_ping(dest_addr, timeout=2, count=4):
    Displays the result on the screen.
    """
    avg = 0
    suc = 0
    for i in range(count):
    print('ping {}...'.format(dest_addr))
    if VERBOSE : print('ping {}...'.format(dest_addr))
    delay = do_one(dest_addr, timeout)
    if delay == None:
    print('failed. (Timeout within {} seconds.)'.format(timeout))
    if VERBOSE : print('failed. (Timeout within {} seconds.)'.format(timeout))
    else:
    delay = round(delay * 1000.0, 4)
    print('get ping in {} milliseconds.'.format(delay))
    print('')
    avg += delay
    suc += 1
    if VERBOSE : print('get ping in {} milliseconds.'.format(delay))
    return (avg/suc) if avg else None


    class PingQuery(asyncore.dispatcher):
    @@ -357,12 +368,13 @@ def multi_ping_query(hosts, timeout=1, step=512, ignore_errors=False):


    if __name__ == '__main__':
    VERBOSE = True
    # Testing
    verbose_ping('www.heise.de')
    verbose_ping('google.com')
    verbose_ping('an-invalid-test-url.com')
    verbose_ping('127.0.0.1')
    ping('www.heise.de'); print('')
    ping('google.com'); print('')
    ping('an-invalid-test-url.com'); print('')
    ping('127.0.0.1'); print('')
    host_list = ['www.heise.de', 'google.com', '127.0.0.1',
    'an-invalid-test-url.com']
    for host, ping in multi_ping_query(host_list).iteritems():
    print(host, '=', ping)
    for host, ping in multi_ping_query(host_list).items():
    print(host, '=', ping)
  4. @pklaus pklaus revised this gist Jan 15, 2014. 1 changed file with 10 additions and 0 deletions.
    10 changes: 10 additions & 0 deletions ping.py
    Original file line number Diff line number Diff line change
    @@ -1,6 +1,16 @@
    #!/usr/bin/env python2

    """
    Other Repositories of python-ping
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    * https://github.com/l4m3rx/python-ping supports Python2 and Python3
    * https://bitbucket.org/delroth/python-ping
    About
    ~~~~~
    A pure python ping implementation using raw socket.
  5. @pklaus pklaus revised this gist Jan 15, 2014. 1 changed file with 0 additions and 6 deletions.
    6 changes: 0 additions & 6 deletions ping.py
    Original file line number Diff line number Diff line change
    @@ -68,12 +68,6 @@
    Changed the struct.pack() calls to pack the checksum and ID as
    unsigned. My thanks to Jerome Poincheval for the fix.
    Last commit info:
    ~~~~~~~~~~~~~~~~~
    $LastChangedDate: $
    $Rev: $
    $Author: $
    """

    import time
  6. @pklaus pklaus revised this gist Jan 15, 2014. 1 changed file with 7 additions and 7 deletions.
    14 changes: 7 additions & 7 deletions ping.py
    100644 → 100755
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    #!/usr/bin/env python
    #!/usr/bin/env python2

    """
    A pure python ping implementation using raw socket.
    @@ -147,9 +147,9 @@ def do_one(dest_addr, timeout=1):
    try:
    my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE)
    except socket.error as e:
    if errno in ERROR_DESCR:
    if e.errno in ERROR_DESCR:
    # Operation not permitted
    raise socket.error(''.join((msg, ERROR_DESCR[errno])))
    raise socket.error(''.join((e.args[1], ERROR_DESCR[e.errno])))
    raise # raise the original error
    try:
    host = socket.gethostbyname(dest_addr)
    @@ -201,7 +201,7 @@ def verbose_ping(dest_addr, timeout=2, count=4):
    """
    for i in range(count):
    print('ping {}...'.format(dest_addr), end=' ')
    print('ping {}...'.format(dest_addr))
    delay = do_one(dest_addr, timeout)
    if delay == None:
    print('failed. (Timeout within {} seconds.)'.format(timeout))
    @@ -235,10 +235,10 @@ def __init__(self, host, p_id, timeout=0.5, ignore_errors=False):
    asyncore.dispatcher.__init__(self)
    try:
    self.create_socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE)
    except socket.error as (errno, msg):
    if errno in ERROR_DESCR:
    except socket.error as e:
    if e.errno in ERROR_DESCR:
    # Operation not permitted
    raise socket.error(''.join((msg, ERROR_DESCR[errno])))
    raise socket.error(''.join((e.args[1], ERROR_DESCR[e.errno])))
    raise # raise the original error
    self.time_received = 0
    self.time_sent = 0
  7. @riyadparvez riyadparvez revised this gist Oct 29, 2013. 1 changed file with 8 additions and 8 deletions.
    16 changes: 8 additions & 8 deletions ping.py
    Original file line number Diff line number Diff line change
    @@ -146,7 +146,7 @@ def do_one(dest_addr, timeout=1):
    """
    try:
    my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE)
    except socket.error, (errno, msg):
    except socket.error as e:
    if errno in ERROR_DESCR:
    # Operation not permitted
    raise socket.error(''.join((msg, ERROR_DESCR[errno])))
    @@ -200,15 +200,15 @@ def verbose_ping(dest_addr, timeout=2, count=4):
    Displays the result on the screen.
    """
    for i in xrange(count):
    print 'ping {}...'.format(dest_addr),
    for i in range(count):
    print('ping {}...'.format(dest_addr), end=' ')
    delay = do_one(dest_addr, timeout)
    if delay == None:
    print 'failed. (Timeout within {} seconds.)'.format(timeout)
    print('failed. (Timeout within {} seconds.)'.format(timeout))
    else:
    delay = round(delay * 1000.0, 4)
    print 'get ping in {} milliseconds.'.format(delay)
    print
    print('get ping in {} milliseconds.'.format(delay))
    print('')


    class PingQuery(asyncore.dispatcher):
    @@ -235,7 +235,7 @@ def __init__(self, host, p_id, timeout=0.5, ignore_errors=False):
    asyncore.dispatcher.__init__(self)
    try:
    self.create_socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE)
    except socket.error, (errno, msg):
    except socket.error as (errno, msg):
    if errno in ERROR_DESCR:
    # Operation not permitted
    raise socket.error(''.join((msg, ERROR_DESCR[errno])))
    @@ -361,4 +361,4 @@ def multi_ping_query(hosts, timeout=1, step=512, ignore_errors=False):
    host_list = ['www.heise.de', 'google.com', '127.0.0.1',
    'an-invalid-test-url.com']
    for host, ping in multi_ping_query(host_list).iteritems():
    print host, '=', ping
    print(host, '=', ping)
  8. @pklaus pklaus revised this gist Jan 15, 2014. 1 changed file with 346 additions and 346 deletions.
    692 changes: 346 additions & 346 deletions ping.py
    Original file line number Diff line number Diff line change
    @@ -1,364 +1,364 @@
    #!/usr/bin/env python

    #!/usr/bin/env python

    """
    A pure python ping implementation using raw socket.
    Note that ICMP messages can only be sent from processes running as root.
    Derived from ping.c distributed in Linux's netkit. That code is
    copyright (c) 1989 by The Regents of the University of California.
    That code is in turn derived from code written by Mike Muuss of the
    US Army Ballistic Research Laboratory in December, 1983 and
    placed in the public domain. They have my thanks.
    Bugs are naturally mine. I'd be glad to hear about them. There are
    certainly word - size dependenceies here.
    Copyright (c) Matthew Dixon Cowles, <http://www.visi.com/~mdc/>.
    Distributable under the terms of the GNU General Public License
    version 2. Provided with no warranties of any sort.
    Original Version from Matthew Dixon Cowles:
    -> ftp://ftp.visi.com/users/mdc/ping.py
    Rewrite by Jens Diemer:
    -> http://www.python-forum.de/post-69122.html#69122
    Rewrite by Johannes Meyer:
    -> http://www.python-forum.de/viewtopic.php?p=183720
    Revision history
    ~~~~~~~~~~~~~~~~
    November 1, 2010
    Rewrite by Johannes Meyer:
    - changed entire code layout
    - changed some comments and docstrings
    - replaced time.clock() with time.time() in order
    to be able to use this module on linux, too.
    - added global __all__, ICMP_CODE and ERROR_DESCR
    - merged functions "do_one" and "send_one_ping"
    - placed icmp packet creation in its own function
    - removed timestamp from the icmp packet
    - added function "multi_ping_query"
    - added class "PingQuery"
    May 30, 2007
    little rewrite by Jens Diemer:
    - change socket asterisk import to a normal import
    - replace time.time() with time.clock()
    - delete "return None" (or change to "return" only)
    - in checksum() rename "str" to "source_string"
    November 22, 1997
    Initial hack. Doesn't do much, but rather than try to guess
    what features I (or others) will want in the future, I've only
    put in what I need now.
    December 16, 1997
    For some reason, the checksum bytes are in the wrong order when
    this is run under Solaris 2.X for SPARC but it works right under
    Linux x86. Since I don't know just what's wrong, I'll swap the
    bytes always and then do an htons().
    December 4, 2000
    Changed the struct.pack() calls to pack the checksum and ID as
    unsigned. My thanks to Jerome Poincheval for the fix.
    Last commit info:
    ~~~~~~~~~~~~~~~~~
    $LastChangedDate: $
    $Rev: $
    $Author: $
    """

    import time
    import socket
    import struct
    import select
    import random
    import asyncore

    # From /usr/include/linux/icmp.h; your milage may vary.
    ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.

    ICMP_CODE = socket.getprotobyname('icmp')
    ERROR_DESCR = {
    1: ' - Note that ICMP messages can only be '
    'sent from processes running as root.',
    10013: ' - Note that ICMP messages can only be sent by'
    ' users or processes with administrator rights.'
    }

    __all__ = ['create_packet', 'do_one', 'verbose_ping', 'PingQuery',
    'multi_ping_query']


    def checksum(source_string):
    # I'm not too confident that this is right but testing seems to
    # suggest that it gives the same answers as in_cksum in ping.c.
    sum = 0
    count_to = (len(source_string) / 2) * 2
    count = 0
    while count < count_to:
    this_val = ord(source_string[count + 1])*256+ord(source_string[count])
    sum = sum + this_val
    sum = sum & 0xffffffff # Necessary?
    count = count + 2
    if count_to < len(source_string):
    sum = sum + ord(source_string[len(source_string) - 1])
    sum = sum & 0xffffffff # Necessary?
    sum = (sum >> 16) + (sum & 0xffff)
    sum = sum + (sum >> 16)
    answer = ~sum
    answer = answer & 0xffff
    # Swap bytes. Bugger me if I know why.
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer


    def create_packet(id):
    """Create a new echo request packet based on the given "id"."""
    # Header is type (8), code (8), checksum (16), id (16), sequence (16)
    header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0, 0, id, 1)
    data = 192 * 'Q'
    # Calculate the checksum on the data and the dummy header.
    my_checksum = checksum(header + data)
    # Now that we have the right checksum, we put that in. It's just easier
    # to make up a new header than to stuff it into the dummy.
    header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0,
    socket.htons(my_checksum), id, 1)
    return header + data


    def do_one(dest_addr, timeout=1):
    """
    A pure python ping implementation using raw socket.
    Note that ICMP messages can only be sent from processes running as root.
    Derived from ping.c distributed in Linux's netkit. That code is
    copyright (c) 1989 by The Regents of the University of California.
    That code is in turn derived from code written by Mike Muuss of the
    US Army Ballistic Research Laboratory in December, 1983 and
    placed in the public domain. They have my thanks.
    Bugs are naturally mine. I'd be glad to hear about them. There are
    certainly word - size dependenceies here.
    Copyright (c) Matthew Dixon Cowles, <http://www.visi.com/~mdc/>.
    Distributable under the terms of the GNU General Public License
    version 2. Provided with no warranties of any sort.
    Original Version from Matthew Dixon Cowles:
    -> ftp://ftp.visi.com/users/mdc/ping.py
    Rewrite by Jens Diemer:
    -> http://www.python-forum.de/post-69122.html#69122
    Rewrite by Johannes Meyer:
    -> http://www.python-forum.de/viewtopic.php?p=183720
    Revision history
    ~~~~~~~~~~~~~~~~
    November 1, 2010
    Rewrite by Johannes Meyer:
    - changed entire code layout
    - changed some comments and docstrings
    - replaced time.clock() with time.time() in order
    to be able to use this module on linux, too.
    - added global __all__, ICMP_CODE and ERROR_DESCR
    - merged functions "do_one" and "send_one_ping"
    - placed icmp packet creation in its own function
    - removed timestamp from the icmp packet
    - added function "multi_ping_query"
    - added class "PingQuery"
    May 30, 2007
    little rewrite by Jens Diemer:
    - change socket asterisk import to a normal import
    - replace time.time() with time.clock()
    - delete "return None" (or change to "return" only)
    - in checksum() rename "str" to "source_string"
    November 22, 1997
    Initial hack. Doesn't do much, but rather than try to guess
    what features I (or others) will want in the future, I've only
    put in what I need now.
    December 16, 1997
    For some reason, the checksum bytes are in the wrong order when
    this is run under Solaris 2.X for SPARC but it works right under
    Linux x86. Since I don't know just what's wrong, I'll swap the
    bytes always and then do an htons().
    Sends one ping to the given "dest_addr" which can be an ip or hostname.
    "timeout" can be any integer or float except negatives and zero.
    Returns either the delay (in seconds) or None on timeout and an invalid
    address, respectively.
    """
    try:
    my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE)
    except socket.error, (errno, msg):
    if errno in ERROR_DESCR:
    # Operation not permitted
    raise socket.error(''.join((msg, ERROR_DESCR[errno])))
    raise # raise the original error
    try:
    host = socket.gethostbyname(dest_addr)
    except socket.gaierror:
    return
    # Maximum for an unsigned short int c object counts to 65535 so
    # we have to sure that our packet id is not greater than that.
    packet_id = int((id(timeout) * random.random()) % 65535)
    packet = create_packet(packet_id)
    while packet:
    # The icmp protocol does not use a port, but the function
    # below expects it, so we just give it a dummy port.
    sent = my_socket.sendto(packet, (dest_addr, 1))
    packet = packet[sent:]
    delay = receive_ping(my_socket, packet_id, time.time(), timeout)
    my_socket.close()
    return delay


    def receive_ping(my_socket, packet_id, time_sent, timeout):
    # Receive the ping from the socket.
    time_left = timeout
    while True:
    started_select = time.time()
    ready = select.select([my_socket], [], [], time_left)
    how_long_in_select = time.time() - started_select
    if ready[0] == []: # Timeout
    return
    time_received = time.time()
    rec_packet, addr = my_socket.recvfrom(1024)
    icmp_header = rec_packet[20:28]
    type, code, checksum, p_id, sequence = struct.unpack(
    'bbHHh', icmp_header)
    if p_id == packet_id:
    return time_received - time_sent
    time_left -= time_received - time_sent
    if time_left <= 0:
    return


    def verbose_ping(dest_addr, timeout=2, count=4):
    """
    Sends one ping to the given "dest_addr" which can be an ip or hostname.
    "timeout" can be any integer or float except negatives and zero.
    "count" specifies how many pings will be sent.
    Displays the result on the screen.
    """
    for i in xrange(count):
    print 'ping {}...'.format(dest_addr),
    delay = do_one(dest_addr, timeout)
    if delay == None:
    print 'failed. (Timeout within {} seconds.)'.format(timeout)
    else:
    delay = round(delay * 1000.0, 4)
    print 'get ping in {} milliseconds.'.format(delay)
    print


    class PingQuery(asyncore.dispatcher):
    def __init__(self, host, p_id, timeout=0.5, ignore_errors=False):
    """
    Derived class from "asyncore.dispatcher" for sending and
    receiving an icmp echo request/reply.
    December 4, 2000
    Changed the struct.pack() calls to pack the checksum and ID as
    unsigned. My thanks to Jerome Poincheval for the fix.
    Usually this class is used in conjunction with the "loop"
    function of asyncore.
    Once the loop is over, you can retrieve the results with
    the "get_result" method. Assignment is possible through
    the "get_host" method.
    Last commit info:
    ~~~~~~~~~~~~~~~~~
    $LastChangedDate: $
    $Rev: $
    $Author: $
    """

    import time
    import socket
    import struct
    import select
    import random
    import asyncore

    # From /usr/include/linux/icmp.h; your milage may vary.
    ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.

    ICMP_CODE = socket.getprotobyname('icmp')
    ERROR_DESCR = {
    1: ' - Note that ICMP messages can only be '
    'sent from processes running as root.',
    10013: ' - Note that ICMP messages can only be sent by'
    ' users or processes with administrator rights.'
    }

    __all__ = ['create_packet', 'do_one', 'verbose_ping', 'PingQuery',
    'multi_ping_query']


    def checksum(source_string):
    # I'm not too confident that this is right but testing seems to
    # suggest that it gives the same answers as in_cksum in ping.c.
    sum = 0
    count_to = (len(source_string) / 2) * 2
    count = 0
    while count < count_to:
    this_val = ord(source_string[count + 1])*256+ord(source_string[count])
    sum = sum + this_val
    sum = sum & 0xffffffff # Necessary?
    count = count + 2
    if count_to < len(source_string):
    sum = sum + ord(source_string[len(source_string) - 1])
    sum = sum & 0xffffffff # Necessary?
    sum = (sum >> 16) + (sum & 0xffff)
    sum = sum + (sum >> 16)
    answer = ~sum
    answer = answer & 0xffff
    # Swap bytes. Bugger me if I know why.
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer


    def create_packet(id):
    """Create a new echo request packet based on the given "id"."""
    # Header is type (8), code (8), checksum (16), id (16), sequence (16)
    header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0, 0, id, 1)
    data = 192 * 'Q'
    # Calculate the checksum on the data and the dummy header.
    my_checksum = checksum(header + data)
    # Now that we have the right checksum, we put that in. It's just easier
    # to make up a new header than to stuff it into the dummy.
    header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0,
    socket.htons(my_checksum), id, 1)
    return header + data


    def do_one(dest_addr, timeout=1):
    """
    Sends one ping to the given "dest_addr" which can be an ip or hostname.
    "timeout" can be any integer or float except negatives and zero.
    "host" represents the address under which the server can be reached.
    "timeout" is the interval which the host gets granted for its reply.
    "p_id" must be any unique integer or float except negatives and zeros.
    Returns either the delay (in seconds) or None on timeout and an invalid
    address, respectively.
    If "ignore_errors" is True, the default behaviour of asyncore
    will be overwritten with a function which does just nothing.
    """
    asyncore.dispatcher.__init__(self)
    try:
    my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE)
    self.create_socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE)
    except socket.error, (errno, msg):
    if errno in ERROR_DESCR:
    # Operation not permitted
    raise socket.error(''.join((msg, ERROR_DESCR[errno])))
    raise # raise the original error
    try:
    host = socket.gethostbyname(dest_addr)
    except socket.gaierror:
    return
    self.time_received = 0
    self.time_sent = 0
    self.timeout = timeout
    # Maximum for an unsigned short int c object counts to 65535 so
    # we have to sure that our packet id is not greater than that.
    packet_id = int((id(timeout) * random.random()) % 65535)
    packet = create_packet(packet_id)
    while packet:
    self.packet_id = int((id(timeout) / p_id) % 65535)
    self.host = host
    self.packet = create_packet(self.packet_id)
    if ignore_errors:
    # If it does not care whether an error occured or not.
    self.handle_error = self.do_not_handle_errors
    self.handle_expt = self.do_not_handle_errors

    def writable(self):
    return self.time_sent == 0

    def handle_write(self):
    self.time_sent = time.time()
    while self.packet:
    # The icmp protocol does not use a port, but the function
    # below expects it, so we just give it a dummy port.
    sent = my_socket.sendto(packet, (dest_addr, 1))
    packet = packet[sent:]
    delay = receive_ping(my_socket, packet_id, time.time(), timeout)
    my_socket.close()
    return delay


    def receive_ping(my_socket, packet_id, time_sent, timeout):
    # Receive the ping from the socket.
    time_left = timeout
    while True:
    started_select = time.time()
    ready = select.select([my_socket], [], [], time_left)
    how_long_in_select = time.time() - started_select
    if ready[0] == []: # Timeout
    return
    time_received = time.time()
    rec_packet, addr = my_socket.recvfrom(1024)
    icmp_header = rec_packet[20:28]
    type, code, checksum, p_id, sequence = struct.unpack(
    'bbHHh', icmp_header)
    if p_id == packet_id:
    return time_received - time_sent
    time_left -= time_received - time_sent
    if time_left <= 0:
    return


    def verbose_ping(dest_addr, timeout=2, count=4):
    """
    Sends one ping to the given "dest_addr" which can be an ip or hostname.
    "timeout" can be any integer or float except negatives and zero.
    "count" specifies how many pings will be sent.
    Displays the result on the screen.
    """
    for i in xrange(count):
    print 'ping {}...'.format(dest_addr),
    delay = do_one(dest_addr, timeout)
    if delay == None:
    print 'failed. (Timeout within {} seconds.)'.format(timeout)
    else:
    delay = round(delay * 1000.0, 4)
    print 'get ping in {} milliseconds.'.format(delay)
    print


    class PingQuery(asyncore.dispatcher):
    def __init__(self, host, p_id, timeout=0.5, ignore_errors=False):
    """
    Derived class from "asyncore.dispatcher" for sending and
    receiving an icmp echo request/reply.
    Usually this class is used in conjunction with the "loop"
    function of asyncore.
    Once the loop is over, you can retrieve the results with
    the "get_result" method. Assignment is possible through
    the "get_host" method.
    "host" represents the address under which the server can be reached.
    "timeout" is the interval which the host gets granted for its reply.
    "p_id" must be any unique integer or float except negatives and zeros.
    If "ignore_errors" is True, the default behaviour of asyncore
    will be overwritten with a function which does just nothing.
    """
    asyncore.dispatcher.__init__(self)
    try:
    self.create_socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE)
    except socket.error, (errno, msg):
    if errno in ERROR_DESCR:
    # Operation not permitted
    raise socket.error(''.join((msg, ERROR_DESCR[errno])))
    raise # raise the original error
    self.time_received = 0
    self.time_sent = 0
    self.timeout = timeout
    # Maximum for an unsigned short int c object counts to 65535 so
    # we have to sure that our packet id is not greater than that.
    self.packet_id = int((id(timeout) / p_id) % 65535)
    self.host = host
    self.packet = create_packet(self.packet_id)
    if ignore_errors:
    # If it does not care whether an error occured or not.
    self.handle_error = self.do_not_handle_errors
    self.handle_expt = self.do_not_handle_errors

    def writable(self):
    return self.time_sent == 0

    def handle_write(self):
    self.time_sent = time.time()
    while self.packet:
    # The icmp protocol does not use a port, but the function
    # below expects it, so we just give it a dummy port.
    sent = self.sendto(self.packet, (self.host, 1))
    self.packet = self.packet[sent:]

    def readable(self):
    # As long as we did not sent anything, the channel has to be left open.
    if (not self.writable()
    # Once we sent something, we should periodically check if the reply
    # timed out.
    and self.timeout < (time.time() - self.time_sent)):
    self.close()
    return False
    # If the channel should not be closed, we do not want to read something
    # until we did not sent anything.
    return not self.writable()

    def handle_read(self):
    read_time = time.time()
    packet, addr = self.recvfrom(1024)
    header = packet[20:28]
    type, code, checksum, p_id, sequence = struct.unpack("bbHHh", header)
    if p_id == self.packet_id:
    # This comparison is necessary because winsocks do not only get
    # the replies for their own sent packets.
    self.time_received = read_time
    self.close()

    def get_result(self):
    """Return the ping delay if possible, otherwise None."""
    if self.time_received > 0:
    return self.time_received - self.time_sent

    def get_host(self):
    """Return the host where to the request has or should been sent."""
    return self.host

    def do_not_handle_errors(self):
    # Just a dummy handler to stop traceback printing, if desired.
    pass

    def create_socket(self, family, type, proto):
    # Overwritten, because the original does not support the "proto" arg.
    sock = socket.socket(family, type, proto)
    sock.setblocking(0)
    self.set_socket(sock)
    # Part of the original but is not used. (at least at python 2.7)
    # Copied for possible compatiblity reasons.
    self.family_and_type = family, type

    # If the following methods would not be there, we would see some very
    # "useful" warnings from asyncore, maybe. But we do not want to, or do we?
    def handle_connect(self):
    pass

    def handle_accept(self):
    pass

    def handle_close(self):
    sent = self.sendto(self.packet, (self.host, 1))
    self.packet = self.packet[sent:]

    def readable(self):
    # As long as we did not sent anything, the channel has to be left open.
    if (not self.writable()
    # Once we sent something, we should periodically check if the reply
    # timed out.
    and self.timeout < (time.time() - self.time_sent)):
    self.close()


    def multi_ping_query(hosts, timeout=1, step=512, ignore_errors=False):
    """
    Sends multiple icmp echo requests at once.
    "hosts" is a list of ips or hostnames which should be pinged.
    "timeout" must be given and a integer or float greater than zero.
    "step" is the amount of sockets which should be watched at once.
    See the docstring of "PingQuery" for the meaning of "ignore_erros".
    """
    results, host_list, id = {}, [], 0
    for host in hosts:
    try:
    host_list.append(socket.gethostbyname(host))
    except socket.gaierror:
    results[host] = None
    while host_list:
    sock_list = []
    for ip in host_list[:step]: # select supports only a max of 512
    id += 1
    sock_list.append(PingQuery(ip, id, timeout, ignore_errors))
    host_list.remove(ip)
    # Remember to use a timeout here. The risk to get an infinite loop
    # is high, because noone can guarantee that each host will reply!
    asyncore.loop(timeout)
    for sock in sock_list:
    results[sock.get_host()] = sock.get_result()
    return results


    if __name__ == '__main__':
    # Testing
    verbose_ping('www.heise.de')
    verbose_ping('google.com')
    verbose_ping('an-invalid-test-url.com')
    verbose_ping('127.0.0.1')
    host_list = ['www.heise.de', 'google.com', '127.0.0.1',
    'an-invalid-test-url.com']
    for host, ping in multi_ping_query(host_list).iteritems():
    print host, '=', ping
    return False
    # If the channel should not be closed, we do not want to read something
    # until we did not sent anything.
    return not self.writable()

    def handle_read(self):
    read_time = time.time()
    packet, addr = self.recvfrom(1024)
    header = packet[20:28]
    type, code, checksum, p_id, sequence = struct.unpack("bbHHh", header)
    if p_id == self.packet_id:
    # This comparison is necessary because winsocks do not only get
    # the replies for their own sent packets.
    self.time_received = read_time
    self.close()

    def get_result(self):
    """Return the ping delay if possible, otherwise None."""
    if self.time_received > 0:
    return self.time_received - self.time_sent

    def get_host(self):
    """Return the host where to the request has or should been sent."""
    return self.host

    def do_not_handle_errors(self):
    # Just a dummy handler to stop traceback printing, if desired.
    pass

    def create_socket(self, family, type, proto):
    # Overwritten, because the original does not support the "proto" arg.
    sock = socket.socket(family, type, proto)
    sock.setblocking(0)
    self.set_socket(sock)
    # Part of the original but is not used. (at least at python 2.7)
    # Copied for possible compatiblity reasons.
    self.family_and_type = family, type

    # If the following methods would not be there, we would see some very
    # "useful" warnings from asyncore, maybe. But we do not want to, or do we?
    def handle_connect(self):
    pass

    def handle_accept(self):
    pass

    def handle_close(self):
    self.close()


    def multi_ping_query(hosts, timeout=1, step=512, ignore_errors=False):
    """
    Sends multiple icmp echo requests at once.
    "hosts" is a list of ips or hostnames which should be pinged.
    "timeout" must be given and a integer or float greater than zero.
    "step" is the amount of sockets which should be watched at once.
    See the docstring of "PingQuery" for the meaning of "ignore_erros".
    """
    results, host_list, id = {}, [], 0
    for host in hosts:
    try:
    host_list.append(socket.gethostbyname(host))
    except socket.gaierror:
    results[host] = None
    while host_list:
    sock_list = []
    for ip in host_list[:step]: # select supports only a max of 512
    id += 1
    sock_list.append(PingQuery(ip, id, timeout, ignore_errors))
    host_list.remove(ip)
    # Remember to use a timeout here. The risk to get an infinite loop
    # is high, because noone can guarantee that each host will reply!
    asyncore.loop(timeout)
    for sock in sock_list:
    results[sock.get_host()] = sock.get_result()
    return results


    if __name__ == '__main__':
    # Testing
    verbose_ping('www.heise.de')
    verbose_ping('google.com')
    verbose_ping('an-invalid-test-url.com')
    verbose_ping('127.0.0.1')
    host_list = ['www.heise.de', 'google.com', '127.0.0.1',
    'an-invalid-test-url.com']
    for host, ping in multi_ping_query(host_list).iteritems():
    print host, '=', ping
  9. @pklaus pklaus revised this gist May 17, 2012. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions ping.py
    Original file line number Diff line number Diff line change
    @@ -157,7 +157,7 @@ def do_one(dest_addr, timeout=1):
    return
    # Maximum for an unsigned short int c object counts to 65535 so
    # we have to sure that our packet id is not greater than that.
    packet_id = int((id(timeout) * random.random()) / 65535)
    packet_id = int((id(timeout) * random.random()) % 65535)
    packet = create_packet(packet_id)
    while packet:
    # The icmp protocol does not use a port, but the function
    @@ -245,7 +245,7 @@ def __init__(self, host, p_id, timeout=0.5, ignore_errors=False):
    self.timeout = timeout
    # Maximum for an unsigned short int c object counts to 65535 so
    # we have to sure that our packet id is not greater than that.
    self.packet_id = int((id(timeout) / p_id) / 65535)
    self.packet_id = int((id(timeout) / p_id) % 65535)
    self.host = host
    self.packet = create_packet(self.packet_id)
    if ignore_errors:
  10. @pklaus pklaus revised this gist Mar 5, 2011. 1 changed file with 361 additions and 202 deletions.
    563 changes: 361 additions & 202 deletions ping.py
    Original file line number Diff line number Diff line change
    @@ -1,205 +1,364 @@
    #!/usr/bin/env python

    """
    A pure python ping implementation using raw socket.
    Note that ICMP messages can only be sent from processes running as root.
    Derived from ping.c distributed in Linux's netkit. That code is
    copyright (c) 1989 by The Regents of the University of California.
    That code is in turn derived from code written by Mike Muuss of the
    US Army Ballistic Research Laboratory in December, 1983 and
    placed in the public domain. They have my thanks.
    Bugs are naturally mine. I'd be glad to hear about them. There are
    certainly word - size dependenceies here.
    Copyright (c) Matthew Dixon Cowles, <http://www.visi.com/~mdc/>.
    Distributable under the terms of the GNU General Public License
    version 2. Provided with no warranties of any sort.
    Original Version from Matthew Dixon Cowles:
    -> ftp://ftp.visi.com/users/mdc/ping.py
    Rewrite by Jens Diemer:
    -> http://www.python-forum.de/post-69122.html#69122
    Revision history
    ~~~~~~~~~~~~~~~~
    May 30, 2007
    little rewrite by Jens Diemer:
    - change socket asterisk import to a normal import
    - replace time.time() with time.clock()
    - delete "return None" (or change to "return" only)
    - in checksum() rename "str" to "source_string"
    November 22, 1997
    Initial hack. Doesn't do much, but rather than try to guess
    what features I (or others) will want in the future, I've only
    put in what I need now.
    December 16, 1997
    For some reason, the checksum bytes are in the wrong order when
    this is run under Solaris 2.X for SPARC but it works right under
    Linux x86. Since I don't know just what's wrong, I'll swap the
    bytes always and then do an htons().
    December 4, 2000
    Changed the struct.pack() calls to pack the checksum and ID as
    unsigned. My thanks to Jerome Poincheval for the fix.
    Last commit info:
    ~~~~~~~~~~~~~~~~~
    $LastChangedDate: $
    $Rev: $
    $Author: $
    """


    import os, sys, socket, struct, select, time

    # From /usr/include/linux/icmp.h; your milage may vary.
    ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.


    def checksum(source_string):
    #!/usr/bin/env python

    """
    I'm not too confident that this is right but testing seems
    to suggest that it gives the same answers as in_cksum in ping.c
    A pure python ping implementation using raw socket.
    Note that ICMP messages can only be sent from processes running as root.
    Derived from ping.c distributed in Linux's netkit. That code is
    copyright (c) 1989 by The Regents of the University of California.
    That code is in turn derived from code written by Mike Muuss of the
    US Army Ballistic Research Laboratory in December, 1983 and
    placed in the public domain. They have my thanks.
    Bugs are naturally mine. I'd be glad to hear about them. There are
    certainly word - size dependenceies here.
    Copyright (c) Matthew Dixon Cowles, <http://www.visi.com/~mdc/>.
    Distributable under the terms of the GNU General Public License
    version 2. Provided with no warranties of any sort.
    Original Version from Matthew Dixon Cowles:
    -> ftp://ftp.visi.com/users/mdc/ping.py
    Rewrite by Jens Diemer:
    -> http://www.python-forum.de/post-69122.html#69122
    Rewrite by Johannes Meyer:
    -> http://www.python-forum.de/viewtopic.php?p=183720
    Revision history
    ~~~~~~~~~~~~~~~~
    November 1, 2010
    Rewrite by Johannes Meyer:
    - changed entire code layout
    - changed some comments and docstrings
    - replaced time.clock() with time.time() in order
    to be able to use this module on linux, too.
    - added global __all__, ICMP_CODE and ERROR_DESCR
    - merged functions "do_one" and "send_one_ping"
    - placed icmp packet creation in its own function
    - removed timestamp from the icmp packet
    - added function "multi_ping_query"
    - added class "PingQuery"
    May 30, 2007
    little rewrite by Jens Diemer:
    - change socket asterisk import to a normal import
    - replace time.time() with time.clock()
    - delete "return None" (or change to "return" only)
    - in checksum() rename "str" to "source_string"
    November 22, 1997
    Initial hack. Doesn't do much, but rather than try to guess
    what features I (or others) will want in the future, I've only
    put in what I need now.
    December 16, 1997
    For some reason, the checksum bytes are in the wrong order when
    this is run under Solaris 2.X for SPARC but it works right under
    Linux x86. Since I don't know just what's wrong, I'll swap the
    bytes always and then do an htons().
    December 4, 2000
    Changed the struct.pack() calls to pack the checksum and ID as
    unsigned. My thanks to Jerome Poincheval for the fix.
    Last commit info:
    ~~~~~~~~~~~~~~~~~
    $LastChangedDate: $
    $Rev: $
    $Author: $
    """
    sum = 0
    countTo = (len(source_string)/2)*2
    count = 0
    while count<countTo:
    thisVal = ord(source_string[count + 1])*256 + ord(source_string[count])
    sum = sum + thisVal
    sum = sum & 0xffffffff # Necessary?
    count = count + 2

    if countTo<len(source_string):
    sum = sum + ord(source_string[len(source_string) - 1])
    sum = sum & 0xffffffff # Necessary?

    sum = (sum >> 16) + (sum & 0xffff)
    sum = sum + (sum >> 16)
    answer = ~sum
    answer = answer & 0xffff

    # Swap bytes. Bugger me if I know why.
    answer = answer >> 8 | (answer << 8 & 0xff00)

    return answer


    def receive_one_ping(my_socket, ID, timeout):
    """
    receive the ping from the socket.
    """
    timeLeft = timeout
    while True:
    startedSelect = time.clock()
    whatReady = select.select([my_socket], [], [], timeLeft)
    howLongInSelect = (time.clock() - startedSelect)
    if whatReady[0] == []: # Timeout
    return

    timeReceived = time.clock()
    recPacket, addr = my_socket.recvfrom(1024)
    icmpHeader = recPacket[20:28]
    type, code, checksum, packetID, sequence = struct.unpack(
    "bbHHh", icmpHeader
    )
    if packetID == ID:
    bytesInDouble = struct.calcsize("d")
    timeSent = struct.unpack("d", recPacket[28:28 + bytesInDouble])[0]
    return timeReceived - timeSent

    timeLeft = timeLeft - howLongInSelect
    if timeLeft <= 0:
    return


    def send_one_ping(my_socket, dest_addr, ID):
    """
    Send one ping to the given >dest_addr<.
    """
    dest_addr = socket.gethostbyname(dest_addr)

    # Header is type (8), code (8), checksum (16), id (16), sequence (16)
    my_checksum = 0

    # Make a dummy heder with a 0 checksum.
    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
    bytesInDouble = struct.calcsize("d")
    data = (192 - bytesInDouble) * "Q"
    data = struct.pack("d", time.clock()) + data

    # Calculate the checksum on the data and the dummy header.
    my_checksum = checksum(header + data)

    # Now that we have the right checksum, we put that in. It's just easier
    # to make up a new header than to stuff it into the dummy.
    header = struct.pack(
    "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
    )
    packet = header + data
    my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1


    def do_one(dest_addr, timeout):
    """
    Returns either the delay (in seconds) or none on timeout.
    """
    icmp = socket.getprotobyname("icmp")
    try:
    my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
    except socket.error, (errno, msg):
    if errno == 1:
    # Operation not permitted
    msg = msg + (
    " - Note that ICMP messages can only be sent from processes"
    " running as root."
    )
    raise socket.error(msg)
    raise # raise the original error

    my_ID = os.getpid() & 0xFFFF

    send_one_ping(my_socket, dest_addr, my_ID)
    delay = receive_one_ping(my_socket, my_ID, timeout)

    my_socket.close()
    return delay


    def verbose_ping(dest_addr, timeout = 2, count = 4):
    """
    Send >count< ping to >dest_addr< with the given >timeout< and display
    the result.
    """
    for i in xrange(count):
    print "ping %s..." % dest_addr,

    import time
    import socket
    import struct
    import select
    import random
    import asyncore

    # From /usr/include/linux/icmp.h; your milage may vary.
    ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.

    ICMP_CODE = socket.getprotobyname('icmp')
    ERROR_DESCR = {
    1: ' - Note that ICMP messages can only be '
    'sent from processes running as root.',
    10013: ' - Note that ICMP messages can only be sent by'
    ' users or processes with administrator rights.'
    }

    __all__ = ['create_packet', 'do_one', 'verbose_ping', 'PingQuery',
    'multi_ping_query']


    def checksum(source_string):
    # I'm not too confident that this is right but testing seems to
    # suggest that it gives the same answers as in_cksum in ping.c.
    sum = 0
    count_to = (len(source_string) / 2) * 2
    count = 0
    while count < count_to:
    this_val = ord(source_string[count + 1])*256+ord(source_string[count])
    sum = sum + this_val
    sum = sum & 0xffffffff # Necessary?
    count = count + 2
    if count_to < len(source_string):
    sum = sum + ord(source_string[len(source_string) - 1])
    sum = sum & 0xffffffff # Necessary?
    sum = (sum >> 16) + (sum & 0xffff)
    sum = sum + (sum >> 16)
    answer = ~sum
    answer = answer & 0xffff
    # Swap bytes. Bugger me if I know why.
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer


    def create_packet(id):
    """Create a new echo request packet based on the given "id"."""
    # Header is type (8), code (8), checksum (16), id (16), sequence (16)
    header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0, 0, id, 1)
    data = 192 * 'Q'
    # Calculate the checksum on the data and the dummy header.
    my_checksum = checksum(header + data)
    # Now that we have the right checksum, we put that in. It's just easier
    # to make up a new header than to stuff it into the dummy.
    header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0,
    socket.htons(my_checksum), id, 1)
    return header + data


    def do_one(dest_addr, timeout=1):
    """
    Sends one ping to the given "dest_addr" which can be an ip or hostname.
    "timeout" can be any integer or float except negatives and zero.
    Returns either the delay (in seconds) or None on timeout and an invalid
    address, respectively.
    """
    try:
    my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE)
    except socket.error, (errno, msg):
    if errno in ERROR_DESCR:
    # Operation not permitted
    raise socket.error(''.join((msg, ERROR_DESCR[errno])))
    raise # raise the original error
    try:
    delay = do_one(dest_addr, timeout)
    except socket.gaierror, e:
    print "failed. (socket error: '%s')" % e[1]
    break

    if delay == None:
    print "failed. (timeout within %ssec.)" % timeout
    else:
    delay = delay * 1000
    print "get ping in %0.4fms" % delay
    print


    if __name__ == '__main__':
    verbose_ping("heise.de")
    verbose_ping("google.com")
    verbose_ping("a-test-url-taht-is-not-available.com")
    verbose_ping("192.168.1.1")
    host = socket.gethostbyname(dest_addr)
    except socket.gaierror:
    return
    # Maximum for an unsigned short int c object counts to 65535 so
    # we have to sure that our packet id is not greater than that.
    packet_id = int((id(timeout) * random.random()) / 65535)
    packet = create_packet(packet_id)
    while packet:
    # The icmp protocol does not use a port, but the function
    # below expects it, so we just give it a dummy port.
    sent = my_socket.sendto(packet, (dest_addr, 1))
    packet = packet[sent:]
    delay = receive_ping(my_socket, packet_id, time.time(), timeout)
    my_socket.close()
    return delay


    def receive_ping(my_socket, packet_id, time_sent, timeout):
    # Receive the ping from the socket.
    time_left = timeout
    while True:
    started_select = time.time()
    ready = select.select([my_socket], [], [], time_left)
    how_long_in_select = time.time() - started_select
    if ready[0] == []: # Timeout
    return
    time_received = time.time()
    rec_packet, addr = my_socket.recvfrom(1024)
    icmp_header = rec_packet[20:28]
    type, code, checksum, p_id, sequence = struct.unpack(
    'bbHHh', icmp_header)
    if p_id == packet_id:
    return time_received - time_sent
    time_left -= time_received - time_sent
    if time_left <= 0:
    return


    def verbose_ping(dest_addr, timeout=2, count=4):
    """
    Sends one ping to the given "dest_addr" which can be an ip or hostname.
    "timeout" can be any integer or float except negatives and zero.
    "count" specifies how many pings will be sent.
    Displays the result on the screen.
    """
    for i in xrange(count):
    print 'ping {}...'.format(dest_addr),
    delay = do_one(dest_addr, timeout)
    if delay == None:
    print 'failed. (Timeout within {} seconds.)'.format(timeout)
    else:
    delay = round(delay * 1000.0, 4)
    print 'get ping in {} milliseconds.'.format(delay)
    print


    class PingQuery(asyncore.dispatcher):
    def __init__(self, host, p_id, timeout=0.5, ignore_errors=False):
    """
    Derived class from "asyncore.dispatcher" for sending and
    receiving an icmp echo request/reply.
    Usually this class is used in conjunction with the "loop"
    function of asyncore.
    Once the loop is over, you can retrieve the results with
    the "get_result" method. Assignment is possible through
    the "get_host" method.
    "host" represents the address under which the server can be reached.
    "timeout" is the interval which the host gets granted for its reply.
    "p_id" must be any unique integer or float except negatives and zeros.
    If "ignore_errors" is True, the default behaviour of asyncore
    will be overwritten with a function which does just nothing.
    """
    asyncore.dispatcher.__init__(self)
    try:
    self.create_socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE)
    except socket.error, (errno, msg):
    if errno in ERROR_DESCR:
    # Operation not permitted
    raise socket.error(''.join((msg, ERROR_DESCR[errno])))
    raise # raise the original error
    self.time_received = 0
    self.time_sent = 0
    self.timeout = timeout
    # Maximum for an unsigned short int c object counts to 65535 so
    # we have to sure that our packet id is not greater than that.
    self.packet_id = int((id(timeout) / p_id) / 65535)
    self.host = host
    self.packet = create_packet(self.packet_id)
    if ignore_errors:
    # If it does not care whether an error occured or not.
    self.handle_error = self.do_not_handle_errors
    self.handle_expt = self.do_not_handle_errors

    def writable(self):
    return self.time_sent == 0

    def handle_write(self):
    self.time_sent = time.time()
    while self.packet:
    # The icmp protocol does not use a port, but the function
    # below expects it, so we just give it a dummy port.
    sent = self.sendto(self.packet, (self.host, 1))
    self.packet = self.packet[sent:]

    def readable(self):
    # As long as we did not sent anything, the channel has to be left open.
    if (not self.writable()
    # Once we sent something, we should periodically check if the reply
    # timed out.
    and self.timeout < (time.time() - self.time_sent)):
    self.close()
    return False
    # If the channel should not be closed, we do not want to read something
    # until we did not sent anything.
    return not self.writable()

    def handle_read(self):
    read_time = time.time()
    packet, addr = self.recvfrom(1024)
    header = packet[20:28]
    type, code, checksum, p_id, sequence = struct.unpack("bbHHh", header)
    if p_id == self.packet_id:
    # This comparison is necessary because winsocks do not only get
    # the replies for their own sent packets.
    self.time_received = read_time
    self.close()

    def get_result(self):
    """Return the ping delay if possible, otherwise None."""
    if self.time_received > 0:
    return self.time_received - self.time_sent

    def get_host(self):
    """Return the host where to the request has or should been sent."""
    return self.host

    def do_not_handle_errors(self):
    # Just a dummy handler to stop traceback printing, if desired.
    pass

    def create_socket(self, family, type, proto):
    # Overwritten, because the original does not support the "proto" arg.
    sock = socket.socket(family, type, proto)
    sock.setblocking(0)
    self.set_socket(sock)
    # Part of the original but is not used. (at least at python 2.7)
    # Copied for possible compatiblity reasons.
    self.family_and_type = family, type

    # If the following methods would not be there, we would see some very
    # "useful" warnings from asyncore, maybe. But we do not want to, or do we?
    def handle_connect(self):
    pass

    def handle_accept(self):
    pass

    def handle_close(self):
    self.close()


    def multi_ping_query(hosts, timeout=1, step=512, ignore_errors=False):
    """
    Sends multiple icmp echo requests at once.
    "hosts" is a list of ips or hostnames which should be pinged.
    "timeout" must be given and a integer or float greater than zero.
    "step" is the amount of sockets which should be watched at once.
    See the docstring of "PingQuery" for the meaning of "ignore_erros".
    """
    results, host_list, id = {}, [], 0
    for host in hosts:
    try:
    host_list.append(socket.gethostbyname(host))
    except socket.gaierror:
    results[host] = None
    while host_list:
    sock_list = []
    for ip in host_list[:step]: # select supports only a max of 512
    id += 1
    sock_list.append(PingQuery(ip, id, timeout, ignore_errors))
    host_list.remove(ip)
    # Remember to use a timeout here. The risk to get an infinite loop
    # is high, because noone can guarantee that each host will reply!
    asyncore.loop(timeout)
    for sock in sock_list:
    results[sock.get_host()] = sock.get_result()
    return results


    if __name__ == '__main__':
    # Testing
    verbose_ping('www.heise.de')
    verbose_ping('google.com')
    verbose_ping('an-invalid-test-url.com')
    verbose_ping('127.0.0.1')
    host_list = ['www.heise.de', 'google.com', '127.0.0.1',
    'an-invalid-test-url.com']
    for host, ping in multi_ping_query(host_list).iteritems():
    print host, '=', ping
  11. @pklaus pklaus created this gist Mar 5, 2011.
    205 changes: 205 additions & 0 deletions ping.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,205 @@
    #!/usr/bin/env python

    """
    A pure python ping implementation using raw socket.
    Note that ICMP messages can only be sent from processes running as root.
    Derived from ping.c distributed in Linux's netkit. That code is
    copyright (c) 1989 by The Regents of the University of California.
    That code is in turn derived from code written by Mike Muuss of the
    US Army Ballistic Research Laboratory in December, 1983 and
    placed in the public domain. They have my thanks.
    Bugs are naturally mine. I'd be glad to hear about them. There are
    certainly word - size dependenceies here.
    Copyright (c) Matthew Dixon Cowles, <http://www.visi.com/~mdc/>.
    Distributable under the terms of the GNU General Public License
    version 2. Provided with no warranties of any sort.
    Original Version from Matthew Dixon Cowles:
    -> ftp://ftp.visi.com/users/mdc/ping.py
    Rewrite by Jens Diemer:
    -> http://www.python-forum.de/post-69122.html#69122
    Revision history
    ~~~~~~~~~~~~~~~~
    May 30, 2007
    little rewrite by Jens Diemer:
    - change socket asterisk import to a normal import
    - replace time.time() with time.clock()
    - delete "return None" (or change to "return" only)
    - in checksum() rename "str" to "source_string"
    November 22, 1997
    Initial hack. Doesn't do much, but rather than try to guess
    what features I (or others) will want in the future, I've only
    put in what I need now.
    December 16, 1997
    For some reason, the checksum bytes are in the wrong order when
    this is run under Solaris 2.X for SPARC but it works right under
    Linux x86. Since I don't know just what's wrong, I'll swap the
    bytes always and then do an htons().
    December 4, 2000
    Changed the struct.pack() calls to pack the checksum and ID as
    unsigned. My thanks to Jerome Poincheval for the fix.
    Last commit info:
    ~~~~~~~~~~~~~~~~~
    $LastChangedDate: $
    $Rev: $
    $Author: $
    """


    import os, sys, socket, struct, select, time

    # From /usr/include/linux/icmp.h; your milage may vary.
    ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.


    def checksum(source_string):
    """
    I'm not too confident that this is right but testing seems
    to suggest that it gives the same answers as in_cksum in ping.c
    """
    sum = 0
    countTo = (len(source_string)/2)*2
    count = 0
    while count<countTo:
    thisVal = ord(source_string[count + 1])*256 + ord(source_string[count])
    sum = sum + thisVal
    sum = sum & 0xffffffff # Necessary?
    count = count + 2

    if countTo<len(source_string):
    sum = sum + ord(source_string[len(source_string) - 1])
    sum = sum & 0xffffffff # Necessary?

    sum = (sum >> 16) + (sum & 0xffff)
    sum = sum + (sum >> 16)
    answer = ~sum
    answer = answer & 0xffff

    # Swap bytes. Bugger me if I know why.
    answer = answer >> 8 | (answer << 8 & 0xff00)

    return answer


    def receive_one_ping(my_socket, ID, timeout):
    """
    receive the ping from the socket.
    """
    timeLeft = timeout
    while True:
    startedSelect = time.clock()
    whatReady = select.select([my_socket], [], [], timeLeft)
    howLongInSelect = (time.clock() - startedSelect)
    if whatReady[0] == []: # Timeout
    return

    timeReceived = time.clock()
    recPacket, addr = my_socket.recvfrom(1024)
    icmpHeader = recPacket[20:28]
    type, code, checksum, packetID, sequence = struct.unpack(
    "bbHHh", icmpHeader
    )
    if packetID == ID:
    bytesInDouble = struct.calcsize("d")
    timeSent = struct.unpack("d", recPacket[28:28 + bytesInDouble])[0]
    return timeReceived - timeSent

    timeLeft = timeLeft - howLongInSelect
    if timeLeft <= 0:
    return


    def send_one_ping(my_socket, dest_addr, ID):
    """
    Send one ping to the given >dest_addr<.
    """
    dest_addr = socket.gethostbyname(dest_addr)

    # Header is type (8), code (8), checksum (16), id (16), sequence (16)
    my_checksum = 0

    # Make a dummy heder with a 0 checksum.
    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
    bytesInDouble = struct.calcsize("d")
    data = (192 - bytesInDouble) * "Q"
    data = struct.pack("d", time.clock()) + data

    # Calculate the checksum on the data and the dummy header.
    my_checksum = checksum(header + data)

    # Now that we have the right checksum, we put that in. It's just easier
    # to make up a new header than to stuff it into the dummy.
    header = struct.pack(
    "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
    )
    packet = header + data
    my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1


    def do_one(dest_addr, timeout):
    """
    Returns either the delay (in seconds) or none on timeout.
    """
    icmp = socket.getprotobyname("icmp")
    try:
    my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
    except socket.error, (errno, msg):
    if errno == 1:
    # Operation not permitted
    msg = msg + (
    " - Note that ICMP messages can only be sent from processes"
    " running as root."
    )
    raise socket.error(msg)
    raise # raise the original error

    my_ID = os.getpid() & 0xFFFF

    send_one_ping(my_socket, dest_addr, my_ID)
    delay = receive_one_ping(my_socket, my_ID, timeout)

    my_socket.close()
    return delay


    def verbose_ping(dest_addr, timeout = 2, count = 4):
    """
    Send >count< ping to >dest_addr< with the given >timeout< and display
    the result.
    """
    for i in xrange(count):
    print "ping %s..." % dest_addr,
    try:
    delay = do_one(dest_addr, timeout)
    except socket.gaierror, e:
    print "failed. (socket error: '%s')" % e[1]
    break

    if delay == None:
    print "failed. (timeout within %ssec.)" % timeout
    else:
    delay = delay * 1000
    print "get ping in %0.4fms" % delay
    print


    if __name__ == '__main__':
    verbose_ping("heise.de")
    verbose_ping("google.com")
    verbose_ping("a-test-url-taht-is-not-available.com")
    verbose_ping("192.168.1.1")