/ / Gibt es eine ereignisgesteuerte JSON REST-Client-API für Java? - Java, Json, Ruhe

Gibt es eine ereignisgesteuerte JSON REST-Client-API für Java? - Java, Json, Ruhe

Ich habe eine Java-Anwendung, die die RestTemplate-API von Spring verwendet, um präzise, ​​lesbare Konsumenten von JSON-REST-Diensten zu schreiben:

Im Wesentlichen:

 RestTemplate rest = new RestTemplate(clientHttpRequestFactory);
ResponseEntity<ItemList> response = rest.exchange(url,
HttpMethod.GET,
requestEntity,
ItemList.class);

for(Item item : response.getBody().getItems()) {
handler.onItem(item);
}

Die JSON-Antwort enthält eine Liste von Elementen, und wie Sie sehen können, habe ich ein ereignisgesteuertes Design in meinem eigenen Code, um jedes Element der Reihe nach zu behandeln. Die gesamte Liste befindet sich jedoch als Teil von im Speicher response, welche RestTemplate.exchange() produziert.

Ich möchte, dass die Anwendung Antworten verarbeiten kann, die eine große Anzahl von Elementen enthalten - beispielsweise 50.000. In diesem Fall gibt es zwei Probleme mit der aktuellen Implementierung:

  1. Es wird kein einzelnes Element behandelt, bis die gesamte HTTP-Antwort übertragen wurde - was zu einer unerwünschten Latenz führt.
  2. Das riesige Antwortobjekt befindet sich im Speicher und kann erst dann "GC" sein, wenn das letzte Element bearbeitet wurde.

Gibt es eine einigermaßen ausgereifte Java JSON / REST-Client-API, die Antworten ereignisgesteuert verarbeitet?

Ich stelle mir vor, Sie könnten so etwas tun wie:

 RestStreamer rest = new RestStreamer(clientHttpRequestFactory);

// Tell the RestStreamer "when, while parsing a response, you encounter a JSON
// element matching JSONPath "$.items[*]" pass it to "handler" for processing.
rest.onJsonPath("$.items[*]").handle(handler);

// Tell the RestStreamer to make an HTTP request, parse it as a stream.
// We expect "handler" to get passed an object each time the parser encounters
// an item.
rest.execute(url, HttpMethod.GET, requestEntity);

Ich schätze, ich könnte meine eigene Implementierung rollenvon diesem Verhalten beim Streamen von JSON-APIs von Jackson, GSON usw. - aber ich würde gerne erfahren, dass es etwas gibt, das es zuverlässig mit einer präzisen, ausdrucksstarken API macht, die in den HTTP-Aspekt integriert ist.

Antworten:

2 für die Antwort № 1

Du kannst es versuchen JsonSurfer Dies ist darauf ausgelegt, JSON-Streams ereignisgesteuert zu verarbeiten.

JsonSurfer surfer = JsonSurfer.jackson();
Builder builder = config();
builder.bind("$.items[*]", new JsonPathListener() {
@Override
public void onValue(Object value, ParsingContext context) throws Exception {
// handle the value
}
});
surfer.surf(new InputStreamReader(response.getBody()), builder.build());

5 für die Antwort № 2

Ein paar Monate später; zurück, um meine eigene Frage zu beantworten.

Ich habe keine ausdrucksstarke API gefunden, um das zu tun, was ich will, aber ich konnte das gewünschte Verhalten erreichen, indem ich den HTTP-Body als Stream abgerufen und mit einem Jackson konsumiert habe JsonParser:

  ClientHttpRequest request =
clientHttpRequestFactory.createRequest(uri, HttpMethod.GET);
ClientHttpResponse response = request.execute();

return handleJsonStream(response.getBody(), handler);

... mit handleJsonStream für JSON, das so aussieht:

 { items: [
{ field: value; ... },
{ field: value, ... },
... thousands more ...
] }

... validiert die Token, die zum Start des Arrays führen; es schafft eine Item Objekt jedes Mal, wenn es auf ein Array-Element trifft, und gibt es an den Handler.

 // important that the JsonFactory comes from an ObjectMapper, or it won"t be
// able to do readValueAs()
static JsonFactory jsonFactory = new ObjectMapper().getFactory();

public static int handleJsonStream(InputStream stream, ItemHandler handler) throws IOException {

JsonParser parser = jsonFactory.createJsonParser(stream);

verify(parser.nextToken(), START_OBJECT, parser);
verify(parser.nextToken(), FIELD_NAME, parser);
verify(parser.getCurrentName(), "items", parser);
verify(parser.nextToken(), START_ARRAY, parser);
int count = 0;
while(parser.nextToken() != END_ARRAY) {
verify(parser.getCurrentToken(), START_OBJECT, parser);
Item item = parser.readValueAs(Item.class);
handler.onItem(item);
count++;
}
parser.close(); // hope it"s OK to ignore remaining closing tokens.
return count;
}

