Browse Source

Reorganization of the content.

master
Mattia Belletti 8 months ago
parent
commit
615777d630
12 changed files with 274 additions and 314 deletions
  1. +1
    -1
      .vscode/extensions.json
  2. +3
    -7
      .vscode/launch.json
  3. +1
    -2
      package.json
  4. +0
    -36
      src/events-navigator.ts
  5. +1
    -2
      src/index.ts
  6. +83
    -65
      src/initialize.spec.ts
  7. +43
    -29
      src/initialize.ts
  8. +96
    -30
      src/process.spec.ts
  9. +16
    -36
      src/process.ts
  10. +0
    -3
      src/reducer.ts
  11. +0
    -88
      src/test-helpers/events-navigator.ts
  12. +30
    -15
      src/test-helpers/streams.ts

+ 1
- 1
.vscode/extensions.json View File

@@ -3,7 +3,7 @@
// Extension identifier format: ${publisher}.${name}. Example: vscode.csharp

// List of extensions which should be recommended for users of this workspace.
"recommendations": ["esbenp.prettier-vscode"],
"recommendations": ["esbenp.prettier-vscode", "dbaeumer.vscode-eslint"],
// List of extensions recommended by VS Code that should not be recommended for users of this workspace.
"unwantedRecommendations": []
}

+ 3
- 7
.vscode/launch.json View File

@@ -8,15 +8,11 @@
"type": "node",
"request": "launch",
"name": "Debug unit tests",
"skipFiles": [
"<node_internals>/**"
],
"skipFiles": ["<node_internals>/**"],
"program": "${workspaceFolder}/node_modules/tape/bin/tape",
"runtimeArgs": ["--require", "ts-node/register"],
"args": [
"${workspaceFolder}/src/**/process.spec.ts"
],
"args": ["${workspaceFolder}/src/**/*.spec.ts"],
"console": "internalConsole"
}
]
}
}

+ 1
- 2
package.json View File

