# Decompress dump

URL: https://docs.emergingtravel.com/docs/how-tos/process-dump/decompress-dump/

Tags: how-tos

---


> [!WARNING]
> * The decompressed dump size can exceed 20 GB.
> * Use the [official instruments](https://facebook.github.io/zstd/#other-languages).




**Node.js**

```js {linenos=inline}
import fs from 'fs';
import zstd from 'simple-zstd';

// fileName is the file name from which to decompress
async function decompressFile(fileName) {
  const newFileName = String(fileName).split('.')[0] + '.json';
  let localFile = fs.createReadStream(fileName)
    .pipe(zstd.ZSTDDecompress())
    .pipe(fs.createWriteStream(newFileName))
};

await decompressFile(fileName, newFileName);
```

**Sync Python**

```python {linenos=inline}
import io
from pyzstd import decompress_stream


# path is the file path from which to decompress
def decompress_dump(path):
    decompressed_file = ".".join(path.split(".")[:-1])
    with io.open(path, "rb") as f:
        with io.open(decompressed_file, "wb") as g:
            decompress_stream(f, g)
    return decompressed_file


if __name__ == "__main__":
    decompressed_dump = decompress_dump(dump_path)
```

**Async Python**

```python {linenos=inline}
import asyncio

# Install the dependency
from zstandard import ZstdDecompressor
import json
from asyncio import Semaphore


class Decoder:
    def __init__(self, semaphore_value: int) -> None:
        # Create the semaphore that restricts the number of active coroutines
        self.sem = Semaphore(semaphore_value)
        self._raw = []

    # The function processes raw hotel data
    async def _process_raw_hotels(self) -> None:
        raw_hotels = self._raw[1:]
        raw_hotels = [self._raw[0]] + [
            "".join(t) for t in zip(raw_hotels[::2], raw_hotels[1::2])
        ]
        await self._process_hotel(*raw_hotels)

    # The function works with raw hotel byte data
    async def _process_hotel(self, *raw_hotels: str) -> None:
        for h in raw_hotels:
            hotel_data = json.loads(h)
            # Implememnt your logic with the hotel data
            # ...
            print(f"current hotel is {hotel_data['name']}")

    # The function works with with raw chunks
    async def _process_chunk(self, chunk: bytes) -> None:
        raw_data = chunk.decode("utf-8")
        # Read the hotels’ JSONs one by one
        lines = raw_data.split("\n")
        for i, line in enumerate(lines[1:-1]):
            if i == 0:
                # Put the last line to the raw list
                self._raw.append(lines[0])
            await self._process_hotel(line)

        # Put the last line to the raw list
        self._raw.append(lines[-1])
        # Increment the semaphore value
        self.sem.release()

    # The function parses the dump
    async def parse_dump(self, filename: str) -> None:
        # Open the dump
        with open(filename, "rb") as fh:
            dctx = ZstdDecompressor()
            with dctx.stream_reader(fh) as reader:
                while True:
                    # Read the dump by 16 MB chunks
                    chunk = reader.read(2 ** 24)
                    if not chunk:
                        await self._process_raw_hotels()
                        break
                    # Decrement the semaphore value
                    await self.sem.acquire()
                    # Run immediately
                    asyncio.create_task(self._process_chunk(chunk))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    d = Decoder(semaphore_value=10)
    loop.run_until_complete(d.parse_dump("dump_en.json.zst"))
```

**Sync Golang**

```go {linenos=inline}
package main
import (
  "bytes"
  "encoding/json"
  "io"
  "log"
  "math"
  "os"
  // Install the dependency
  "github.com/DataDog/zstd"
)

// Not full struct
type Hotel struct {
  Name string `json:"name"`
}

func parseDump(filename string) {
  // Open the dump
  file, err: = os.Open(filename)
  if err != nil {
    log.Fatal(err)
  }
  defer file.Close()
  reader: = zstd.NewReader(file)
  previousLine: = make([] byte, 0)
  // Read the dump by 16 MB chunks
  bufferSize: = make([] byte, int(math.Pow(2, 24)))
  for {
    n, readErr: = reader.Read(bufferSize)
    if readErr != nil && readErr != io.EOF {
      log.Fatal(readErr)
    }
    if readErr == io.EOF {
      break
    }
    rawReadData: = bufferSize[: n]\
    // Read the hotels’ JSONs one by one
    lines: = bytes.Split(rawReadData, [] byte("\n"))
    for i, line: = range lines[: len(lines) - 1] {
      if i == 0 {
        line = append(previousLine, line...)
      }
      // Unmarshal the current hotel JSON
      var hotel Hotel
      _ = json.Unmarshal(line, & hotel)
      // Implememnt your logic with the hotel data
      // ...
      log.Printf("current hotel is %s", hotel.Name)
    }
    lastLine: = lines[len(lines) - 1]
    previousLine = make([] byte, len(lastLine))
    copy(previousLine, lastLine)
  }
}

func main() {
  parseDump("dump_en.json.zst")
}
```

**Async Golang**

```go {linenos=inline}
package main
import (
  "bytes"
  "context"
  "encoding/json"
  "io"
  "log"
  "math"
  "os"
  "golang.org/x/sync/semaphore"
  // Install the dependency
  "github.com/DataDog/zstd"
)
  
// The storage with raw data
type Raw struct {
  firstLine[] byte
  lastLine[] byte
}

// Not full struct
type Hotel struct {
  Name string `json:"name"`
}

// The function copies raw data without memory leak
func copySlice(slice[] byte)[] byte {
  copiedSlice: = make([] byte, len(slice))
  for i,
  v: = range slice {
    copiedSlice[i] = v
  }
  return copiedSlice
}

// The function works with raw hotel byte data
func processHotel(hotelRaw[] byte) {
  // Unmarshal the hotel JSON
  var hotel Hotel
  err: = json.Unmarshal(hotelRaw, & hotel)
  if err != nil {
    log.Println(err)
  }
  // Implememnt your logic with the hotel data
  // ...
  log.Printf("current hotel is %s", hotel.Name)
}

// The function works with with raw chunks
func processChunk(chunk[] byte, sem * semaphore.Weighted, rawChan chan Raw) {
  defer sem.Release(1)
  lines: = bytes.Split(chunk, [] byte("\n"))
  rawChan < -Raw {
    firstLine: copySlice(lines[0]),
    lastLine: copySlice(lines[len(lines) - 1]),
  }
  for _, line: = range lines[1: len(lines) - 1] {
    processHotel(line)
  }
}

// The function processes raw hotel data
func processRawHotels(raws[] Raw) {
  for i, r: = range raws {
    if i == 0 {
      processHotel(r.firstLine)
      continue
    }
    data: = append(raws[i - 1].lastLine, r.firstLine...)
    processHotel(data)
  }
}

// The function parses the dump
func parseDump(filename string) {
  // Open the dump
  file, err: = os.Open(filename)
  if err != nil {
    log.Fatal(err)
  }
  defer file.Close()
  reader: = zstd.NewReader(file)
  // Read the dump by 16 MB chunks
  bufferSize: = make([] byte, int(math.Pow(2, 24)))
  ctx: = context.Background()
  // Set the weighted semaphore to maximum 10 async goroutines
  var sem = semaphore.NewWeighted(int64(10))
  // Make the storage firstLine and lastLine lines from a chunk
  rawData: = make([] Raw, 0)
  rawChan: = make(chan Raw)
  isFinished: = false
  for {
    if isFinished {
      break
    }
    n, readErr: = reader.Read(bufferSize)
    if readErr != nil && readErr != io.EOF {
      log.Fatal(readErr)
    }
    if readErr == io.EOF {
      isFinished = true
    }
    rawReadData: = bufferSize[: n]
    actualLine: = make([] byte, len(rawReadData))
    copy(actualLine, rawReadData)
    // Read the hotels’ JSONs one by one
    _ = sem.Acquire(ctx, 1)
    go processChunk(actualLine, sem, rawChan)
    rawData = append(rawData, < -rawChan)
  }
  processRawHotels(rawData)
}

func main() {
  parseDump("dump_en.json.zst")
}
```



