0.4.2 • Published 5 months ago
nest-hyperpunch v0.4.2
Nest Hyperpunch
This library include the modules HyperswarmModule, DHTModule, CorestoreModule, LocaldriveModule and all the dependencies needed to integrate Hyperpunch in your nest application
How to use
app.service.js
import { Injectable, Dependencies, Logger } from '@nestjs/common';
import {
DHT,
goodbye,
b4a,
crypto,
Hyperswarm,
Corestore,
Localdrive,
Hyperdrive,
debounce,
Messages,
} from 'nest-hyperpunch';
import fs from 'fs'
import zlib from 'zlib'
const { Node } = Messages
@Injectable()
@Dependencies(DHT, Hyperswarm, Corestore)
export class AppService {
constructor(dht, hyperswarm, corestore) {
this.dht = dht;
this.swarm = hyperswarm;
this.store = corestore;
}
async startServer() {
const keyPair = DHT.keyPair();
goodbye(() => this.dht.close());
const server = this.dht.createServer(conn => {
Logger.log('Connection established');
process.stdin.pipe(conn).pipe(process.stdout);
});
await server.listen(keyPair);
return keyPair
}
async startClient() {
goodbye(() => this.dht.close());
Logger.log(`Connecting to: ${process.argv[3]}`);
const publicKey = b4a.from(process.argv[3], 'hex');
const conn = this.dht.connect(publicKey)
conn.once('open', ()=> Logger.log('got connection'))
process.stdin.pipe(conn).pipe(process.stdout);
}
async startPeer() {
goodbye(() => this.swarm.destroy());
const conns = []
this.swarm.on('connection', conn => {
const name = b4a.toString(conn.remotePublicKey, 'hex');
Logger.log('* got a connection from:', name, '*');
conns.push(conn);
conn.once('close', () => conns.splice(conns.indexOf(conn), 1));
conn.on('data', data => Logger.log(`${name}: ${data}`));
});
process.stdin.on('data', d => {
for (const conn of conns) {
conn.write(d);
}
})
const topic = process.argv[3] ? b4a.from(process.argv[3], 'hex') : crypto.randomBytes(32);
const discovery = this.swarm.join(topic, { client: true, server: true });
await discovery.flushed();
return topic
}
async startWriter() {
goodbye(() => this.swarm.destroy());
const core1 = this.store.get({name: 'core1', valueEncoding: 'json'});
const core2 = this.store.get({name: 'core2'});
const core3 = this.store.get({name: 'core3'});
await Promise.all([core1.ready(), core2.ready(), core3.ready()])
Logger.log(`main core key ${b4a.toString(core1.key, 'hex')}`);
this.swarm.join(core1.discoveryKey);
this.swarm.on('connection', conn =>{
const name = b4a.toString(conn.remotePublicKey, 'hex');
Logger.log(`Got a connection from: ${name}`);
conn.once('close', () => Logger.log(`Close connection from: ${name}`));
this.store.replicate(conn)
});
if (core1.length === 0) {
await core1.append({
otherKeys: [core2, core3].map(core => b4a.toString(core.key, 'hex')),
});
}
process.stdin.on('data', d => {
Logger.log(d);
if (d.length < 5) {
Logger.log("appending short data to core2");
core2.append(d);
} {
Logger.log('appending long data to core3');
core3.append(d);
}
});
}
async startReader() {
goodbye(() => this.swarm.destroy());
Logger.log(`Connecting to: ${process.argv[4]}`);
const key = b4a.from(process.argv[4], 'hex');
this.swarm.on('connection', conn => this.store.replicate(conn));
const core = this.store.get({ key, valueEncoding: 'json'});
await core.ready();
const foundPeers = this.store.findingPeers();
this.swarm.join(core.discoveryKey);
this.swarm.flush().then(() => foundPeers());
await core.update();
if (core.length === 0) {
console.log("Could not connect to the writer peer");
process.exit(1);
}
const {otherKeys} = await core.get(0);
for (const otherKey of otherKeys) {
const core = this.store.get({ key: b4a.from(otherKey, 'hex')});
core.on('append', () => {
const seq = core.length -1;
core.get(seq).then(block => {
Logger.log(`Block ${seq} in Core ${otherKey}: ${block}`);
});
});
}
}
async startBeeWriter() {
goodbye(() => this.swarm.destroy());
this.swarm.on('connection', conn => {
const name = b4a.toString(conn.remotePublicKey, 'hex');
Logger.log(`Got a connection from: ${name}`);
conn.once('close', () => Logger.log(`Close connection from: ${name}`));
this.store.replicate(conn);
});
const core = this.store.get({ name: 'test-bee-core'});
const bee = new Hyperbee(core, {
keyEncoding: 'utf-8',
valueEncoding: 'utf-8',
});
await core.ready();
const discovery = this.swarm.join(core.discoveryKey);
discovery.flushed().then(() => {
Logger.log(`bee key: ${b4a.toString(core.key, 'hex')}`);
})
if (core.length <= 1) {
Logger.log('importing dictionary...');
const dict = await this.loadDictionary();
const batch = bee.batch();
for (const {key, value} of dict) {
await batch.put(key, value);
}
await batch.flush();
} else {
Logger.log('seeding dictionary...');
}
}
async startBeeReader() {
goodbye(() => this.swarm.destroy());
this.swarm.on('connection', conn => this.store.replicate(conn));
const core = this.store.get({ key: b4a.from(process.argv[4], 'hex')});
const bee = new Hyperbee(core, {
keyEncoding: 'utf-8',
valueEncoding: 'utf-8',
});
await core.ready();
Logger.log(`core key here is ${core.key.toString('hex')}`);
this.swarm.join(core.discoveryKey);
process.stdin.setEncoding('utf-8');
process.stdin.on('data', data => {
const word = data.trim();
if(!word.length) return;
bee.get(word).then(node => {
if(!node || !node.value) Logger.log(`No dictionary entry for data ${data}`);
else Logger.log(`${data}: ${node.value}`);
}, err => Logger.error(err));
});
}
async startCoreReader() {
goodbye(() => this.swarm.destroy());
this.swarm.on('connection', conn => this.store.replicate(conn));
const core = this.store.get({ key: b4a.from(process.argv[4], 'hex')});
await core.ready();
const foundPeers = this.store.findingPeers();
this.swarm.join(core.discoveryKey);
this.swarm.flush().then(() => foundPeers());
await core.update();
const seq = core.length -1;
const lastBlock = await core.get(core.length - 1);
Logger.log(`Raw Block ${seq}: ${lastBlock}`);
Logger.log(`Decode Block ${seq}`, Node.decode(lastBlock));
}
async startDriveWriter() {
goodbye(() => this.swarm.destroy());
this.swarm.on('connection', conn => {
const name = b4a.toString(conn.remotePublicKey, 'hex');
Logger.log(`Got a connection from: ${name}`);
conn.once('close', () => Logger.log(`Close connection from: ${name}`));
this.store.replicate(conn);
});
const local = new Localdrive('./writer-dir');
const drive = new Hyperdrive(this.store);
await drive.ready();
const mirrorDrive = async() => {
Logger.log('started mirroring changes from \'./writer-dir\' into the drive...');
const mirror = local.mirror(drive)
await mirror.done()
Logger.log('finished mirroring', mirror.count);
}
const mirror = debounce(mirrorDrive);
const discovery = this.swarm.join(drive.discoveryKey);
await discovery.flushed();
Logger.log(`drive key: ${b4a.toString(drive.key, 'hex')}`)
process.stdin.setEncoding('utf-8');
process.stdin.on('data', data => {
if(!data.match('\n')) return;
mirror()
});
}
async startDriveReader() {
goodbye(() => this.swarm.destroy());
this.swarm.on('connection', conn => this.store.replicate(conn));
const local = new Localdrive('./reader-dir');
const drive = new Hyperdrive(this.store, b4a.from(process.argv[4], 'hex'));
await drive.ready();
const mirrorDrive = async() => {
Logger.log('started mirroring remote drive into \'./reader-dir\'...');
const mirror = drive.mirror(local)
await mirror.done()
Logger.log('finished mirroring', mirror.count);
}
const mirror = debounce(mirrorDrive);
drive.core.on('append', mirror);
const foundPeers = this.store.findingPeers();
this.swarm.join(drive.discoveryKey, { client: true, server: false});
this.swarm.flush().then(() => foundPeers());
mirror();
}
async startDriveBeeReader() {
goodbye(() => this.swarm.destroy());
this.swarm.on('connection', conn => this.store.replicate(conn));
const core = this.store.get({ key: b4a.from(process.argv[4], 'hex')});
const bee = new Hyperbee(core, {
keyEncoding: 'utf-8',
valueEncoding: 'utf-8',
});
await core.ready();
const foundPeers = this.store.findingPeers();
this.swarm.join(core.discoveryKey);
this.swarm.flush().then(() => foundPeers());
const listBee = async () => {
Logger.log('\n**************');
Logger.log('hyperbee contests are now:');
for await (const node of bee.createReadStream()) {
Logger.log(`${node.key} -> ${node.value}`);
}
}
core.on('append', listBee)
listBee()
}
async loadDictionary() {
const compressed = await fs.promises.readFile('./dict.json.gz');
return new Promise((resolve, reject) => {
zlib.gunzip(compressed, (err, dict) => {
if (err) {
reject(err);
} else {
return resolve(JSON.parse(b4a.toString(dict)));
}
});
});
}
}
app.module.js
import { Module } from '@nestjs/common';
import { AppService } from './app.service'
import { DHTModule, HyperswarmModule, CorestoreModule } from 'nest-hyperpunch'
@Module({
imports: [DHTModule, HyperswarmModule, CorestoreModule.foorRoot(process.argv[3])],
providers: [AppService],
})
export class AppModule {}
main.js
import { NestFactory } from '@nestjs/core';
import { Logger } from '@nestjs/common';
import { AppModule } from './app/app.module';
import { AppService } from './app/app.service'
import "reflect-metadata";
import { b4a } from 'nest-hyperpounch';
async function bootstrap() {
const app = await NestFactory.createApplicationContext(AppModule, {
logger: ['error', 'log', 'warn', 'debug'],
});
const appService = app.get(AppService);
if(process.argv[2] === 'server') {
appService.startServer().then((keyPair) => {
Logger.log(`listening on: ${b4a.toString(keyPair.publicKey, 'hex')}`)
});
}
if(process.argv[2] === 'client') {
appService.startClient()
}
if(process.argv[2] === 'peer') {
appService.startPeer().then((topic) => {
Logger.log(`listening topic: ${b4a.toString(topic, 'hex')}`)
})
}
if(process.argv[2] === 'writer') {
appService.startWriter()
}
if(process.argv[2] ==='reader') {
appService.startReader()
}
if(process.argv[2] === 'bee-writer') {
appService.startBeeWriter()
}
if(process.argv[2] === 'bee-reader') {
appService.startBeeReader()
}
if(process.argv[2] === 'core-reader') {
appService.startCoreReader()
}
if(process.argv[2] === 'drive-writer') {
appService.startDriveWriter()
}
if(process.argv[2] === 'drive-reader') {
appService.startDriveReader()
}
if(process.argv[2] === 'drive-bee-reader') {
appService.startDriveBeeReader()
}
}
bootstrap();
Types will be provided in future versions
License
Nest Hyperpunch is MIT licensed. See license