replay with ws

This commit is contained in:
Zutatensuppe 2021-06-05 15:14:36 +02:00
parent 22f5ce0065
commit 9a13838d33
5 changed files with 247 additions and 38 deletions

View file

@ -43,6 +43,11 @@ const EV_SERVER_INIT = 4
const EV_CLIENT_EVENT = 2
const EV_CLIENT_INIT = 3
const EV_CLIENT_INIT_REPLAY = 5
const EV_SERVER_INIT_REPLAY = 6
const EV_CLIENT_REPLAY_EVENT = 7
const EV_SERVER_REPLAY_EVENT = 8
const LOG_HEADER = 1
const LOG_ADD_PLAYER = 2
const LOG_UPDATE_PLAYER = 4
@ -70,6 +75,11 @@ export default {
EV_CLIENT_EVENT,
EV_CLIENT_INIT,
EV_CLIENT_INIT_REPLAY,
EV_SERVER_INIT_REPLAY,
EV_CLIENT_REPLAY_EVENT,
EV_SERVER_REPLAY_EVENT,
LOG_HEADER,
LOG_ADD_PLAYER,
LOG_UPDATE_PLAYER,

View file

@ -112,6 +112,54 @@ function connect(
})
}
let onReplayDataCallback = (logEntry: any[]) => {}
const onReplayData = (fn: any) => {
onReplayDataCallback = fn
}
function connectReplay(
address: string,
gameId: string,
clientId: string
): Promise<EncodedGame> {
clientSeq = 0
events = {}
setConnectionState(CONN_STATE_CONNECTING)
return new Promise(resolve => {
ws = new WebSocket(address, clientId + '|' + gameId)
ws.onopen = () => {
setConnectionState(CONN_STATE_CONNECTED)
send([Protocol.EV_CLIENT_INIT_REPLAY])
}
ws.onmessage = (e: MessageEvent) => {
const msg: ServerEvent = JSON.parse(e.data)
const msgType = msg[0]
if (msgType === Protocol.EV_SERVER_INIT_REPLAY) {
const game = msg[1]
resolve(game)
} else if (msgType === Protocol.EV_SERVER_REPLAY_EVENT) {
onReplayDataCallback(msg[1])
} else {
throw `[ 2021-06-05 invalid connectReplay msgType ${msgType} ]`
}
}
ws.onerror = () => {
setConnectionState(CONN_STATE_DISCONNECTED)
throw `[ 2021-05-15 onerror ]`
}
ws.onclose = (e: CloseEvent) => {
if (e.code === CODE_CUSTOM_DISCONNECT || e.code === CODE_GOING_AWAY) {
setConnectionState(CONN_STATE_CLOSED)
} else {
setConnectionState(CONN_STATE_DISCONNECTED)
}
}
})
}
async function requestReplayData(
gameId: string,
offset: number,
@ -139,7 +187,17 @@ function sendClientEvent(evt: GameEvent): void {
send([Protocol.EV_CLIENT_EVENT, clientSeq, events[clientSeq]])
}
function requestMoreReplayData(): void {
send([Protocol.EV_CLIENT_REPLAY_EVENT])
}
export default {
connectReplay,
requestMoreReplayData,
onReplayData,
connect,
requestReplayData,
disconnect,

View file

@ -310,28 +310,6 @@ export async function main(
HUD.setConnectionState(state)
})
const queryNextReplayBatch = async (
gameId: string
): Promise<ReplayData> => {
const offset = REPLAY.dataOffset
REPLAY.dataOffset += REPLAY.dataSize
const replay: ReplayData = await Communication.requestReplayData(
gameId,
offset,
REPLAY.dataSize
)
// cut log that was already handled
REPLAY.log = REPLAY.log.slice(REPLAY.logPointer)
REPLAY.logPointer = 0
REPLAY.log.push(...replay.log)
if (replay.log.length < REPLAY.dataSize) {
REPLAY.final = true
}
return replay
}
let TIME: () => number = () => 0
const connect = async () => {
if (MODE === MODE_PLAY) {
@ -344,15 +322,31 @@ export async function main(
REPLAY.dataSize = 10000
REPLAY.speeds = [0.5, 1, 2, 5, 10, 20, 50, 100, 250, 500]
REPLAY.speedIdx = 1
const replay: ReplayData = await queryNextReplayBatch(gameId)
if (!replay.game) {
Communication.onReplayData((logLines: any[][]) => {
if (logLines.length === 0) {
console.log('MUHHHHH FINAL!!!!')
REPLAY.final = true
} else {
if (REPLAY.logPointer === 0) {
REPLAY.log.push(...logLines)
} else {
REPLAY.log = REPLAY.log.slice(REPLAY.logPointer - 1)
REPLAY.logPointer = 1
REPLAY.log.push(...logLines)
}
}
})
const game: any = await Communication.connectReplay(wsAddress, gameId, clientId)
if (!game) {
throw '[ 2021-05-29 no game received ]'
}
const gameObject: GameType = Util.decodeGame(replay.game)
const gameObject: GameType = Util.decodeGame(game)
Game.setGame(gameObject.id, gameObject)
REPLAY.lastRealTs = Time.timestamp()
REPLAY.gameStartTs = parseInt(replay.log[0][4], 10)
REPLAY.gameStartTs = Game.getStartTs(gameId)
REPLAY.lastGameTs = REPLAY.gameStartTs
REPLAY.paused = false
REPLAY.skipNonActionPhases = false
@ -521,7 +515,7 @@ export async function main(
}
// // TODO: remove (make changable via interface)
// REPLAY.skipNonActionPhases = true
REPLAY.skipNonActionPhases = true
if (MODE === MODE_PLAY) {
Communication.onServerChange((msg: ServerEvent) => {
@ -580,8 +574,10 @@ export async function main(
}
const next = async () => {
if (REPLAY.logPointer + 1 >= REPLAY.log.length) {
await queryNextReplayBatch(gameId)
if (REPLAY.logPointer >= REPLAY.log.length) {
Communication.requestMoreReplayData()
to = setTimeout(next, 50)
return
}
const realTs = Time.timestamp()
@ -597,19 +593,18 @@ export async function main(
if (REPLAY.paused) {
break
}
const nextIdx = REPLAY.logPointer + 1
if (nextIdx >= REPLAY.log.length) {
if (REPLAY.logPointer >= REPLAY.log.length) {
break
}
const currLogEntry = REPLAY.log[REPLAY.logPointer]
const currTs: Timestamp = REPLAY.gameStartTs + currLogEntry[currLogEntry.length - 1]
const nextLogEntry = REPLAY.log[nextIdx]
const lastLogEntry = REPLAY.logPointer > 0 ? REPLAY.log[REPLAY.logPointer - 1] : null
const lastTs: Timestamp = REPLAY.gameStartTs + (lastLogEntry ? lastLogEntry[lastLogEntry.length - 1] : 0)
const nextLogEntry = REPLAY.log[REPLAY.logPointer]
const nextTs: Timestamp = REPLAY.gameStartTs + nextLogEntry[nextLogEntry.length - 1]
if (nextTs > maxGameTs) {
// next log entry is too far into the future
if (REPLAY.skipNonActionPhases && (maxGameTs + 50 < nextTs)) {
const skipInterval = nextTs - currTs
const skipInterval = nextTs - lastTs
// lets skip to the next log entry
// log.info('skipping non-action, from', maxGameTs, skipInterval)
maxGameTs += skipInterval
@ -620,13 +615,15 @@ export async function main(
if (handleLogEntry(nextLogEntry, nextTs)) {
RERENDER = true
}
REPLAY.logPointer = nextIdx
REPLAY.logPointer++
} while (true)
REPLAY.lastRealTs = realTs
REPLAY.lastGameTs = maxGameTs
updateTimerElements()
if (!REPLAY.final) {
if (REPLAY.final && REPLAY.logPointer + 1 >= REPLAY.log.length) {
// done
} else {
to = setTimeout(next, 50)
}
}

View file

@ -1,10 +1,12 @@
import WebSocket from 'ws'
import fs from 'fs'
import readline from 'readline'
import stream from 'stream'
import Time from '../common/Time'
import { Timestamp } from '../common/Types'
import { Game as GameType, ScoreMode, ShapeMode, SnapMode, Timestamp } from '../common/Types'
import { logger } from './../common/Util'
import { DATA_DIR } from './../server/Dirs'
import Game from './Game'
const log = logger('GameLog.js')
@ -81,7 +83,118 @@ const get = async (
})
}
interface LineReader {
readLine: () => Promise<string>,
}
const createLineReader = async (
gameId: string
): Promise<LineReader|null> => {
const stream: fs.ReadStream = await new Promise(resolve => {
const file = filename(gameId)
if (!fs.existsSync(file)) {
return null
}
const instream = fs.createReadStream(file)
instream.on('readable', () => {
resolve(instream)
})
})
let line = ''
const readLine = async (): Promise<string> => {
return new Promise(resolve => {
let chunk
let resolved = false
while (null !== (chunk = stream.read(1))) {
if (chunk.toString() === "\n") {
resolve(line)
line = ''
resolved = true
break
}
line += chunk
}
if (!resolved) {
resolve('')
}
})
}
return {
readLine,
}
}
interface SocketGameLog {
socket: WebSocket
rl: LineReader
}
const connected: SocketGameLog[] = []
const getSocketGameLog = (
socket: WebSocket,
): SocketGameLog|null => {
for (const entry of connected) {
if (entry.socket === socket) {
return entry
}
}
return null
}
const open = async (socket: WebSocket, gameId: string): Promise<GameType|null> => {
const rl = await createLineReader(gameId)
if (!rl) {
return null
}
const socketGameLog = {
socket: socket,
rl: rl,
}
const line = await rl.readLine()
const log = JSON.parse(line)
connected.push(socketGameLog)
const g: GameType = await Game.createGameObject(
gameId,
log[2],
log[3],
log[4],
log[5] || ScoreMode.FINAL,
log[6] || ShapeMode.NORMAL,
log[7] || SnapMode.NORMAL,
)
return g
}
const getNextBySocket = async (socket: WebSocket): Promise<any[]> => {
const socketGameLog = getSocketGameLog(socket)
if (!socketGameLog) {
return []
}
const lines = []
for (let i = 0; i < 1000; i++) {
const line = await socketGameLog.rl.readLine()
if (line) {
try {
lines.push(JSON.parse(line))
} catch (e) {
log.error(e)
log.error(line)
}
} else {
break
}
}
return lines
}
export default {
open,
getNextBySocket,
shouldLog,
create,
exists,

View file

@ -247,6 +247,37 @@ wss.on('message', async (
const msg = JSON.parse(data as string)
const msgType = msg[0]
switch (msgType) {
case Protocol.EV_CLIENT_INIT_REPLAY: {
if (!GameLog.exists(gameId)) {
throw `[gamelog ${gameId} does not exist... ]`
}
// should connect the socket with game log
// pseudo code
const game = await GameLog.open(socket, gameId)
if (!game) {
throw `[game not created :/ ]`
}
notify(
[Protocol.EV_SERVER_INIT_REPLAY, Util.encodeGame(game)],
[socket]
)
} break
case Protocol.EV_CLIENT_REPLAY_EVENT: {
if (!GameLog.exists(gameId)) {
throw `[gamelog ${gameId} does not exist... ]`
}
// should read next some lines from game log, using the game
// log connected with this socket
// pseudo code
const logEntries = await GameLog.getNextBySocket(socket)
notify(
[Protocol.EV_SERVER_REPLAY_EVENT, logEntries],
[socket]
)
} break
case Protocol.EV_CLIENT_INIT: {
if (!GameCommon.exists(gameId)) {
throw `[game ${gameId} does not exist... ]`