verify() ist nur eine private statische Methode, die eine Ausnahme auslöst, wenn die ersten beiden Argumente nicht gleich sind.

Das Wichtigste an dieser Methode ist, dass unabhängig von der Anzahl der Elemente im Stream nur bei jeder Methode ein Verweis auf ein Element vorhanden ist.


4 für die Antwort № 3

Gibt es keine Möglichkeit, die Anfrage aufzubrechen? Es klingt so, als ob Sie Paging verwenden sollten. Stellen Sie sicher, dass Sie die ersten 100 Ergebnisse, die nächsten 100 Ergebnisse usw. anfordern können. Die Anfrage sollte einen Startindex und eine Zählnummer enthalten. Dies ist ein sehr häufiges Verhalten bei REST-Services und scheint die Lösung für Ihr Problem zu sein.

Der springende Punkt bei REST ist, dass es zustandslos ist. Es hört sich so an, als würden Sie versuchen, es zustandsbehaftet zu machen. Das ist ein Gräuel für REST, also werden Sie keine Bibliotheken finden, die auf diese Weise geschrieben wurden.

Die Transaktionsnatur von REST ist von Natur aus sehr beabsichtigt, und so kommen Sie nicht so einfach herum. Sie werden gegen den Strich kämpfen, wenn Sie es versuchen.


3 für die Antwort № 4

Nach dem, was ich gesehen habe, erleichtern das Umschließen von Frameworks (wie Sie sie verwenden) die Arbeit, indem sie die Antwort in ein Objekt deserialisieren. In Ihrem Fall eine Sammlung von Objekten.

Um Dinge jedoch auf Streaming-Weise zu verwenden, müssen Sie möglicherweise auf den zugrunde liegenden HTTP-Antwortstrom zugreifen. Ich bin am besten mit Jersey vertraut, das entlarvt https://jersey.java.net/nonav/apidocs/1.5/jersey/com/sun/jersey/api/client/ClientResponse.html#getEntityInputStream()

Es würde durch Aufrufen verwendet werden

Client client = Client.create();
WebResource webResource = client.resource("http://...");
ClientResponse response = webResource.accept("application/json")
.get(ClientResponse.class);
InputStream is = response.getEntityInputStream();

Auf diese Weise erhalten Sie den eingehenden Datenstrom. Der nächste Schritt besteht darin, den Streaming-Teil zu schreiben. Da Sie JSON verwenden, gibt es Optionen auf verschiedenen Ebenen, einschließlich http://wiki.fasterxml.com/JacksonStreamingApi oder http://argo.sourceforge.net/documentation.html. Sie können den InputStream verwenden.

Diese nutzen das Ganze nicht wirklich gut ausEine Deserialisierung ist möglich, aber Sie können sie verwenden, um ein Element eines JSON-Arrays zu analysieren und dieses Element an einen typischen JSON-Objekt-Mapper (wie Jackson, GSON usw.) zu übergeben. Dies wird zur Ereignisbehandlungslogik. Sie können hierfür neue Threads erstellen oder alles tun, was Ihr Anwendungsfall benötigt.


2 für die Antwort № 5

Ich werde nicht behaupten, alle anderen Frameworks da draußen zu kennen (oder sogar die Hälfte), aber ich werde mit der Antwort gehen

Wahrscheinlich nicht

Wie von anderen bemerkt, ist dies nicht der Weg RESTNormalerweise denkt man an die Interaktionen. REST ist ein großartiger Hammer, aber wenn Sie Streaming benötigen, befinden Sie sich (IMHO) im Gebiet der Schraubenzieher, und der Hammer funktioniert möglicherweise immer noch, aber es ist wahrscheinlich, dass er ein Chaos verursacht. Man kann argumentieren dass es den ganzen Tag mit REST übereinstimmt oder nicht, aber am Ende wäre ich sehr überrascht, ein Framework zu finden, das diese Funktion implementiert. Ich wäre noch mehr überrascht, wenn die Funktion ausgereift wäre (auch wenn das Framework vorhanden ist), da Ihr Anwendungsfall in Bezug auf REST bestenfalls ein ungewöhnlicher Eckfall ist.

Wenn sich jemand einen einfallen lässt, stehe ich gerne korrigiert da und lerne etwas Neues :)

