0.4.2 • Published 5 months ago

nest-hyperpunch v0.4.2

Weekly downloads
-
License
MIT
Repository
-
Last release
5 months ago

Nest Hyperpunch

This library include the modules HyperswarmModule, DHTModule, CorestoreModule, LocaldriveModule and all the dependencies needed to integrate Hyperpunch in your nest application

How to use

  • app.service.js
import { Injectable, Dependencies, Logger } from '@nestjs/common';
import {
  DHT,
  goodbye,
  b4a,
  crypto,
  Hyperswarm,
  Corestore,
  Localdrive,
  Hyperdrive,
  debounce,
  Messages,
} from 'nest-hyperpunch';

import fs from 'fs'
import zlib from 'zlib'

const { Node } = Messages

@Injectable()
@Dependencies(DHT, Hyperswarm, Corestore)
export class AppService {
  constructor(dht, hyperswarm, corestore) {
    this.dht = dht;
    this.swarm = hyperswarm;
    this.store = corestore;
  }

  async startServer() {
    const keyPair = DHT.keyPair();

    goodbye(() => this.dht.close());

    const server = this.dht.createServer(conn => {
      Logger.log('Connection established');
      process.stdin.pipe(conn).pipe(process.stdout);
    });

    await server.listen(keyPair);

    return keyPair
  }

  async startClient() {
    goodbye(() => this.dht.close());
    Logger.log(`Connecting to: ${process.argv[3]}`);
    const publicKey = b4a.from(process.argv[3], 'hex');

    const conn = this.dht.connect(publicKey)
    conn.once('open', ()=> Logger.log('got connection'))

    process.stdin.pipe(conn).pipe(process.stdout);
  }

  async startPeer() {
    goodbye(() => this.swarm.destroy());
    const conns = []
    this.swarm.on('connection', conn => {
      const name = b4a.toString(conn.remotePublicKey, 'hex');
      Logger.log('* got a connection from:', name, '*');
      conns.push(conn);
      conn.once('close', () => conns.splice(conns.indexOf(conn), 1));
      conn.on('data', data => Logger.log(`${name}: ${data}`));
    });

    process.stdin.on('data', d => {
      for (const conn of conns) {
        conn.write(d);
      }
    })

    const topic = process.argv[3] ? b4a.from(process.argv[3], 'hex') : crypto.randomBytes(32);
    const discovery = this.swarm.join(topic, { client: true, server: true });

    await discovery.flushed();

    return topic
  }

  async startWriter() {
    goodbye(() => this.swarm.destroy());
    const core1 = this.store.get({name: 'core1', valueEncoding: 'json'});
    const core2 = this.store.get({name: 'core2'});
    const core3 = this.store.get({name: 'core3'});
    
    await Promise.all([core1.ready(), core2.ready(), core3.ready()])
    
    Logger.log(`main core key ${b4a.toString(core1.key, 'hex')}`);

    this.swarm.join(core1.discoveryKey);

    this.swarm.on('connection', conn =>{ 
      const name = b4a.toString(conn.remotePublicKey, 'hex');
      Logger.log(`Got a connection from: ${name}`);
      conn.once('close', () => Logger.log(`Close connection from: ${name}`));
      this.store.replicate(conn)
    });

    if (core1.length === 0) {
      await core1.append({
        otherKeys: [core2, core3].map(core => b4a.toString(core.key, 'hex')),
      });
    }

    process.stdin.on('data', d => {
      Logger.log(d);
      if (d.length < 5) {
        Logger.log("appending short data to core2");
        core2.append(d);
      } {
        Logger.log('appending long data to core3');
        core3.append(d);
      }
    });
  }

  async startReader() {
    goodbye(() => this.swarm.destroy());
    Logger.log(`Connecting to: ${process.argv[4]}`);
    const key = b4a.from(process.argv[4], 'hex');

    this.swarm.on('connection', conn => this.store.replicate(conn));

    const core = this.store.get({ key, valueEncoding: 'json'});

    await core.ready();

    const foundPeers = this.store.findingPeers();

    this.swarm.join(core.discoveryKey);
    this.swarm.flush().then(() => foundPeers());

    await core.update();

    if (core.length === 0) {
      console.log("Could not connect to the writer peer");
      process.exit(1);
    }

    const {otherKeys} = await core.get(0);

    for (const otherKey of otherKeys) {
      const core = this.store.get({ key: b4a.from(otherKey, 'hex')});
      core.on('append', () => {
        const seq = core.length -1;
        core.get(seq).then(block => {
          Logger.log(`Block ${seq} in Core ${otherKey}: ${block}`);
        });
      });
    }
  }