@@ -8,8 +8,7 @@
"license": "MIT",
"scripts": {
"build": "tsc",
"unit-tests": "nyc node_modules/tape/bin/tape src/**/*.spec.ts | faucet",
"unit-tests-one": "nyc node_modules/tape/bin/tape src/**/process.spec.ts"
"unit-tests": "nyc node_modules/tape/bin/tape src/**/*.spec.ts | faucet"
},
"nyc": {
"include": [


+ 0
- 36
src/events-navigator.ts View File

@@ -1,36 +0,0 @@
import { ZoresyEvent } from "./events";

/**
* A navigator inside a timeline of events chronologically ordered according to time at which they were received by the
* local server.
*/
export interface EventsNavigator {
/**
* Get the latest id the navigator knows of.
* @returns A promise containing the id of the latest event the navigator knows of.
*/
getLatestId(): Promise<string>;

/**
* Get the id of the event before the current event.
* @param id The id of the event to start from.
* @returns A promise containing the id of the event before the current one,
* or null if we are already at the first event.
*/
getPreviousId(id: string): Promise<string | null>;

/**
* Get the id of the event after the current event. Note well: this event will block until
* the next event is available, or return immediately if it's already present.
* @param id The id of the event to start from.
* @returns The id of the event after the current event.
*/
getNextId(id: string): Promise<string>;

/**
* Get an event by id.
* @param id The id of the event to get.
* @returns A promise containing the event with given id.
*/
getEventById(id: string): Promise<ZoresyEvent>;
}

+ 1
- 2
src/index.ts View File

@@ -1,4 +1,3 @@
export * from "./events";
export * from "./events-navigator";
export * from "./initialize";
export * from "./reducer";
export * from "./process";

+ 83
- 65
src/initialize.spec.ts View File

@@ -1,78 +1,96 @@
import * as test from "tape-promise/tape";

import { initialize, Result } from "./initialize";
import {
createEventsNavigator,
MyState,
ceProclaim,
ceClaim,
ce
} from "./test-helpers/events-navigator";
import { initialize, InitializeCursor } from "./initialize";
import { ZoresyEvent } from "./events";
import { fromPairs } from "ramda";

const initTest = async () => {
const eventsNavigator = await createEventsNavigator();
let resultContainer: { result: Result<MyState> | null } = { result: null };
const initializePromise = initialize<MyState>(eventsNavigator).then(r => {
resultContainer.result = r;
interface MyState {
x: number;
}

const ce = (id: string, type: string = "generic"): ZoresyEvent => ({
id,
type,
sender: "sender",
parents: [],
timestamp: 0,
});

const ceProclaim = (id: string, claimId: string) => ({
...ce(id, "proclaim"),
claimId,
});

const ceClaim = (id: string, state: MyState) => ({
...ce(id, "claim"),
state,
});

const initTest = (...eventList: ZoresyEvent[]) => {
const eventsById = fromPairs(eventList.map((e) => [e.id, e]));
const getCursor = (i: number): InitializeCursor => ({
event: eventList[i],
back: () => Promise.resolve(i === 0 ? null : getCursor(i - 1)),
});
return { resultContainer, eventsNavigator, initializePromise };
return initialize<MyState>(
Promise.resolve(getCursor(eventList.length - 1)),
(id: string) => Promise.resolve(eventsById[id])
);
};

test("initialize: an 'empty' timeline returns an empty state and the only event's id.", async t => {
const {
resultContainer,
eventsNavigator,
initializePromise
} = await initTest();
t.equals(resultContainer.result, null);
await eventsNavigator.finish();
await initializePromise;
t.deepEqual(resultContainer.result, {
test("initialize: an 'empty' timeline returns an empty state and the only event's id.", async (t) => {
t.deepEqual(await initTest(ce("create")), {
state: {},
id: "create"
id: "create",
});
});

test("initialize: a timeline with just a claim and proclaim event defines the given state.", async t => {
const {
resultContainer,
eventsNavigator,
initializePromise
} = await initTest();
t.equals(resultContainer.result, null);
await eventsNavigator.addPrevious(ceProclaim("proclaim", "claim"));
t.equals(resultContainer.result, null);
await eventsNavigator.addPrevious(ceClaim("claim", { x: 3 }));
await initializePromise;
t.deepEqual(resultContainer.result, {
state: {
x: 3
},
id: "claim"
});
test("initialize: a timeline with just a claim and proclaim event defines the given state.", async (t) => {
t.deepEqual(
await initTest(ceClaim("claim", { x: 3 }), ceProclaim("proclaim", "claim")),
{
state: {
x: 3,
},
id: "claim",
}
);
});

test("initialize: a timeline with a claim, a proclaim, and some more spurious events defines the given state.", async t => {
const {
resultContainer,
eventsNavigator,
initializePromise
} = await initTest();
t.equals(resultContainer.result, null);
await eventsNavigator.addPrevious(ce("a"));
await eventsNavigator.addPrevious(ce("b"));
await eventsNavigator.addPrevious(ceProclaim("proclaim", "claim"));
await eventsNavigator.addPrevious(ce("c"));
await eventsNavigator.addPrevious(ce("d"));
t.equals(resultContainer.result, null);
await eventsNavigator.addPrevious(ceClaim("claim", { x: 3 }));
await eventsNavigator.addPrevious(ce("e"));
await eventsNavigator.addPrevious(ce("f"));
await initializePromise;
t.deepEqual(resultContainer.result, {
state: {
x: 3
},
id: "claim"
});
test("initialize: a timeline with a claim, a proclaim, and some more spurious events defines the given state.", async (t) => {
t.deepEqual(
await initTest(
ce("f"),
ce("e"),
ceClaim("claim", { x: 3 }),
ce("d"),
ce("c"),
ceProclaim("proclaim", "claim"),
ce("a"),
ce("b")
),
{
state: {
x: 3,
},
id: "claim",
}
);
});

test("initialize: in a timeline with multiple claim/proclaim couples, only the last one is used.", async (t) => {
t.deepEqual(
await initTest(
ceClaim("claim", { x: 3 }),
ceProclaim("proclaim", "claim"),
ceClaim("claim2", { x: 4 }),
ceProclaim("proclaim2", "claim2")
),
{
state: {
x: 4,
},
id: "claim2",
}
);
});

+ 43
- 29
src/initialize.ts View File

@@ -1,6 +1,28 @@
import { EventsNavigator } from "./events-navigator";
import { ProclaimEvent, ClaimEvent, ZoresyEvent } from "./events";

/**
* A cursor to travel back through the history of messages.
*/
export interface InitializeCursor {
/**
* The event this cursor points to.
*/
event: ZoresyEvent;
/**
* Get back one step.
* @returns A promise containing the cursor that points one item back, or `null` if the cursor points at the
* beginning of the timeline.
*/
back: () => Promise<InitializeCursor | null>;
}

/**
* Get an event from an id.
* @param id The id of the event to retrieve.
* @returns A promise containing the requested event.
*/
export type GetEventById = (id: string) => Promise<ZoresyEvent>;

/**
* The result of an initialization process.
*/
@@ -10,52 +32,44 @@ export interface Result<State> {
*/
state: State | {};
/**
* The id of the event after which we get the state determined by the process.
* The id of the event where to start processing to obtain the next state.
*/
id: string;
}

/**
* Get the latest consistent state we can get, and the cursor at which that state was obtained.
* @param eventsNavigator The events navigator.
* @param startingCursor The cursor pointing to the current end of the timeline.
* @param getEventById A method to obtain an event from an id.
* @returns The result of the initialization process.
*/
export const initialize = <State>(
eventsNavigator: EventsNavigator
startingCursor: Promise<InitializeCursor>,
getEventById: GetEventById
): Promise<Result<State>> => {
const makeResult = (state: State | {}, id: string) =>
({
state,
id
id,
} as Result<State>);

const getResultFromProclaim = (event: ProclaimEvent) =>
eventsNavigator
.getEventById(event.claimId)
.then(claimEvent =>
makeResult((claimEvent as ClaimEvent<State>).state, claimEvent.id)
);

const getResultFromPreviousEvents = (eventId: string) =>
eventsNavigator
.getPreviousId(eventId)
.then(previousId =>
previousId === null
? makeResult({}, eventId)
: getResultFromEventOrPrevious(previousId)
);
const getResultFromProclaimEvent = (event: ProclaimEvent) =>
getEventById(event.claimId).then((claimEvent) =>
makeResult((claimEvent as ClaimEvent<State>).state, claimEvent.id)
);

const getResultFromEventOrPrevious = (
eventId: string
previousEventId: string,
cursorPromise: Promise<InitializeCursor | null>
): Promise<Result<State>> =>
eventsNavigator
.getEventById(eventId)
.then(event =>
event.type === "proclaim"
? getResultFromProclaim(event as ProclaimEvent)
: getResultFromPreviousEvents(eventId)
);
cursorPromise.then((cursor) =>
cursor === null
? makeResult({}, previousEventId)
: cursor.event.type === "proclaim"
? getResultFromProclaimEvent(cursor.event as ProclaimEvent)
: getResultFromEventOrPrevious(cursor.event.id, cursor.back())
);

// get the state starting from the latest events
return eventsNavigator.getLatestId().then(getResultFromEventOrPrevious);
return getResultFromEventOrPrevious("", startingCursor);
};

+ 96
- 30
src/process.spec.ts View File

@@ -1,33 +1,99 @@
import * as test from "tape-promise/tape";

import { process, isNonEmptyState } from "./process";
import {
createEventsNavigator,
MyState,
ceProclaim,
ceClaim,
ce
} from "./test-helpers/events-navigator";
import { streamToList } from "./test-helpers/streams";

test("process: processing an empty timeline returns a stream with an empty state", async t => {
try {
console.log("Create events navigator.");
const eventsNavigator = await createEventsNavigator();
console.log("Create stream.");
const stream = process<MyState>(eventsNavigator, x => x);
console.log("Produce event list.");
const events = await streamToList(
stream,
state => (isNonEmptyState(state) ? state.x : ""),
1
);
console.log("Check.");
t.deepEqual(events, [""]);
} catch (e) {
console.error(e);
throw e;
} finally {
console.log("Tests finished.");
}
import { process, isNonEmptyState, Reducer } from "./process";
import { streamToList, listToStream } from "./test-helpers/streams";
import { ZoresyEvent } from "./events";

type MyState = {
ids: (string | null)[];
};

export const e = (
id: string,
parents: string[],
timestamp: number = 0
): ZoresyEvent => ({
type: "generic",
id,
sender: "sender",
parents,
timestamp,
});

const reducer: Reducer<MyState> = (state, event) => ({
ids: (isNonEmptyState(state) ? state.ids : []).concat(event.id),
});

const runTest = (startingState: MyState | {}, input: ZoresyEvent[]) =>
streamToList(
process<MyState>(startingState, listToStream<ZoresyEvent>(input), reducer)
);

test("process: processing an empty timeline returns a stream with the starting state", async (t) => {
t.deepEquals(await runTest({}, []), [{}]);
t.deepEquals(await runTest({ ids: ["a", "b", "c"] }, []), [
{ ids: ["a", "b", "c"] },
]);
});

test("process: processing a non-empty linear timeline returns a stream with a linear progression of states", async (t) => {
t.deepEquals(await runTest({}, [e("a", []), e("b", ["a"]), e("c", ["b"])]), [
{},
{ ids: ["a"] },
{ ids: ["a", "b"] },
{ ids: ["a", "b", "c"] },
]);
t.deepEquals(
await runTest({ ids: ["x"] }, [e("a", []), e("b", ["a"]), e("c", ["b"])]),
[
{ ids: ["x"] },
{ ids: ["x", "a"] },
{ ids: ["x", "a", "b"] },
{ ids: ["x", "a", "b", "c"] },
]
);
});

test("process: processing a non-empty non-linear timeline returns a stream with a linear progression of states", async (t) => {
/*
a[0]
/ \
b[2] c[1]
\ /
d[2]
local timeline: a, b, c, d
final top. sorted timeline: [a, c, b, d]
*/
t.deepEquals(
await runTest({}, [
e("a", [], 0),
e("b", ["a"], 2),
e("c", ["a"], 1),
e("d", ["b", "c"], 2),
]),
[
{},
{ ids: ["a"] },
{ ids: ["a", "b"] },
{ ids: ["a", "c", "b"] },
{ ids: ["a", "c", "b", "d"] },
]
);

t.deepEquals(
await runTest({ ids: ["x"] }, [
e("a", [], 0),
e("b", ["a"], 2),
e("c", ["a"], 1),
e("d", ["b", "c"], 2),
]),
[
{ ids: ["x"] },
{ ids: ["x", "a"] },
{ ids: ["x", "a", "b"] },
{ ids: ["x", "a", "c", "b"] },
{ ids: ["x", "a", "c", "b", "d"] },
]
);
});

+ 16
- 36
src/process.ts View File

@@ -1,48 +1,28 @@
import { fromPromise, join, now, merge } from "@most/core";
import { Stream } from "@most/types";
import { scan, map } from "@most/core";

import { EventsNavigator } from "./events-navigator";
import { Reducer } from "./reducer";
import { initialize } from "./initialize";
import { ZoresyEvent } from "./events";
import { topologicalSort } from "./topological-sort";

export type Reducer<State> = (
state: State | {},
event: ZoresyEvent
) => State | {};

export const isNonEmptyState = <State>(state: State | {}): state is State =>
Object.getOwnPropertyNames(state).length !== 0;

export const process = <State>(
eventsNavigator: EventsNavigator,
startingState: State | {},
eventStream: Stream<ZoresyEvent>,
reducer: Reducer<State>
): Stream<State | {}> => {
const promiseOfStreamToStream = (
promise: Promise<Stream<State>>
): Stream<State> => join(fromPromise(promise));

const getEventStreamFrom = (
state: State | {},
prevEvents: ZoresyEvent[],
currentId: string
): Stream<State | {}> =>
promiseOfStreamToStream(
eventsNavigator
.getNextId(currentId)
.then(eventsNavigator.getEventById)
.then(event => {
const events = prevEvents.concat([event]);
const newState = topologicalSort(events).reduce<State | {}>(
reducer,
state
);
return merge(
now(newState),
getEventStreamFrom(newState, events, event.id)
);
})
);

return promiseOfStreamToStream(
initialize<State>(eventsNavigator).then<Stream<State>>(({ state, id }) =>
getEventStreamFrom(state, [], id)
): Stream<State | {}> =>
map(
(events: ZoresyEvent[]) =>
topologicalSort(events).reduce(reducer, startingState),
scan(
(events, event) => events.concat([event]),
[] as ZoresyEvent[],
eventStream
)
);
};

+ 0
- 3
src/reducer.ts View File

@@ -1,3 +0,0 @@
import { ZoresyEvent } from "./events";

export type Reducer<State> = (state: State | {}, event: ZoresyEvent) => State | {};

+ 0
- 88
src/test-helpers/events-navigator.ts View File

@@ -1,88 +0,0 @@
import { ZoresyEvent } from "../events";
import { EventsNavigator } from "../events-navigator";

export const ce = (id: string, type: string = "generic"): ZoresyEvent => ({
type,
id,
sender: "sender",
parents: [],
timestamp: 0
});

export interface MyState {
x: number;
}

export const ceProclaim = (id: string, claimId: string) => ({
...ce(id, "proclaim"),
claimId
});

export const ceClaim = (id: string, state: MyState) => ({
...ce(id, "claim"),
state
});

export interface TestEventNavigator extends EventsNavigator {
addPrevious: (event: ZoresyEvent) => Promise<void>;
finish: () => Promise<void>;
}

export const createEventsNavigator = async () => {
// reversed list of events
const events: ZoresyEvent[] = [];

const waitForIndex: { [index: string]: Function[] } = {};
const addResolve = (index: number) => (resolve: Function) => {
waitForIndex[index] = waitForIndex[index] || [];
waitForIndex[index].push(resolve);
};
const resolveIndex = (index: number | string) => {
waitForIndex[index] = waitForIndex[index] || [];
while (waitForIndex[index].length > 0) {
waitForIndex[index].splice(0, 1)[0](event);
}
delete waitForIndex[index];
};

const getIndex = (index: number) =>
index < events.length
? Promise.resolve(events[index])
: new Promise<ZoresyEvent>(addResolve(index));

const idsToIndex: { [id: string]: number } = {};

let finished = false;

const eventsNavigator: TestEventNavigator = {
getLatestId: () => getIndex(0).then(event => event.id),
getPreviousId: (id: string) => {
const index = idsToIndex[id];
return index === events.length - 1 && finished
? Promise.resolve(null)
: getIndex(index + 1).then(e => e.id);
},
getNextId: () => {
throw new Error("Not implemented.");
},
getEventById: (id: string) =>
Object.prototype.hasOwnProperty.call(idsToIndex, id)
? getIndex(idsToIndex[id])
: getIndex(events.length).then(() => eventsNavigator.getEventById(id)),
addPrevious: (event: ZoresyEvent) => {
events.push(event);
const index = events.length - 1;
idsToIndex[event.id] = index;
resolveIndex(index);
return new Promise<void>(resolve => resolve());
},
finish: () => {
finished = true;
Object.getOwnPropertyNames(waitForIndex).forEach(resolveIndex);
return new Promise<void>(resolve => resolve());
}
};
await eventsNavigator.addPrevious(ce("create"));
return eventsNavigator;
};


+ 30
- 15
src/test-helpers/streams.ts View File

@@ -1,18 +1,33 @@
import { Stream } from "@most/types";
import { tap, take, runEffects, map } from "@most/core";
import { Stream, Time, Disposable, Sink } from "@most/types";
import { withItems, periodic, run } from "@most/core";
import { newDefaultScheduler } from "@most/scheduler";

const id = <X>(x: X) => x;

export const streamToList = <A, B = A>(
stream: Stream<A>,
transform: (source: A) => B,
maxEvents: number | null = null
) => {
const lst: A[] = [];
const tappedStream = tap(x => lst.push(x), stream);
const mappedStream = map(transform, tappedStream);
const limitedStream =
maxEvents !== null ? take(maxEvents, mappedStream) : mappedStream;
return runEffects(limitedStream, newDefaultScheduler()).then(() => lst);
export const streamToList = <A>(stream: Stream<A>) => {
let disposable: Disposable | null = null;
return new Promise<A[]>((resolve, reject) => {
const lst: A[] = [];
const sink: Sink<A> = {
event: (_: Time, a: A) => {
lst.push(a);
},
error: (_: Time, error: any) => {
reject({
lst,
error,
});
},
end: (_: Time) => {
resolve(lst);
},
};
const scheduler = newDefaultScheduler();
disposable = run(sink, scheduler, stream);
}).finally(() => {
if (disposable) {
disposable.dispose();
}
});
};

export const listToStream = <A>(events: A[]) =>
withItems(events, periodic(0));

Loading…
Cancel
Save