Vielleicht ist es am besten, für diese spezielle Operation in Kometen oder Websockets zu denken. Diese Frage kann hilfreich sein, da Sie bereits Frühling haben. (Websockets sind nicht wirklich lebensfähig Wenn Sie IE <10 unterstützen müssen, was die meisten kommerziellen Apps noch benötigen ... Leider habe ich in meiner persönlichen Arbeit einen Client mit einem Schlüsselkunden, der noch auf IE 7 ist.


1 für die Antwort № 6

Sie können darüber nachdenken Restlet.

http://restlet.org/discover/features

Unterstützt asynchrone Anforderungsverarbeitung,von IO-Operationen entkoppelt. Im Gegensatz zur Servlet-API haben die Restlet-Anwendungen keine direkte Kontrolle über den Ausgabestream, sondern bieten nur eine Ausgabedarstellung, die vom Server-Connector geschrieben werden kann.


1 für die Antwort № 7

Der beste Weg, dies zu erreichen, ist die Verwendung einer anderen Streaming-Laufzeit für JVM, die das Lesen von Antworten von Websockets ermöglicht, und mir ist eine mit dem Namen atmostphere bekannt

Auf diese Weise wird Ihr großer Datensatz auf beiden Seiten in Blöcken gesendet und empfangen und in Echtzeit auf dieselbe Weise gelesen, ohne auf die gesamte Antwort zu warten.

Dies hat einen guten POC: http://keaplogik.blogspot.in/2012/05/atmosphere-websockets-comet-with-spring.html

Server:

    @RequestMapping(value="/twitter/concurrency")
@ResponseBody
public void twitterAsync(AtmosphereResource atmosphereResource){
final ObjectMapper mapper = new ObjectMapper();

this.suspend(atmosphereResource);

final Broadcaster bc = atmosphereResource.getBroadcaster();

logger.info("Atmo Resource Size: " + bc.getAtmosphereResources().size());

bc.scheduleFixedBroadcast(new Callable<String>() {

//@Override
public String call() throws Exception {

//Auth using keaplogik application springMVC-atmosphere-comet-webso key
final TwitterTemplate twitterTemplate =
new TwitterTemplate("WnLeyhTMjysXbNUd7DLcg",
"BhtMjwcDi8noxMc6zWSTtzPqq8AFV170fn9ivNGrc",
"537308114-5ByNH4nsTqejcg5b2HNeyuBb3khaQLeNnKDgl8",
"7aRrt3MUrnARVvypaSn3ZOKbRhJ5SiFoneahEp2SE");

final SearchParameters parameters = new SearchParameters("world").count(5).sinceId(sinceId).maxId(0);
final SearchResults results = twitterTemplate.searchOperations().search(parameters);

sinceId = results.getSearchMetadata().getMax_id();

List<TwitterMessage> twitterMessages = new ArrayList<TwitterMessage>();

for (Tweet tweet : results.getTweets()) {
twitterMessages.add(new TwitterMessage(tweet.getId(),
tweet.getCreatedAt(),
tweet.getText(),
tweet.getFromUser(),
tweet.getProfileImageUrl()));
}

return mapper.writeValueAsString(twitterMessages);
}

}, 10, TimeUnit.SECONDS);
}

Klient: Atmosphere verfügt über eine eigene Javascript-Datei, um die verschiedenen Comet / Websocket-Transporttypen und -Anforderungen zu verarbeiten. Auf diese Weise können Sie den Endpunkt der Spring URL Controller-Methode auf die Anforderung festlegen. Sobald Sie den Controller abonniert haben, erhalten Sie Sendungen, die dies können Dies wird durch Hinzufügen einer request.onMessage-Methode behandelt. Hier ist eine Beispielanforderung für den Transport von Websockets.

       var request = new $.atmosphere.AtmosphereRequest();
request.transport = "websocket";
request.url = "<c:url value="/twitter/concurrency"/>";
request.contentType = "application/json";
request.fallbackTransport = "streaming";

request.onMessage = function(response){
buildTemplate(response);
};

var subSocket = socket.subscribe(request);

function buildTemplate(response){

if(response.state = "messageReceived"){

var data = response.responseBody;

if (data) {

try {
var result =  $.parseJSON(data);

$( "#template" ).tmpl( result ).hide().prependTo( "#twitterMessages").fadeIn();

} catch (error) {
console.log("An error ocurred: " + error);
}
} else {
console.log("response.responseBody is null - ignoring.");
}
}
}

Es unterstützt alle gängigen Browser und nativen mobilen Clients. Apple ist Pionier dieser Technologie:

Wie hier erwähnt, hervorragende Unterstützung für Bereitstellungsumgebungen in Web- und Enterprise-JEE-Containern:

http://jfarcand.wordpress.com/2012/04/19/websockets-or-comet-or-both-whats-supported-in-the-java-ee-land/