Dump processing

Dump processing

  1. Request a dump by one of the following calls:

  2. Extract the url field value from the call response. The value has the download link for the dump.

    ℹ️
    The dump size can exceed 2 GB.

  3. Download the dump.

  4. Decompress the dump using official instruments.

    ℹ️
    The decompressed dump size can exceed 20 GB.

    If the dump size is large, it is recommended to implement dumps decompression solution on your side. Some implementation examples are presented below.

Sync code example in Python

 1# Install the dependency
 2from zstandard import ZstdDecompressor
 3import json
 4
 5
 6def parse_dump(filename: str) -> None:
 7    # Open the dump
 8    with open(filename, "rb") as fh:
 9        dctx = ZstdDecompressor()
10        with dctx.stream_reader(fh) as reader:
11            previous_line = ""
12            while True:
13                # Read the dump by 16 MB chunks
14                chunk = reader.read(2 ** 24)
15                if not chunk:
16                    break
17                raw_data = chunk.decode("utf-8")
18                # Read the hotels’ JSONs one by one
19                lines = raw_data.split("\n")
20                for i, line in enumerate(lines[:-1]):
21                    if i == 0:
22                        line = previous_line + line
23                    hotel_data = json.loads(line)
24                    # Implememnt your logic with the hotel data
25                    # ...
26                    print(f"current hotel is {hotel_data['name']}")
27                    previous_line = lines[-1]
28
29
30if __name__ == "__main__":
31  parse_dump("dump_en.json.zst")

Async code example in Python

 1import asyncio
 2
 3# Install the dependency
 4from zstandard import ZstdDecompressor
 5import json
 6from asyncio import Semaphore
 7
 8
 9class Decoder:
10    def __init__(self, semaphore_value: int) -> None:
11        # Create the semaphore that restricts the number of active coroutines
12        self.sem = Semaphore(semaphore_value)
13        self._raw = []
14
15    # The function processes raw hotel data
16    async def _process_raw_hotels(self) -> None:
17        raw_hotels = self._raw[1:]
18        raw_hotels = [self._raw[0]] + [
19            "".join(t) for t in zip(raw_hotels[::2], raw_hotels[1::2])
20        ]
21        await self._process_hotel(*raw_hotels)
22
23    # The function works with raw hotel byte data
24    async def _process_hotel(self, *raw_hotels: str) -> None:
25        for h in raw_hotels:
26            hotel_data = json.loads(h)
27            # Implememnt your logic with the hotel data
28            # ...
29            print(f"current hotel is {hotel_data['name']}")
30
31    # The function works with with raw chunks
32    async def _process_chunk(self, chunk: bytes) -> None:
33        raw_data = chunk.decode("utf-8")
34        # Read the hotels’ JSONs one by one
35        lines = raw_data.split("\n")
36        for i, line in enumerate(lines[1:-1]):
37            if i == 0:
38                # Put the last line to the raw list
39                self._raw.append(lines[0])
40            await self._process_hotel(line)
41
42        # Put the last line to the raw list
43        self._raw.append(lines[-1])
44        # Increment the semaphore value
45        self.sem.release()
46
47    # The function parses the dump
48    async def parse_dump(self, filename: str) -> None:
49        # Open the dump
50        with open(filename, "rb") as fh:
51            dctx = ZstdDecompressor()
52            with dctx.stream_reader(fh) as reader:
53                while True:
54                    # Read the dump by 16 MB chunks
55                    chunk = reader.read(2 ** 24)
56                    if not chunk:
57                        await self._process_raw_hotels()
58                        break
59                    # Decrement the semaphore value
60                    await self.sem.acquire()
61                    # Run immediately
62                    asyncio.create_task(self._process_chunk(chunk))
63
64
65if __name__ == "__main__":
66    loop = asyncio.get_event_loop()
67    d = Decoder(semaphore_value=10)
68    loop.run_until_complete(d.parse_dump("dump_en.json.zst"))

