使用 Python 处理 Ogg 文件
我有一个“oggscissors.py”脚本,可以剪切和加入 ogg 文件。它需要 pyogg 和 pyvorbis 绑定。当我尝试执行这个脚本时,Python 解释器崩溃了。该脚本的源代码是:
import getopt
import ogg.vorbis
import sys
def usage():
print 'usage:'
print ' oggscissors.py --analyze file.ogg'
print " oggscissors.py --from=startpos --upto=endpos in.ogg out.ogg"
sys.exit(" oggscissors.py --join in1.ogg in2.ogg out.ogg")
def analyze_packets(fin, filenamein):
count = None
copied = 0
written = 0
pageno = 0
instream = None
insync = ogg.OggSyncState()
while count == None or copied < count:
b = fin.read(65536)
if not b:
break
insync.bytesin(b)
skipped = 1
while skipped != 0:
skipped, page = insync.pageseek()
print 'read', skipped, 'bytes:', page
if skipped > 0:
if instream and page.serialno() != serialno:
print 'we hit a chain boundary, leaving'
break
if not instream:
serialno = page.serialno()
instream = ogg.OggStreamState(serialno)
page.pageno = pageno
pageno = pageno + 1
instream.pagein(page)
while 1:
p = instream.packetout()
if not p:
break
print ' read', p
copied = copied + 1
elif skipped < 0:
print 'skipped', -skipped, 'bytes'
def count_packets(fin):
print 'Counting packets',
fin.seek(0)
count = 0
instream = None
insync = ogg.OggSyncState()
while 1:
b = fin.read(65536)
if not b:
break
insync.bytesin(b)
skipped = 1
while skipped != 0:
skipped, page = insync.pageseek()
if skipped > 0:
if instream and page.serialno() != serialno:
print 'we hit a chain boundary, leaving'
break
if not instream:
serialno = page.serialno()
instream = ogg.OggStreamState(serialno)
instream.pagein(page)
while 1:
p = instream.packetout()
if not p:
break
#print ' read', p
count = count + 1
elif skipped < 0:
print 'skipped', -skipped, 'bytes'
fin.seek(0)
print count
return count
def copy_packets(fin, fout, outstream=None, skip_packets=0, count=0, written_pages=0, written_packets=0, granule_delta=0, last_chunk=0):
fin.seek(0)
copied = 0
written = 0
orig_packetno = 0
granule_skip = 0
if last_chunk and count == 0:
total_packets = count_packets(fin) - skip_packets
instream = None
insync = ogg.OggSyncState()
if skip_packets: print 'Skipping packets', skip_packets,
while count == 0 or copied < count:
b = fin.read(65536)
if not b:
break
insync.bytesin(b)
skipped = 1
while skipped != 0:
skipped, page = insync.pageseek()
if skipped > 0:
if instream and page.serialno() != serialno:
print 'we hit a chain boundary'
# we hit a chain boundary
break
if not instream:
serialno = page.serialno()
instream = ogg.OggStreamState(serialno)
if not outstream:
outstream = ogg.OggStreamState(serialno)
#print 'reading', page
instream.pagein(page)
while count == 0 or copied < count:
p = instream.packetout()
if not p:
break
if (skip_packets > 0):
#print ' skipping', p, 'yet to skip:', skip_packets
skip_packets = skip_packets - 1
orig_packetno = orig_packetno + 1
if p.granulepos != -1:
granule_skip = p.granulepos
if skip_packets % 1000 == 0: print skip_packets,
if skip_packets == 1: print
continue
#print ' reading', p
if p.eos:
p.eos = 0
if last_chunk:
if count > 0:
if copied + 1 == count:
p.eos = 512
else:
if copied + 1 == total_packets:
p.eos = 512
if written_packets >= 3:
if samples.has_key(orig_packetno):
granule_delta = granule_delta + samples[orig_packetno]
else:
granule_delta = granule_delta + 1024
print 'Unknown size of packet', orig_packetno
p.granulepos = granule_delta
#print ' writing', p
outstream.packetin(p)
copied = copied + 1
written_packets = written_packets + 1
orig_packetno = orig_packetno + 1
if copied % 1000 == 0: print copied,
if copied % 25000 == 0:
print
written_pages, written, outstream = dump_pages(outstream, written_pages, written)
elif skipped < 0:
print 'skipped', -skipped, 'bytes'
print copied
written_pages, written, outstream = dump_pages(outstream, written_pages, written)
return (written, written_pages, written_packets, outstream, granule_delta)
def dump_pages(outstream, written_pages, written):
print 'Dumping pages',
while 1:
pg = outstream.flush()
if not pg:
break
pg.pageno = written_pages
#print 'writing', pg
written = written + pg.writeout(fout)
#print 'written', written, 'bytes so far'
written_pages = written_pages + 1
if written_pages % 100 == 0: print written_pages,
print written_pages
return written_pages, written, outstream
def get_packetno(seconds, rate):
needed_samples = seconds * rate
for z in samples:
needed_samples = needed_samples - samples[z]
if needed_samples <= 0:
break
return z
def get_samples(filenamein):
print 'Scanning packets'
vf = ogg.vorbis.VorbisFile(filenamein)
pcm_prev = 0
packetno = 3
samples = {}
while 1:
(buff, bytes, bit) = vf.read(4096)
if bytes == 0:
break
pcm_now = vf.pcm_tell()
samples[packetno] = pcm_now - pcm_prev
if samples[packetno] > 1024:
print "Clipping bogus packet", packetno, "sample size", samples[packetno], "to 1024"
samples[packetno] = 1024
pcm_prev = pcm_now
packetno = packetno + 1
if packetno % 1000 == 0: print packetno,
(buff, bytes,bit) = vf.read(4096)
vinfo = vf.info()
return samples, vinfo.rate
def renumber_pages(filenamein):
filenameout = filenamein+".fixed.ogg"
print
print "Hole detected! Fixing ogg file and writing it to", filenameout
fin = open(filenamein, 'rb')
fout = open(filenameout, 'wb')
insync = ogg.OggSyncState()
instream = None
outstream = None
pageno = 0
while 1:
b = fin.read(65536)
if not b:
break
insync.bytesin(b)
skipped = 1
while skipped != 0:
skipped, page = insync.pageseek()
if skipped > 0:
if instream and page.serialno() != serialno:
print "we hit a chain boundary, leaving"
break
if not instream:
serialno = page.serialno()
instream = ogg.OggStreamState(serialno)
if not outstream:
outstream = ogg.OggStreamState(serialno)
instream.pagein(page)
#print "in -- ", page
while 1:
try:
p = instream.packetout()
except ogg.OggError:
instream.reset()
instream.pagein(page)
p = instream.packetout()
if not p:
break
outstream.packetin(p)
while 1:
pg = outstream.flush()
if not pg:
break
pg.writeout(fout)
#print "out - ", pg
fin.close()
fout.close()
return filenameout
opts, pargs = getopt.getopt(sys.argv[1:], '', ['analyze', 'join', 'from=', 'upto='])
opt_analyze = opt_join = 0
startpos = -1
endpos = -1
for option, argument in opts:
if option == '--from':
try:
startpos = float(argument)
except ValueError:
usage()
elif option == '--upto':
try:
endpos = float(argument)
except ValueError:
usage()
elif option == '--analyze':
opt_analyze = 1
elif option == '--join':
opt_join = 1
else:
usage()
if opt_analyze:
if len(pargs) != 1:
usage()
filenamein = pargs[0]
fin = open(filenamein, 'rb')
analyze_packets(fin, filenamein)
elif opt_join:
if len(pargs) != 3:
usage()
filenamein = pargs[0]
try:
samples, rate = get_samples(filenamein)
except ogg.vorbis.VorbisError, errorString:
filenamein = renumber_pages(filenamein)
samples, rate = get_samples(filenamein)
fin = open(filenamein, 'rb')
filenameout = pargs[2]
fout = open(filenameout, 'wb')
n, pages, packets, os, grskip = copy_packets(fin, fout, None, 0, 3)
n, pages, packets, os, grskip = copy_packets(fin, fout, os, 3, 0, pages, packets, grskip)
filenamein = pargs[1]
try:
samples, rate = get_samples(filenamein)
except ogg.vorbis.VorbisError, errorString:
filenamein = renumber_pages(filenamein)
samples, rate = get_samples(filenamein)
fin = open(filenamein, 'rb')
n, pages, packets, os, grskip = copy_packets(fin, fout, os, 3, 0, pages, packets, grskip, 1)
else:
if len(pargs) != 2:
usage()
filenamein = pargs[0]
try:
samples, rate = get_samples(filenamein)
except ogg.vorbis.VorbisError, errorString:
filenamein = renumber_pages(filenamein)
samples, rate = get_samples(filenamein)
fin = open(filenamein, 'rb')
filenameout = pargs[1]
fout = open(filenameout, 'wb')
n, pages, packets, os, grskip = copy_packets(fin, fout, None, 0, 3)
if startpos > -1:
startpacket = get_packetno(startpos, rate)
else:
startpacket = 3
if startpacket > 5:
startpacket = startpacket - 2
if endpos > -1:
endpacket = get_packetno(endpos, rate)
count = endpacket - startpacket
else:
count = 0
n, pages, packets, os, grskip = copy_packets(fin, fout, os, startpacket, count, pages, packets, grskip, 1)
它在 get_samples 方法的“vf = ogg.vorbis.VorbisFile(filenamein)”字符串中崩溃。谁能帮我解决这个问题?也许我错误地编译了 vorbis 模块?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论