Projection management
The various gRPC client APIs include dedicated clients that allow you to manage projections.
WARNING
Currently not all clients fully expose all operations.
For a detailed explanation of projections, see the server documentation.
You can find the full sample code from this documentation page in the respective clients repositories.
Required packages
Install the client SDK package to your project.
# From Pypi
$ pip install esdbclient
# With Poetry
$ poetry add esdbclient
# Yarn
$ yarn add @eventstore/db-client
# NPM
$ npm install --save @eventstore/db-client
# TypeScript Declarations are included in the package.
# Yarn
$ yarn add @eventstore/db-client
# NPM
$ npm install --save @eventstore/db-client
# Maven
<dependency>
<groupId>com.eventstore</groupId>
<artifactId>db-client-java</artifactId>
<version>5.2.0</version>
</dependency>
# Gradle
implementation 'com.eventstore:db-client-java:5.2.0'
dotnet add package EventStore.Client.Grpc.ProjectionManagement --version 23.*
// This is currently not available in the Go client
No additional configuration is needed having Rust installed. Go check https://rustup.rs.
Creating a client
Projection management operations are exposed through a dedicated client.
client = EventStoreDBClient(
uri="{connectionString}"
)
const client = EventStoreDBClient.connectionString`
esdb+discover://${ADMIN}:${PASSWORD}@${ENDPOINT}?nodePreference=leader
const client = EventStoreDBClient.connectionString`
esdb+discover://${ADMIN}:${PASSWORD}@${ENDPOINT}?nodePreference=leader
EventStoreDBClientSettings settings = EventStoreDBConnectionString.parseOrThrow(connection);
EventStoreDBProjectionManagementClient client = EventStoreDBProjectionManagementClient.create(settings);
var settings = EventStoreClientSettings.Create(connection);
settings.ConnectionName = "Projection management client";
settings.DefaultCredentials = new UserCredentials("admin", "changeit");
var managementClient = new EventStoreProjectionManagementClient(settings);
// This is currently not available in the Go client
// This is currently not available in the Rust client
Create a projection
Creates a projection that runs until the last event in the store, and then continues processing new events as they are appended to the store. The query parameter contains the JavaScript you want created as a projection. Projections have explicit names, and you can enable or disable them via this name.
# This is currently not available in the Python client
const name = `countEvents_Create_${uuid()}`;
const projection = `
fromAll()
.when({
$init() {
return {
count: 0,
};
},
$any(s, e) {
s.count += 1;
}
})
.outputState();
await client.createProjection(name, projection);
const name = `countEvents_Create_${uuid()}`;
const projection = `
fromAll()
.when({
$init() {
return {
count: 0,
};
},
$any(s, e) {
s.count += 1;
}
})
.outputState();
await client.createProjection(name, projection);
String js =
"fromAll()" +
".when({" +
" $init: function() {" +
" return {" +
" count: 0" +
" };" +
" }," +
" $any: function(s, e) {" +
" s.count += 1;" +
" }" +
"})" +
".outputState();";
String name = "countEvents_Create_" + java.util.UUID.randomUUID();
client.create(name, js).get();
const string js = @"fromAll()
.when({
$init: function() {
return {
count: 0
};
},
$any: function(s, e) {
s.count += 1;
}
})
.outputState();";
var name = $"countEvents_Create_{Guid.NewGuid()}";
await managementClient.CreateContinuousAsync(name, js);
// This is currently not available in the Go client
// This is currently not available in the Rust client
Trying to create projections with the same name will result in an error:
# This is currently not available in the Python client
try {
await client.createProjection(name, projection);
} catch (err) {
if (!isCommandError(err) || !err.message.includes("Conflict"))
throw err;
console.log(`${name} already exists`);
}
try {
await client.createProjection(name, projection);
} catch (err) {
if (!isCommandError(err) || !err.message.includes("Conflict"))
throw err;
console.log(`${name} already exists`);
}
try {
client.create(name, js).get();
} catch (ExecutionException ex) {
if (ex.getMessage().contains("Conflict")) {
System.out.println(name + " already exists");
}
}
await managementClient.CreateContinuousAsync(name, js);
try {
await managementClient.CreateContinuousAsync(name, js);
}
catch (RpcException e) when (e.StatusCode is StatusCode.AlreadyExists) {
Console.WriteLine(e.Message);
}
catch (RpcException e) when (e.Message.Contains("Conflict")) { // will be removed in a future release
var format = $"{name} already exists";
Console.WriteLine(format);
}
// This is currently not available in the Go client
// This is currently not available in the Rust client
Restart the subsystem
Restarts the entire projection subsystem. The user must be in the $ops
or $admin
group to perform this operation
# This is currently not available in the Python client
await client.restartSubsystem();
await client.restartSubsystem();
client.restartSubsystem().get();
await managementClient.RestartSubsystemAsync();
// This is currently not available in the Go client
// This is currently not available in the Rust client
Enable a projection
Enables an existing projection by name. Once enabled, the projection will start to process events even after restarting the server or the projection subsystem. You must have access to a projection to enable it, see the ACL documentation
# This is currently not available in the Python client
await client.enableProjection("$by_category");
await client.enableProjection("$by_category");
client.enable("$by_category").get();
await managementClient.EnableAsync("$by_category");
// This is currently not available in the Go client
// This is currently not available in the Rust client
You can only enable an existing projection. When you try to enable a non-existing projection, you'll get an error:
# This is currently not available in the Python client
const projectionName = "projection that does not exist";
try {
await client.enableProjection(projectionName);
} catch (err) {
if (!isCommandError(err) || !err.message.includes("NotFound"))
throw err;
console.log(`${projectionName} does not exist`);
}
const projectionName = "projection that does not exist";
try {
await client.enableProjection(projectionName);
} catch (err) {
if (!isCommandError(err) || !err.message.includes("NotFound"))
throw err;
console.log(`${projectionName} does not exist`);
}
try {
client.disable("projection that does not exists").get();
} catch (ExecutionException ex) {
if (ex.getMessage().contains("NotFound")) {
System.out.println(ex.getMessage());
}
}
try {
await managementClient.EnableAsync("projection that does not exists");
}
catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) {
Console.WriteLine(e.Message);
}
catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release
Console.WriteLine(e.Message);
}
// This is currently not available in the Go client
// This is currently not available in the Rust client
Disable a projection
Disables a projection, this will save the projection checkpoint. Once disabled, the projection will not process events even after restarting the server or the projection subsystem. You must have access to a projection to disable it, see the ACL documentation.
# This is currently not available in the Python client
await client.disableProjection("$by_category");
await client.disableProjection("$by_category");
client.disable("$by_category").get();
await managementClient.DisableAsync("$by_category");
// This is currently not available in the Go client
// This is currently not available in the Rust client
You can only disable an existing projection. When you try to disable a non-existing projection, you'll get an error:
# This is currently not available in the Python client
const projectionName = "projection that does not exist";
try {
await client.disableProjection(projectionName);
} catch (err) {
if (!isCommandError(err) || !err.message.includes("NotFound"))
throw err;
console.log(`${projectionName} does not exist`);
}
const projectionName = "projection that does not exist";
try {
await client.disableProjection(projectionName);
} catch (err) {
if (!isCommandError(err) || !err.message.includes("NotFound"))
throw err;
console.log(`${projectionName} does not exist`);
}
try {
client.disable("projection that does not exists").get();
} catch (ExecutionException ex) {
if (ex.getMessage().contains("NotFound")) {
System.out.println(ex.getMessage());
}
}
try {
await managementClient.DisableAsync("projection that does not exists");
}
catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) {
Console.WriteLine(e.Message);
}
catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release
Console.WriteLine(e.Message);
}
// This is currently not available in the Go client
// This is currently not available in the Rust client
Delete a projection
Deletes a projection
# This is currently not available in the Python client
// A projection must be disabled to allow it to be deleted.
await client.disableProjection(name);
// The projection can now be deleted
await client.deleteProjection(name);
// A projection must be disabled to allow it to be deleted.
await client.disableProjection(name);
// The projection can now be deleted
await client.deleteProjection(name);
// A projection must be disabled to allow it to be deleted.
client.disable(name).get();
// The projection can now be deleted
client.delete(name).get();
// this is not yet available in the .net grpc client
// This is currently not available in the Go client
// This is currently not available in the Rust client
You can only delete an existing projection. When you try to delete a non-existing projection, you'll get an error:
# This is currently not available in the Python client
const projectionName = "projection that does not exist";
try {
await client.deleteProjection(projectionName);
} catch (err) {
if (!isCommandError(err) || !err.message.includes("NotFound"))
throw err;
console.log(`${projectionName} does not exist`);
}
const projectionName = "projection that does not exist";
try {
await client.deleteProjection(projectionName);
} catch (err) {
if (!isCommandError(err) || !err.message.includes("NotFound"))
throw err;
console.log(`${projectionName} does not exist`);
}
try {
client.delete("projection that does not exists").get();
} catch (ExecutionException ex) {
if (ex.getMessage().contains("NotFound")) {
System.out.println(ex.getMessage());
}
}
// This is currently not available in the .Net client
// This is currently not available in the Go client
// This is currently not available in the Rust client
Abort a projection
Aborts a projection, this will not save the projection's checkpoint.
# This is currently not available in the Python client
await client.abortProjection(name);
await client.abortProjection(name);
client.abort("$by_category").get();
// The .net clients prior to version 21.6 had an incorrect behavior: they will save the checkpoint.
await managementClient.AbortAsync("countEvents_Abort");
// This is currently not available in the Go client
// This is currently not available in the Rust client
You can only abort an existing projection. When you try to abort a non-existing projection, you'll get an error:
# This is currently not available in the Python client
const projectionName = "projection that does not exist";
try {
await client.abortProjection(projectionName);
} catch (err) {
if (!isCommandError(err) || !err.message.includes("NotFound"))
throw err;
console.log(`${projectionName} does not exist`);
}
const projectionName = "projection that does not exist";
try {
await client.abortProjection(projectionName);
} catch (err) {
if (!isCommandError(err) || !err.message.includes("NotFound"))
throw err;
console.log(`${projectionName} does not exist`);
}
try {
client.abort("projection that does not exists").get();
} catch (ExecutionException ex) {
if (ex.getMessage().contains("NotFound")) {
System.out.println(ex.getMessage());
}
}
try {
await managementClient.AbortAsync("projection that does not exists");
}
catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) {
Console.WriteLine(e.Message);
}
catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release
Console.WriteLine(e.Message);
}
// This is currently not available in the Go client
// This is currently not available in the Rust client
Reset a projection
Resets a projection. This will re-emit events. Streams that are written to from the projection will also be soft deleted.
# This is currently not available in the Python client
await client.resetProjection(name);
await client.resetProjection(name);
client.reset("$by_category").get();
// Checkpoint will be written prior to resetting the projection
await managementClient.ResetAsync("countEvents_Reset");
// This is currently not available in the Go client
// This is currently not available in the Rust client
Resetting a projection that does not exist will result in an error.
# This is currently not available in the Python client
const projectionName = "projection that does not exist";
try {
await client.resetProjection(projectionName);
} catch (err) {
if (!isCommandError(err) || !err.message.includes("NotFound"))
throw err;
console.log(`${projectionName} does not exist`);
}
const projectionName = "projection that does not exist";
try {
await client.resetProjection(projectionName);
} catch (err) {
if (!isCommandError(err) || !err.message.includes("NotFound"))
throw err;
console.log(`${projectionName} does not exist`);
}
try {
client.reset("projection that does not exists").get();
} catch (ExecutionException ex) {
if (ex.getMessage().contains("NotFound")) {
System.out.println(ex.getMessage());
}
}
try {
await managementClient.ResetAsync("projection that does not exists");
}
catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) {
Console.WriteLine(e.Message);
}
catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release
Console.WriteLine(e.Message);
}
// This is currently not available in the Go client
// This is currently not available in the Rust client
Update a projection
Updates a projection. The name parameter is the name of the projection to be updated. The query parameter contains the new JavaScript.
# This is currently not available in the Python client
const name = `countEvents_Update_${uuid()}`;
const projection = `
fromAll()
.when({
$init() {
return {
count: 0,
};
},
$any(s, e) {
s.count += 1;
}
})
.outputState();
await client.createProjection(name, "fromAll().when()");
await client.updateProjection(name, projection);
const name = `countEvents_Update_${uuid()}`;
const projection = `
fromAll()
.when({
$init() {
return {
count: 0,
};
},
$any(s, e) {
s.count += 1;
}
})
.outputState();
await client.createProjection(name, "fromAll().when()");
await client.updateProjection(name, projection);
String name = "countEvents_Update_" + java.util.UUID.randomUUID();
String js =
"fromAll()" +
".when({" +
" $init: function() {" +
" return {" +
" count: 0" +
" };" +
" }," +
" $any: function(s, e) {" +
" s.count += 1;" +
" }" +
"})" +
".outputState();";
client.create(name, "fromAll().when()").get();
client.update(name, js).get();
const string js = @"fromAll()
.when({
$init: function() {
return {
count: 0
};
},
$any: function(s, e) {
s.count += 1;
}
})
.outputState();";
var name = $"countEvents_Update_{Guid.NewGuid()}";
await managementClient.CreateContinuousAsync(name, "fromAll().when()");
await managementClient.UpdateAsync(name, js);
// This is currently not available in the Go client
// This is currently not available in the Rust client
You can only update an existing projection. When you try to update a non-existing projection, you'll get an error:
# This is currently not available in the Python client
const projectionName = "projection that does not exist";
try {
await client.updateProjection(projectionName, "fromAll().when()");
} catch (err) {
if (!isCommandError(err) || !err.message.includes("NotFound"))
throw err;
console.log(`${projectionName} does not exist`);
}
const projectionName = "projection that does not exist";
try {
await client.updateProjection(projectionName, "fromAll().when()");
} catch (err) {
if (!isCommandError(err) || !err.message.includes("NotFound"))
throw err;
console.log(`${projectionName} does not exist`);
}
try {
client.update("Update Not existing projection", "fromAll().when()").get();
} catch (ExecutionException ex) {
if (ex.getMessage().contains("NotFound")) {
System.out.println("'Update Not existing projection' does not exists and can not be updated");
}
}
try {
await managementClient.UpdateAsync("Update Not existing projection", "fromAll().when()");
}
catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) {
Console.WriteLine(e.Message);
}
catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release
Console.WriteLine("'Update Not existing projection' does not exists and can not be updated");
}
// This is currently not available in the Go client
// This is currently not available in the Rust client
List all projections
Returns a list of all projections, user defined & system projections. See the projection details section for an explanation of the returned values
# This is currently not available in the Python client
// This is currently not available in the nodejs client
// This is currently not available in the nodejs client
List<ProjectionDetails> details = client.list().get();
for (ProjectionDetails detail: details) {
System.out.println(
detail.getName() + ", " +
detail.getStatus() + ", " +
detail.getCheckpointStatus() + ", " +
detail.getMode() + ", " +
detail.getProgress());
}
var details = managementClient.ListAllAsync();
await foreach (var detail in details)
Console.WriteLine(
$@"{detail.Name}, {detail.Status}, {detail.CheckpointStatus}, {detail.Mode}, {detail.Progress}"
);
// This is currently not available in the Go client
// This is currently not available in the Rust client
List continuous projections
Returns a list of all continuous projections. See the projection details section for an explanation of the returned values
# This is currently not available in the Python client
const projections = await client.listProjections();
for (const { name, status, checkpointStatus, progress } of projections) {
console.log(name, status, checkpointStatus, progress);
}
const projections = await client.listProjections();
for (const { name, status, checkpointStatus, progress } of projections) {
console.log(name, status, checkpointStatus, progress);
}
List<ProjectionDetails> details = client.list().get();
for (ProjectionDetails detail: details) {
System.out.println(
detail.getName() + ", " +
detail.getStatus() + ", " +
detail.getCheckpointStatus() + ", " +
detail.getMode() + ", " +
detail.getProgress());
}
var details = managementClient.ListContinuousAsync();
await foreach (var detail in details)
Console.WriteLine(
$@"{detail.Name}, {detail.Status}, {detail.CheckpointStatus}, {detail.Mode}, {detail.Progress}"
);
// This is currently not available in the Go client
// This is currently not available in the Rust client
Get Status
Gets the status of a named projection. See the projection details section for an explanation of the returned values
# This is currently not available in the Python client
const projection = await client.getProjectionStatus(name);
console.log(
projection.name,
projection.status,
projection.checkpointStatus,
projection.progress
);
const projection = await client.getProjectionStatus(name);
console.log(
projection.name,
projection.status,
projection.checkpointStatus,
projection.progress
);
String name = "get_result_example";
String js =
"fromAll()" +
".when({" +
" $init() {" +
" return {" +
" count: 0," +
" };" +
" }," +
" $any(s, e) {" +
" s.count += 1;" +
" }" +
"})" +
".transformBy((state) => state.count)" +
".outputState();";
client.create(name, js).get();
Thread.sleep(500); //give it some time to process and have a state.
int result = client
.getResult(name, int.class)
.get();
System.out.println(result);
await managementClient.CreateContinuousAsync(name, js);
var status = await managementClient.GetStatusAsync(name);
Console.WriteLine(
$@"{status?.Name}, {status?.Status}, {status?.CheckpointStatus}, {status?.Mode}, {status?.Progress}"
);
// This is currently not available in the Go client
// This is currently not available in the Rust client
Get state
Retrieves the state of a projection.
# This is currently not available in the Python client
const name = `get_state_example`;
const projection = `
fromAll()
.when({
$init() {
return {
count: 0,
};
},
$any(s, e) {
s.count += 1;
}
})
.transformBy((state) => state.count)
.outputState();
`;
await client.createProjection(name, projection);
// Give it some time to count event
await delay(500);
const state = await client.getProjectionState(name);
console.log(`Counted ${state.count} events.`);
interface CountProjectionState {
count: number;
}
const name = `get_state_example`;
const projection = `
fromAll()
.when({
$init() {
return {
count: 0,
};
},
$any(s, e) {
s.count += 1;
}
})
.transformBy((state) => state.count)
.outputState();
await client.createProjection(name, projection);
// Give it some time to count event
await delay(500);
const state = await client.getProjectionState<CountProjectionState>(name);
console.log(`Counted ${state.count} events.`);
String name = "get_result_example";
String js =
"fromAll()" +
".when({" +
" $init() {" +
" return {" +
" count: 0," +
" };" +
" }," +
" $any(s, e) {" +
" s.count += 1;" +
" }" +
"})" +
".transformBy((state) => state.count)" +
".outputState();";
client.create(name, js).get();
Thread.sleep(500); //give it some time to process and have a state.
int result = client
.getResult(name, int.class)
.get();
System.out.println(result);
const string js =
"fromAll().when({$init:function(){return {count:0};},$any:function(s, e){s.count += 1;}}).outputState();";
var name = $"countEvents_State_{Guid.NewGuid()}";
await managementClient.CreateContinuousAsync(name, js);
//give it some time to process and have a state.
await Task.Delay(500);
var stateDocument = await managementClient.GetStateAsync(name);
var result = await managementClient.GetStateAsync<Result>(name);
Console.WriteLine(DocToString(stateDocument));
Console.WriteLine(result);
static async Task<string> DocToString(JsonDocument d) {
await using var stream = new MemoryStream();
var writer = new Utf8JsonWriter(stream, new JsonWriterOptions { Indented = false });
d.WriteTo(writer);
await writer.FlushAsync();
return Encoding.UTF8.GetString(stream.ToArray());
}
// This is currently not available in the Go client
// This is currently not available in the Rust client
Get result
Retrieves the result of the named projection and partition.
# This is currently not available in the Python client
const name = `get_result_example`;
const projection = `
fromAll()
.when({
$init() {
return {
count: 0,
};
},
$any(s, e) {
s.count += 1;
}
})
.transformBy((state) => state.count)
.outputState();
await client.createProjection(name, projection);
// Give it some time to have a result.
await delay(500);
const result = await client.getProjectionResult(name);
console.log(`Counted ${result} events.`);
const name = `get_result_example`;
const projection = `
fromAll()
.when({
$init() {
return {
count: 0,
};
},
$any(s, e) {
s.count += 1;
}
})
.transformBy((state) => state.count)
.outputState();
await client.createProjection(name, projection);
// Give it some time to have a result.
await delay(500);
const result = await client.getProjectionResult<number>(name);
console.log(`Counted ${result} events.`);
String name = "get_result_example";
String js =
"fromAll()" +
".when({" +
" $init() {" +
" return {" +
" count: 0," +
" };" +
" }," +
" $any(s, e) {" +
" s.count += 1;" +
" }" +
"})" +
".transformBy((state) => state.count)" +
".outputState();";
client.create(name, js).get();
Thread.sleep(500); //give it some time to process and have a state.
int result = client
.getResult(name, int.class)
.get();
System.out.println(result);
const string js = @"fromAll()
.when({
$init: function() {
return {
count: 0
};
},
$any: function(s, e) {
s.count += 1;
}
})
.outputState();";
var name = $"countEvents_Result_{Guid.NewGuid()}";
await managementClient.CreateContinuousAsync(name, js);
await Task.Delay(500); //give it some time to have a result.
// Results are retrieved either as JsonDocument or a typed result
var document = await managementClient.GetResultAsync(name);
var result = await managementClient.GetResultAsync<Result>(name);
Console.WriteLine(DocToString(document));
Console.WriteLine(result);
static string DocToString(JsonDocument d) {
using var stream = new MemoryStream();
using var writer = new Utf8JsonWriter(stream, new JsonWriterOptions { Indented = false });
d.WriteTo(writer);
writer.Flush();
return Encoding.UTF8.GetString(stream.ToArray());
}
// This is currently not available in the Go client
// This is currently not available in the Rust client
Projection Details
List all, list continuous and get status all return the details and statistics of projections
Field | Description |
---|---|
Name , EffectiveName | The name of the projection |
Status | A human readable string of the current statuses of the projection (see below) |
StateReason | A human readable string explaining the reason of the current projection state |
CheckpointStatus | A human readable string explaining the current operation performed on the checkpoint : requested , writing |
Mode | Continuous , OneTime , Transient |
CoreProcessingTime | The total time, in ms, the projection took to handle events since the last restart |
Progress | The progress, in %, indicates how far this projection has processed event, in case of a restart this could be -1% or some number. It will be updated as soon as a new event is appended and processed |
WritesInProgress | The number of write requests to emitted streams currently in progress, these writes can be batches of events |
ReadsInProgress | The number of read requests currently in progress |
PartitionsCached | The number of cached projection partitions |
Position | The Position of the last processed event |
LastCheckpoint | The Position of the last checkpoint of this projection |
EventsProcessedAfterRestart | The number of events processed since the last restart of this projection |
BufferedEvents | The number of events in the projection read buffer |
WritePendingEventsBeforeCheckpoint | The number of events waiting to be appended to emitted streams before the pending checkpoint can be written |
WritePendingEventsAfterCheckpoint | The number of events to be appended to emitted streams since the last checkpoint |
Version | This is used internally, the version is increased when the projection is edited or reset |
Epoch | This is used internally, the epoch is increased when the projection is reset |
The Status
string is a combination of the following values. The first 3 are the most common one, as the other one are transient values while the projection is initialised or stopped
Value | Description |
---|---|
Running | The projection is running and processing events |
Stopped | The projection is stopped and is no longer processing new events |
Faulted | An error occurred in the projection, StateReason will give the fault details, the projection is not processing events |
Initial | This is the initial state, before the projection is fully initialised |
Suspended | The projection is suspended and will not process events, this happens while stopping the projection |
LoadStateRequested | The state of the projection is being retrieved, this happens while the projection is starting |
StateLoaded | The state of the projection is loaded, this happens while the projection is starting |
Subscribed | The projection has successfully subscribed to its readers, this happens while the projection is starting |
FaultedStopping | This happens before the projection is stopped due to an error in the projection |
Stopping | The projection is being stopped |
CompletingPhase | This happens while the projection is stopping |
PhaseCompleted | This happens while the projection is stopping |