  async startBeeWriter() {
    goodbye(() => this.swarm.destroy());

    this.swarm.on('connection', conn => {
      const name = b4a.toString(conn.remotePublicKey, 'hex');
      Logger.log(`Got a connection from: ${name}`);
      conn.once('close', () => Logger.log(`Close connection from: ${name}`));
      this.store.replicate(conn);
    });

    const core = this.store.get({ name: 'test-bee-core'});

    const bee = new Hyperbee(core, {
      keyEncoding: 'utf-8',
      valueEncoding: 'utf-8',
    });

    await core.ready();

    const discovery = this.swarm.join(core.discoveryKey);

    discovery.flushed().then(() => {
      Logger.log(`bee key: ${b4a.toString(core.key, 'hex')}`);
    })

    if (core.length <= 1) {
      Logger.log('importing dictionary...');
      const dict = await this.loadDictionary();
      const batch = bee.batch();
      for (const {key, value} of dict) {
        await batch.put(key, value);
      }
      await batch.flush();
    } else {
      Logger.log('seeding dictionary...');
    }
  }

  async startBeeReader() {
    goodbye(() => this.swarm.destroy());

    this.swarm.on('connection', conn => this.store.replicate(conn));

    const core = this.store.get({ key: b4a.from(process.argv[4], 'hex')});

    const bee = new Hyperbee(core, {
      keyEncoding: 'utf-8',
      valueEncoding: 'utf-8',
    });

    await core.ready();

    Logger.log(`core key here is ${core.key.toString('hex')}`);

    this.swarm.join(core.discoveryKey);

    process.stdin.setEncoding('utf-8');

    process.stdin.on('data', data => {
      const word = data.trim();

      if(!word.length) return;
      
      bee.get(word).then(node => {
        if(!node || !node.value) Logger.log(`No dictionary entry for data ${data}`);
        else Logger.log(`${data}: ${node.value}`);
      }, err => Logger.error(err));
    });
  }

  async startCoreReader() {
    goodbye(() => this.swarm.destroy());
    
    this.swarm.on('connection', conn => this.store.replicate(conn));

    const core = this.store.get({ key: b4a.from(process.argv[4], 'hex')});

    await core.ready();

    const foundPeers = this.store.findingPeers();

    this.swarm.join(core.discoveryKey);
    this.swarm.flush().then(() => foundPeers());

    await core.update();

    const seq = core.length -1;
    const lastBlock = await core.get(core.length - 1);

    Logger.log(`Raw Block ${seq}: ${lastBlock}`);
    Logger.log(`Decode Block ${seq}`, Node.decode(lastBlock));
  }

  async startDriveWriter() {
    goodbye(() => this.swarm.destroy());

    this.swarm.on('connection', conn => {
      const name = b4a.toString(conn.remotePublicKey, 'hex');
      Logger.log(`Got a connection from: ${name}`);
      conn.once('close', () => Logger.log(`Close connection from: ${name}`));
      this.store.replicate(conn);
    });

    const local = new Localdrive('./writer-dir');
    const drive = new Hyperdrive(this.store);

    await drive.ready();

    const mirrorDrive = async() => {
      Logger.log('started mirroring changes from \'./writer-dir\' into the drive...');
      const mirror = local.mirror(drive)
      await mirror.done()
      Logger.log('finished mirroring', mirror.count);
    }

    const mirror = debounce(mirrorDrive);
    const discovery = this.swarm.join(drive.discoveryKey);
    await discovery.flushed();

    Logger.log(`drive key: ${b4a.toString(drive.key, 'hex')}`)

    process.stdin.setEncoding('utf-8');
    process.stdin.on('data', data => {
      if(!data.match('\n')) return;
      mirror()
    });
  }

