diff --git a/src/common/Protocol.ts b/src/common/Protocol.ts index 1a5e124..d1dd698 100644 --- a/src/common/Protocol.ts +++ b/src/common/Protocol.ts @@ -43,6 +43,11 @@ const EV_SERVER_INIT = 4 const EV_CLIENT_EVENT = 2 const EV_CLIENT_INIT = 3 +const EV_CLIENT_INIT_REPLAY = 5 +const EV_SERVER_INIT_REPLAY = 6 +const EV_CLIENT_REPLAY_EVENT = 7 +const EV_SERVER_REPLAY_EVENT = 8 + const LOG_HEADER = 1 const LOG_ADD_PLAYER = 2 const LOG_UPDATE_PLAYER = 4 @@ -70,6 +75,11 @@ export default { EV_CLIENT_EVENT, EV_CLIENT_INIT, + EV_CLIENT_INIT_REPLAY, + EV_SERVER_INIT_REPLAY, + EV_CLIENT_REPLAY_EVENT, + EV_SERVER_REPLAY_EVENT, + LOG_HEADER, LOG_ADD_PLAYER, LOG_UPDATE_PLAYER, diff --git a/src/frontend/Communication.ts b/src/frontend/Communication.ts index 5301faa..753ce6f 100644 --- a/src/frontend/Communication.ts +++ b/src/frontend/Communication.ts @@ -112,6 +112,54 @@ function connect( }) } + +let onReplayDataCallback = (logEntry: any[]) => {} +const onReplayData = (fn: any) => { + onReplayDataCallback = fn +} +function connectReplay( + address: string, + gameId: string, + clientId: string +): Promise { + clientSeq = 0 + events = {} + setConnectionState(CONN_STATE_CONNECTING) + return new Promise(resolve => { + ws = new WebSocket(address, clientId + '|' + gameId) + ws.onopen = () => { + setConnectionState(CONN_STATE_CONNECTED) + send([Protocol.EV_CLIENT_INIT_REPLAY]) + } + + ws.onmessage = (e: MessageEvent) => { + const msg: ServerEvent = JSON.parse(e.data) + const msgType = msg[0] + if (msgType === Protocol.EV_SERVER_INIT_REPLAY) { + const game = msg[1] + resolve(game) + } else if (msgType === Protocol.EV_SERVER_REPLAY_EVENT) { + onReplayDataCallback(msg[1]) + } else { + throw `[ 2021-06-05 invalid connectReplay msgType ${msgType} ]` + } + } + + ws.onerror = () => { + setConnectionState(CONN_STATE_DISCONNECTED) + throw `[ 2021-05-15 onerror ]` + } + + ws.onclose = (e: CloseEvent) => { + if (e.code === CODE_CUSTOM_DISCONNECT || e.code === CODE_GOING_AWAY) { + setConnectionState(CONN_STATE_CLOSED) + } else { + setConnectionState(CONN_STATE_DISCONNECTED) + } + } + }) +} + async function requestReplayData( gameId: string, offset: number, @@ -139,7 +187,17 @@ function sendClientEvent(evt: GameEvent): void { send([Protocol.EV_CLIENT_EVENT, clientSeq, events[clientSeq]]) } +function requestMoreReplayData(): void { + send([Protocol.EV_CLIENT_REPLAY_EVENT]) +} + export default { + connectReplay, + requestMoreReplayData, + onReplayData, + + + connect, requestReplayData, disconnect, diff --git a/src/frontend/game.ts b/src/frontend/game.ts index 7a14e42..15df76d 100644 --- a/src/frontend/game.ts +++ b/src/frontend/game.ts @@ -310,28 +310,6 @@ export async function main( HUD.setConnectionState(state) }) - const queryNextReplayBatch = async ( - gameId: string - ): Promise => { - const offset = REPLAY.dataOffset - REPLAY.dataOffset += REPLAY.dataSize - const replay: ReplayData = await Communication.requestReplayData( - gameId, - offset, - REPLAY.dataSize - ) - - // cut log that was already handled - REPLAY.log = REPLAY.log.slice(REPLAY.logPointer) - REPLAY.logPointer = 0 - REPLAY.log.push(...replay.log) - - if (replay.log.length < REPLAY.dataSize) { - REPLAY.final = true - } - return replay - } - let TIME: () => number = () => 0 const connect = async () => { if (MODE === MODE_PLAY) { @@ -344,15 +322,31 @@ export async function main( REPLAY.dataSize = 10000 REPLAY.speeds = [0.5, 1, 2, 5, 10, 20, 50, 100, 250, 500] REPLAY.speedIdx = 1 - const replay: ReplayData = await queryNextReplayBatch(gameId) - if (!replay.game) { + + Communication.onReplayData((logLines: any[][]) => { + if (logLines.length === 0) { + console.log('MUHHHHH FINAL!!!!') + REPLAY.final = true + } else { + if (REPLAY.logPointer === 0) { + REPLAY.log.push(...logLines) + } else { + REPLAY.log = REPLAY.log.slice(REPLAY.logPointer - 1) + REPLAY.logPointer = 1 + REPLAY.log.push(...logLines) + } + } + }) + + const game: any = await Communication.connectReplay(wsAddress, gameId, clientId) + if (!game) { throw '[ 2021-05-29 no game received ]' } - const gameObject: GameType = Util.decodeGame(replay.game) + const gameObject: GameType = Util.decodeGame(game) Game.setGame(gameObject.id, gameObject) REPLAY.lastRealTs = Time.timestamp() - REPLAY.gameStartTs = parseInt(replay.log[0][4], 10) + REPLAY.gameStartTs = Game.getStartTs(gameId) REPLAY.lastGameTs = REPLAY.gameStartTs REPLAY.paused = false REPLAY.skipNonActionPhases = false @@ -521,7 +515,7 @@ export async function main( } // // TODO: remove (make changable via interface) - // REPLAY.skipNonActionPhases = true + REPLAY.skipNonActionPhases = true if (MODE === MODE_PLAY) { Communication.onServerChange((msg: ServerEvent) => { @@ -580,8 +574,10 @@ export async function main( } const next = async () => { - if (REPLAY.logPointer + 1 >= REPLAY.log.length) { - await queryNextReplayBatch(gameId) + if (REPLAY.logPointer >= REPLAY.log.length) { + Communication.requestMoreReplayData() + to = setTimeout(next, 50) + return } const realTs = Time.timestamp() @@ -597,19 +593,18 @@ export async function main( if (REPLAY.paused) { break } - const nextIdx = REPLAY.logPointer + 1 - if (nextIdx >= REPLAY.log.length) { + if (REPLAY.logPointer >= REPLAY.log.length) { break } - const currLogEntry = REPLAY.log[REPLAY.logPointer] - const currTs: Timestamp = REPLAY.gameStartTs + currLogEntry[currLogEntry.length - 1] - const nextLogEntry = REPLAY.log[nextIdx] + const lastLogEntry = REPLAY.logPointer > 0 ? REPLAY.log[REPLAY.logPointer - 1] : null + const lastTs: Timestamp = REPLAY.gameStartTs + (lastLogEntry ? lastLogEntry[lastLogEntry.length - 1] : 0) + const nextLogEntry = REPLAY.log[REPLAY.logPointer] const nextTs: Timestamp = REPLAY.gameStartTs + nextLogEntry[nextLogEntry.length - 1] if (nextTs > maxGameTs) { // next log entry is too far into the future if (REPLAY.skipNonActionPhases && (maxGameTs + 50 < nextTs)) { - const skipInterval = nextTs - currTs + const skipInterval = nextTs - lastTs // lets skip to the next log entry // log.info('skipping non-action, from', maxGameTs, skipInterval) maxGameTs += skipInterval @@ -620,13 +615,15 @@ export async function main( if (handleLogEntry(nextLogEntry, nextTs)) { RERENDER = true } - REPLAY.logPointer = nextIdx + REPLAY.logPointer++ } while (true) REPLAY.lastRealTs = realTs REPLAY.lastGameTs = maxGameTs updateTimerElements() - if (!REPLAY.final) { + if (REPLAY.final && REPLAY.logPointer + 1 >= REPLAY.log.length) { + // done + } else { to = setTimeout(next, 50) } } diff --git a/src/server/GameLog.ts b/src/server/GameLog.ts index d764304..72d69ec 100644 --- a/src/server/GameLog.ts +++ b/src/server/GameLog.ts @@ -1,10 +1,12 @@ +import WebSocket from 'ws' import fs from 'fs' import readline from 'readline' import stream from 'stream' import Time from '../common/Time' -import { Timestamp } from '../common/Types' +import { Game as GameType, ScoreMode, ShapeMode, SnapMode, Timestamp } from '../common/Types' import { logger } from './../common/Util' import { DATA_DIR } from './../server/Dirs' +import Game from './Game' const log = logger('GameLog.js') @@ -81,7 +83,118 @@ const get = async ( }) } +interface LineReader { + readLine: () => Promise, +} + +const createLineReader = async ( + gameId: string +): Promise => { + const stream: fs.ReadStream = await new Promise(resolve => { + const file = filename(gameId) + if (!fs.existsSync(file)) { + return null + } + + const instream = fs.createReadStream(file) + instream.on('readable', () => { + resolve(instream) + }) + }) + + let line = '' + const readLine = async (): Promise => { + return new Promise(resolve => { + let chunk + let resolved = false + while (null !== (chunk = stream.read(1))) { + if (chunk.toString() === "\n") { + resolve(line) + line = '' + resolved = true + break + } + line += chunk + } + + if (!resolved) { + resolve('') + } + }) + } + + return { + readLine, + } +} + +interface SocketGameLog { + socket: WebSocket + rl: LineReader +} + +const connected: SocketGameLog[] = [] +const getSocketGameLog = ( + socket: WebSocket, +): SocketGameLog|null => { + for (const entry of connected) { + if (entry.socket === socket) { + return entry + } + } + return null +} + +const open = async (socket: WebSocket, gameId: string): Promise => { + const rl = await createLineReader(gameId) + if (!rl) { + return null + } + const socketGameLog = { + socket: socket, + rl: rl, + } + const line = await rl.readLine() + const log = JSON.parse(line) + connected.push(socketGameLog) + const g: GameType = await Game.createGameObject( + gameId, + log[2], + log[3], + log[4], + log[5] || ScoreMode.FINAL, + log[6] || ShapeMode.NORMAL, + log[7] || SnapMode.NORMAL, + ) + return g +} + +const getNextBySocket = async (socket: WebSocket): Promise => { + const socketGameLog = getSocketGameLog(socket) + if (!socketGameLog) { + return [] + } + + const lines = [] + for (let i = 0; i < 1000; i++) { + const line = await socketGameLog.rl.readLine() + if (line) { + try { + lines.push(JSON.parse(line)) + } catch (e) { + log.error(e) + log.error(line) + } + } else { + break + } + } + return lines +} + export default { + open, + getNextBySocket, shouldLog, create, exists, diff --git a/src/server/main.ts b/src/server/main.ts index 215ec6f..fd75379 100644 --- a/src/server/main.ts +++ b/src/server/main.ts @@ -247,6 +247,37 @@ wss.on('message', async ( const msg = JSON.parse(data as string) const msgType = msg[0] switch (msgType) { + case Protocol.EV_CLIENT_INIT_REPLAY: { + if (!GameLog.exists(gameId)) { + throw `[gamelog ${gameId} does not exist... ]` + } + + // should connect the socket with game log + // pseudo code + const game = await GameLog.open(socket, gameId) + if (!game) { + throw `[game not created :/ ]` + } + notify( + [Protocol.EV_SERVER_INIT_REPLAY, Util.encodeGame(game)], + [socket] + ) + } break + + case Protocol.EV_CLIENT_REPLAY_EVENT: { + if (!GameLog.exists(gameId)) { + throw `[gamelog ${gameId} does not exist... ]` + } + // should read next some lines from game log, using the game + // log connected with this socket + // pseudo code + const logEntries = await GameLog.getNextBySocket(socket) + notify( + [Protocol.EV_SERVER_REPLAY_EVENT, logEntries], + [socket] + ) + } break + case Protocol.EV_CLIENT_INIT: { if (!GameCommon.exists(gameId)) { throw `[game ${gameId} does not exist... ]`