Loading scripts/cut_bs.py +69 −80 Original line number Diff line number Diff line Loading @@ -39,75 +39,63 @@ SID_BITS = {35, 48, 104} SYNC_WORDS = {b"!k", b" k"} def cut_to_length(fp, fp_out, length): assert length > 0 fr_cnt = 0 for f in range(length): def get_next_frame(fp): eof = False n_bits = 0 data = None sync_word = fp.read(2) if sync_word == b"": return fr_cnt eof = True return (sync_word, n_bits, data, eof) if sync_word not in SYNC_WORDS: raise ValueError("Bad Sync word!") n_bits_bs = fp.read(2) n_bits = struct.unpack("h", n_bits_bs)[0] fp_out.write(sync_word) fp_out.write(n_bits_bs) fp_out.write(fp.read(n_bits * 2)) fr_cnt += 1 n_bits = struct.unpack("h", fp.read(2))[0] data = fp.read(n_bits * 2) return (sync_word, n_bits, data, eof) return fr_cnt def write_frame(fp_out, sync_word, n_bits, data): fp_out.write(sync_word) fp_out.write(struct.pack("h", n_bits)) fp_out.write(data) def cut_from_start(fp, fp_out, start_frame=0, start_with_sid=False): # cut until start frame def cut_from_start(fp, fp_out, start_frame=0, length=-1, start_with_sid=False): fr_cnt = 0 cut_cnt = 0 for cur_frame in range(start_frame): sync_word = fp.read(2) if sync_word == b"": return (fr_cnt, cut_cnt) if sync_word not in SYNC_WORDS: raise ValueError("Bad Sync word!") n_bits = struct.unpack("h", fp.read(2))[0] fp.read(n_bits * 2) extracted_frames_cnt = 0 eof = False # cut until start frame while fr_cnt < start_frame and not eof: (sync_word, n_bits, data, eof) = get_next_frame(fp) if not eof: fr_cnt += 1 cut_cnt += 1 # cut until SID frame if start_with_sid: found = False while not found: sync_word = fp.read(2) if sync_word == b"": return (fr_cnt, cut_cnt) if sync_word not in SYNC_WORDS: raise ValueError("Bad Sync word!") n_bits_bs = fp.read(2) n_bits = struct.unpack("h", n_bits_bs)[0] while not found and not eof: (sync_word, n_bits, data, eof) = get_next_frame(fp) if not eof: if n_bits in SID_BITS: found = True fp_out.write(sync_word) fp_out.write(n_bits_bs) fp_out.write(fp.read(n_bits * 2)) fp.seek(-(4 + n_bits * 2), 1) # Found SID, rewind else: fp.read(n_bits * 2) cut_cnt += 1 fr_cnt += 1 # transfer frames while length != 0 and not eof: (sync_word, n_bits, data, eof) = get_next_frame(fp) if not eof: write_frame(fp_out, sync_word, n_bits, data) fr_cnt += 1 extracted_frames_cnt += 1 length -= 1 # transfer remaining frames while True: sync_word = fp.read(2) if sync_word == b"": break if sync_word not in SYNC_WORDS: raise ValueError("Bad Sync word!") n_bits_bs = fp.read(2) n_bits = struct.unpack("h", n_bits_bs)[0] fp_out.write(sync_word) fp_out.write(n_bits_bs) fp_out.write(fp.read(n_bits * 2)) # count remaining frames, if any while not eof: (sync_word, n_bits, data, eof) = get_next_frame(fp) if not eof: fr_cnt += 1 return (fr_cnt, cut_cnt) return (fr_cnt, extracted_frames_cnt) if __name__ == "__main__": Loading Loading @@ -138,24 +126,25 @@ if __name__ == "__main__": if not os.path.exists(os.path.realpath(my_args.bs_in)): print(f"\nInput file {my_args.bs_in} does not exist!") sys.exit(-1) if not os.path.exists(os.path.realpath(my_args.bs_out)): print(f"\nOutput file {my_args.bs_out} does not exist, creating it!") else: print(f"\nOutput file {my_args.bs_out} does exist, overwriting it!") if os.path.exists(os.path.realpath(my_args.bs_out)): print(f"\nOutput file {my_args.bs_out} exists, overwriting it!") with open(my_args.bs_in, "rb") as fp_in: with open(my_args.bs_out, "wb") as fp_out: if my_args.sid or my_args.frame: fr_cnt, cut_cnt = cut_from_start( fp_in, fp_out, start_frame=my_args.frame, start_with_sid=my_args.sid fr_cnt, extracted_frames_cnt = cut_from_start( fp_in, fp_out, start_frame=my_args.frame, length=my_args.length, start_with_sid=my_args.sid, ) if extracted_frames_cnt == 0: print(f"\nExtracted 0 frames!") sys.exit(-1) if my_args.length != -1 and extracted_frames_cnt != my_args.length: print( f"Warning! Requested {my_args.length}, only {extracted_frames_cnt} obtained!" ) if my_args.sid and (fr_cnt == cut_cnt): print("Warning! No SID frame found in bitstream!") print(f"Cut {cut_cnt} of {fr_cnt} frames from {my_args.bs_in}") elif my_args.length: fr_cnt = cut_to_length(fp_in, fp_out, my_args.length) if fr_cnt != my_args.length: print( f"Warning! Could not cut to length {my_args.length} as bitstream only contained {fr_cnt} frames!" f"Extracted {extracted_frames_cnt} out of {fr_cnt} frames from {my_args.bs_in}" ) print(f"Cut {my_args.bs_in} to {fr_cnt} frames") Loading
scripts/cut_bs.py +69 −80 Original line number Diff line number Diff line Loading @@ -39,75 +39,63 @@ SID_BITS = {35, 48, 104} SYNC_WORDS = {b"!k", b" k"} def cut_to_length(fp, fp_out, length): assert length > 0 fr_cnt = 0 for f in range(length): def get_next_frame(fp): eof = False n_bits = 0 data = None sync_word = fp.read(2) if sync_word == b"": return fr_cnt eof = True return (sync_word, n_bits, data, eof) if sync_word not in SYNC_WORDS: raise ValueError("Bad Sync word!") n_bits_bs = fp.read(2) n_bits = struct.unpack("h", n_bits_bs)[0] fp_out.write(sync_word) fp_out.write(n_bits_bs) fp_out.write(fp.read(n_bits * 2)) fr_cnt += 1 n_bits = struct.unpack("h", fp.read(2))[0] data = fp.read(n_bits * 2) return (sync_word, n_bits, data, eof) return fr_cnt def write_frame(fp_out, sync_word, n_bits, data): fp_out.write(sync_word) fp_out.write(struct.pack("h", n_bits)) fp_out.write(data) def cut_from_start(fp, fp_out, start_frame=0, start_with_sid=False): # cut until start frame def cut_from_start(fp, fp_out, start_frame=0, length=-1, start_with_sid=False): fr_cnt = 0 cut_cnt = 0 for cur_frame in range(start_frame): sync_word = fp.read(2) if sync_word == b"": return (fr_cnt, cut_cnt) if sync_word not in SYNC_WORDS: raise ValueError("Bad Sync word!") n_bits = struct.unpack("h", fp.read(2))[0] fp.read(n_bits * 2) extracted_frames_cnt = 0 eof = False # cut until start frame while fr_cnt < start_frame and not eof: (sync_word, n_bits, data, eof) = get_next_frame(fp) if not eof: fr_cnt += 1 cut_cnt += 1 # cut until SID frame if start_with_sid: found = False while not found: sync_word = fp.read(2) if sync_word == b"": return (fr_cnt, cut_cnt) if sync_word not in SYNC_WORDS: raise ValueError("Bad Sync word!") n_bits_bs = fp.read(2) n_bits = struct.unpack("h", n_bits_bs)[0] while not found and not eof: (sync_word, n_bits, data, eof) = get_next_frame(fp) if not eof: if n_bits in SID_BITS: found = True fp_out.write(sync_word) fp_out.write(n_bits_bs) fp_out.write(fp.read(n_bits * 2)) fp.seek(-(4 + n_bits * 2), 1) # Found SID, rewind else: fp.read(n_bits * 2) cut_cnt += 1 fr_cnt += 1 # transfer frames while length != 0 and not eof: (sync_word, n_bits, data, eof) = get_next_frame(fp) if not eof: write_frame(fp_out, sync_word, n_bits, data) fr_cnt += 1 extracted_frames_cnt += 1 length -= 1 # transfer remaining frames while True: sync_word = fp.read(2) if sync_word == b"": break if sync_word not in SYNC_WORDS: raise ValueError("Bad Sync word!") n_bits_bs = fp.read(2) n_bits = struct.unpack("h", n_bits_bs)[0] fp_out.write(sync_word) fp_out.write(n_bits_bs) fp_out.write(fp.read(n_bits * 2)) # count remaining frames, if any while not eof: (sync_word, n_bits, data, eof) = get_next_frame(fp) if not eof: fr_cnt += 1 return (fr_cnt, cut_cnt) return (fr_cnt, extracted_frames_cnt) if __name__ == "__main__": Loading Loading @@ -138,24 +126,25 @@ if __name__ == "__main__": if not os.path.exists(os.path.realpath(my_args.bs_in)): print(f"\nInput file {my_args.bs_in} does not exist!") sys.exit(-1) if not os.path.exists(os.path.realpath(my_args.bs_out)): print(f"\nOutput file {my_args.bs_out} does not exist, creating it!") else: print(f"\nOutput file {my_args.bs_out} does exist, overwriting it!") if os.path.exists(os.path.realpath(my_args.bs_out)): print(f"\nOutput file {my_args.bs_out} exists, overwriting it!") with open(my_args.bs_in, "rb") as fp_in: with open(my_args.bs_out, "wb") as fp_out: if my_args.sid or my_args.frame: fr_cnt, cut_cnt = cut_from_start( fp_in, fp_out, start_frame=my_args.frame, start_with_sid=my_args.sid fr_cnt, extracted_frames_cnt = cut_from_start( fp_in, fp_out, start_frame=my_args.frame, length=my_args.length, start_with_sid=my_args.sid, ) if extracted_frames_cnt == 0: print(f"\nExtracted 0 frames!") sys.exit(-1) if my_args.length != -1 and extracted_frames_cnt != my_args.length: print( f"Warning! Requested {my_args.length}, only {extracted_frames_cnt} obtained!" ) if my_args.sid and (fr_cnt == cut_cnt): print("Warning! No SID frame found in bitstream!") print(f"Cut {cut_cnt} of {fr_cnt} frames from {my_args.bs_in}") elif my_args.length: fr_cnt = cut_to_length(fp_in, fp_out, my_args.length) if fr_cnt != my_args.length: print( f"Warning! Could not cut to length {my_args.length} as bitstream only contained {fr_cnt} frames!" f"Extracted {extracted_frames_cnt} out of {fr_cnt} frames from {my_args.bs_in}" ) print(f"Cut {my_args.bs_in} to {fr_cnt} frames")