Decompress dump

Decompress dump

#how-tos

⚠️
 1import fs from 'fs';
 2import zstd from 'simple-zstd';
 3
 4// fileName is the file name from which to decompress
 5async function decompressFile(fileName) {
 6  const newFileName = String(fileName).split('.')[0] + '.json';
 7  let localFile = fs.createReadStream(fileName)
 8    .pipe(zstd.ZSTDDecompress())
 9    .pipe(fs.createWriteStream(newFileName))
10};
11
12await decompressFile(fileName, newFileName);
 1import io
 2from pyzstd import decompress_stream
 3
 4
 5# path is the file path from which to decompress
 6def decompress_dump(path):
 7    decompressed_file = ".".join(path.split(".")[:-1])
 8    with io.open(path, "rb") as f:
 9        with io.open(decompressed_file, "wb") as g:
10            decompress_stream(f, g)
11    return decompressed_file
12
13
14if __name__ == "__main__":
15    decompressed_dump = decompress_dump(dump_path)
 1import asyncio
 2
 3# Install the dependency
 4from zstandard import ZstdDecompressor
 5import json
 6from asyncio import Semaphore
 7
 8
 9class Decoder:
10    def __init__(self, semaphore_value: int) -> None:
11        # Create the semaphore that restricts the number of active coroutines
12        self.sem = Semaphore(semaphore_value)
13        self._raw = []
14
15    # The function processes raw hotel data
16    async def _process_raw_hotels(self) -> None:
17        raw_hotels = self._raw[1:]
18        raw_hotels = [self._raw[0]] + [
19            "".join(t) for t in zip(raw_hotels[::2], raw_hotels[1::2])
20        ]
21        await self._process_hotel(*raw_hotels)
22
23    # The function works with raw hotel byte data
24    async def _process_hotel(self, *raw_hotels: str) -> None:
25        for h in raw_hotels:
26            hotel_data = json.loads(h)
27            # Implememnt your logic with the hotel data
28            # ...
29            print(f"current hotel is {hotel_data['name']}")
30
31    # The function works with with raw chunks
32    async def _process_chunk(self, chunk: bytes) -> None:
33        raw_data = chunk.decode("utf-8")
34        # Read the hotels’ JSONs one by one
35        lines = raw_data.split("\n")
36        for i, line in enumerate(lines[1:-1]):
37            if i == 0:
38                # Put the last line to the raw list
39                self._raw.append(lines[0])
40            await self._process_hotel(line)
41
42        # Put the last line to the raw list
43        self._raw.append(lines[-1])
44        # Increment the semaphore value
45        self.sem.release()
46
47    # The function parses the dump
48    async def parse_dump(self, filename: str) -> None:
49        # Open the dump
50        with open(filename, "rb") as fh:
51            dctx = ZstdDecompressor()
52            with dctx.stream_reader(fh) as reader:
53                while True:
54                    # Read the dump by 16 MB chunks
55                    chunk = reader.read(2 ** 24)
56                    if not chunk:
57                        await self._process_raw_hotels()
58                        break
59                    # Decrement the semaphore value
60                    await self.sem.acquire()
61                    # Run immediately
62                    asyncio.create_task(self._process_chunk(chunk))
63
64
65if __name__ == "__main__":
66    loop = asyncio.get_event_loop()
67    d = Decoder(semaphore_value=10)
68    loop.run_until_complete(d.parse_dump("dump_en.json.zst"))
 1package main
 2import (
 3  "bytes"
 4  "encoding/json"
 5  "io"
 6  "log"
 7  "math"
 8  "os"
 9  // Install the dependency
10  "github.com/DataDog/zstd"
11)
12
13// Not full struct
14type Hotel struct {
15  Name string `json:"name"`
16}
17
18func parseDump(filename string) {
19  // Open the dump
20  file, err: = os.Open(filename)
21  if err != nil {
22    log.Fatal(err)
23  }
24  defer file.Close()
25  reader: = zstd.NewReader(file)
26  previousLine: = make([] byte, 0)
27  // Read the dump by 16 MB chunks
28  bufferSize: = make([] byte, int(math.Pow(2, 24)))
29  for {
30    n, readErr: = reader.Read(bufferSize)
31    if readErr != nil && readErr != io.EOF {
32      log.Fatal(readErr)
33    }
34    if readErr == io.EOF {
35      break
36    }
37    rawReadData: = bufferSize[: n]\
38    // Read the hotels’ JSONs one by one
39    lines: = bytes.Split(rawReadData, [] byte("\n"))
40    for i, line: = range lines[: len(lines) - 1] {
41      if i == 0 {
42        line = append(previousLine, line...)
43      }
44      // Unmarshal the current hotel JSON
45      var hotel Hotel
46      _ = json.Unmarshal(line, & hotel)
47      // Implememnt your logic with the hotel data
48      // ...
49      log.Printf("current hotel is %s", hotel.Name)
50    }
51    lastLine: = lines[len(lines) - 1]
52    previousLine = make([] byte, len(lastLine))
53    copy(previousLine, lastLine)
54  }
55}
56
57func main() {
58  parseDump("dump_en.json.zst")
59}
  1package main
  2import (
  3  "bytes"
  4  "context"
  5  "encoding/json"
  6  "io"
  7  "log"
  8  "math"
  9  "os"
 10  "golang.org/x/sync/semaphore"
 11  // Install the dependency
 12  "github.com/DataDog/zstd"
 13)
 14  
 15// The storage with raw data
 16type Raw struct {
 17  firstLine[] byte
 18  lastLine[] byte
 19}
 20
 21// Not full struct
 22type Hotel struct {
 23  Name string `json:"name"`
 24}
 25
 26// The function copies raw data without memory leak
 27func copySlice(slice[] byte)[] byte {
 28  copiedSlice: = make([] byte, len(slice))
 29  for i,
 30  v: = range slice {
 31    copiedSlice[i] = v
 32  }
 33  return copiedSlice
 34}
 35
 36// The function works with raw hotel byte data
 37func processHotel(hotelRaw[] byte) {
 38  // Unmarshal the hotel JSON
 39  var hotel Hotel
 40  err: = json.Unmarshal(hotelRaw, & hotel)
 41  if err != nil {
 42    log.Println(err)
 43  }
 44  // Implememnt your logic with the hotel data
 45  // ...
 46  log.Printf("current hotel is %s", hotel.Name)
 47}
 48
 49// The function works with with raw chunks
 50func processChunk(chunk[] byte, sem * semaphore.Weighted, rawChan chan Raw) {
 51  defer sem.Release(1)
 52  lines: = bytes.Split(chunk, [] byte("\n"))
 53  rawChan < -Raw {
 54    firstLine: copySlice(lines[0]),
 55    lastLine: copySlice(lines[len(lines) - 1]),
 56  }
 57  for _, line: = range lines[1: len(lines) - 1] {
 58    processHotel(line)
 59  }
 60}
 61
 62// The function processes raw hotel data
 63func processRawHotels(raws[] Raw) {
 64  for i, r: = range raws {
 65    if i == 0 {
 66      processHotel(r.firstLine)
 67      continue
 68    }
 69    data: = append(raws[i - 1].lastLine, r.firstLine...)
 70    processHotel(data)
 71  }
 72}
 73
 74// The function parses the dump
 75func parseDump(filename string) {
 76  // Open the dump
 77  file, err: = os.Open(filename)
 78  if err != nil {
 79    log.Fatal(err)
 80  }
 81  defer file.Close()
 82  reader: = zstd.NewReader(file)
 83  // Read the dump by 16 MB chunks
 84  bufferSize: = make([] byte, int(math.Pow(2, 24)))
 85  ctx: = context.Background()
 86  // Set the weighted semaphore to maximum 10 async goroutines
 87  var sem = semaphore.NewWeighted(int64(10))
 88  // Make the storage firstLine and lastLine lines from a chunk
 89  rawData: = make([] Raw, 0)
 90  rawChan: = make(chan Raw)
 91  isFinished: = false
 92  for {
 93    if isFinished {
 94      break
 95    }
 96    n, readErr: = reader.Read(bufferSize)
 97    if readErr != nil && readErr != io.EOF {
 98      log.Fatal(readErr)
 99    }
100    if readErr == io.EOF {
101      isFinished = true
102    }
103    rawReadData: = bufferSize[: n]
104    actualLine: = make([] byte, len(rawReadData))
105    copy(actualLine, rawReadData)
106    // Read the hotels’ JSONs one by one
107    _ = sem.Acquire(ctx, 1)
108    go processChunk(actualLine, sem, rawChan)
109    rawData = append(rawData, < -rawChan)
110  }
111  processRawHotels(rawData)
112}
113
114func main() {
115  parseDump("dump_en.json.zst")
116}