|
| 1 | +import os |
| 2 | +import argparse |
| 3 | + |
| 4 | +import numpy as np |
| 5 | +from scipy import signal |
| 6 | +from scipy.io import wavfile |
| 7 | +import resampy |
| 8 | + |
| 9 | + |
| 10 | + |
| 11 | + |
| 12 | +parser = argparse.ArgumentParser() |
| 13 | + |
| 14 | +parser.add_argument("filelist", type=str, help="file with filenames for concatenation in WAVE format") |
| 15 | +parser.add_argument("target_fs", type=int, help="target sampling rate of concatenated file") |
| 16 | +parser.add_argument("output", type=str, help="binary output file (integer16)") |
| 17 | +parser.add_argument("--basedir", type=str, help="basedir for filenames in filelist, defaults to ./", default="./") |
| 18 | +parser.add_argument("--normalize", action="store_true", help="apply normalization") |
| 19 | +parser.add_argument("--db_max", type=float, help="max DB for random normalization", default=0) |
| 20 | +parser.add_argument("--db_min", type=float, help="min DB for random normalization", default=0) |
| 21 | +parser.add_argument("--verbose", action="store_true") |
| 22 | + |
| 23 | +def read_filelist(basedir, filelist): |
| 24 | + with open(filelist, "r") as f: |
| 25 | + files = f.readlines() |
| 26 | + |
| 27 | + fullfiles = [os.path.join(basedir, f.rstrip('\n')) for f in files if len(f.rstrip('\n')) > 0] |
| 28 | + |
| 29 | + return fullfiles |
| 30 | + |
| 31 | +def read_wave(file, target_fs): |
| 32 | + fs, x = wavfile.read(file) |
| 33 | + |
| 34 | + if fs < target_fs: |
| 35 | + return None |
| 36 | + print(f"[read_wave] warning: file {file} will be up-sampled from {fs} to {target_fs} Hz") |
| 37 | + |
| 38 | + if fs != target_fs: |
| 39 | + x = resampy.resample(x, fs, target_fs) |
| 40 | + |
| 41 | + return x.astype(np.float32) |
| 42 | + |
| 43 | +def random_normalize(x, db_min, db_max, max_val=2**15 - 1): |
| 44 | + db = np.random.uniform(db_min, db_max, 1) |
| 45 | + m = np.abs(x).max() |
| 46 | + c = 10**(db/20) * max_val / m |
| 47 | + |
| 48 | + return c * x |
| 49 | + |
| 50 | + |
| 51 | +def concatenate(filelist : str, output : str, target_fs: int, normalize=True, db_min=0, db_max=0, verbose=False): |
| 52 | + |
| 53 | + overlap_size = int(40 * target_fs / 8000) |
| 54 | + overlap_mem = np.zeros(overlap_size, dtype=np.float32) |
| 55 | + overlap_win1 = (0.5 + 0.5 * np.cos(np.arange(0, overlap_size) * np.pi / overlap_size)).astype(np.float32) |
| 56 | + overlap_win2 = np.flipud(overlap_win1) |
| 57 | + |
| 58 | + with open(output, 'wb') as f: |
| 59 | + for file in filelist: |
| 60 | + x = read_wave(file, target_fs) |
| 61 | + if x is None: continue |
| 62 | + |
| 63 | + if len(x) < 10 * overlap_size: |
| 64 | + if verbose: print(f"skipping {file}...") |
| 65 | + continue |
| 66 | + elif verbose: |
| 67 | + print(f"processing {file}...") |
| 68 | + |
| 69 | + if normalize: |
| 70 | + x = random_normalize(x, db_min, db_max) |
| 71 | + |
| 72 | + x1 = x[:-overlap_size] |
| 73 | + x1[:overlap_size] = overlap_win1 * overlap_mem + overlap_win2 * x1[:overlap_size] |
| 74 | + |
| 75 | + f.write(x1.astype(np.int16).tobytes()) |
| 76 | + |
| 77 | + overlap_mem = x1[-overlap_size] |
| 78 | + |
| 79 | + |
| 80 | +if __name__ == "__main__": |
| 81 | + args = parser.parse_args() |
| 82 | + |
| 83 | + filelist = read_filelist(args.basedir, args.filelist) |
| 84 | + |
| 85 | + concatenate(filelist, args.output, args.target_fs, normalize=args.normalize, db_min=args.db_min, db_max=args.db_max, verbose=args.verbose) |
0 commit comments