#!/usr/bin/env python # vim: set ts=8 sw=4 sts=4 et ai tw=79: ''' Usage: ./spandspflow2pcap.py SPANDSP_LOG SENDFAX_PCAP Takes a log from Asterisk with SpanDSP, extracts the "received" data and puts it in a pcap file. Use 'fax set debug on' and configure logger.conf to get fax logs. Input data should look something like this:: [2013-08-07 15:17:34] FAX[23479] res_fax.c: FLOW T.38 Rx 5: IFP c0 ... Output data will look like a valid pcap file ;-) This allows you to reconstruct received faxes into replayable pcaps. Replaying is expected to be done by SIPp with sipp-sendfax.xml. The SIPp binary used for replaying must have image (fax) support. This means you'll need a version higher than 3.5.0 (unreleased when writing this), or the git master branch: https://github.com/SIPp/sipp Author: Walter Doekes, OSSO B.V. (2013,2015,2016,2019) License: Public Domain ''' from base64 import b16decode from collections import namedtuple from datetime import datetime, timedelta from re import search from time import mktime from struct import pack import os import sys LOSSY = False EMPTY_RECOVERY = False IFP = namedtuple('IFP', 'date seqno data') # datetime, int, bytearray def n2b(text): """ Convert "aa bb cc" to bytearray('\xaa\xbb\xcc'). """ return bytearray( b16decode(text.replace(' ', '').replace('\n', '').upper())) class SkipPacket(Exception): pass class FaxPcap(object): PCAP_PREAMBLE = n2b( 'd4 c3 b2 a1 02 00 04 00' '00 00 00 00 00 00 00 00' 'ff ff 00 00 71 00 00 00') def __init__(self, outfile): self.outfile = outfile self.date = None self.seqno = None self.udpseqno = 128 self.prev_data = None # Only do this if at pos 0? def add(self, ifp): """ Add the IFP packet. T.38 basic format of UDPTL payload section with redundancy: UDPTL_SEQNO - 2 sequence number (big endian) UDPTL_PRIMARY_PAYLOAD (T30?) - 1 subpacket length (excluding this byte) - 1 type of message (e.g. 0xd0 for data(?)) - 1 items in data field (e.g. 0x01) - 2 length of data (big endian) - N data RECOVERY (optional) - 2 count of previous seqno packets (big endian) - N UDPTL_PRIMARY_PAYLOAD of (seqno-1) - N UDPTL_PRIMARY_PAYLOAD of (seqno-2) - ... """ # First packet? if self.seqno is None: # Add preamble. self._add_preamble() # Start a second late (optional). self._add_garbage(ifp.date) # Set sequence, and fill with missing leading zeroes. self.seqno = 0 for i in range(ifp.seqno): self.add(IFP(date=ifp.date, seqno=i, data=bytearray([0]))) # Auto-increasing dates if self.date is None or ifp.date > self.date: self.date = ifp.date elif ifp.date < self.date.replace(microsecond=0): assert False, 'More packets than expected in 1s? {!r}/{!r}'.format( ifp.date, self.date) else: self.date += timedelta(microseconds=9000) # Add packet. self.seqno = ifp.seqno try: self.outfile.write(self._make_packet(ifp.data)) except SkipPacket: pass def _add_preamble(self): self.outfile.write(self.PCAP_PREAMBLE) def _add_garbage(self, date): if self.date is None or date > self.date: self.date = date self.seqno = 0xffff self.outfile.write(self._make_packet( bytearray(b'GARBAGE'), is_ifp=False)) def _make_packet(self, ifp_data, is_ifp=True): sum16 = bytearray(b'\x43\x21') # the OS fixes the checksums for us data = bytearray() if is_ifp: data.append(len(ifp_data)) # length data.extend(ifp_data) # data self.prev_data, prev_data = data[:], self.prev_data else: data.extend(ifp_data) prev_data = None if prev_data: if LOSSY and (self.seqno % 3) == 2: self.udpseqno += 1 raise SkipPacket() if EMPTY_RECOVERY: # struct ast_frame f[16], we have room for a few # packets. packets = 14 data.extend([0, packets + 1] + [0] * packets) data.extend(prev_data) else: # Add 1 previous packet, without the seqno. data.extend([0, 1]) data.extend(prev_data) # Wrap it in UDP udp = bytearray( b'\x00\x01\x00\x02%(len)s%(sum16)s%(seqno)s%(data)s' % { b'len': pack('>H', len(data) + 10), b'sum16': sum16, b'seqno': pack('>H', self.seqno), b'data': data}) # Wrap it in IP ip = bytearray( b'\x45\xb8%(len)s%(udpseqno)s\x00\x00\xf9\x11%(sum16)s' b'\x01\x01\x01\x01\x02\x02\x02\x02%(udp)s' % { b'len': pack('>H', len(udp) + 20), b'udpseqno': pack('>H', self.udpseqno), b'sum16': sum16, b'udp': udp}) # Wrap it in Ethernet ethernet = bytearray( b'\x00\x00\x00\x01\x00\x06\x00\x30\x48\xb1\x1c\x34\x00\x00' b'\x08\x00%(ip)s' % {b'ip': ip}) # Wrap it in a pcap packet packet = bytearray(b'%(prelude)s%(ethernet)s' % { b'prelude': pack( '