-
Notifications
You must be signed in to change notification settings - Fork 12
/
recordio.go
108 lines (92 loc) · 3.79 KB
/
recordio.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package recordio
import (
"encoding/binary"
"fmt"
"os"
"github.com/thomasjungblut/go-sstables/recordio/compressor"
)
const Version1 uint32 = 0x01
const Version2 uint32 = 0x02
const Version3 uint32 = 0x03
const CurrentVersion = Version3
const MagicNumberSeparator uint32 = 0x130691
const MagicNumberSeparatorLong uint64 = 0x130691
// FileHeaderSizeBytes has a 4 byte version number, 4 byte compression code = 8 bytes
const FileHeaderSizeBytes = 8
const RecordHeaderSizeBytesV1V2 = 20
// RecordHeaderV3MaxSizeBytes is the max buffer sizes to prevent PutUvarint to panic:
// 10 byte magic number, 10 byte uncompressed size, 10 bytes for compressed size, 1 byte for nil = 31 bytes
const RecordHeaderV3MaxSizeBytes = binary.MaxVarintLen64 + binary.MaxVarintLen64 + binary.MaxVarintLen64 + 1
// never reorder, always append
const (
CompressionTypeNone = iota
CompressionTypeGZIP = iota
CompressionTypeSnappy = iota
CompressionTypeLzw = iota
)
// DefaultBufferSize is four mebibyte and can be customized using the option BufferSizeBytes.
const DefaultBufferSize = 1024 * 1024 * 4
type SizeI interface {
// Size returns the current size of the file in bytes
Size() uint64
}
type CloseableI interface {
// Close closes the given file. Errors can happen when:
// File was already closed before or is not yet open.
// File could not be closed on the filesystem (eg when flushes fail)
Close() error
}
type OpenableI interface {
// Open opens the given file for reading or writing. Errors can happen in multiple circumstances:
// File or directory doesn't exist or are not accessible.
// File was already opened or closed before.
// File is corrupt, header wasn't readable or versions are incompatible.
Open() error
}
type OpenClosableI interface {
CloseableI
OpenableI
}
type WriterI interface {
OpenClosableI
SizeI
// Write appends a record of bytes, returns the current offset this item was written to
Write(record []byte) (uint64, error)
// WriteSync appends a record of bytes and forces a disk sync, returns the current offset this item was written to
WriteSync(record []byte) (uint64, error)
}
type ReaderI interface {
OpenClosableI
// ReadNext reads the next record, EOF error when it reaches the end signalled by (nil, io.EOF). It can be wrapped however, so always check using errors.Is(err, io.EOF).
ReadNext() ([]byte, error)
// SkipNext skips the next record, EOF error when it reaches the end signalled by io.EOF as the error. It can be wrapped however, so always check using errors.Is(err, io.EOF).
SkipNext() error
}
// ReadAtI implementors must make their implementation thread-safe
type ReadAtI interface {
OpenClosableI
// ReadNextAt reads the next record at the given offset, EOF error when it reaches the end signalled by (nil, io.EOF).
// It can be wrapped however, so always check using errors.Is(err, io.EOF). Implementation must be thread-safe.
ReadNextAt(offset uint64) ([]byte, error)
}
type ReaderWriterCloserFactory interface {
CreateNewReader(filePath string, bufSize int) (*os.File, ByteReaderResetCount, error)
CreateNewWriter(filePath string, bufSize int) (*os.File, WriteCloserFlusher, error)
}
// NewCompressorForType returns an instance of the desired compressor defined by its identifier.
// An error is returned if the desired compressor is not implemented.
// Only CompressionTypeNone, CompressionTypeSnappy and CompressionTypeGZIP are available currently.
func NewCompressorForType(compType int) (compressor.CompressionI, error) {
switch compType {
case CompressionTypeNone:
return nil, nil
case CompressionTypeSnappy:
return &compressor.SnappyCompressor{}, nil
case CompressionTypeGZIP:
return &compressor.GzipCompressor{}, nil
case CompressionTypeLzw:
return &compressor.LzwCompressor{}, nil
default:
return nil, fmt.Errorf("unsupported compression type %d", compType)
}
}