Sync code example in Golang

 1package main
 2import (
 3  "bytes"
 4  "encoding/json"
 5  "io"
 6  "log"
 7  "math"
 8  "os"
 9  // Install the dependency
10  "github.com/DataDog/zstd"
11)
12
13// Not full struct
14type Hotel struct {
15  Name string `json:"name"`
16}
17
18func parseDump(filename string) {
19  // Open the dump
20  file, err: = os.Open(filename)
21  if err != nil {
22    log.Fatal(err)
23  }
24  defer file.Close()
25  reader: = zstd.NewReader(file)
26  previousLine: = make([] byte, 0)
27  // Read the dump by 16 MB chunks
28  bufferSize: = make([] byte, int(math.Pow(2, 24)))
29  for {
30    n, readErr: = reader.Read(bufferSize)
31    if readErr != nil && readErr != io.EOF {
32      log.Fatal(readErr)
33    }
34    if readErr == io.EOF {
35      break
36    }
37    rawReadData: = bufferSize[: n]\
38    // Read the hotels’ JSONs one by one
39    lines: = bytes.Split(rawReadData, [] byte("\n"))
40    for i, line: = range lines[: len(lines) - 1] {
41      if i == 0 {
42        line = append(previousLine, line...)
43      }
44      // Unmarshal the current hotel JSON
45      var hotel Hotel
46      _ = json.Unmarshal(line, & hotel)
47      // Implememnt your logic with the hotel data
48      // ...
49      log.Printf("current hotel is %s", hotel.Name)
50    }
51    lastLine: = lines[len(lines) - 1]
52    previousLine = make([] byte, len(lastLine))
53    copy(previousLine, lastLine)
54  }
55}
56
57func main() {
58  parseDump("dump_en.json.zst")
59}

Async code example in Golang

  1package main
  2import (
  3  "bytes"
  4  "context"
  5  "encoding/json"
  6  "io"
  7  "log"
  8  "math"
  9  "os"
 10  "golang.org/x/sync/semaphore"
 11  // Install the dependency
 12  "github.com/DataDog/zstd"
 13)
 14  
 15// The storage with raw data
 16type Raw struct {
 17  firstLine[] byte
 18  lastLine[] byte
 19}
 20
 21// Not full struct
 22type Hotel struct {
 23  Name string `json:"name"`
 24}
 25
 26// The function copies raw data without memory leak
 27func copySlice(slice[] byte)[] byte {
 28  copiedSlice: = make([] byte, len(slice))
 29  for i,
 30  v: = range slice {
 31    copiedSlice[i] = v
 32  }
 33  return copiedSlice
 34}
 35
 36// The function works with raw hotel byte data
 37func processHotel(hotelRaw[] byte) {
 38  // Unmarshal the hotel JSON
 39  var hotel Hotel
 40  err: = json.Unmarshal(hotelRaw, & hotel)
 41  if err != nil {
 42    log.Println(err)
 43  }
 44  // Implememnt your logic with the hotel data
 45  // ...
 46  log.Printf("current hotel is %s", hotel.Name)
 47}
 48
 49// The function works with with raw chunks
 50func processChunk(chunk[] byte, sem * semaphore.Weighted, rawChan chan Raw) {
 51  defer sem.Release(1)
 52  lines: = bytes.Split(chunk, [] byte("\n"))
 53  rawChan < -Raw {
 54    firstLine: copySlice(lines[0]),
 55    lastLine: copySlice(lines[len(lines) - 1]),
 56  }
 57  for _, line: = range lines[1: len(lines) - 1] {
 58    processHotel(line)
 59  }
 60}
 61
 62// The function processes raw hotel data
 63func processRawHotels(raws[] Raw) {
 64  for i, r: = range raws {
 65    if i == 0 {
 66      processHotel(r.firstLine)
 67      continue
 68    }
 69    data: = append(raws[i - 1].lastLine, r.firstLine...)
 70    processHotel(data)
 71  }
 72}
 73
 74// The function parses the dump
 75func parseDump(filename string) {
 76  // Open the dump
 77  file, err: = os.Open(filename)
 78  if err != nil {
 79    log.Fatal(err)
 80  }
 81  defer file.Close()
 82  reader: = zstd.NewReader(file)
 83  // Read the dump by 16 MB chunks
 84  bufferSize: = make([] byte, int(math.Pow(2, 24)))
 85  ctx: = context.Background()
 86  // Set the weighted semaphore to maximum 10 async goroutines
 87  var sem = semaphore.NewWeighted(int64(10))
 88  // Make the storage firstLine and lastLine lines from a chunk
 89  rawData: = make([] Raw, 0)
 90  rawChan: = make(chan Raw)
 91  isFinished: = false
 92  for {
 93    if isFinished {
 94      break
 95    }
 96    n, readErr: = reader.Read(bufferSize)
 97    if readErr != nil && readErr != io.EOF {
 98      log.Fatal(readErr)
 99    }
100    if readErr == io.EOF {
101      isFinished = true
102    }
103    rawReadData: = bufferSize[: n]
104    actualLine: = make([] byte, len(rawReadData))
105    copy(actualLine, rawReadData)
106    // Read the hotels’ JSONs one by one
107    _ = sem.Acquire(ctx, 1)
108    go processChunk(actualLine, sem, rawChan)
109    rawData = append(rawData, < -rawChan)
110  }
111  processRawHotels(rawData)
112}
113
114func main() {
115  parseDump("dump_en.json.zst")
116}