FAQ for Sybase CEP users - Trying to migrate to ESP
- What is the ESP equivalent for Sybase CEP’s GETTIMESTAMP() ?
- What is the equivalent clause in ESP for CEP’s - OUTPUT AT STARTUP ?
- What is the equivalent in ESP for CEP’s - OUTPUT EVERY ?
- What is the paradigm difference between Sybase CEP and ESP ?
- Can I view all streams in ESP like in Sybase CEP ?
- Can a stream be connected to a stream higher up in the flow ?
- What is the equivalent of public windows in ESP ?
- What is the interaction between 2 different flex operators in the same flex stream ?
- What is the equivalent in ESP of CEP’s OUTPUT AFTER clause?
- Can I join a stream/window without a common column ?
- Is there a way to use unnamed window in a stream like in Sybase CEP?
- How can a user test a project using message timestamp data to simulate a production environment and test the data based on its arrival rate ?
- What is an opcode ?
- What is the ESP equivalent for Sybase CEP’s KEEP LAST PER clause ?
- What is the difference between Sybase CEP parameters and ESP parameters ?
- What is the equivalent of a CCS schema file in ESP ?
- Does every input stream need to have a memory store and a primary key ?
- Can I declare streams and windows in any place in the project as in Sybase CEP ?
- How can I make my project server timestamped or message timestamped in ESP ?
- How do I delete the contents of the window in ESP ? or what is the equivalent is ESP for the ON DELETE clause in Sybase CEP ?
- Is there the concept of variables in ESP like in Sybase CEP ? How do I use them ?
- I have an IF..ELSE query in Sybase CEP, how do I convert it to ESP ?
- How do I perform a window lookup to see if data is present or not present using subqueries in a where clause ? How do I port a Sybase CEP ccl like INSERT into s1 select * from s2 where s2.symbol in (select * from s3) ?
In Sybase CEP every row has a timestamp value and GETTIMESTAMP() returns that value. ESP has an implicit column called BIGROWTIME which provides the last modification time of the row and this can be accessed as any other column can be accessed.
CREATE INPUT WINDOW TestStream SCHEMA
(Ts bigdatetime , symbol string , Price money(4),volume integer )
PRIMARY KEY(Ts)
;
Create OUTPUT WINDOW Stream2 schema
(Ts bigdatetime,currentrow_time bigdatetime , symbol string , Price money(4),volume integer )
PRIMARY KEY(Ts)
AS
SELECT
T.Ts as Ts,
T.BIGROWTIME as currentrow_time,
T.symbol as symbol,
T.Price as Price,
T.volume as volume
FROM TestStream T
;
OUTPUT AT STARTUP is not present in ESP. In Sybase CEP OUTPUT AT STARTUP is generally used where you want to load some type of information into a window (generally some type of lookup) before starting the processing of the other queries in the project. Something similar can be achieved in ESP using the ADAPTER START GROUPS statement which starts one adapter before another adapter can be started up. Say for example using adapter start groups one can start a database input adapter to read from the data base and store it in a window before starting the csv input adapter which then brings in data for the actual processing.
When deciding on trying to rewrite the OUTPUT EVERY clause one needs to understand the paradigm difference between Sybase CEP and ESP and whether it is required in the project. In Sybase CEP there is no difference between an update row and an insert row, all rows appear continuously with unique timestamp as there is no key field, but in ESP there is a key field and when the second row comes up with the same key field it is treated as an update and it overwrites the previous row. Generally in Sybase CEP OUTPUT EVERY clause is used when there are too many updates and we just want one row every 10 seconds so that we don’t overwhelm the downstream queries. To port the OUTPUT EVERY clause in ESP the user should understand the requirements before trying a solution in ESP as we can leverage the ESP functionality better by doing it. However here is one workaround.
CREATE SCHEMA sTrade (Ts bigdatetime , symbol string , Price money(4),volume integer );
CREATE INPUT STREAM TestStream SCHEMA sTrade;
CREATE FLEX TestFlex
In TestStream
OUT OUTPUT Stream Trades Schema sTrade
BEGIN
DECLARE
typeof(TestStream) rec;
END;
ON TestStream
{
rec := TestStream;
};
EVERY 10 seconds
{
output setOpcode(rec,upsert);
};
END;
In Sybase CEP every project is a thread and the engine processes the messages in small timestamps. The messages pass through queries based on strict rules of processing. There is no key field so there is no differentiation between an insert and a update message. It is a deterministic engine.
ESP is more of a CRUD system where rows are identified as inserts, updates and deletes. In ESP each stream/window is a thread and messages get pushed as soon as they arrive in the input queue of the thread. As this is a multithreaded system there is no control over which thread can be executed before the other. Because of this ESP is not a deterministic engine as we don’t have control over which thread gets executed first, the data gets pushed as soon as it appears in the thread. There is no guarantee that message A will be processed before message B when the message arrive from different streams.
The ESP stream viewer works on a subscription mechanism that is different from what it was in Sybase CEP and at present you can only view input and output streams/windows. Local streams/windows cannot be viewed through stream viewer.
It is not possible to loop to a higher up stream in ESP as it was in Sybase CEP. Looping is not possible in ESP and you may need to rethink the logic of the project the ESP way to leverage the functionalities of ESP. In cases where the logic definitely needs looping then the workaround is to use a flex stream.
In ESP all input/output windows are public windows meaning that they can be queried through a SQL port interface. Local windows cannot be queried as well because they cannot be subscribed to.
CREATE FLEX A
IN 1, 2
OUT OUTPUT WINDOW FlexOperator1
...
SCHEMA(...)
BEGIN
ON 1
{...} //and also
ON 2 ...
In the SPLASH code for the “ON 2 ...” part, is it possible to reference values in “ON 1 ...” SPLASH code?
Yes, it is possible to access the streams between the 2 different flex_operator, but one needs to understand that the trigger of the flex operator happens only when data for that stream appears.
CREATE INPUT WINDOW TestStream SCHEMA
(Ts bigdatetime , symbol string , Price money(4),volume integer )
PRIMARY KEY(Ts)
;
ATTACH INPUT ADAPTER Adapter1 TYPE dsv_in TO TestStream
PROPERTIES
blockSize=1,
dateFormat='%Y/%m/%d %H:%M:%S',
delimiter=',',
dir='C:\\Documents and Settings\\vchittar\\My Documents\\SybaseESP\\5.0\\workspace\\exampledata',
expectStreamNameOpcode=false,
fieldCount=0,
file='stock-trades.csv',
filePattern='*.csv',
hasHeader=true,
safeOps=false,
skipDels=false,
timestampFormat= '%Y/%m/%d %H:%M:%S';
Create INPUT WINDOW Stream2 schema
(Ts bigdatetime , symbol string , Price money(4),volume integer )
PRIMARY KEY(Ts)
;
CREATE FLEX TestFlex
In TestStream,Stream2
OUT OUTPUT Window Trades
Schema
(Ts bigdatetime , symbol string , Price money(4),volume integer )
Primary Key(Ts)
BEGIN
ON TestStream
{
};
ON Stream2
{
TestStream_iterator:=getIterator(TestStream_stream);
TestStream :=getNext(TestStream_iterator);
while(not(isnull(TestStream)))
{
print('Symbol ',TestStream.symbol);
TestStream:=getNext(TestStream_iterator);
}
deleteIterator(TestStream_iterator);
END;
The OUTPUT AFTER clause is used in Sybase CEP to delay the output by some time period. If a row appears at 12:00:00 <message1> then OUTPUT AFTER 10 seconds clause sends the message 10 seconds later. We need to understand why this delay is required, and try to approach the problem with the ESP paradigm in mind. ESP does not process messages based on timestamps values. To delay the output in ESP one can use the Aging clause. Here is a sample of the aging clause..
CREATE INPUT STREAM TestStream SCHEMA
(Ts bigdatetime , symbol string , Price money(4),volume integer )
;
ATTACH INPUT ADAPTER Adapter1 TYPE dsv_in TO TestStream
PROPERTIES
blockSize=1,
dateFormat='%Y/%m/%d %H:%M:%S',
delimiter=',',
dir='C:\\Documents and Settings\\vchittar\\My Documents\\SybaseESP\\5.0\\workspace\\exampledata',
expectStreamNameOpcode=false,
fieldCount=0,
file='stock-trades.csv',
filePattern='*.csv',
hasHeader=true,
safeOps=false,
skipDels=false,
timestampFormat= '%Y/%m/%d %H:%M:%S';
CREATE OUTPUT WINDOW W1 SCHEMA
(Ts bigdatetime,symbol string,Price money(4),volume integer,A1 integer)
PRIMARY KEY DEDUCED
AGES EVERY 1 second SET A1 10 times
AS
SELECT
TestStream.Ts as Ts,
TestStream.symbol as symbol,
TestStream.Price as Price,
TestStream.volume as volume,
0 as A1
FROM TestStream
GROUP BY TestStream.Ts;
CREATE OUTPUT STREAM S2 SCHEMA
(Ts bigdatetime,symbol string,Price money(4),volume integer)
AS
SELECT
W1.Ts as Ts,
W1.symbol as symbol,
W1.Price as Price,
W1.volume as volume
FROM W1
WHERE W1.A1=10;
For StreamA(1,2,3) and Window(4,5,6), we wish to join them and have Stream/Window C(1,2,3,4,5,6).
This can be done in ESP using a comma separated join.
========================================
CREATE INPUT STREAM S1 SCHEMA (Column1 integer,Column2 integer);
CREATE INPUT Window W1 SCHEMA(Column3 integer,Column4 integer)
PRIMARY KEY(Column3);
CREATE OUTPUT Stream S3 SCHEMA(Column1 integer,Column2 integer,Column3 integer,Column4 integer)
AS
SELECT
S1.Column1 as Column1,
S1.Column2 as Column2,
W1.Column3 as Column3,
W1.Column4 as Column4
FROM S1,W1;
==================================================
This basically does a Cartesian product of the stream and window and produces it as output.
Please note the output is a stream here.
If the output is a window then a window requires a key and the query is written like this.
=====================================================
CREATE INPUT STREAM S1 SCHEMA (Column1 integer,Column2 integer);
CREATE INPUT Window W1 SCHEMA(Column3 integer,Column4 integer)
PRIMARY KEY(Column3);
CREATE OUTPUT Window S3 SCHEMA(Column1 integer,Column2 integer,Column3 integer,Column4 integer)
PRIMARY KEY DEDUCED
AS
SELECT
S1.Column1 as Column1,
S1.Column2 as Column2,
W1.Column3 as Column3,
W1.Column4 as Column4
FROM S1,W1
GROUP BY
W1.Column3
;
=================================================
Unnamed windows cannot be used on streams. The workaround is to use an event cache and do aggregations using eventcache.
CREATE OUTPUT WINDOW wMarket_ask
SCHEMA(Pair string,Rate FLOAT)
PRIMARY KEY DEDUCED
DECLARE
eventCache(sMarket_ask[Pair], 20 seconds) cache ;
END
AS
SELECT
sMarket_ask.Pair as Pair,
sum(cache.Rate) as Rate
FROM sMarket_ask
GROUP BY sMarket_ask.Pair;
ESP has an esp_playback command that can do this. The esp_playback command can run the csv files based on the column timestamp value.
Every row in ESP is identified with an opcode. Insert(i),delete(d),upsert(p),update(u) and safedeletes(s) are the different opcodes. Please see a sample csv file
W1 |
i |
IBM |
100 |
12.2 |
|
W1 |
i |
MSFT |
200 |
12 |
|
W1 |
u |
IBM |
200 |
13 |
|
W1 |
i |
SAP |
300 |
14 |
|
W1 |
u |
SAP |
222 |
10 |
It is possible to send data to the engine with opcodes or without it. If the opcode is not present in the input data then the engine understands every row with new primary key as an insert row and a new row with an existing primary key as an update row. This functionality is for windows, for streams the functionality is different as there are no keys and all rows are considered as insert rows so when a row with an update opcode arrives the engine converts it to an insert opcode row and a delete opcode row is discarded.
ESP works with a key field of the data so a KEEP LAST PER can be modified as
CREATE OUTPUT WINDOW S1 schema(Ts bigdatetime,symbol string , Price money(4),volume integer)
PRIMARY KEY DEDUCED
AS
select
NEWSTREAM.Ts as Ts,
NEWSTREAM.symbol as symbol,
NEWSTREAM.Price as Price,
NEWSTREAM.volume as volume
from NEWSTREAM
GROUP BY NEWSTREAM.symbol
;
This will automatically give you the last row for the GROUP BY column.
Parameters are compile time arguments. They cannot be modified at run time. Parameters in ESP are very similar to Sybase CEP . The major difference between Sybase CEP parameters and ESP parameters is that once we set the parameter we cannot change it in Sybase CEP and every time we want to change the parameter value we need to modify the parameter in the project and recompile and redeploy the project. Parameters in ESP is stored as part of the .ccr file. This makes it very easy as we can just modify the .ccr file and redeploy the project, and there is no need to recompile. This is also a big advantage for projects which need to run with different parameter values.
There is no CCS schema file in ESP, instead there is a different feature where we can create a schema in another CCL file and we can import the CCL file with the schema declaration in to the project CCL file.
CREATE SCHEMA TradesSchema(symbol string,volume integer,price float);
and use the ccl file in your project file using the clause
IMPORT 'schemas.ccl'
CREATE INPUT STREAM st1 SCHEMA TradesSchema;
A stream does not require a primary key or a memory store. It is only a window that requires a primary key and a store. If you don’t declare a memory store for a window then a memory store is automatically created for you.
IMPORT ' tradesschema.ccl';
CREATE INPUT STREAM TopOrder
SCHEMA TradesSchema;
CREATE OUTPUT STREAM OutExecutionReport SCHEMA TradesSchema
AS
SELECT * from TopOrder;
CREATE LOCAL STREAM L1 SCHEMA TradesSchema
AS
SELECT * from TopOrder;
In ESP, you cannot create a stream and then do an insert as you do in Sybase CEP. You need to declare the stream along with the flow of the data. In ESP, CCL is modeled more with the flow of the data which is different from Sybase CEP. In the example shown above, you cannot declare the OutExecutionReport stream before the TopOrder stream because OutExecution relies on TopOrder so TopOrder should be declared before OutExecution.
There is no concept of server timestamp and message timestamp in ESP. ESP does not process messages based on timestamp. ESP processes each message as soon as it arrives in its queue.
//Input stream that fills the window
CREATE INPUT Stream S1 SCHEMA (Col1 integer,Col2 integer,Col3 integer);
//Trigger stream to delete the content of the window
CREATE INPUT STREAM trigger SCHEMA(Col1 integer);
CREATE FLEX TestFlex
In trigger,S1
OUT OUTPUT Window W2
Schema
(Col1 integer , Col2 integer , Col3 integer)
Primary Key(Col1)
BEGIN
DECLARE
//Event cache that stores the stream data that comes from S1 as one whole bucket
eventcache(S1[]) events;
END;
ON S1
{
typeof(S1) rec;
rec:=S1;
output setOpcode(rec,upsert);
};
ON trigger
{
long idx := cacheSize(events) -1;
while(idx >=0){
//Get the information from the cache and send the row with delete opcode
//delete the cache data.
typeof(S1) rec := getCache(events,idx);
deleteCache(events,idx);
output setOpcode(rec,safedelete);
idx--;
}
};
END;
When deleting the content of the window the deletion propagates to downstream queries. If a user wishes to stop the propagation then the user should insert a stream after the window.
ESP has two different types of variables global and local. Both types of variable can be modified using a query. Here is a example of a local variable value getting modified in a query based on some trigger from a stream.
DECLARE
PARAMETER float Initial := 12.0;
END;
CREATE SCHEMA sInput (Val float);
CREATE INPUT STREAM S1 SCHEMA sInput;
CREATE OUTPUT STREAM Change1 SCHEMA sInput
DECLARE
//variable whose value is modified based on the input stream S1
float Lastval:=Initial;
END
AS SELECT
(Lastval:=S1.Val-Lastval) as Val
FROM S1;
ESP has a CASE-WHEN-THEN-ELSE conditional statement clause to do the equivalent of IF..ELSE.
A select clause in a where clause is not supported in ESP. The way to do a window lookup like this in ESP is using a flex stream. A very simple way to do it in flex stream is using a keyvalue lookup in flex.
CREATE SCHEMA schTweetWords
(
ID Long,
Word String
);
//Words that need to be stopped from tweeting. The window on which lookup needs
//to be performed
CREATE INPUT WINDOW iwStopwords SCHEMA ( Word string ) PRIMARY KEY ( Word ) ;
CREATE INPUT STREAM sTweetWords SCHEMA schTweetWords;
CREATE FLEX flexOp
IN iwStopwords,sTweetWords
OUT OUTPUT stream fStopWords SCHEMA(Word String)
BEGIN
ON sTweetWords
{
typeof(iwStopwords) iwStoprec;
typeof(sTweetWords) sTweetrec;
sTweetrec := sTweetWords;
//GetValue by key to see if value is present in iwStopwords window
//This will directly check the stream for data present and returns the output
iwStoprec := iwStopwords_stream [ [ Word = sTweetWords.Word ; | ] ];
if(iwStoprec is null)
{
//Insert into sTweetWords if data not present in iwStopwods
output setOpcode(sTweetrec,insert);
}
};
ON iwStopwords
{
};
END;
FAQ for SAP Sybase ESP users – Developing New Applications
- What is an eventcache in ESP?
- Can we use dictionaries in a CCL query?
- How do I use command line commands to start a project in ESP?
- Why does a query like this produce only one row in an output window?
- What are the ways in ESP to reduce memory usage in the project?
- Why do I see a memory increase when data is sent through a query like this ?
Eventcache can be described as a small window on the input stream in the query on which aggregation and other computations can be performed. It is a very powerful feature in ESP and can be used in many different places where one needs to compare data with previous stream input. Below is an example of this:
CREATE INPUT STREAM S1 SCHEMA (ID integer, Val integer);
CREATE OUTPUT WINDOW Change1 SCHEMA (ID integer, lastVal integer, prevVal integer, chg integer)
PRIMARY KEY DEDUCED
DECLARE
eventCache(S1[ID], 5 events) ec1;
END
AS SELECT
S1.ID as ID,
S1.Val as lastVal,
nth(1,ec1.Val) as prevVal,
(S1.Val - nth(1,ec1.Val)) as chg
FROM S1
GROUP BY (S1.ID);
Yes, the dictionary data type can be used in CCL queries also.
CREATE INPUT STREAM S1 SCHEMA (ID integer, Val integer);
CREATE OUTPUT WINDOW Change1 SCHEMA (ID integer, lastVal integer, chg integer)
PRIMARY KEY DEDUCED
DECLARE
dictionary(integer, integer) prev;
integer temp;
END
AS SELECT
S1.ID as ID,
last(S1.Val) as lastVal,
(temp:=prev[S1.ID];prev[S1.ID]:= S1.Val; S1.Val-temp) as chg
FROM S1
GROUP BY (S1.ID);
1) Start a cluster manager node:
$ esp_server --cluster-node=node1.xml
The node1.xml file can be found under $ESP_HOME/cluster/nodes/node1
2) Log into the cluster manager, create a workspace, and start projects under the cluster manager:
a. Log in to cluster manager
$ esp_cluster_admin --uri esp://<hostname>:<port>
b. Add a workspace
> add workspace <workspace_name>
c. Add a project
> add project <workspace_name>/<project_name> <project>.ccx
d. Start the project
> start project <workspace_name>/<project_name>
e. Get details of the project
> get project <workspace_name>/<project_name>
f. (Optional) Make command port and sql port static via project configuration file instead of dynamically picked by cluster manager
> add project <workspace_name>/<project_name> <project>.ccx <project>.ccr
3) (Optional) Interactive admin commands can be issued via the command line as independent commands
$ esp_cluster_admin --uri esp://<hostname>:<port> --get_projectdetail --workspace-name <workspace_name> --project-name <project_name>
CREATE INPUT WINDOW W1 SCHEMA (symbol string,volume integer,price float)
PRIMARY KEY(symbol)
KEEP 1 ROW
;
CREATE OUTPUT WINDOW W2 SCHEMA(symbol string, volume integer,price float)
PRIMARY KEY(symbol)
KEEP ALL
As
SELECT * FROM W1;
The reason for this is every time a new row is inserted in to window W1 the old row is deleted as the first query has KEEP 1 row policy. The deletion propagates downstream and the row gets deleted in W2 also.
To minimize memory usage users can use
1. Streams,
2. Windows with KEEP policy
3. Delta streams.
Streams don’t have any storage and can understand only insert opcode. When a row with update opcode arrives it is converted to insert opcode and used in a stream. A delete opcode row is ignored by Streams.
Windows with KEEP policy store rows based on the keep policy. If there is a row with keep 1 row policy and when a new row comes in, the engine deletes the previous row and inserts a new row. This propagates deletion downstream and the corresponding row gets deleted in all downstream entities.
A Delta stream is a new feature in ESP which was not present in Aleri. A Delta stream does not store any rows and understands all opcodes. A Delta streams use the input stream storage and decides on what to do with the row that arrives. Delta stream also has the ability to change the opcodes if it is used as a filter.
For example..
CREATE INPUT WINDOW W1 SCHEMA (Symbol string,Volume integer,Price Float)
PRIMARY KEY(Symbol)
;
CREATE OUTPUT DELTA STREAM S1 SCHEMA(Symbol string, Volume integer,Price Float)
PRIMARY KEY(Symbol)
AS
SELECT * from W1
where W1.Volume > 100;
CREATE OUTPUT WINDOW W2 SCHEMA(Symbol string, Volume integer,Price Float)
PRIMARY KEY(Symbol)
AS
SELECT * from S1;
If the data send in as input is:
i,SAP,90,3
u,SAP,120,3
u,SAP,200,3
then the first row is rejected as it does not satisfy the filter. The second row gets converted to an insert and propagates downstream as an insert row and the third row propagates as an update opcode downstream.
A Delta stream has the ability to change opcodes referencing the data from the input. It determines what opcode it needs to output for a particular incoming event by checking whether data has gone through it or not by referencing the input window data, although it doesn't store the data by itself. There are some restrictions to its usage which are documented under DELTA stream documentation in the CCL guide.
Create input stream S1 schema(symbol string,volume integer,price float) ;
Create output window w1 schema(symbol string,volume integer,price float)
Primary key deduced
Keep 1 row
As
Select * from S1
Group by Symbol;
Although there is only a stream and window with keep 1 row policy involved here the engine has to store all the rows for aggregation of the group by clause and this causes the memory to increase. The KEEP 1 row is only for the output of the query and the aggregation is done on the input and that causes the memory to increase.

Back to Top