Created
November 2, 2023 01:56
-
-
Save flaviut/c038ffb0aaff73defc9c93aa9de47cad to your computer and use it in GitHub Desktop.
Revisions
-
flaviut created this gist
Nov 2, 2023 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,163 @@ import socket import struct import time from dataclasses import dataclass from enum import IntEnum from typing import Optional NTP_SERVER = "pool.ntp.org" NTP_PORT = 123 TIME1970 = 2208988800 # 1970-01-01 in NTP epoch class LeapIndicator(IntEnum): NO_WARNING = 0 LAST_MINUTE_61 = 1 LAST_MINUTE_59 = 2 ALARM_CONDITION = 3 class Mode(IntEnum): RESERVED = 0 SYMMETRIC_ACTIVE = 1 SYMMETRIC_PASSIVE = 2 CLIENT = 3 SERVER = 4 BROADCAST = 5 RESERVED_NTP_CONTROL = 6 RESERVED_PRIVATE = 7 def unpack_pop(fmt, data_iter: iter): size = struct.calcsize(fmt) data = b"" for _ in range(size): data += next(data_iter).to_bytes(1, "big") return struct.unpack(fmt, data) @dataclass class NtpPacket: leap: LeapIndicator version: int # This could be an enum as well, depending on your requirements mode: Mode stratum: int = 0 poll: int = 0 precision: int = 0 root_delay: float = 0.0 root_dispersion: float = 0.0 reference_id: int = 0 reference_timestamp: float = 0.0 originate_timestamp: float = 0.0 receive_timestamp: float = 0.0 transmit_timestamp: float = 0.0 kiss_of_death: Optional[str] = None @staticmethod def decode(data: bytes) -> 'NtpPacket': data_iter = iter(data) flags = unpack_pop('!B', data_iter)[0] leap = LeapIndicator((flags >> 6) & 0x03) version = (flags >> 3) & 0x07 mode = Mode(flags & 0x07) stratum = unpack_pop('!B', data_iter)[0] if stratum == 0: # Kiss of death message kiss_of_death = unpack_pop('!4s', data_iter)[0].decode('utf-8') return NtpPacket(leap, version, mode, stratum, kiss_of_death=kiss_of_death) poll, precision = unpack_pop('!BB', data_iter) root_delay, root_dispersion = unpack_pop('!2I', data_iter) reference_id = unpack_pop('!I', data_iter)[0] # Timestamps are in fixed-point format, with the integer part in the first 32 bits # and the fractional part in the last 32 bits fields = {} for ts_name in ['reference_timestamp', 'originate_timestamp', 'receive_timestamp', 'transmit_timestamp']: seconds, seconds_frac = unpack_pop('!2I', data_iter) fields[ts_name] = seconds + (seconds_frac / 2.0 ** 32.0) - TIME1970 return NtpPacket(leap, version, mode, stratum, poll, precision, root_delay / 2 ** 16, root_dispersion / 2 ** 16, reference_id, **fields) def encode(self) -> bytes: flags = (self.leap.value << 6) | (self.version << 3) | self.mode.value root_delay = int(self.root_delay * 2 ** 16) root_dispersion = int(self.root_dispersion * 2 ** 16) packed_data = struct.pack('!B3B3I', flags, self.stratum, self.poll, self.precision, root_delay, root_dispersion, self.reference_id) # Encode timestamps for ts in [self.reference_timestamp, self.originate_timestamp, self.receive_timestamp, self.transmit_timestamp]: offset_ts = ts + TIME1970 int_part = int(offset_ts) frac_part = int((offset_ts - int_part) * 2 ** 32) packed_data += struct.pack('!2I', int_part, frac_part) return packed_data def send_ntp_request(client): # Getting the current time as T1 (originate timestamp) originate_timestamp = time.time() request_data = NtpPacket( leap=LeapIndicator.NO_WARNING, version=3, mode=Mode.CLIENT, originate_timestamp=originate_timestamp ).encode() client.send(request_data) return client, originate_timestamp def receive_ntp_response(client, t1): data, address = client.recvfrom(1024) t4 = time.time() if len(data) != 48: print(f"Unexpected packet size: {len(data)}") return None, None, None parsed = NtpPacket.decode(data) print(parsed) t2 = parsed.receive_timestamp # T2 t3 = parsed.transmit_timestamp # T3 # Compute the round trip delay and local clock offset round_trip_delay = (t4 - t1) - (t3 - t2) local_clock_offset = ((t2 - t1) + (t3 - t4)) / 2 # Corrected time using local clock offset corrected_time = t4 + local_clock_offset print(f"Server's Time: {time.ctime(corrected_time)}") print(f"Round Trip Delay: {round_trip_delay} seconds") print(f"Local Clock Offset: {local_clock_offset} seconds") def main(): for _ in range(5): client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) client.settimeout(5) client.connect((NTP_SERVER, NTP_PORT)) for _ in range(5): client, t1 = send_ntp_request(client) receive_ntp_response(client, t1) client.close() if __name__ == "__main__": main()