forked from csimplestring/delta-go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcheckpoint_reader.go
216 lines (197 loc) · 5.07 KB
/
checkpoint_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package deltago
import (
"context"
"github.com/csimplestring/delta-go/action"
"github.com/csimplestring/delta-go/internal/util/path"
"github.com/csimplestring/delta-go/iter"
goparquet "github.com/fraugster/parquet-go"
"github.com/fraugster/parquet-go/floor/interfaces"
"github.com/rotisserie/eris"
"gocloud.dev/blob"
_ "gocloud.dev/blob/azureblob"
_ "gocloud.dev/blob/fileblob"
_ "gocloud.dev/blob/gcsblob"
)
type checkpointReader interface {
Read(path string) (iter.Iter[action.Action], error)
}
func newCheckpointReader(urlstr string) (checkpointReader, error) {
blobURL, err := path.ConvertToBlobURL(urlstr)
if err != nil {
return nil, err
}
b, err := blob.OpenBucket(context.Background(), blobURL)
if err != nil {
return nil, err
}
return &defaultCheckpointReader{
bucket: b,
}, nil
}
// defaultCheckpointReader implements checkpoint reader
type defaultCheckpointReader struct {
bucket *blob.Bucket
}
func (l *defaultCheckpointReader) Read(path string) (iter.Iter[action.Action], error) {
r, err := l.bucket.NewReader(context.Background(), path, nil)
if err != nil {
return nil, eris.Wrap(err, "")
}
fr, err := goparquet.NewFileReader(r)
if err != nil {
return nil, err
}
return &defaultParquetIterater{
br: r,
reader: fr,
}, nil
}
type defaultParquetIterater struct {
br *blob.Reader
reader *goparquet.FileReader
}
func (p *defaultParquetIterater) Next() (action.Action, error) {
data, err := p.reader.NextRow()
if err != nil {
return nil, err
}
obj := interfaces.NewUnmarshallObject(data)
am := &actionMarshaller{a: &action.SingleAction{}}
if err := am.UnmarshalParquet(obj); err != nil {
return nil, eris.Wrap(err, "failed to read value")
}
return am.a.Unwrap(), nil
}
func (p *defaultParquetIterater) Close() error {
return p.br.Close()
}
// SchemaDefinition
const actionSchemaDefinitionString = `
message SingleAction {
optional group txn {
optional binary appId (STRING);
required int64 version ;
optional int64 lastUpdated ;
}
optional group add {
required binary path (STRING);
required int64 size ;
required group partitionValues (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
required int64 modificationTime ;
required boolean dataChange;
optional binary stats (STRING);
optional group tags (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
}
optional group remove {
required binary path (STRING);
required int64 deletionTimestamp ;
required boolean dataChange;
required boolean extendedFileMetadata;
optional group partitionValues (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
optional int64 size ;
optional group tags (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
}
optional group metaData {
optional binary id (STRING);
optional binary name (STRING);
optional binary description (STRING);
required group format {
optional binary provider (STRING);
optional group options (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
}
optional binary schemaString (STRING);
optional group partitionColumns (LIST) {
repeated group list {
optional binary element (STRING);
}
}
optional group configuration (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
optional int64 createdTime ;
}
optional group protocol {
required int32 minReaderVersion;
required int32 minWriterVersion;
}
optional group cdc {
required binary path (STRING);
required group partitionValues (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
required int64 size ;
optional group tags (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
}
optional group commitInfo {
optional int64 version ;
optional int64 timestamp;
optional binary userId (STRING);
optional binary userName (STRING);
optional binary operation (STRING);
optional group operationParameters (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
optional group job {
optional binary jobId (STRING);
optional binary jobName (STRING);
optional binary runId (STRING);
optional binary jobOwnerId (STRING);
optional binary triggerType (STRING);
}
optional group notebook {
optional binary notebookId (STRING);
}
optional binary clusterId (STRING);
optional int64 readVersion ;
optional binary isolationLevel (STRING);
optional boolean isBlindAppend;
optional group operationMetrics (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
optional binary userMetadata (STRING);
optional binary engineInfo (STRING);
}
}
`