ハフマン符号によるデータをうまい具合にいい感じにするあれ
この記事はKogakuin Advent Calendar 2018,9日目の記事です。
皆さんこんばんは
一部マニアからは神と呼ばれる男、浦郷です
私が神と呼ばれているのは、浦郷という名前をローマ字で「URAGOU」とした時
URAGOU → GOU → GOD → 神
なだけです。それ以外に理由はありません
さて、アドベントカレンダーなので一応技術的な事を書かなきゃいけないそうです。それっぽいことを書きます。
今回選んだのはハフマン符号 うん、それっぽい
注意
私は別にその分野のプロとかではないので、ちゃんと知りたい方は自分で調べてください。
ハフマン符号とは?
詳しい歴史やら何やらは自分で調べてください。
要はデータを圧縮する際に、ファイルなどのデータ列の頻出度によって分類する方法。
たくさん出てくるデータ列は小さく、あまり出てこないデータ列は大きくなります。
では図にして見ましょう。
バイト列で考えるのは人間には厳しそうなので、ここでは1バイト=3ビットという体で進めていきます。
この8パターンで構成されているデータがあるとします。意図して作らない限り、構成しているデータの中では特定のバイト列が多くなったり少なくなったりします。
あるファイルAではバイト列101がすごいたくさん出てきて、バイト列001やバイト列100があまり出てこないということにしましょう。その他は適当に
元の状態なら、たくさん出てきても少なくても同じ3ビットで構成されていますが、ハフマン符号ではバイト列ごとに新しいパターンを付与します。
例えば、 101 101 000 101 110 101 011 000 … なら
圧縮後は 0 0 100 0 1101 0 101 100 ... という風にでき、この部分だけでも24ビットを17ビットにまで短くできていますね。
ここでのポイントは変換後のパターンを他のパターンと被らないようにすること
パターンとして1010と10がある場合、 1010 10 10 1010 .... と続いてしまうと、復元時に10なのか1010なのか分からなくなりますからね。
ハフマン木
変換後のパターンを他のパターンと被らないようにするために、一意に分かるようなデータ列に変換するアルゴリズムが必要です。それがハフマン木
- 手順としては
- データ列を出現率順に並べる
- 出現頻度が一番少ないペアを木にして、木にしたデータの出現率の合計値を出します。
- 木の親も含め、また出現頻度が一番少ないペアを木にして木にしたデータの出現率の合計値を出します。
- 木が一つになるまで続けます。
就活中なのに俺は何をやってんだろう。
こうして出来た木を辿っていくと、うまい具合のデータ列ができます。
例えば110なら上からこんな風にたどっていって、1101が得られますね
プログラムにする
頑張ってpython3で書きました。(Python 3.7.0)
いつも通り自己流で、アルゴリズムだけ確認してなんとなくで書いているため他とは大きく違うと思います。
私以外には読めないと思うので、意味ないけどコメント文ごと貼りました。
流れとしては
- ファイルを取り込んでバイト列を取り出す
- 出現率でソートして、木構造にする
- 得られたパターンデータを.himnに格納(適当な拡張子で意味はない)
- 圧縮したデータを.hhmnに格納(適当な拡張子で意味はない)
import os.path Fname = "test.bmp" ifp = open(Fname,"rb") siz = os.path.getsize(Fname) Btable = {} Htable = [] Wtable = None Hnum = 0 #木構造したい class HTree: def __init__(self,lval,lname,rval,rname,oya = None): self.oya = oya self.lval = lval self.lname = lname self.rval = rval self.rname = rname if not (lval == None and rval == None ): self.hi = lval + rval else: self.hi = None def __str__(self): return str( self.oya ) def Lmarge(self,lval,lname,rnode): root = HTree(lval,lname,rnode.hi,rnode) rnode.oya = root return root def Rmarge(self,lnode,rval,rname): root = HTree(lnode.hi,lnode,rval,rname) lnode.oya = root return root def marge(self,lnode,rnode): root = HTree(lnode.hi,lnode,rnode.hi,rnode) rnode.oya = root lnode.oya = root return root def seek(self,tar): creData = "" if self.lname == tar: print(self.lname) return str(0) elif self.rname == tar: return str(1) else: creData = self.lname.subseek(tar,"0") if creData == str(-1) : creData = self.rname.subseek(tar,"1") return creData def subseek(self,tar,data): if self.lname == tar: return data + str(0) elif self.rname == tar: return data + str(1) buf = "" if not (type(self.lname) is str): buf = self.lname.subseek(tar,data + str(0)) if not (buf == "-1"): return buf if not (type(self.rname) is str): buf = self.rname.subseek(tar,data + str(1)) if not (buf == "-1"): return buf return "-1" def __str__(self): return "edge" #2つデータとを取ってそいつを木にしたい。 #途中で木が複数できたり統合したりする関係でおかしなことになっている #奇数のときには一つ目のデータを取っている部分のため、ここでデータが一つしかないかを判断する #2つ目の部分ではデータが複数個あるのは確定なので木にしている #終了条件 元のテーブルが空になって木を入れた配列の長さが1のとき #つまりは全てのデータが一つの木に入った時 #if文のそれぞれは """ 1. 木がある無い場合 1.1 元データが一種しかない場合 : そのまま木にデータにぶち込む (多分整合性エラー) 1.2 元データが複数ある場合 2. 元データが無くて、木がある場合 : 木の中で色々する 3. 両方ある場合 """ def MakeTree(TBL): Itree = [] cnt = 0 lis = TBL[:] if not TBL: print("not data") return while True: if not Itree: if len(TBL) == 1: print("data error") exit() else: Tbuf = HTree(TBL[0][0] , TBL[0][1] , TBL[1][0] , TBL[1][1]) TBL.remove(TBL[1]) TBL.remove(TBL[0]) Itree.append( [Tbuf.hi , Tbuf] ) else: if not TBL: if len(Itree) == 1: break else: Tbuf = HTree(None,None,None,None) Tbuf = Tbuf.marge(Itree[0][1],Itree[1][1]) Itree.remove(Itree[1]) Itree.remove(Itree[0]) Itree.append( [Tbuf.hi , Tbuf] ) else: if len(Itree) == 1 and len(Htable) == 1 : buf1 = TBL[0] buf2 = Itree[0] Itree.remove(Itree[0]) TBL.remove(TBL[0]) Tbuf = HTree(None,None,None,None) Tbuf = Tbuf.Rmarge(buf2[1],buf1[0],buf1[1]) Itree.append( [Tbuf.hi , Tbuf] ) else : if Itree[0][0] <= TBL[0][0]: buf1 = Itree[0] Itree.remove(Itree[0]) if not Itree: buf2 = TBL[0] TBL.remove(TBL[0]) Tbuf = HTree(None,None,None,None) Tbuf = Tbuf.Rmarge(buf1[1],buf2[0],buf2[1]) Itree.append( [Tbuf.hi , Tbuf] ) elif Itree[0][0] <= TBL[0][0]: buf2 = Itree[0] Itree.remove(Itree[0]) Tbuf = HTree(None,None,None,None) Tbuf = Tbuf.marge(buf1[1],buf2[1]) Itree.append( [Tbuf.hi , Tbuf] ) else: buf2 = TBL[0] TBL.remove(TBL[0]) Tbuf = HTree(None,None,None,None) Tbuf = Tbuf.Lmarge(buf2[0],buf2[1],buf1[1]) Itree.append( [Tbuf.hi , Tbuf] ) else: buf1 = TBL[0] TBL.remove(TBL[0]) if not TBL: buf2 = Itree[0] Itree.remove(Itree[0]) Tbuf = HTree(None,None,None,None) Tbuf = Tbuf.Lmarge(buf1[0],buf1[1],buf2[1]) Itree.append( [Tbuf.hi , Tbuf] ) elif Itree[0][0] <= TBL[0][0]: buf2 = Itree[0] Itree.remove(Itree[0]) Tbuf = HTree(None,None,None,None) Tbuf = Tbuf.Rmarge(buf2[1],buf1[0],buf1[1]) Itree.append( [Tbuf.hi , Tbuf] ) else: buf2 = TBL[0] TBL.remove(TBL[0]) Tbuf = HTree(buf1[0] , buf1[1] , buf2[0] , buf2[1]) Itree.append( [Tbuf.hi , Tbuf] ) cnt += 1 HAtable = {} for lop in range(len(lis)): HAtable[ lis[lop][1] ] = Itree[0][1].seek(lis[lop][1]) return HAtable #データ取り出し部分 #1バイトごと取って数を数えている print("open " + Fname) for lop in range(siz): data = ifp.read(1) Bget = "b" + str(ord(data)) if Bget in Btable: Btable[Bget] = Btable[Bget] + 1 else: Btable[Bget] = 1 BStable = sorted(Btable.items() , key = lambda x:x[1]) BStable = sorted(Btable.items() , key = lambda x:x[1]) #単純に数えていたバイト列を出現率に変えている for lop in range( len( BStable ) ): Htable.append( [ Btable[ BStable[lop][0] ] / siz , BStable[lop][0] ] ) Htable = MakeTree(Htable) ofp = open(Fname + ".hhmn" , "w+b") tfp = open(Fname + ".himn" , "w+") ifp.seek(0) brry = "" print("create file 1/2") for lop in range(siz): data = ifp.read(1) Bget = "b" + str(ord(data)) brry += Htable[ Bget ] while len(brry) >= 8: ofp.write( int(brry[0:8] , 2).to_bytes(1, byteorder='big' ) ) brry = brry[8:] if not(brry == ""): while len(brry) < 8: brry += "0" ofp.write( int(brry[0:8] , 2).to_bytes(1, byteorder='big' ) ) brry = brry[8:] print("create file 2/2") for num , mo in Htable.items(): tfp.write( num + "," + mo + "\n" ) ifp.close() tfp.close() ofp.close()
続いて、復元します。
import os.path Fname = "test.bmp" hhfp = open(Fname + ".hhmn","rb") hifp = open(Fname + ".himn","r") txtfp = open(Fname ,"w+b") siz = os.path.getsize(Fname + ".hhmn") Htable = {} def fufile(strbin): nowbin = "" for lop in range( len(strbin) ): nowbin = nowbin + strbin[lop] if nowbin in Htable: wribin = Htable[ nowbin ] txtfp.write( int(wribin , 2).to_bytes(1, byteorder='big') ) nowbin = "" return nowbin data = hifp.readline() while data: cha , num = data.split(",") num = num.strip("\n") cha = str( bin( int(cha[1:]) ) ) cha = cha[2:] while len(cha) < 8 : cha = "0" + cha Htable[ num ] = cha data = hifp.readline() cnt = int(siz/10) strbyte = "" for lop in range( siz ): if lop > cnt : if siz > 10: print("read " + str( int(cnt / int(siz/10)) ) +"/ 10") cnt += int(siz/10) bite = hhfp.read(1) lopbyte = bin( int.from_bytes(bite , "big") ) lopbyte = lopbyte[2:] while len(lopbyte) < 8 : lopbyte = "0" + lopbyte strbyte = strbyte + lopbyte strbyte = fufile(strbyte) hhfp.close() hifp.close() txtfp.close()
では、実際にやって見ましょう。
単純なデータtest.bmpを用意します。
実行すると、test.bmp.himnとtest.bmp.hhmnが作成されます。ちゃんと圧縮されていますね
削除して・・・
実行すると・・・
できました‼(画像じゃよく分からないけど、test.bmpの更新時刻が変わっているでしょ?)
今回のケースではtest.bmpのデータ列が大幅に偏っているためかなり圧縮されましたが、普通のファイルならほんの少しです。
ていうかデータ列が均等に出てくる(大体のデータ)では、そもそも圧縮してないってこともあります。
後、でかいファイルは処理が遅いです
少なくとも私の環境ならうまくいきました。多分大丈夫だと思います。
プログラムを実行して復元してみたファイルを実行したり、バイナリエディタで元データと比較してみてください。
あとがき
このプログラムは結構前に作っていて、アドベントカレンダーのネタが思いつかなかった用に取っといてましたが
いざ実行してみたら動かない・圧縮しない・微妙に元データと違う・圧縮後のデータ量の方が大きいなど、散々な目にあいました。
まぁ原因はケアレスミスだったり、頭で考えていたコードをうまく反映できてなかったり、そもそもデバック用に途中で終了するようにしていたのを忘れていたりなどなど・・・こんな時間(夜中の3時)にやるもんじゃないですね。
さぁ、これでそれっぽいことは書いたので来週はポエム全開、ポエポエブログを書きます。
ちなみにネスぺ受けましたが、合否が分かる前に記事投稿しちゃうので、せっかくのネスぺ合否芸ができないです。まぁ結果はTwitterにでも書きます。
追記
世の中の大体の画像は圧縮されている状態ですが、bmp形式(つまり非圧縮画像)ならうまくできますよ