Skip to content

Instantly share code, notes, and snippets.

@flaviut
Created November 2, 2023 01:56
Show Gist options
  • Select an option

  • Save flaviut/c038ffb0aaff73defc9c93aa9de47cad to your computer and use it in GitHub Desktop.

Select an option

Save flaviut/c038ffb0aaff73defc9c93aa9de47cad to your computer and use it in GitHub Desktop.

Revisions

  1. flaviut created this gist Nov 2, 2023.
    163 changes: 163 additions & 0 deletions ntp.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,163 @@
    import socket
    import struct
    import time
    from dataclasses import dataclass
    from enum import IntEnum
    from typing import Optional

    NTP_SERVER = "pool.ntp.org"
    NTP_PORT = 123
    TIME1970 = 2208988800 # 1970-01-01 in NTP epoch


    class LeapIndicator(IntEnum):
    NO_WARNING = 0
    LAST_MINUTE_61 = 1
    LAST_MINUTE_59 = 2
    ALARM_CONDITION = 3


    class Mode(IntEnum):
    RESERVED = 0
    SYMMETRIC_ACTIVE = 1
    SYMMETRIC_PASSIVE = 2
    CLIENT = 3
    SERVER = 4
    BROADCAST = 5
    RESERVED_NTP_CONTROL = 6
    RESERVED_PRIVATE = 7


    def unpack_pop(fmt, data_iter: iter):
    size = struct.calcsize(fmt)
    data = b""
    for _ in range(size):
    data += next(data_iter).to_bytes(1, "big")

    return struct.unpack(fmt, data)


    @dataclass
    class NtpPacket:
    leap: LeapIndicator
    version: int # This could be an enum as well, depending on your requirements
    mode: Mode
    stratum: int = 0
    poll: int = 0
    precision: int = 0
    root_delay: float = 0.0
    root_dispersion: float = 0.0
    reference_id: int = 0
    reference_timestamp: float = 0.0
    originate_timestamp: float = 0.0
    receive_timestamp: float = 0.0
    transmit_timestamp: float = 0.0

    kiss_of_death: Optional[str] = None

    @staticmethod
    def decode(data: bytes) -> 'NtpPacket':
    data_iter = iter(data)

    flags = unpack_pop('!B', data_iter)[0]
    leap = LeapIndicator((flags >> 6) & 0x03)
    version = (flags >> 3) & 0x07
    mode = Mode(flags & 0x07)

    stratum = unpack_pop('!B', data_iter)[0]

    if stratum == 0:
    # Kiss of death message
    kiss_of_death = unpack_pop('!4s', data_iter)[0].decode('utf-8')
    return NtpPacket(leap, version, mode, stratum, kiss_of_death=kiss_of_death)
    poll, precision = unpack_pop('!BB', data_iter)

    root_delay, root_dispersion = unpack_pop('!2I', data_iter)
    reference_id = unpack_pop('!I', data_iter)[0]

    # Timestamps are in fixed-point format, with the integer part in the first 32 bits
    # and the fractional part in the last 32 bits
    fields = {}
    for ts_name in ['reference_timestamp', 'originate_timestamp',
    'receive_timestamp', 'transmit_timestamp']:
    seconds, seconds_frac = unpack_pop('!2I', data_iter)
    fields[ts_name] = seconds + (seconds_frac / 2.0 ** 32.0) - TIME1970

    return NtpPacket(leap, version, mode, stratum, poll, precision,
    root_delay / 2 ** 16, root_dispersion / 2 ** 16, reference_id,
    **fields)

    def encode(self) -> bytes:
    flags = (self.leap.value << 6) | (self.version << 3) | self.mode.value
    root_delay = int(self.root_delay * 2 ** 16)
    root_dispersion = int(self.root_dispersion * 2 ** 16)

    packed_data = struct.pack('!B3B3I', flags, self.stratum, self.poll,
    self.precision, root_delay, root_dispersion,
    self.reference_id)

    # Encode timestamps
    for ts in [self.reference_timestamp, self.originate_timestamp,
    self.receive_timestamp, self.transmit_timestamp]:
    offset_ts = ts + TIME1970
    int_part = int(offset_ts)
    frac_part = int((offset_ts - int_part) * 2 ** 32)
    packed_data += struct.pack('!2I', int_part, frac_part)

    return packed_data


    def send_ntp_request(client):
    # Getting the current time as T1 (originate timestamp)
    originate_timestamp = time.time()
    request_data = NtpPacket(
    leap=LeapIndicator.NO_WARNING,
    version=3,
    mode=Mode.CLIENT,
    originate_timestamp=originate_timestamp
    ).encode()

    client.send(request_data)
    return client, originate_timestamp


    def receive_ntp_response(client, t1):
    data, address = client.recvfrom(1024)
    t4 = time.time()

    if len(data) != 48:
    print(f"Unexpected packet size: {len(data)}")
    return None, None, None

    parsed = NtpPacket.decode(data)
    print(parsed)

    t2 = parsed.receive_timestamp # T2
    t3 = parsed.transmit_timestamp # T3

    # Compute the round trip delay and local clock offset
    round_trip_delay = (t4 - t1) - (t3 - t2)
    local_clock_offset = ((t2 - t1) + (t3 - t4)) / 2

    # Corrected time using local clock offset
    corrected_time = t4 + local_clock_offset

    print(f"Server's Time: {time.ctime(corrected_time)}")
    print(f"Round Trip Delay: {round_trip_delay} seconds")
    print(f"Local Clock Offset: {local_clock_offset} seconds")


    def main():
    for _ in range(5):
    client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    client.settimeout(5)
    client.connect((NTP_SERVER, NTP_PORT))

    for _ in range(5):
    client, t1 = send_ntp_request(client)
    receive_ntp_response(client, t1)
    client.close()


    if __name__ == "__main__":
    main()