  async startDriveReader() {
    goodbye(() => this.swarm.destroy());

    this.swarm.on('connection', conn => this.store.replicate(conn));

    const local = new Localdrive('./reader-dir');
    const drive = new Hyperdrive(this.store, b4a.from(process.argv[4], 'hex'));

    await drive.ready();

    const mirrorDrive = async() => {
      Logger.log('started mirroring remote drive into \'./reader-dir\'...');
      const mirror = drive.mirror(local)
      await mirror.done()
      Logger.log('finished mirroring', mirror.count);
    }

    const mirror = debounce(mirrorDrive);
    drive.core.on('append', mirror);

    const foundPeers = this.store.findingPeers();

    this.swarm.join(drive.discoveryKey, { client: true, server: false});
    this.swarm.flush().then(() => foundPeers());

    mirror();
  }

  async startDriveBeeReader() {
    goodbye(() => this.swarm.destroy());

    this.swarm.on('connection', conn => this.store.replicate(conn));
    const core = this.store.get({ key: b4a.from(process.argv[4], 'hex')});

    const bee = new Hyperbee(core, {
      keyEncoding: 'utf-8',
      valueEncoding: 'utf-8',
    });

    await core.ready();

    const foundPeers = this.store.findingPeers();

    this.swarm.join(core.discoveryKey);
    this.swarm.flush().then(() => foundPeers());

    const listBee = async () => {
      Logger.log('\n**************');
      Logger.log('hyperbee contests are now:');
      for await (const node of bee.createReadStream()) {
        Logger.log(`${node.key} -> ${node.value}`);
      }
    }

    core.on('append', listBee)

    listBee()
  }

  async loadDictionary() {
    const compressed = await fs.promises.readFile('./dict.json.gz');
    return new Promise((resolve, reject) => {
      zlib.gunzip(compressed, (err, dict) => {
        if (err) {
          reject(err);
        } else {
          return resolve(JSON.parse(b4a.toString(dict)));
        }
      });
    });
  }
}
  • app.module.js
import { Module } from '@nestjs/common';
import { AppService } from './app.service'
import { DHTModule, HyperswarmModule, CorestoreModule } from 'nest-hyperpunch'

@Module({
  imports: [DHTModule, HyperswarmModule, CorestoreModule.foorRoot(process.argv[3])],
  providers: [AppService],
})
export class AppModule {}
  • main.js
import { NestFactory } from '@nestjs/core';
import { Logger } from '@nestjs/common';
import { AppModule } from './app/app.module';
import { AppService } from './app/app.service'
import "reflect-metadata";
import { b4a } from 'nest-hyperpounch';

async function bootstrap() {
  const app = await NestFactory.createApplicationContext(AppModule, {
    logger: ['error', 'log', 'warn', 'debug'],
  });

  const appService = app.get(AppService);

  if(process.argv[2] === 'server') {
    appService.startServer().then((keyPair) => {
      Logger.log(`listening on: ${b4a.toString(keyPair.publicKey, 'hex')}`)
    });
  }

  if(process.argv[2] === 'client') {
    appService.startClient()
  }

  if(process.argv[2] === 'peer') {
    appService.startPeer().then((topic) => {
      Logger.log(`listening topic: ${b4a.toString(topic, 'hex')}`)
    })
  }

  if(process.argv[2] === 'writer') {
    appService.startWriter()
  }

  if(process.argv[2] ==='reader') {
    appService.startReader()
  }

  if(process.argv[2] === 'bee-writer') {
    appService.startBeeWriter()
  }

  if(process.argv[2] === 'bee-reader') {
    appService.startBeeReader()
  }

  if(process.argv[2] === 'core-reader') {
    appService.startCoreReader()
  }

  if(process.argv[2] === 'drive-writer') {
    appService.startDriveWriter()
  }

  if(process.argv[2] === 'drive-reader') {
    appService.startDriveReader()
  }

  if(process.argv[2] === 'drive-bee-reader') {
    appService.startDriveBeeReader()
  }
}
bootstrap();

Types will be provided in future versions

License

Nest Hyperpunch is MIT licensed. See license