"""
Smart pipe storage scheme
"""
import os
import struct
import zlib
def _detect_data_path(prefix):
# try uncompressed
base = prefix + SmartPipeWriter.DATA_EXT
if os.path.exists(base):
return base, False
# try compressed version
base += 'z'
if os.path.exists(base):
return base, True
raise FileNotFoundError
def _make_data_path(prefix, compress):
res = prefix + SmartPipeWriter.DATA_EXT
if compress:
res += 'z'
return res
def _make_index_path(prefix):
return prefix + SmartPipeWriter.INDEX_EXT
[docs]class SmartPipeWriter:
"""
Class which can create smart pipe file
"""
INDEX_EXT = ".idx"
DATA_EXT = ".dat"
[docs] def __init__(self, path_prefix, compress=False):
"""
Constructs smart pipe writer
:param path_prefix: path prefix to be used. Extension for data and index files will be appended to this prefix
:param compress: boolean flag (False by default) indicating compression of data chunks
"""
self.index_writer = IndexWriter(_make_index_path(path_prefix))
self.data_path = _make_data_path(path_prefix, compress)
self._fd = open(self.data_path, mode='wb')
self._compressor = None
if compress:
self._compressor = zlib.compressobj()
self._compressor_needs_flush = False
self._raw_ofs = 0
[docs] def close(self):
"""
Closes smart pipe writer and flushes data to disk
"""
if self._fd:
self._flush()
self._fd.close()
self._fd = None
self.index_writer.close()
[docs] def append(self, key, value):
"""
Appends given key-value pair to current chunk
:param key: byte string with key
:param value: byte string with value
"""
k_len = len(key)
v_len = len(value)
data = struct.pack("<I", k_len)
data += key
data += struct.pack("<I", v_len)
data += value
if self._compressor:
data = self._compressor.compress(data)
self._compressor_needs_flush = True
self._fd.write(data)
self._raw_ofs += 4 + 4 + k_len + v_len
[docs] def checkpoint(self, key):
"""
Flushes current chunk on disk and starts new with the given key
:param key: byte string with binary key
"""
self._flush()
self.index_writer.append(key, self._fd.tell(), self._raw_ofs)
def _flush(self):
if self._compressor and self._compressor_needs_flush:
dat = self._compressor.flush(zlib.Z_FINISH)
self._fd.write(dat)
self._compressor = zlib.compressobj()
self._compressor_needs_flush = False
self._fd.flush()
class IndexWriter:
def __init__(self, path):
self.path = path
self._entries = 0
self._fd = self._open_data_stream(path)
def __str__(self):
return "IndexWriter[{path}, entries={entries}]".format(path=self.path, entries=self._entries)
@staticmethod
def _open_data_stream(path):
return open(path, "wb")
def close(self):
if self._fd:
self._fd.close()
self._fd = None
def append(self, key, ofs, raw_ofs):
"""
Append entry to index
:param key: binary key to remember
:param ofs: offset in data stream which will be associated with that key
:param raw_ofs: current offset in a raw data stream
"""
k_len = len(key)
self._fd.write(struct.pack("<I", k_len))
self._fd.write(key)
self._fd.write(struct.pack("<Q", ofs))
self._fd.write(struct.pack("<Q", raw_ofs))
self._fd.flush()
class IndexReader:
def __init__(self, path):
self.ofs_map, self.keys = IndexReader._read_data(path)
@staticmethod
def _read_data(path):
ofs_map = {}
keys = []
with open(path, 'rb') as fd:
while True:
val = fd.read(4)
if len(val) != 4:
break
k_len = struct.unpack("<I", val)[0]
key = fd.read(k_len)
ofs, raw_ofs = struct.unpack("<QQ", fd.read(8+8))
ofs_map[key] = ofs, raw_ofs
keys.append(key)
return ofs_map, keys
class ZlibWrapper:
CHUNK_SIZE = 256
def __init__(self, path, compressed):
self.path = path
self.compressed = compressed
self._fd = open(self.path, 'rb')
self._buffer = bytearray()
def close(self):
if self._fd:
self._fd.close()
self._fd = None
def read(self, size):
result = self._buffer[:size]
self._buffer = self._buffer[size:]
return result
def seek(self, ofs):
self._buffer = bytearray()
self._fd.seek(ofs)
def pull_block(self, block_size):
"""
Read next optionally compressed chunk of data from disk
:param block_size: size of block in bytes or None for last block
:return:
"""
size = block_size if block_size is not None else -1
self._buffer = self._fd.read(size)
if self.compressed:
self._buffer = zlib.decompress(self._buffer)
[docs]class SmartPipeReader:
"""
Reader for smart pipe data
"""
[docs] def __init__(self, path_prefix):
"""
Constructs smart pipe reader
:param path_prefix: path prefix for smart pipe data and index files
"""
self.data_path, self.compressed = _detect_data_path(path_prefix)
self._fd = ZlibWrapper(self.data_path, self.compressed)
self.index = IndexReader(_make_index_path(path_prefix))
self._next_key_ofs = 0
[docs] def close(self):
"""
Closes smart pipe reader
"""
if self._fd:
self._fd.close()
self._fd = None
[docs] def pull_block(self, index_key=None):
"""
Get list of key,value pairs from next block
:param index_key: optional key of block to seek
:return: yield block records
"""
if index_key is not None:
if not self._seek(index_key):
return
comp_block_size, raw_block_size = self._get_block_sizes()
self._fd.pull_block(comp_block_size)
while raw_block_size is None or raw_block_size > 0:
dat = self._fd.read(4)
if len(dat) != 4:
break
k_len = struct.unpack("<I", dat)[0]
key = self._fd.read(k_len)
v_len = struct.unpack("<I", self._fd.read(4))[0]
val = self._fd.read(v_len)
if raw_block_size is not None:
raw_block_size -= 4 + 4 + k_len + v_len
yield (key, val)
self._next_key_ofs += 1
def _get_block_sizes(self):
"""
Return size of the current block in bytes, both uncompressed and compressed sizes
:return: tuple of (compressed size, uncompressed size) or None,None if it's a last block
"""
if self._next_key_ofs+1 >= len(self.index.keys):
return None, None
key1, key2 = self.index.keys[self._next_key_ofs:self._next_key_ofs+2]
comp_ofs1, raw_ofs1 = self.index.ofs_map[key1]
comp_ofs2, raw_ofs2 = self.index.ofs_map[key2]
return comp_ofs2-comp_ofs1, raw_ofs2-raw_ofs1
[docs] def get_next_block_key(self):
"""
Return index key for the next block
:return: key of the next block or None if we reached end of pipe
"""
if self._next_key_ofs >= len(self.index.keys):
return None
return self.index.keys[self._next_key_ofs]
def _seek(self, key):
"""
Perform seek on a given key.
:param key:
:return: return true if seek was successfull or not needed
"""
if key == self.get_next_block_key():
return True
ofs = self.index.ofs_map.get(key)
if ofs is None:
return False
self._fd.seek(ofs[0])
self._next_key_ofs = self.index.keys.index(key)
return True